Building a Serverless Video Transcoding and Analysis Pipeline with Step Functions

Orchestrating Scalable Video Workflows and Enabling AI-Driven Content Analysis

Brad Duncan
AWS in Plain English

--

📘 Introduction

The exponential growth of user-submitted video content has created significant challenges in processing and analysing videos affordably and scalably.

Traditional video processing workflows often struggle to keep pace with the increasing volume and complexity of video data, especially for startups, when trying to avoid extremely high upfront investments in infrastructure. This is where serverless video processing pipelines come into play.

🔧 Designing the Video Transcoding and Analysis Pipeline

When architecting a video transcoding and analysis pipeline, there are some essential considerations that need to be made, especially around the complexities of video processing effectively. In this article, I’ve detailed a general approach, where I’ve segmented it into distinct phases: ingestion, transcoding, and content analysis.

Ingestion: The initial phase involves the collection and storage of the raw video files. In this case, the obvious answer in AWS is S3 storage. When moving onto further stages, there has to be some thought around the video formats and codecs that are allowed to be ingest into your environment, and how different formats should be handled.

Transcoding: Video platforms now are burdened with the responsibility of supporting a wide-range of devices, all with different format and codec requirements. For example, x265 allows you to achieve much smaller file sizes when compared to x264 without a large compromise on video quality. x265 sounds great right, so why not set and forget? This is largely down to compatibility; x264 has been around longer, and has lower processing requirements when it comes to encoding/decoding. For example, I can play x264 content on my TV through Plex, but x265 doesn’t work. So it’s key to identify what devices will be playing back content in order to achieve compatibility, so that you can define different profiles and configurations for each of them.

There’s different methodologies of transcoding videos, and plenty of online comparisons arguing to death CPU vs GPU, NVEC vs HEVC, all of which fall outside of the scope of this article. AWS do have an excellent tool we can utilise for video transcoding called AWS Elemental MediaConvert.

Content Analysis: The final stage leverages Amazon Rekognition for deep content analysis, using AWS’s ML models to identify objects, scenes, and activities within the video. The output of Rekognition is thousands (3083 at time of writing) of classifiers/labels, along with a confidence score for each of these labels between 0–100, indicating the probability that the applied label is applicable to the content.

By mapping each of these stages to a specific State in the AWS Step Functions workflow, we can create an automated pipeline.

🔍 Example Use-cases

Here’s a few different example use-cases for a video transcoding/analysis pipeline:

Streaming Services

Creating a transcoding and analysis pipeline can automate the process of preparing content for multi-platform distribution. The classifiers/labels can be used to help with search and categorisation. Amazon Prime Video has a cool piece of functionality called X-Ray, which when viewing, allows you to see the actors in a given scene, which could be achieved by utilising the celebrity recognition API; where the response provides the specific timecodes when those people appear in the scene.

Content Moderation for Social Media

Social media platforms can deploy the pipeline to automatically screen uploaded videos for inappropriate content, preventing the need for manual human-moderation of offensive or harmful visual material.

Security and Surveillance

The pipeline can process surveillance footage to detect specific activities, objects, or individuals. This can significantly enhance the efficiency of security operations by automating the analysis of vast amounts of video data, enabling quicker response times to security incidents.

📺 Demo

Below is an example pipeline created as a Step Function. The stages outlined are Transcode -> Analyse -> GetResults.

Example Step Function State Machine

Execution of the Step Function

Below is an input passed to my Step Function at point of execution, which contains the bucket where the video is stored and the key of the file.

{
“inputVideo”: {
“bucket”: “serverlessvideotranscode-demo-input”,
“key”: “ForBiggerBlazes.mp4”
}
}

Transcode

Here, the Step Function passes the input values to Lambda, which takes the values and creates a job with MediaConvert to transcode the video and to a different S3 bucket. Note at the time of writing, MediaConvert does not have a create_job function in Step Functions, yet weirdly it’s predecessor, Elastic Transcoder, is integrated. I’m sure at some point in the future in an RSS feed there will be an update about the integration, but as it stands, if you’re using MediaConvert, Lambda is the way (Elastic Transcoder does not support 4k video, and lacks support of a load of codecs and output formats).

