How to Integrate a DynamoDB Stream to AWS OpenSearch

A step-by-step tutorial on integrating a DynamoDB stream to AWS OpenSearch.

Dane Brear
AWS in Plain English

--

Photo by Domenico Loia on Unsplash

When designing an efficient backend system with a variety of search patterns, using an indexing and aggregation service can be a valuable tool. In this article, we will cover all steps to integrate a DynamoDB stream to OpenSearch, as well as querying an index.

I will be using a “board” entity to illustrate the example. Each board represents a social media post and will have tags, likes, boardId, and userId associated with it.

So whenever there is a change to a board in our DynamoDB table, we want to index the new item in our OpenSearch domain.

Our goal is to be able to search OpenSearch by keyword and to return any object that has a field matching our search.

Architecture

Screenshot by me

When we create a stream from DynamoDB, we essentially say we want to monitor changes for the items in a given table. We use AWS Lambda to handle these newly changed items and give them to our OpenSearch instance.

Creating the OpenSearch Instance

  1. Select ‘Create a Domain’ in the AWS OpenSearch Console
  2. Enter a name for your domain
  3. Leave Custom Endpoint unchecked
  4. Select the ‘Development and testing’ deployment type
  5. Leave default configuration for Auto-tune section
  6. Since we are creating a development instance, we do not need a lot of computing power. In the Data Nodes section, select the smallest instance type, ‘t3.small.search’. You can leave the default selections for the remainder of this section.
  7. Select ‘public access’ in the Network section
  8. Have ‘fine-grained access control’ enabled, and create a master user.
  9. Select ‘Only use fine-grained access control’ under the Access Policy section.
  10. Click Create.

Once the domain is created, it will take some time, but you should see an active domain status and the URLs for the dashboard and the endpoint.

Screenshot by me

Creating the Lambda Deployment Package

To handle the changes from DynamoDB and put them into our OpenSearch domain, we need to construct our lambda. Our lambda code will include packages that aren’t included in the AWS Python environment, so we need to create a deployment package as a .zip file.

  1. Create a new folder locally and make a file with the name lambda_function.py
  2. Inside this file, copy and paste the following code:
from opensearchpy import OpenSearch, RequestsHttpConnection
import boto3
import requests
from requests_aws4auth import AWS4Auth
region = 'us-east-1'
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
host = 'search-test-z***ce.us-east-1.es.amazonaws.com'
index = 'boards'
type = '_doc'
url = 'https://' + host + '/' + index + '/' + type + '/'
headers = { "Content-Type": "application/json" }def lambda_handler(event, context):
print(event)
client = OpenSearch(
hosts = [{'host': host, 'port': 443}],
http_auth = awsauth,
use_ssl = True,
verify_certs = True,
connection_class = RequestsHttpConnection
)

count = 0
if(event.get("Records", None) != None):

for record in event['Records']:
if record['eventName'] == 'REMOVE':
id = record['dynamodb']['Keys']['boardId']['S']
document = record['dynamodb']['NewImage']
print("Removing this document: ", document)

response = client.delete(
index = index,
id = id,
)
count += 1
print(response)
else:
id = record['dynamodb']['Keys']['boardId']['S']
document = record['dynamodb']['NewImage']
print("Indexing this document: ", document)

response = client.index(
index = index,
body = document,
id = id,
refresh = True
)
count += 1
print(response)

else:
response = client.search(
body={
'size': 1,
'query': {
'multi_match': {
'query': event['queryStringParameters']['search']
}
}
}
)
print(response)
return {
'statusCode': 200,
'body': json.dumps(response)
}
return str(count) + ' records processed.'

For simplicity, we are going to handle both index and query events in our Lambda function. Each request to index an item from DynamoDB looks like this:

{
"Records": [
{
"eventID": "43582764cd0b8affb3f7d3406824a79b",
"eventName": "MODIFY",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "us-east-1",
"dynamodb": {
"ApproximateCreationDateTime": 1637040719,
"Keys": {
"boardId": {
"S": "B53DC9CA-68F8-432A-AA1A-A1FE812624E0"
}
},
"NewImage": {
"tags": {
"L": []
},
"likes": {
"L": []
},
"boardId": {
"S": "B53DC9CA-68F8-432A-AA1A-A1FE812624E0"
},
"userId": {
"S": "test@gmail.com"
}
},
"SequenceNumber": "616204600000000014034774311",
"SizeBytes": 206,
"StreamViewType": "NEW_IMAGE"
},
"eventSourceARN": "arn:aws:dynamodb:us-east-1:900000000000:table/test/stream/2021-10-15T20:28:33.007"
}
]
}

