// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0
package app.tradematching.outbound;

import app.tradematching.outbound.pojo.ResponseMessage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
import software.amazon.kinesis.common.KinesisClientUtil;

import java.util.ArrayList;
import java.util.List;

public class Egress2OutboundE2E {
    private static String streamName = "trade-matching-out-gateway-trade-us-east-1-kinesis-stream";

    private KinesisAsyncClient kinesisClient =
            KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(Region.of("us-east-1")));

    private static final Logger log = LoggerFactory.getLogger(TradeMatchingOutboundApplication.class);

    @Test
    void testRawMessageToKinesis() throws InterruptedException {
        for (int j = 0; j < 1; j++) {
            ArrayList<ResponseMessage> list = new ArrayList<>();
            for (int i = 0; i < 20; i++) {
                list.add(getSampleRawMessage(i));
            }
            publishRecord(list);
        }
    }

    private ResponseMessage getSampleRawMessage(int i) {
        ResponseMessage responseMessage = new ResponseMessage();
        responseMessage.setId("e4381798-2f61-4d53-ade0-b56079a9b3e3");
        responseMessage.setDescription("Test description");
        responseMessage.setStatus("ACK");
//        responseMessage.setDestination(i%2 == 0 ? "destination" : "SETTLEMENT");
        responseMessage.setDestination("destination");
        responseMessage.setMessage("{\"brokerID\":\"5062661010\",\"quantity\":3313,\"tradeDate\":\"2022-01-25T01:31:14Z\",\"settlementDate\":\"2022-01-31T21:05:54.597506Z\",\"transactionIndicator\":\"B\",\"imID\":\"2545908364\",\"deliveryInstructions\":\"Aut eius ut.Ten\",\"senderID\":\"6644623370\",\"security\":\"EUM\",\"allocations\":[\"{\\\"quantity\\\":100,\\\"allocationID\\\":1,\\\"account\\\":\\\"1538552570\\\",\\\"status\\\":\\\"Settled\\\"}\"],\"price\":4091.823533894876,\"tradeID\":\"19426\",\"status\":\"Cancelled\"}");
        return responseMessage;
    }

    private void publishRecord(ArrayList<ResponseMessage> messages) throws InterruptedException {
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.registerModule(new JavaTimeModule());

        List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
        messages.forEach(m -> {
            try {
                putRecordsRequestEntryList.add(
                        PutRecordsRequestEntry.builder()
                                .partitionKey(RandomStringUtils.randomAlphabetic(2, 20))
                                .data(SdkBytes.fromByteArray(objectMapper.writeValueAsString(m).getBytes()))
                                .build());
            } catch (JsonProcessingException e) {
                log.error("fail to convert object to json ", e);
            }
        });

        PutRecordsRequest request = PutRecordsRequest.builder()
                .streamName(streamName)
                .records(putRecordsRequestEntryList)
                .build();
        try {
            PutRecordsResponse putRecordsResponse = kinesisClient.putRecords(request).get();
            log.info("put records finished");
            log.info(putRecordsResponse.toString());
        } catch (Exception e) {
            log.error("issue putting mock data", e);
        }
    }
}