// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0
package com.amazonaws.kinesis.blog.demo;
import java.util.List;
import java.util.Optional;
import com.amazonaws.kinesis.blog.lambda.demo.KinesisShard;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.ListShardsRequest;
import com.amazonaws.services.kinesis.model.ListShardsResult;
import com.amazonaws.services.kinesis.model.Shard;
import com.google.common.collect.Lists;
/**
*
* This is a utility class with methods to fetch details about a Kinesis Stream.
* The shard details include the following: shard id, starting Hash Key, and
* ending Hash Key.
*
*
* @author Ravi Itha, Amazon Web Service, Inc.
*
*/
public class KinesisStreamUtil {
/**
* This method describes a Kinesis Data Stream, fetches starting Hash Key for
* all the active shards, and creates a list based on those keys.
*
* @param streamName
* @param region
* @return List
*/
public List getHashKeysForOpenShards(AmazonKinesis kinesis, String streamName) {
String nextToken = null;
List hashKeyList = Lists.newArrayList();
// prepare ListShardsRequest
ListShardsRequest listShardsRequest = new ListShardsRequest();
listShardsRequest.setStreamName(streamName);
// get shards
ListShardsResult listShardResult = kinesis.listShards(listShardsRequest);
List shardList = listShardResult.getShards();
for (Shard s : shardList) {
if (s.getSequenceNumberRange().getEndingSequenceNumber() == null) {
hashKeyList.add(s.getHashKeyRange().getStartingHashKey());
}
}
// get 'next token' from ListShardsResult and check its value.
// if it is not null, call listShards until you get a null.
// hint: paginating all shards.
nextToken = listShardResult.getNextToken();
if (Optional.ofNullable(nextToken).isPresent()) {
do {
// creating a new ListShardsRequest using next token alone.
listShardsRequest = new ListShardsRequest();
listShardsRequest.setNextToken(nextToken);
listShardResult = kinesis.listShards(listShardsRequest);
shardList = listShardResult.getShards();
for (Shard s : shardList) {
if (s.getSequenceNumberRange().getEndingSequenceNumber() == null) {
hashKeyList.add(s.getHashKeyRange().getStartingHashKey());
}
}
nextToken = listShardResult.getNextToken();
} while (Optional.ofNullable(nextToken).isPresent());
}
return hashKeyList;
}
/**
* This method wraps shard details to a KinesisShard object and create add
* KinesisShard objects to a List.
*
* @param streamName
* @param region
* @return
*/
public List getOpenShardDetails(AmazonKinesis kinesis, String streamName) {
String nextToken = null;
List openShards = Lists.newArrayList();
// prepare ListShardsRequest
ListShardsRequest listShardsRequest = new ListShardsRequest();
listShardsRequest.setStreamName(streamName);
// get list of shards
ListShardsResult listShardResult = kinesis.listShards(listShardsRequest);
List shardList = listShardResult.getShards();
for (Shard s : shardList) {
if (s.getSequenceNumberRange().getEndingSequenceNumber() == null) {
KinesisShard kShard = new KinesisShard();
kShard.setStreamName(streamName);
kShard.setShardId(s.getShardId().concat("-").concat(streamName));
kShard.setStartingHashKey(s.getHashKeyRange().getStartingHashKey());
kShard.setEndingHashKey(s.getHashKeyRange().getEndingHashKey());
openShards.add(kShard);
}
}
// get 'next token' from ListShardsResult and check its value.
// if it is not null, call listShards until you get a null.
// hint: paginating all shards.
nextToken = listShardResult.getNextToken();
if (Optional.ofNullable(nextToken).isPresent()) {
System.out.println(
"Stream appears to have more shards than the specified max results or the default value of 1000");
do {
// creating a new ListShardsRequest using next token alone. check the API to know more details.
listShardsRequest = new ListShardsRequest();
listShardsRequest.setNextToken(nextToken);
listShardResult = kinesis.listShards(listShardsRequest);
shardList = listShardResult.getShards();
for (Shard s : shardList) {
if (s.getSequenceNumberRange().getEndingSequenceNumber() == null) {
KinesisShard kShard = new KinesisShard();
kShard.setStreamName(streamName);
kShard.setShardId(s.getShardId().concat("-").concat(streamName));
kShard.setStartingHashKey(s.getHashKeyRange().getStartingHashKey());
kShard.setEndingHashKey(s.getHashKeyRange().getEndingHashKey());
openShards.add(kShard);
}
}
nextToken = listShardResult.getNextToken();
// System.out.println("Next token: " + listShardResult.getNextToken());
} while (Optional.ofNullable(nextToken).isPresent());
}
return openShards;
}
}