Effortless Scalability: How Asynchronous Lambda Invocation Transforms AWS Workflows

Akash Mathur
AWS in Plain English
8 min readOct 24, 2023

--

Situation: While working on working multiple countries/regions file (with one file dedicated to each country), we encountered the requirement to execute a specific set of data cleaning, validation, transformation and enrichment task for each of these countries. To streamline this process and enable the concurrent execution, I harnessed the capabilities of AWS Lambda, S3, and ECR to effectively achieve a highly efficient workflow.

Task: The task at hand is to set up a serverless system that can concurrently and seamlessly trigger multiple AWS Lambda(s), each of which is based on one of the rows in the configuration file. To accomplish this, it’s essential to create a well-coordinated system where one Lambda function acts as an orchestrator and another Lambda acts as the worker for these tasks.

Action: Here’s how you can build this using only AWS Lambda:

  1. Create a Configuration File: The configuration file containing distinct rows for each country/region, will serve as the blueprint for the individual lambda. Each row represents a unique payload to be executed by AWS Lambda. Store this file in an S3 bucket or within your Lambda function itself if it’s not too large.

Below is the code that sits inside the lambda_handler function which is triggered by an S3 PUT event.

config.csv

2. Create an Orchestrator Lambda Function: I start by creating a Lambda function that will serve as the orchestrator. This function will read your configuration file and trigger the individual Lambda functions. Ensure that this Lambda function has the necessary permissions to read the configuration file and invoke other Lambda functions.

3. Code the Orchestrator Lambda Function: In the orchestrator Lambda function, write code to:

  • Read the configuration file.
  • Iterate through each row in the configuration file.
  • For each row, trigger an individual Lambda function using the invoke method or the AWS SDK for Lambda. You can do this concurrently using asynchronous calls to Lambda functions.

Before we deep dive into the code, let’s look at the meaning of the phrase concurrency and asynchronous. It means that you are triggering multiple Lambda functions to execute simultaneously, and you are doing this in a asynchronous manner. Let’s break down the meaning in more detail:

  • Concurrency: This means that you are executing multiple tasks or operations at the same time, without waiting for one to finish before starting the next. In the context of AWS Lambda, it means that you are triggering multiple Lambda function invocations simultaneously, and these invocations can run in parallel.
  • Asynchronous: In this context, asynchronous refers to a style of communication or execution where the caller (the orchestrator Lambda function, in this case) doesn’t wait for the called function (the worker Lambda function) to complete its work before moving on to the next task. Instead, it initiates the task and proceeds immediately without blocking.
  • The InvocationType parameter is set to 'Event', indicating that the invocation will be asynchronous. This means that the invoking Lambda function won't wait for a response from the target Lambda but will continue its execution immediately.

The below code reads the configuration file from S3, iterates through each row, and triggers the specified Lambda functions asynchronously with the row data as the payload.

import json
import numpy as np
import boto3
import awswrangler as wr

def lambda_handler(event, context):

# Get the file details that has been PUT into the designated bucket
BUCKET = event['Records'][0]['s3']['bucket']['name']
KEY = event['Records'][0]['s3']['object']['key']

print(BUCKET, KEY)
config_file_path = f"s3://{BUCKET}/path-to-config-file/{KEY}"

try:
# read input files files
config = wr.s3.read_csv(path=config_file_path)

except Exception as e:
print(f"An error occurred in reading the files: {e}")

print("Creating a Lambda client")
# Create an AWS Lambda client
lambda_client = boto3.client('lambda')

for idx in np.arange(config.shape[0]):
row = config.iloc[idx]

country = str(row["country"])
column_1 = int(row["column_1"])
column_2 = int(row["column_2"])

# Customize the payload with the necessary data
payload = {
"country": country,
"column_1": column_1,
"column_2": column_2
}

# Invoke an AWS Lambda function for each row concurrently
lambda_client.invoke(
FunctionName='arn_of_worker_lambda',
InvocationType='Event', # Asynchronous invocation
Payload=json.dumps(payload)
)
print(f"Lambda invoked for {country}")

return {
"statusCode": 200,
"body": json.dumps("Process completed!")
}

4. Code the Worker Lambda Function: The worker Lambda function is designed to handle incoming events and process data, with the output being saved to Amazon S3. Here’s a summary of the code:

import json
import pandas as pd
import numpy as np
import awswrangler as wr

def lambda_handler(event, context):

# The event parameter contains the payload you provided when invoking this Lambda function
payload = json.loads(event['Payload'])

# Get the input details from event to instantiate the class
country = payload["country"]
column_1 = payload["column_1"]
column_2 = payload["column_2"]

"""
ENTER YOUR CODE HERE OR IMPORT A PYTHON MODULE

""""

print("Saving the results to S3.")
#wr.s3.to_csv(df=df,
# path=f"s3://saved-path.csv",
# index=False)

print(f"{country} Completed.")

return {
"statusCode": 200,
"body": json.dumps(f"{country} Completed.")
}

