package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class Helpers {
    private static final Logger LOG = LoggerFactory.getLogger(Helpers.class);

    public static final String MSKBOOTSTRAP_SERVERS = "MSKBootstrapServers";
    public static final String KAFKA_SOURCE_TOPIC_KEY = "KafkaSourceTopic";
    public static final String KAFKA_CONSUMER_GROUP_ID_KEY = "KafkaConsumerGroupId";
    public static final String S3_BUCKET_REGION_KEY = "S3BucketRegion";
    public static final String KEYSTORE_S3_BUCKET_KEY = "KeystoreS3Bucket";
    public static final String KEYSTORE_S3_PATH_KEY = "KeystoreS3Path";
    public static final String TRUSTSTORE_S3_BUCKET_KEY = "TruststoreS3Bucket";
    public static final String TRUSTSTORE_S3_PATH_KEY = "TruststoreS3Path";
    public static final String KEYSTORE_PASS_SECRET_KEY = "KeystorePassSecret";
    public static final String KEYSTORE_PASS_SECRET_FIELD_KEY = "KeystorePassSecretField";

    public static Properties getAppProperties() throws IOException {
        // note: this won't work when running locally
        Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
        Properties flinkProperties = applicationProperties.get("FlinkApplicationProperties");

        if(flinkProperties == null) {
            LOG.error("Unable to retrieve FlinkApplicationProperties; please ensure that you've " +
                    "supplied them via application properties.");
            return null;
        }

        if(!flinkProperties.containsKey(MSKBOOTSTRAP_SERVERS)) {
            LOG.error("Unable to retrieve property: " + MSKBOOTSTRAP_SERVERS);
            return null;
        }

        if(!flinkProperties.containsKey(KAFKA_SOURCE_TOPIC_KEY)) {
            LOG.error("Unable to retrieve property: " + KAFKA_SOURCE_TOPIC_KEY);
            return null;
        }

        if(!flinkProperties.containsKey(KAFKA_CONSUMER_GROUP_ID_KEY)) {
            LOG.error("Unable to retrieve property: " + KAFKA_CONSUMER_GROUP_ID_KEY);
            return null;
        }

        if(!flinkProperties.containsKey(S3_BUCKET_REGION_KEY)) {
            LOG.error("Unable to retrieve property: " + S3_BUCKET_REGION_KEY);
            return null;
        }

        if(!flinkProperties.containsKey(KEYSTORE_S3_BUCKET_KEY)) {
            LOG.error("Unable to retrieve property: " + KEYSTORE_S3_BUCKET_KEY);
            return null;
        }

        if(!flinkProperties.containsKey(KEYSTORE_S3_PATH_KEY)) {
            LOG.error("Unable to retrieve property: " + KEYSTORE_S3_PATH_KEY);
            return null;
        }

        if(!flinkProperties.containsKey(TRUSTSTORE_S3_BUCKET_KEY)) {
            LOG.error("Unable to retrieve property: " + TRUSTSTORE_S3_BUCKET_KEY);
            return null;
        }

        if(!flinkProperties.containsKey(TRUSTSTORE_S3_PATH_KEY)) {
            LOG.error("Unable to retrieve property: " + TRUSTSTORE_S3_PATH_KEY);
            return null;
        }

        if(!flinkProperties.containsKey(KEYSTORE_PASS_SECRET_KEY)) {
            LOG.error("Unable to retrieve property: " + KEYSTORE_PASS_SECRET_KEY);
            return null;
        }

        if(!flinkProperties.containsKey(KEYSTORE_PASS_SECRET_FIELD_KEY)) {
            LOG.error("Unable to retrieve property: " + KEYSTORE_PASS_SECRET_FIELD_KEY);
            return null;
        }

        return flinkProperties;
    }
}