Below is an example of a Lambda which handles this, and applies a defined output configuration:

import os
import boto3
from botocore.exceptions import ClientError

mediaconvert = boto3.client('mediaconvert')

def lambda_handler(event, context):
try:
# Get the input video file details from the event
input_video = event['inputVideo']
input_bucket = input_video['bucket']
input_key = input_video['key']

# Set up the MediaConvert job settings
job_settings = {
'Role': os.environ['MEDIA_CONVERT_ROLE_ARN'],
'Settings': {
'Inputs': [
{
'AudioSelectors': {
'Audio Selector 1': {
'Offset': 0,
'DefaultSelection': 'NOT_DEFAULT',
'ProgramSelection': 1
}
},
'VideoSelector': {},
'TimecodeSource': 'EMBEDDED',
'FileInput': f's3://{input_bucket}/{input_key}'
}
],
'OutputGroups': [
{
'Name': 'File Group',
'OutputGroupSettings': {
'Type': 'FILE_GROUP_SETTINGS',
'FileGroupSettings': {
'Destination': f's3://{os.environ["OUTPUT_BUCKET"]}/'
}
},
'Outputs': [
{
'VideoDescription': {
'ScalingBehavior': 'DEFAULT',
'TimecodeInsertion': 'DISABLED',
'AntiAlias': 'ENABLED',
'Sharpness': 50,
"CodecSettings": {
"Codec": "H_264",
"H264Settings": {
"InterlaceMode": "PROGRESSIVE",
"NumberReferenceFrames": 3,
"Syntax": "DEFAULT",
"Softness": 0,
"GopClosedCadence": 1,
"GopSize": 90,
"Slices": 1,
"GopBReference": "DISABLED",
"SlowPal": "DISABLED",
"SpatialAdaptiveQuantization": "ENABLED",
"TemporalAdaptiveQuantization": "ENABLED",
"FlickerAdaptiveQuantization": "DISABLED",
"EntropyEncoding": "CABAC",
"Bitrate": 5000000,
"FramerateControl": "SPECIFIED",
"RateControlMode": "CBR",
"CodecProfile": "MAIN",
"Telecine": "NONE",
"MinIInterval": 0,
"AdaptiveQuantization": "HIGH",
"CodecLevel": "AUTO",
"FieldEncoding": "PAFF",
"SceneChangeDetect": "ENABLED",
"QualityTuningLevel": "SINGLE_PASS",
"FramerateConversionAlgorithm": "DUPLICATE_DROP",
"UnregisteredSeiTimecode": "DISABLED",
"GopSizeUnits": "FRAMES",
"ParControl": "SPECIFIED",
"NumberBFramesBetweenReferenceFrames": 2,
"RepeatPps": "DISABLED",
"DynamicSubGop": "STATIC",
"ParDenominator": 1,
"FramerateNumerator": 30,
"FramerateDenominator": 1
}
},
'AfdSignaling': 'NONE',
'DropFrameTimecode': 'ENABLED',
'RespondToAfd': 'NONE',
'ColorMetadata': 'INSERT'
},
'AudioDescriptions': [
{
'AudioTypeControl': 'FOLLOW_INPUT',
'CodecSettings': {
'Codec': 'AAC',
'AacSettings': {
'AudioDescriptionBroadcasterMix': 'NORMAL',
'RateControlMode': 'CBR',
'CodecProfile': 'LC',
'CodingMode': 'CODING_MODE_2_0',
'RawFormat': 'NONE',
'SampleRate': 48000,
'Specification': 'MPEG4',
'Bitrate': 64000
}
},
'LanguageCodeControl': 'FOLLOW_INPUT',
'AudioSourceName': 'Audio Selector 1'
}
],
'ContainerSettings': {
'Container': 'MP4',
'Mp4Settings': {
'CslgAtom': 'INCLUDE',
'FreeSpaceBox': 'EXCLUDE',
'MoovPlacement': 'PROGRESSIVE_DOWNLOAD'
}
}
}
]
}
],
'TimecodeConfig': {
'Source': 'EMBEDDED'
}
}
}

