/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: MIT-0
 */
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.TimeUnit;


public class AsyncIOExample {


    private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env, String region, String inputStreamName) {

        Properties inputProperties = new Properties();
        inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
        inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
        inputProperties.setProperty(ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS, "true");
        return env.addSource(new FlinkKinesisConsumer<>(
                inputStreamName,
                new SimpleStringSchema(),
                inputProperties))
                .name("flink_kinesis_consumer_01").setParallelism(1);
    }


    private static FlinkKinesisProducer<String> createSinkFromStaticConfig(String region, String outputStreamName) {
        Properties outputProperties = new Properties();
        outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
        outputProperties.setProperty("AggregationEnabled", "false");
        FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties);
        sink.setDefaultStream(outputStreamName);
        sink.setDefaultPartition("0");
        return sink;
    }

    public static void main(String[] args) throws Exception {

        /**
         * Args to pass in via Kinesis Properties
         */
        final Logger LOG = LoggerFactory.getLogger(AsyncIOExample.class);
        final String WAIT_MODE;
        final String REGION;
        final String INPUT_STREAM_NAME;
        final String OUTPUT_STREAM_NAME;
        int ASYNC_OPERATOR_PARALLELISM;
        final long ASYNC_OPERATOR_TIMEOUT;
        final int ASYNC_OPERATOR_CAPACITY;
        final String POST_REQUEST_URL;

        // obtain execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties applicationProperties;

        /**
         * In order to accurately reflect the Kinesis Data Analytics application, set the
         * parallelism value locally to the parallelism in the KDA application.
         */
        if (env instanceof LocalStreamEnvironment) {
            applicationProperties = new Properties(); // default
            applicationProperties.setProperty("input_stream_name", System.getenv("input_stream_name"));
            applicationProperties.setProperty("output_stream_name", System.getenv("output_stream_name"));
            applicationProperties.setProperty("post_request_url", System.getenv("post_request_url"));
            env.setParallelism(8);
        } else {
            applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties().get("applicationProperties");
        }



        try {
            // check the configuration for the job
            WAIT_MODE = applicationProperties.getProperty("wait_mode", "unordered");
            REGION = applicationProperties.getProperty("region", "us-east-1");
            INPUT_STREAM_NAME = applicationProperties.getProperty("input_stream_name");
            OUTPUT_STREAM_NAME = applicationProperties.getProperty("output_stream_name");
            ASYNC_OPERATOR_PARALLELISM = Integer.parseInt(applicationProperties.getProperty("waitOperatorParallelism", String.valueOf(env.getParallelism())));
            ASYNC_OPERATOR_TIMEOUT = Long.parseLong(applicationProperties.getProperty("timeout", "30000"));
            ASYNC_OPERATOR_CAPACITY = Integer.parseInt(applicationProperties.getProperty("capacity", "2"));
            POST_REQUEST_URL = applicationProperties.getProperty("post_request_url");
        } catch (Exception e) {
            throw e;
        }

        /**
         * Start crafting the streaming pipeline
         */
        DataStream<String> inputStream = createSourceFromStaticConfig(env, REGION, INPUT_STREAM_NAME);

        // Custom metric reporting what the current capacity of the async operator is
        inputStream.map(new NoOpMapperFunction("async-capacity", ASYNC_OPERATOR_CAPACITY)).setParallelism(1);

        // create async function, which will "wait" for a while to simulate the process of async i/o
        AsyncFunction<String, String> customAsyncFunction = new SampleAsyncFunction(POST_REQUEST_URL);


        // add async operator to streaming job
        DataStream<String> result;
        if (WAIT_MODE.equals("ordered")) {
            result = AsyncDataStream.orderedWait(
                    inputStream,
                    customAsyncFunction,
                    ASYNC_OPERATOR_TIMEOUT,
                    TimeUnit.MILLISECONDS,
                    ASYNC_OPERATOR_CAPACITY).setParallelism(ASYNC_OPERATOR_PARALLELISM).disableChaining();
        } else {
            result = AsyncDataStream.unorderedWait(
                    inputStream,
                    customAsyncFunction,
                    ASYNC_OPERATOR_TIMEOUT,
                    TimeUnit.MILLISECONDS,
                    ASYNC_OPERATOR_CAPACITY).setParallelism(ASYNC_OPERATOR_PARALLELISM).disableChaining();
        }

        // comparing async to a map operator doing the same thing
        inputStream.map(new SampleSyncFunction(POST_REQUEST_URL)).setParallelism(ASYNC_OPERATOR_PARALLELISM).disableChaining();


        result.addSink(createSinkFromStaticConfig(REGION, OUTPUT_STREAM_NAME)).name("flink_kinesis_producer_01");

        // execute the program
        env.execute("Async IO Example");
    }
}