// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0
package com.amazonaws.kinesis.blog.lambda.demo;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
//import com.amazonaws.kinesis.blog.demo.KDSUtil;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResult;
import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
/**
*
* Sample Lambda function to read data from an S3 Object and write to Kinesis
* Stream.
*
*
* @author Ravi Itha, Amazon Web Service, Inc.
*
*/
public class LambdaFunctionHandler implements RequestHandler {
private AmazonS3 s3 = AmazonS3ClientBuilder.standard().build();
public LambdaFunctionHandler() {
}
// Test purpose only.
LambdaFunctionHandler(AmazonS3 s3) {
this.s3 = s3;
}
/**
* This the main Handler method. It process records from an Object in S3 bucket
* and write those records to a target Kinesis Stream.
*/
@Override
public String handleRequest(S3Event event, Context context) {
context.getLogger().log("Received event: " + event);
String contentType = "";
String ddbTblName4HashKeys = Optional.ofNullable(System.getenv("tbl_kinesis_shard_hashkeys"))
.orElse("kinesis_hash_keys");
String targetKinesiStream = Optional.ofNullable(System.getenv("target_kinesis_stream"))
.orElse("stream_with_100_shards");
String region = Optional.ofNullable(System.getenv("region")).orElse("us-east-1");
S3Object fullObject = null;
DynamoDBUtil ddbUtil = new DynamoDBUtil();
AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard().withRegion(region).build();
// Option 1: get Hash Keys from a pre-populated DynamoDB table
List hashKeyListForOpenShards = ddbUtil.getHashkeys(ddbTblName4HashKeys, targetKinesiStream);
// Option 2: get Hash Keys directly Kinesis Stream. Use this option when Option 1 is not used.
// KDSUtil kdsUtil = new KDSUtil();
// List hashKeyListForOpenShards = kdsUtil.getHashKeysForOpenShards(kinesis, targetKinesiStream);
Iterator hashKeyIterator = createRoundrobinListofHashKyes(hashKeyListForOpenShards);
// Get the object from the event and show its content type
String bucket = event.getRecords().get(0).getS3().getBucket().getName();
String key = event.getRecords().get(0).getS3().getObject().getKey();
try {
fullObject = s3.getObject(new GetObjectRequest(bucket, key));
contentType = fullObject.getObjectMetadata().getContentType();
context.getLogger().log("CONTENT TYPE: " + contentType);
processRecordsFromObject(fullObject.getObjectContent(), kinesis, targetKinesiStream, hashKeyIterator);
} catch (Exception e) {
e.printStackTrace();
context.getLogger().log(String.format("Error getting object %s from bucket %s. Make sure they exist and"
+ " your bucket is in the same region as this function.", key, bucket));
}
return contentType;
}
private void processRecordsFromObject(InputStream input, AmazonKinesis kinesis, String targetKinesiStream,
Iterator hashKeyIterator) throws IOException {
// Read the text input stream one line at a time and display each line.
List recordList = new ArrayList();
BufferedReader reader = new BufferedReader(new InputStreamReader(input));
String line = null;
while ((line = reader.readLine()) != null) {
// System.out.println(line);
recordList.add(line);
if (recordList.size() == 500) {
writeMessagesToKinesis(recordList, targetKinesiStream, kinesis, hashKeyIterator);
recordList.clear();
}
}
if (recordList.size() > 0) {
writeMessagesToKinesis(recordList, targetKinesiStream, kinesis, hashKeyIterator);
recordList.clear();
}
System.out.println();
}
/**
* This method demonstrates writing multiple messages to Kinesis Data Stream
* using PutRecords API.
*
* Explicit Hash Keys: Hash Keys belong to shards are used to write records.
* Partition key is needed and it can be an empty string. When both Partition
* Key and explicit Hash Key are set, explicit Hash Key takes precedence.
* Calling hashKeyIterator.next() provides a Hash Key belongs to a shard.
*
* Retry logic: PutRecords is not atomic so it can partially reject some
* records. Unlike PutRecord, PutRecords does not thrown an exception rather it
* rejects records that are failed to write. Records are rejected for two
* reasons: 1. ProvisionedThroughputExceededException 2. InternalFailure. The
* retry logic used here handles both these errors.
*
* Input records used for demo purpose: List of input messages is broken into
* smaller list objects with 500 records each. Kinesis Tip 1: Each PutRecords
* request can support up to 500 records. Kinesis Tip 2: Each record in the
* request can be as large as 1 MiB, up to a limit of 5 MiB for the entire
* request, including partition keys.
*
* @param msgList
* @param streamName
* @param kinesis
* @param hashKeyIterator
*/
public void writeMessagesToKinesis(List msgList, String streamName, AmazonKinesis kinesis,
Iterator hashKeyIterator) {
PutRecordsRequest putRecsReq = new PutRecordsRequest();
List putRecReqEntryList = new ArrayList();
PutRecordsResult putRecsRes = new PutRecordsResult();
List> listofSmallerLists = Lists.partition(msgList, 500);
for (List smallerList : listofSmallerLists) {
putRecReqEntryList.clear();
for (String message : smallerList) {
PutRecordsRequestEntry putRecsReqEntry = new PutRecordsRequestEntry();
putRecsReqEntry.setData(ByteBuffer.wrap(message.getBytes()));
putRecsReqEntry.setPartitionKey("reqiredButHasNoEffect-when-setExplicitHashKey-isUsed");
putRecsReqEntry.setExplicitHashKey(hashKeyIterator.next());
putRecReqEntryList.add(putRecsReqEntry);
}
try {
putRecsReq.setStreamName(streamName);
putRecsReq.setRecords(putRecReqEntryList);
putRecsRes = kinesis.putRecords(putRecsReq);
while (putRecsRes.getFailedRecordCount() > 0) {
System.out.println("Processing rejected records");
// TODO: For simplicity, the backoff implemented as a constant 100ms sleep
// For production-grade, consider using CoralRetry's Exponential Jittered Backoff retry strategy
// Ref: https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
Thread.sleep(100);
final List failedRecordsList = new ArrayList();
final List putRecsResEntryList = putRecsRes.getRecords();
for (int i = 0; i < putRecsResEntryList.size(); i++) {
final PutRecordsRequestEntry putRecordReqEntry = putRecReqEntryList.get(i);
final PutRecordsResultEntry putRecordsResEntry = putRecsResEntryList.get(i);
if (putRecordsResEntry.getErrorCode() != null) {
failedRecordsList.add(putRecordReqEntry);
}
}
putRecReqEntryList = failedRecordsList;
putRecsReq.setRecords(putRecReqEntryList);
putRecsRes = kinesis.putRecords(putRecsReq);
}
System.out.printf("%d records inserted to Kinesis Stream successfully.\n", smallerList.size());
} catch (Exception e) {
System.out.println("Exception in Kinesis Batch Insert: " + e.getMessage());
}
}
}
/**
* This method creates a circular list based on a standard array list.
*
* @param hashKeysList
* @return
*/
public Iterator createRoundrobinListofHashKyes(List hashKeysList) {
Iterator hashKeyIterator = Iterables.cycle(hashKeysList).iterator();
return hashKeyIterator;
}
}