# Create the MediaConvert job
mediaconvert_result = mediaconvert.create_job(**job_settings)
print(f'MediaConvert job created: {mediaconvert_result["Job"]["Id"]}')

return {
'jobId': mediaconvert_result['Job']['Id'],
'outputBucket': os.environ['OUTPUT_BUCKET'],
'transcodedVideoKey': f"{input_key}"
}


except ClientError as e:
print(f'Error: {e}')
raise e

Analyse

Once the transcoding Lambda has completed, it passes transcodedVideoKey and outputBucket as outputs to the Analyse state. I initially thought about having this execute within the Transcode state, though since the introduction of Redrive from Failure, it makes a lot more sense to modularise this process.

The analyse state simply submits the video key and bucket, and creates a Rekognition job, which returns a JobId:

import os
import boto3
from botocore.exceptions import ClientError

rekognition = boto3.client('rekognition')

def lambda_handler(event, context):
try:
output_bucket = event['outputBucket']
transcoded_video_key = event['transcodedVideoKey']

response = rekognition.start_label_detection(
Video={
'S3Object': {
'Bucket': output_bucket,
'Name': transcoded_video_key
}
},
MinConfidence=50,
NotificationChannel={
'SNSTopicArn': os.environ['SNS_TOPIC'],
'RoleArn': os.environ['REKOGNITION_NOTIFICATION_ROLE_ARN']
}
)

return {
'rekognitionJobId': response['JobId']
}

except ClientError as e:
print(f'Error: {e}')
raise e

WaitBeforeCheckingStatus

AWS Rekognition can take some time to process the video, so at this point we introduce a wait of 30 seconds before passing to the next step.

CheckJobStatus

This checks the status of the Rekognition job using the output from the Analyse lambda state (rekognitionJobId).

import boto3

def lambda_handler(event, context):
rekognition = boto3.client('rekognition')

# Extract the Rekognition job ID from the event
rekognition_job_id = event['rekognitionJobId']

# Check the status of the Rekognition job
response = rekognition.get_label_detection(JobId=rekognition_job_id)

# Extract the job status
job_status = response['JobStatus']

# Return the job status along with the rekognitionJobId
return {
'rekognitionJobId': rekognition_job_id,
'statusCode': 200,
'status': job_status
}

IsJobDone (GetResults/FailState)

This is a choice state, which evaluates the output from CheckJobStatus. If the result is “SUCCEEDED” then pass to GetResults, if “IN_PROGRESS”, pass back to “WaitBeforeCheckingStatus”, otherwise, pass to FailsState to capture the error.

GetResults

The last state in the pipeline gets the results of the classifiers applied to the video content from Rekognition. In a real world example, this would store the results in something like DynamoDB, correlating the video path in S3 with the results of the classification of video.

import boto3
import json

def lambda_handler(event, context):
rekognition = boto3.client('rekognition')

try:
# Extract the Rekognition job ID from the event
rekognition_job_id = event['rekognitionJobId']

# Fetch the analysis results
response = rekognition.get_label_detection(JobId=rekognition_job_id)

# Check the job status and proceed accordingly
if response['JobStatus'] == 'SUCCEEDED':
# Process and return the labels detected in the video
# This example simplifies the response to include relevant details
labels = response.get('Labels', [])
return {
'statusCode': 200,
'body': json.dumps({'Labels': labels, 'JobStatus': response['JobStatus']}, default=str)
}
else:
# Handle other job statuses as needed
return {
'statusCode': 500,
'body': json.dumps({'message': 'Rekognition job did not succeed', 'JobStatus': response['JobStatus']})
}
except KeyError:
return {
'statusCode': 400,
'body': json.dumps({'message': "Missing 'rekognitionJobId' in the input."})
}
except Exception as e:
# Generic error handling
return {
'statusCode': 500,
'body': json.dumps({'message': str(e)})
}

