Serverless Data Processing on AWS

Stream Processing

In this module, you’ll use AWS Lambda to process data from the wildrydes Amazon Kinesis stream created earlier. We’ll create and configure a Lambda function to read from the stream and write records to an Amazon DynamoDB table as they arrive.

Implementation

1. Create an Amazon DynamoDB table

Use the Amazon DynamoDB console to create a new DynamoDB table. Call your table UnicornSensorData and give it a Partition key called Name of type String and a Sort key called StatusTime of type String. Use the defaults for all other settings.

After you’ve created the table, note the Amazon Resource Name (ARN) for use in the next section.

✅ Step-by-step Instructions

  1. Go to the AWS Management Console, choose Services then select DynamoDB under Database.

  2. Click Create table.

  3. Enter UnicornSensorData for the Table name.

  4. Enter Name for the Partition key and select String for the key type.

  5. Tick the Add sort key checkbox. Enter StatusTime for the Sort key and select String for the key type.

  6. Leave the Use default settings box checked and choose Create.

  7. Scroll to the Table details section of your new table’s properties and note the Amazon Resource Name (ARN). You will use this in the next step.

2. Create an IAM role for your Lambda function

Use the IAM console to create a new role. Name it WildRydesStreamProcessorRole and select Lambda for the role type. Attach the managed policy called AWSLambdaKinesisExecutionRole to this role in order to grant permissions for your function to read from Amazon Kinesis streams and to log to Amazon CloudWatch Logs. Create a policy that allows dynamodb:BatchWriteItem access to the DynamoDB table created in the last section and attach it to the new role.

✅ Step-by-step Instructions

  1. From the AWS Console, click on Services and then select IAM in the Security, Identity & Compliance section.

  2. Select Policies from the left navigation and then click Create policy.

  3. Using the Visual editor, we’re going to create an IAM policy to allow our Lambda function access to the DynamoDB table created in the last section. To begin, click Service, begin typing DynamoDB in Find a service, and click DynamoDB.

  4. Click Action, begin typing BatchWriteItem in Filter actions, and tick the BatchWriteItem checkbox.

  5. Click Resources, click Add ARN in table, and construct the ARN of the DynamoDB table you created in the previous section by specifying the Region, Account, and Table Name.

    In Region, enter the AWS Region in which you created the DynamoDB table in the previous section, e.g.: us-east-1.

    In Account, enter your AWS Account ID which is a twelve digit number, e.g.: 123456789012. To find your AWS account ID number in the AWS Management Console, click on Support in the navigation bar in the upper-right, and then click Support Center. Your currently signed in account ID appears in the upper-right corner below the Support menu.

    In Table Name, enter UnicornSensorData.

    You should see your ARN in the Specify ARN for table field and it should look similar to:

  6. Click Add.

  7. Click Review policy.

  8. Enter WildRydesDynamoDBWritePolicy in the Name field.

  9. Click Create policy.

  10. Select Roles from the left navigation and then click Create role.

  11. Click Lambda for the role type from the AWS service section.

  12. Click Next: Permissions.

  13. Begin typing AWSLambdaKinesisExecutionRole in the Filter text box and check the box next to that role.

  14. Begin typing WildRydesDynamoDBWritePolicy in the Filter text box and check the box next to that role.

  15. Click Next: Review.

  16. Enter WildRydesStreamProcessorRole for the Role name.

  17. Click Create role.

3. Create a Lambda function to process the stream

Create a Lambda function called WildRydesStreamProcessor that will be triggered whenever a new record is avaialble in the wildrydes stream. Use the provided index.js implementation for your function code. Create an environment variable with the key TABLE_NAME and the value UnicornSensorData. Configure the function to use the WildRydesStreamProcessor role created in the previous section.

✅ Step-by-step Instructions

  1. Go to the AWS Management Console, choose Services then select Lambda under Compute.

  2. Click Create a function.

  3. Enter WildRydesStreamProcessor in the Function name field.

  4. Select Node.js 10.x from Runtime.

  5. Select WildRydesStreamProcessorRole from the Existing role dropdown.

  6. Click Create function.

  7. Scroll down to the Function code section.

  8. Copy and paste the JavaScript code below into the code editor.

  9. In the Environment variables section, enter an environment variable with Key TABLE_NAME and Value UnicornSensorData.

  10. In the Basic settings section. Set the Timeout to 1 minute.

  11. Scroll up and select Kinesis from the Designer section.

  12. In the Configure triggers section, select wildrydes-summary from Kinesis Stream.

  13. Leave Batch size set to 100 and Starting position set to Latest.

  14. Click Add.

  15. Click Enabled to enable the trigger.

  16. Click Save.

4. Monitor the Lambda function

Verify that the trigger is properly executing the Lambda function. View the metrics emitted by the function and inspect the output from the Lambda function.

✅ Step-by-step Instructions

  1. Run the producer to start emiting sensor data to the stream with a unique unicorn name.

    ./producer -name Rocinante
  2. Click on the Monitoring tab and explore the metrics available to monitor the function. Click on Jump to Logs to explore the function’s log output.

5. Query the DynamoDB table

Using the AWS Management Console, query the DynamoDB table for data for a specific unicorn. Use the producer to create data from a distinct unicorn name and verify those records are persisted.

✅ Step-by-step Instructions

  1. Click on Services then select DynamoDB in the Database section.

  2. Click Tables from the left-hand navigation

  3. Click on UnicornSensorData.

  4. Click on the Items tab. Here you should see each per-minute data point for each Unicorn for which you’re running a producer.

⭐️ Recap

🔑 You can subscribe Lambda functions to automatically read batches of records off your Kinesis stream and process them if records are detected on the stream.

🔧 In this module, you’ve created a Lambda function that reads from the Kinesis stream of summary unicorn data and saves each row to DynamoDB.

Next

✅ Proceed to the next module, Data Lake, wherein you’ll deliver the raw stream data from the Kinesis stream to Kinesis Data Firehose for delivery to Amazon S3 and use Amazon Athena to query that raw data.