a) The Lambda function is named lambda_handler and is configured to accept an event and context as parameters. The event parameter contains the payload sent when the Lambda function is invoked.

b) The code starts by parsing the event payload, which is expected to be in JSON format, and extracting specific values such as “country,” “column_1,” and “column_2” from the payload.

c) Next, you would perform the specific data transformations or computations required by your use case.

d) The processed data is then saved to an S3 bucket as a CSV file using the wr.s3.to_csv function provided by the awswrangler library.

Below can be the high level folder structure for this project.

# FOLDER STRUCTURE 

lambda_application/
├── orchestrator_lambda/
│ ├── lambda_function.py # Orchestrator Lambda function code
│ ├── requirements.txt # Dependencies for the orchestrator Lambda (if needed)
│ └── Dockerfile # Dockerfile

├── worker_lambda/
│ ├── lambda_function.py # Worker Lambda function code
│ ├── script.py # Supporting Python code or modules used by the worker Lambda
│ ├── requirements.txt # Dependencies for the worker Lambda (if needed)
│ └── Dockerfile # Dockerfile

└── config.json # Configuration file used by the orchestrator Lambda

For more details, refer to this GitHub repository.

Integration of Docker, and ECR

To make this workflow scalable, we have integrated Docker, and ECR before this code is executed as part of an AWS Lambda. Let me explain how these services likely fit into the overall architecture:

  1. Docker: Docker is a containerization platform that allows you to package an application and its dependencies into a container image. In this context, Docker would be used to create a container image that encapsulates the code and dependencies required to run the processing tasks for the AWS Lambda. This container image is typically defined in a Dockerfile.
  2. Amazon Elastic Container Registry (ECR): ECR is a fully managed container registry provided by AWS. It is used to store and manage Docker container images. After creating a Docker image, you can push it to an ECR repository. Once the image is stored in ECR, it can be easily accessed by AWS Lambdafor running containerized tasks.

Here’s a high-level explanation of how the integration would work in the overall workflow:

  1. You would create a Dockerfile to define the environment and dependencies for your processing tasks.
  2. You would build a Docker image from the Dockerfile. This image encapsulates your code, dependencies, and runtime environment.
  3. You would push the Docker image to an ECR repository. This makes the image available for use by AWS services.
  4. While creating your your Lambda, you would specify the ECR repository and the specific Docker image to use.
  5. When the Lambda is initiated, it pulls the Docker image from the ECR repository and runs the task using the defined image.

Lastly, you can monitor and manage your jobs from the AWS Batch console or using the AWS CLI. You can view job status, logs, and other job-related information on Cloudwatch, and SQS if you want to enable any notification for the internal stakeholders.

Choosing between Lambda and Batch

The choice between AWS Lambda and AWS Batch depends on the nature of your workload, specific requirements, and the characteristics of your application.

Here are some factors to consider when deciding between AWS Lambda and AWS Batch:

Choose AWS Lambda When:

  1. Event-Driven and Stateless Tasks: Lambda is ideal for event-driven and stateless tasks. It’s designed for short, isolated, and stateless functions that respond to events in real-time. If your workloads involve handling events like API requests, file uploads, or messages from event sources like S3, SNS, or API Gateway, Lambda is a good choice.
  2. Low to Moderate Computational Workloads: Lambda functions are designed for relatively short and lightweight tasks. If your tasks can be completed within a few minutes and don’t require heavy computational resources, Lambda is cost-effective and efficient.
  3. Stateless Parallel Processing: When you need to perform parallel, stateless processing of data or tasks, Lambda can easily handle concurrent executions, making it suitable for parallelization of tasks.

Choose AWS Batch When:

  1. Batch Processing Workloads: AWS Batch is purpose-built for batch processing workloads, where tasks are typically long-running, resource-intensive, and require fine-grained control over execution.
  2. High Computational Workloads: If your tasks involve heavy computational work, such as data processing, scientific computing, or rendering, AWS Batch allows you to provision and manage containerized resources for these tasks.
  3. Resource Management: AWS Batch allows you to control and manage the resources (CPU, memory, GPU) allocated to each task, ensuring that they have the necessary resources to execute efficiently.
  4. Job Dependency and Priority: AWS Batch provides features for managing job dependencies and setting task priorities, making it suitable for complex workflows with interdependent tasks.
  5. Distributed and HPC Workloads: If you have high-performance computing (HPC) or distributed computing workloads that require job scheduling, task parallelism, and job queues, AWS Batch is designed to handle such scenarios.

I have explained how we can optimize AWS Batch workloads by concurrent Batch jobs execution for High Computational Workloads. Refer my earlier Medium article.

Hope you learned something new today!

If you enjoyed reading this article comment “Hell Yes!” in the comment section and let me know if any feedback.

Feel free to follow me on Medium, and GitHub, or say Hi on LinkedIn. I am excited to discuss on AI, ML, NLP, and MLOps areas!

Appendix —

  1. The end-to-end code for this module is saved to this GitHub repository.

In Plain English

Thank you for being a part of our community! Before you go:

--

--