📊 Results from Rekognition

Here is some example results generated from a test video along with stills from the timestamps.

Fire

{
"Label": {
"Aliases": [],
"Categories": [
{
"Name": "Public Safety"
}
],
"Confidence": 78.67972564697266,
"Instances": [],
"Name": "Fire",
"Parents": []
},
"Timestamp": 4000
}

Person (with bounding box):

{
"Label": {
"Aliases": [],
"Categories": [
{
"Name": "Person Description"
}
],
"Confidence": 71.37804412841797,
"Instances": [
{
"BoundingBox": {
"Height": 0.8409050107002258,
"Left": 0.5057185888290405,
"Top": 0.10198806971311569,
"Width": 0.42175769805908203
},
"Confidence": 98.14481353759766
}
],
"Name": "Girl",
"Parents": [
{
"Name": "Female"
},
{
"Name": "Person"
}
]
},
"Timestamp": 6500
}

Bonus Round: iPhone? Phone?

It’s 50% confident that this is an iPhone (it’s not, which means the confidence is accurately low):

{
"Label": {
"Aliases": [],
"Categories": [
{
"Name": "Technology and Computing"
}
],
"Confidence": 50.055233001708984,
"Instances": [],
"Name": "Iphone",
"Parents": [
{
"Name": "Electronics"
},
{
"Name": "Mobile Phone"
},
{
"Name": "Phone"
}
]
},
"Timestamp": 0
}

And it’s 95% confident it’s a phone…

{
"Label": {
"Aliases": [
{
"Name": "Cell Phone"
}
],
"Categories": [
{
"Name": "Technology and Computing"
}
],
"Confidence": 95.14392852783203,
"Instances": [],
"Name": "Mobile Phone",
"Parents": [
{
"Name": "Electronics"
},
{
"Name": "Phone"
}
]
},
"Timestamp": 0
}

Here’s the full output from the sample video: https://gist.github.com/XargsUK/2afa3a10bd2d14209683b5384a09b199

💡 My ‘Writing-an-Article’ Thoughts

This is my third article, and with each of them, I write with the aim that the reader can walk away with something. When writing on the topic of video transcoding and image recognition, there’s almost a dissatisfaction of not being able to provide an in-depth, neatly bundled solution which solves the issue the article set out to explore; though it would be naive for me to believe that an article could be written on a topic which is so vast.

However, here’s a couple of tips and considerations…

Video vs Images

If you don’t require extreme precision when evaluating and classifying videos, then consider generating images of frames in a video (for example, 1 image every second). This is because the video analysis in Rekognition is billed at $0.10 per minute, whereas an image processed by Rekognition costs $0.001. The difference in cost when comparing the two is astronomical in this instance. Take a 10 minute video as an example, which would cost $1.00 for the video API to process. If the video was broken down into 600 images, this would amount to $0.60, offering a 40% saving.

However, if you have a 60 second video shot at 120 frames per second, and require precision then processing each frame of that video as an image would cost $7.2, vs the $0.10 for video.

Ordering the Pipeline

The order of the pipeline may also shift dependant on the desired results and the requirements. If utilising these services for transcoding and content moderation, then it may be wiser to initially store the video, and then classify a small sample of frames. If the results of this classification passes, you could then trigger the transcoding of the video, followed by the precision evaluation, as to avoid running up costs on transcoding video content that may later be removed due to being deemed by the classifiers as inappropriate.

Conclusion

Here we’ve explored the potential of serverless video transcoding and analysis pipelines using AWS services such as Step Functions, Lambda, MediaConvert, and Rekognition.

Designing an optimal video pipeline genuinely requires careful consideration of factors like precision, cost, distribution of video, quality and more. The importance of making informed decisions early in the development of a pipeline cannot be stressed enough, as its performance and economics are dependent on it.

📚 Additional Reading

Video Encoding 101: A Comprehensive Guide

AWS: Rekognition Documentation

AWS: MediaConvert Documentation

In Plain English 🚀

Thank you for being a part of the In Plain English community! Before you go:

--

--