// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT-0 package com.amazonaws.kda.flink.benchmarking.util; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.StringTokenizer; import java.util.UUID; import java.util.stream.Collectors; import com.amazonaws.kda.flink.benchmarking.model.Event; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.PutRecordRequest; import com.amazonaws.services.kinesis.model.PutRecordsRequest; import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry; import com.amazonaws.services.kinesis.model.PutRecordsResult; import com.amazonaws.services.kinesis.model.PutRecordsResultEntry; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.gson.Gson; public class KDSProducerUtil { /** * This method creates sample InteractionIds * * @param numInteractions * @return */ public static List createInteractions(int numInteractions) { List interactionList = new ArrayList(); for (int i = 0; i < numInteractions; i++) { String interactionId = UUID.randomUUID().toString(); // System.out.printf("Interaction_id: %s \n", interactionId); interactionList.add(interactionId); } return interactionList; } public static Iterator createRotatableInteractions(int numInteractions) { List interactionList = new ArrayList(); for (int i = 0; i < numInteractions; i++) { String interactionId = UUID.randomUUID().toString(); System.out.printf("Interaction_id: %s \n", interactionId); interactionList.add(interactionId); } return Iterables.cycle(interactionList).iterator(); } /** * This method create a list of sample events * * @param eventList * @param numEvents * @param interactionId * @return */ public static List createEvents(List eventList, int numEvents, String sessionId) { // clear the eventList and build it up again! eventList.clear(); long createTime = System.currentTimeMillis(); for (int i = 1; i < numEvents + 1; i++) { Event event = new Event(); event.setAttr_1("my_attr_1_" + i); event.setAttr_2("my_attr_2_" + i); event.setAttr_3("my_attr_3_" + i); event.setAttr_4("my_attr_4_" + i); event.setAttr_5("my_attr_5_" + i); event.setAttr_6(sessionId); event.setAttr_7(createTime); event.setSession_id(sessionId); event.setTimestamp(createTime); eventList.add(new Gson().toJson(event)); } return eventList; } /** * This method demonstrates writing a single messages to Kinesis Data Stream * using PutRecord API. * * Partition key is needed and it can be an empty string. When both Partition * Key and explicit Hash Key are set, explicit Hash Key takes precedence. * Calling hashKeyIterator.next() provides a Hash Key belongs to a shard. * * Retry logic: PutRecord throws ProvisionedThroughputExceededException when a * stream is throttled. The retry logic used here handles the exception and * re-writes the failed record. * * @param record * @param streamName * @param kinesis */ public static void writeSingleMessageToKinesis(String record, String streamName, AmazonKinesis kinesis, String startingHashKey) { PutRecordRequest putRecReq = new PutRecordRequest(); try { putRecReq.setStreamName(streamName); putRecReq.setData(ByteBuffer.wrap(record.getBytes())); putRecReq.setExplicitHashKey(startingHashKey); putRecReq.setPartitionKey("reqiredButHasNoEffect-when-setExplicitHashKey-isUsed"); kinesis.putRecord(putRecReq); } catch (ProvisionedThroughputExceededException exception) { try { System.out.println("ERROR: Throughput Exception Thrown."); exception.printStackTrace(); System.out.println("Retrying after a short delay."); Thread.sleep(100); kinesis.putRecord(putRecReq); } catch (ProvisionedThroughputExceededException e) { e.printStackTrace(); System.out.println("Kinesis Put operation failed after re-try due to second consecutive " + "ProvisionedThroughputExceededException"); } catch (Exception e) { e.printStackTrace(); System.out.println("Exception thrown while writing a record to Kinesis."); } } catch (Exception e) { e.printStackTrace(); System.out.println("Exception thrown while writing a record to Kinesis."); } } public static void writeMessagesToKinesis(AmazonKinesis kinesis, String streamName, List recordList, Iterator hashKeyIterator) { PutRecordsRequest putRecsReq = new PutRecordsRequest(); List putRecReqEntryList = new ArrayList(); PutRecordsResult putRecsRes = new PutRecordsResult(); // Make sure you write messages in a batch of 500 messages List> listofSmallerLists = Lists.partition(recordList, 500); for (List smallerList : listofSmallerLists) { putRecReqEntryList.clear(); for (String message : smallerList) { PutRecordsRequestEntry putRecsReqEntry = new PutRecordsRequestEntry(); putRecsReqEntry.setData(ByteBuffer.wrap(message.getBytes())); putRecsReqEntry.setPartitionKey("reqiredButHasNoEffect-when-setExplicitHashKey-isUsed"); putRecsReqEntry.setExplicitHashKey(hashKeyIterator.next()); putRecReqEntryList.add(putRecsReqEntry); } try { putRecsReq.setStreamName(streamName); putRecsReq.setRecords(putRecReqEntryList); putRecsRes = kinesis.putRecords(putRecsReq); while (putRecsRes.getFailedRecordCount() > 0) { System.out.println("Processing rejected records"); // TODO: For simplicity, the backoff implemented as a constant 100ms sleep // For production-grade, consider using CoralRetry's Exponential Jittered // Backoff retry strategy // Ref: // https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ Thread.sleep(100); final List failedRecordsList = new ArrayList(); final List putRecsResEntryList = putRecsRes.getRecords(); for (int i = 0; i < putRecsResEntryList.size(); i++) { final PutRecordsRequestEntry putRecordReqEntry = putRecReqEntryList.get(i); final PutRecordsResultEntry putRecordsResEntry = putRecsResEntryList.get(i); if (putRecordsResEntry.getErrorCode() != null) { failedRecordsList.add(putRecordReqEntry); } } putRecReqEntryList = failedRecordsList; putRecsReq.setRecords(putRecReqEntryList); putRecsRes = kinesis.putRecords(putRecsReq); } // end of while loop System.out.println("Number of messages written: " + smallerList.size()); } catch (Exception e) { System.out.println("Exception in Kinesis Batch Insert: " + e.getMessage()); } } } public static List tokenizeStrings(String str, String separator) { List tokenList = Collections.list(new StringTokenizer(str, separator)).stream() .map(token -> (String) token).collect(Collectors.toList()); return tokenList; } }