// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT-0 package app.settlement.ingestion; import app.settlement.ingestion.pojo.RawMessage; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse; import software.amazon.kinesis.common.KinesisClientUtil; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class RawMessage2SettlementE2E { private static String streamName = "settlement-ingress-us-east-1-kinesis-stream"; private KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(Region.of("us-east-1"))); private static final Logger log = LoggerFactory.getLogger(SettlementIngestionApplication.class); private final static ObjectMapper JSON = new ObjectMapper(); static { JSON.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } @Test void testRawMessageToKinesis() throws InterruptedException { for (int j = 0; j < 1; j++) { ArrayList list = new ArrayList<>(); for (int i = 0; i < 20; i++) { list.add(getSampleRawMessage(i)); // list.add(getSampleBadMessage(i)); } publishRecord(list); } } private RawMessage getSampleRawMessage(int i) { return RawMessage.builder().id("e4381798-2f61-4d53-ade0-b56079a9b3e3" + i) .timestamp(1643663158148L) // TODO add mock settlement json string .rawMessage("{\"tradeID\": \"3500017\", \"price\": 5.0, \"settlementDate\": 1644278400.0, \"tradeDate\": 1644278400.0, \"brokerID\": \"QABRO010\", \"quantity\": 1, \"imID\": \"QAINS012\", \"senderID\": \"VOLUPTAS\", \"deliveryInstructions\": \"Et deleniti rer\", \"security\": \"AAPL\", \"status\": \"MATCHED\", \"id\": \"89697e14-6e09-4bde-a1ef-d8c4c9f01715\", \"transactionIndicator\": \"S\", \"allocationID\": 3, \"account\": \"6727534070\"\n}") .build(); } private RawMessage getSampleBadMessage(int i) { return RawMessage.builder().id("e4381798-2f61-4d53-ade0-b56079a9b3e3" + i) .timestamp(1643663158148L) // TODO add BAD mock settlement json string .rawMessage("") .build(); } private void publishRecord(ArrayList messages) throws InterruptedException { List putRecordsRequestEntryList = new ArrayList<>(); messages.forEach(m -> putRecordsRequestEntryList.add( PutRecordsRequestEntry.builder() .partitionKey(RandomStringUtils.randomAlphabetic(2, 20)) .data(SdkBytes.fromByteArray(toJsonAsBytes(m))) .build())); PutRecordsRequest request = PutRecordsRequest.builder() .streamName(streamName) .records(putRecordsRequestEntryList) .build(); try { PutRecordsResponse putRecordsResponse = kinesisClient.putRecords(request).get(); log.info("put records finished"); log.info(putRecordsResponse.toString()); } catch (Exception e) { log.error("issue putting mock data", e); } } public byte[] toJsonAsBytes(RawMessage message) { try { return JSON.writeValueAsBytes(message); } catch (IOException e) { return null; } } }