/*
 * Copyright 2010-2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  http://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 *
 */

package com.amazon.verticles;

import com.amazon.exceptions.KinesisException;
import com.amazon.proto.TrackingEventProtos;

import com.amazon.vo.TrackingMessage;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import io.vertx.core.json.Json;
import io.vertx.mutiny.core.eventbus.EventBus;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;

import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Logger;

import static com.amazon.util.Constants.KINESIS_EVENTBUS_ADDRESS;
import static com.amazon.util.Constants.STREAM_NAME;

@ApplicationScoped
public class KinesisVerticle extends AbstractVerticle {

    private static final Logger LOGGER = Logger.getLogger(KinesisVerticle.class.getName());
    private KinesisAsyncClient kinesisAsyncClient;
    private String eventStream = "EventStream";

    @Override
    public void start() {

        EventBus eb = vertx.eventBus();

        kinesisAsyncClient = createClient();
        eventStream = System.getenv(STREAM_NAME) == null ? "EventStream" : System.getenv(STREAM_NAME);

        eb
                .<String>consumer(KINESIS_EVENTBUS_ADDRESS)
                .handler(message -> {
                    try {
                        TrackingMessage trackingMessage = Json.decodeValue(message.body(), TrackingMessage.class);
                        String partitionKey = trackingMessage.getMessageId();

                        byte [] byteMessage = createMessage(trackingMessage);

                        sendMessageToKinesis(byteMessage, partitionKey);

                        // Now send back reply
                        message.reply("OK");
                    }
                    catch (KinesisException exc) {
                        LOGGER.severe(exc.getMessage());
                    }
                });
    }

    @Override
    public void stop() {
        if (kinesisAsyncClient != null) {
            kinesisAsyncClient.close();
        }
    }

    private void sendMessageToKinesis(byte [] byteMessage, String partitionKey) throws KinesisException {
        if (null == kinesisAsyncClient) {
            throw new KinesisException("AmazonKinesisAsync is not initialized");
        }

        SdkBytes payload = SdkBytes.fromByteArray(byteMessage);
        PutRecordRequest putRecordRequest = PutRecordRequest.builder()
                .partitionKey(partitionKey)
                .streamName(eventStream)
                .data(payload)
                .build();

        LOGGER.info("Writing to streamName " + eventStream + " using partitionkey " + partitionKey);

        try {
            CompletableFuture<PutRecordResponse> future = kinesisAsyncClient.putRecord(putRecordRequest);

            future.whenComplete((result, e) -> vertx.getDelegate().runOnContext(none -> {
                if (e != null) {
                    LOGGER.severe("Something happened ... 1");
                    LOGGER.severe(e.getMessage());
                    e.printStackTrace();
                } else {
                    String sequenceNumber = result.sequenceNumber();
                    LOGGER.fine("Message sequence number: " + sequenceNumber);
                }
            }));
        }
        catch (Exception exc) {
            LOGGER.severe("Something happened ... 2");
            exc.printStackTrace();
            LOGGER.severe(exc.getMessage());
        }
    }

    private byte[] createMessage(TrackingMessage trackingMessage) {

        TrackingEventProtos.TrackingEvent.Builder trackingBuilder = TrackingEventProtos.TrackingEvent.newBuilder();
        trackingBuilder.setChecksum(trackingMessage.getChecksum());
        trackingBuilder.setCustomerId(trackingMessage.getCustomerId());
        trackingBuilder.setProgramid(trackingMessage.getProgramId());
        trackingBuilder.setUserAgent(trackingMessage.getUserAgent());
        trackingBuilder.setCustomerId(trackingMessage.getCustomerId());
        trackingBuilder.setCustomerName(trackingMessage.getCustomerName());
        trackingBuilder.setMessageId(trackingMessage.getMessageId());
        trackingBuilder.setProgramName(trackingMessage.getProgramName());

        TrackingEventProtos.TrackingEvent trackingEvent = trackingBuilder.build();
        return trackingEvent.toByteArray();
    }

    private KinesisAsyncClient createClient() {

        ClientAsyncConfiguration clientConfiguration = ClientAsyncConfiguration.builder().build();

        // Reading credentials from ENV-variables
        AwsCredentialsProvider awsCredentialsProvider = DefaultCredentialsProvider.builder().build();

        // Configuring Kinesis-client with configuration
        String tmp = System.getenv("REGION");

        Region myRegion;
        if (tmp == null || tmp.trim().length() == 0) {
            myRegion = Region.US_EAST_1;
            LOGGER.info("Using default region");
        } else {
            myRegion = Region.of(tmp);
        }

        LOGGER.info("Deploying in Region " + myRegion.toString());

        return KinesisAsyncClient.builder()
                .asyncConfiguration(clientConfiguration)
                .credentialsProvider(awsCredentialsProvider)
                .region(myRegion)
                .httpClientBuilder(NettyNioAsyncHttpClient.builder()
                        .maxConcurrency(10)
                        .maxPendingConnectionAcquires(1000))
                .build();
    }
}