// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT-0 package com.amazonaws.kda.flink.benchmarking.util; import java.util.List; import java.util.Optional; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.model.ListShardsRequest; import com.amazonaws.services.kinesis.model.ListShardsResult; import com.amazonaws.services.kinesis.model.Shard; import com.google.common.collect.Lists; /** *

* This is a utility class with methods to fetch details about a Kinesis Stream. * The shard details include the following: shard id, starting Hash Key, and * ending Hash Key. *

* * @author Ravi Itha, Amazon Web Service, Inc. * */ public class KinesisStreamUtil { /** * This method describes a Kinesis Data Stream, fetches starting Hash Key for * all the active shards, and creates a list based on those keys. * * @param streamName * @param region * @return List */ public static List getHashKeysForOpenShards(AmazonKinesis kinesis, String streamName) { String nextToken = null; List hashKeyList = Lists.newArrayList(); // prepare ListShardsRequest ListShardsRequest listShardsRequest = new ListShardsRequest(); listShardsRequest.setStreamName(streamName); // get shards ListShardsResult listShardResult = kinesis.listShards(listShardsRequest); List shardList = listShardResult.getShards(); for (Shard s : shardList) { if (s.getSequenceNumberRange().getEndingSequenceNumber() == null) { hashKeyList.add(s.getHashKeyRange().getStartingHashKey()); } } // get 'next token' from ListShardsResult and check its value. // if it is not null, call listShards until you get a null. // hint: paginating all shards. nextToken = listShardResult.getNextToken(); if (Optional.ofNullable(nextToken).isPresent()) { do { // creating a new ListShardsRequest using next token alone. listShardsRequest = new ListShardsRequest(); listShardsRequest.setNextToken(nextToken); listShardResult = kinesis.listShards(listShardsRequest); shardList = listShardResult.getShards(); for (Shard s : shardList) { if (s.getSequenceNumberRange().getEndingSequenceNumber() == null) { hashKeyList.add(s.getHashKeyRange().getStartingHashKey()); } } nextToken = listShardResult.getNextToken(); } while (Optional.ofNullable(nextToken).isPresent()); } return hashKeyList; } }