In our case, since our query requests will be coming from API Gateway, we can safely assume only our requests to index an item use the Records field.

Other notes about the code:

  • The code inside of the else statement represents the handler for making a query request. Since we want to make an endpoint in our API Gateway for queries to OpenSearch, we can use a query parameter to specify search criteria. This is represented by event[‘queryStringParameters’][‘search’] .
  • Replace the host with the OpenSearch domain endpoint that is displayed in the AWS console.
  • Replace the region with the appropriate region for your use case
  • The index field relates to your data, so change that value as necessary.
  • The index must be lowercase.

3. Ensure you have pip installed. Here is a guide:

4. Install the requests and AWS4Auth packages

pip install --target ./package requests 
pip install --target ./package requests_aws4auth
pip install --target ./package git+https://github.com/opensearch-project/opensearch-py.git

5. Create a .zip of the directory. Run the following commands in succession:

cd package
zip -r ../opensearch-deployment-package.zip .
cd ..
zip -g opensearch-deployment-package.zip lambda_function.py

You should now have a .zip file called opensearch-deployment-package.zip in your directory. We will be uploading this to our Lambda function in the AWS console.

Creating the Lambda Resource

We are going to create a Lambda function from scratch in the AWS console.

Screenshot by me

Once the Lambda is created, click the Upload From button on the top right of the Code Source box. Select the .zip file option, and upload the opensearch-deployment-package.zip from your code’s local folder.

Configuring Permissions

In Lambda

From your Lambda resource, navigate to the associated IAM role. You can do this under the Configuration tab, and select the hyperlink for your role name.

We will need the following policies attached to the role:

Screenshot by me
  • AWSLambdaBasicExecutionRole
  • AWSLambdaDynaoDBExecutionRole
  • AmazonOpenSearchFullAccess

Note: I am using a broader scope than necessary with these permissions. Please consider using the minimum security permissions necessary in your implementation.

In OpenSearch

Next, we need to configure a back-end role that is associated with our Lambda.

  1. Navigate to your OpenSearch domain in the AWS console, and click on the one we had just created.
  2. Click on the URL for the “OpenSearch Dashboards URL” section.
  3. Log in to the dashboard with the previously created master user for the OpenSearch instance.
  4. In the sidebar of the page, navigate to Security -> Roles.
  5. Under the list of all roles, select the all_access role, then navigate to the “Mapped Users” tab.
  6. Select Manage Mapping.
  7. Enter the ARN for the IAM role associated with our Lambda as a new Backend Role. Click Map.

Creating the Stream in DynamoDB

In the DynamoDB console, select the table we are creating a stream for. Then, navigate to the Exports and streams tab.

Screenshot by me

Here are the steps to create a stream:

  1. Click Enable under the DynamoDB stream details box.
  2. Select New image, then Enable Stream
  3. Now we need to create the lambda trigger. In the DynamoDB stream details box, click the Create trigger button.
  4. Select the lambda we had previously created, leave the remaining options to their defaults.
  5. Click Create trigger.

Now whenever an item changed in the DynamoDB table, the new image will be handed off to the lambda, where we put the item into the OpenSearch domain.

Indexing and Querying

Indexing

Test the function by changing an item’s value in your DynamoDB table, then check the Lambda function’s CloudWatch logs. If everything has been implemented correctly, you should see a response like this:

{
"_index": "boards",
"_type": "_doc",
"_id": "B53DC9CA-68F8-432A-AA1A-A1FE812624E0",
"_version": 1,
"result": "created",
"forced_refresh": true,
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1
}

Querying

As we mentioned earlier, we will be using API Gateway to use our Lambda function to make a query. As setting up the API Gateway is not the focus of this article, I am omitting the steps to set an API up for brevity.

Notes when setting up your endpoint:

  • Make sure to set your API Gateway GET method to use the Lambda Proxy integration.
  • I am using AWS IAM for authorization for this endpoint because I know I will be using this endpoint from within another Lambda function only, and not from the client.

From here, we can click the “Test” button for our method:

Screenshot by me

Remember to specify your query criteria with the search query parameter:

Screenshot by me

As you can see, our query was successful and returned an item from our OpenSearch instance!

Resources

Thanks for reading!

Here is a reference to the python OpenSearch SDK which we use in our Lambda function:

Lastly, here is an AWS guide for creating a lambda deployment package and stream from DynamoDB for OpenSearch, as a secondary source of information.

That’s it for this article. Thank you for reading.

More content at plainenglish.io

--

--