// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0
package app.tradematching.ingestion.services;

import app.tradematching.ingestion.exceptions.SettlementMessageParsingException;
import app.tradematching.ingestion.utils.AwsConfig;
import app.tradematching.ingestion.exceptions.DynamoDBConnectionException;
import app.tradematching.ingestion.exceptions.KinesisStreamException;
import app.tradematching.ingestion.pojo.Settlement;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient;
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbTable;
import software.amazon.awssdk.enhanced.dynamodb.Expression;
import software.amazon.awssdk.enhanced.dynamodb.model.BatchWriteResult;
import software.amazon.awssdk.enhanced.dynamodb.model.PutItemEnhancedRequest;
import software.amazon.awssdk.enhanced.dynamodb.model.WriteBatch;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;

@Slf4j
@Service
public class SettlementService {
    private AwsConfig awsConfig;

    private final static ObjectMapper JSON = new ObjectMapper();
    static {
        JSON.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        JSON.registerModule(new JavaTimeModule());
    }

    public SettlementService(AwsConfig awsConfig){
        this.awsConfig = awsConfig;
    }

    public Settlement settlementFromBytes(byte[] bytes) throws SettlementMessageParsingException {
        try{
            // parse settlement
            Settlement settlement = JSON.readValue(bytes, Settlement.class);
            // then return new settlement from old with fields needed for storage + tracking
            return Settlement.builder()
                    .id(settlement.getId())
//                    .timestamp(System.currentTimeMillis())
                    .timestamp(settlement.getTimestamp())
                    .currentDate(new SimpleDateFormat("yyyy-MM-dd").format(new Date()))
                    .currentTime(new SimpleDateFormat("HH:mm:ss").format(new Date()))
                    .senderID(settlement.getSenderID())
                    .imID(settlement.getImID())
                    .brokerID(settlement.getBrokerID())
                    .tradeID(settlement.getTradeID())
                    .allocationID(settlement.getAllocationID())
                    .quantity(settlement.getQuantity())
                    .security(settlement.getSecurity())
                    .transactionIndicator(settlement.getTransactionIndicator())
                    .price(settlement.getPrice())
                    .tradeDate(settlement.getTradeDate()).settlementDate(settlement.getSettlementDate())
                    .deliveryInstructions(settlement.getDeliveryInstructions()).status(settlement.getStatus())
                    .account(settlement.getAccount())
                    .build();
        }catch (Exception e) {
            log.error("Cannot parse as Settlement json string");
        }
        throw new SettlementMessageParsingException("Error parsing settlement message");
    }

    public void persistSettlements(List<Settlement> settlements) throws DynamoDBConnectionException {
        DynamoDbTable<Settlement> settlementDynamoDbTable = awsConfig.getSettlementTable();
        for (Settlement s : settlements){
            try {
                Expression putExpression = Expression.builder().expression("attribute_not_exists(id)").build();
                PutItemEnhancedRequest<Settlement> request = PutItemEnhancedRequest.<Settlement>builder(Settlement.class)
                        .conditionExpression(putExpression)
                        .item(s)
                        .build();
                settlementDynamoDbTable.putItem(request);
            } catch (ConditionalCheckFailedException e){
                log.error("Record already exists in table");
            } catch (Exception e)
            {
                log.error("Exception saving settlements to dynamodb", e);
                throw new DynamoDBConnectionException("Error saving settlement to dynamodb", e);
            }

        }
//        DynamoDbEnhancedClient enhancedClient = DynamoDbEnhancedClient.builder()
//                .dynamoDbClient(awsConfig.getDynamoDbClient())
//                .build();
//        try {
//            WriteBatch.Builder<Settlement> recordBuilder = WriteBatch.builder(Settlement.class).mappedTableResource(settlementDynamoDbTable);
//            for (int i = 0; i < settlements.size(); i++){
//                recordBuilder.addPutItem(settlements.get(i));
//                if (i % 24 == 0 || i == settlements.size() - 1){
//                    WriteBatch.Builder<Settlement> finalRecordBuilder = recordBuilder;
//                    BatchWriteResult result = enhancedClient.batchWriteItem(r -> r.addWriteBatch(finalRecordBuilder.build()));
//                    log.info(result.toString());
//                    recordBuilder = WriteBatch.builder(Settlement.class).mappedTableResource(settlementDynamoDbTable);
//                }
//            }
//        }catch (Exception e){
//            log.error("Exception saving settlements to dynamodb", e);
//            throw new DynamoDBConnectionException("Error saving settlement to dynamodb", e);
//        }
    }

    public void pushSettlementsUpstream(List<Settlement> settlements) throws KinesisStreamException {
        String streamName = awsConfig.awsProperties.getSettlementOutboundStream();
        KinesisAsyncClient client = awsConfig.getKinesisClient();
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.registerModule(new JavaTimeModule());
        log.info("Pushing Settlements to " + streamName);
        try {
            List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
            int index=1;
            for (Settlement s : settlements)
            {
                if (index==500) {
                    PutRecordsRequest putRecordsRequest = PutRecordsRequest.builder()
                            .streamName(streamName)
                            .records(putRecordsRequestEntryList)
                            .build();

                    CompletableFuture<PutRecordsResponse> putRecordsResult = client.putRecords(putRecordsRequest);
                    putRecordsResult.join();
                    index=1;
                    putRecordsRequestEntryList = new ArrayList<>();
                }
                else
                {
                    try {
                        putRecordsRequestEntryList.add(
                                PutRecordsRequestEntry.builder()
                                        .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                                        .data(SdkBytes.fromByteArray(objectMapper.writeValueAsString(s).getBytes()))
                                        .build());
                        index++;
                    } catch (JsonProcessingException e) {
                        // todo should an exception here throw an error or is logging fine?
                        log.error("Exception pushing settlements to Core: ", e);
                    }
                }
            }
            if (index>1) {
                PutRecordsRequest putRecordsRequest = PutRecordsRequest.builder()
                        .streamName(streamName)
                        .records(putRecordsRequestEntryList)
                        .build();

                CompletableFuture<PutRecordsResponse> putRecordsResult = client.putRecords(putRecordsRequest);
                putRecordsResult.join();
                log.info("Put Result" + putRecordsResult);
            }
        } catch (Exception e) {
            log.error("Exception pushing settlements to Outbound: ", e);
            throw new KinesisStreamException("Error writing to Kinesis stream", e);
        }
    }

}