package com.amazonaws.services.gsr.samples.json.kafka;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema;

import java.util.Properties;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.nio.file.Files;
import java.nio.file.Paths;

public class RunKafkaConsumer {

	public static void main(String[] args) throws IOException {
		if (args.length != 2) {
			System.out.println("Please provide command line arguments: configPath topic");
			System.exit(1);
		}
		System.out.println("Hello World!");
		final Properties props = loadConfig(args[0]);
		System.out.println(props);

		final String topic = args[1];

		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
		props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-west-2");
		props.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON");
		props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "GsrBlogRegistry");
		props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "GsrBlogSchema");

		props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-consumer-1");
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

		Consumer<String, JsonDataWithSchema> consumer = new KafkaConsumer<String, JsonDataWithSchema>(props);
		consumer.subscribe(Arrays.asList(topic));

		try {
			while (true) {
				@SuppressWarnings("deprecation")
				ConsumerRecords<String, JsonDataWithSchema> records = consumer.poll(100);
				for (ConsumerRecord<String, JsonDataWithSchema> record : records) {
					String key = record.key();
					JsonDataWithSchema value = record.value();
					System.out.println("Received message: key = " + key + ", value = " + value);
				}
			}
		} finally {
			consumer.close();
		}
	}

	public static Properties loadConfig(final String configFile) throws IOException {
		if (!Files.exists(Paths.get(configFile))) {
			throw new IOException(configFile + " not found.");
		}
		final Properties cfg = new Properties();
		try (InputStream inputStream = new FileInputStream(configFile)) {
			cfg.load(inputStream);
		}
		return cfg;
	}
}