// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT-0 package services.kinesisanalytics import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants import services.kinesisanalytics.operators.JsonToTimestreamPayloadFn import services.kinesisanalytics.operators.OffsetFutureTimestreamPoints import services.kinesisanalytics.utils.ParameterToolUtils import services.timestream.TimestreamInitializer import services.timestream.TimestreamSink import java.util.* /** * Skeleton for a Flink Streaming Job. * *

For a tutorial how to write a Flink streaming application, check the * tutorials and examples on the [Flink Website](http://flink.apache.org/docs/stable/). * *

To package your application into a JAR file for execution, run * './gradlew shadowJar' on the command line. * * * If you change the name of the main class (with the public static void main(String[] args)) * method, change the respective entry in the build.gradle.kts file (simply search for 'javaMainClass'). */ object StreamingJob { private const val DEFAULT_STREAM_NAME = "timeseries-input-stream" private const val DEFAULT_REGION_NAME = "eu-west-1" private const val DEFAULT_DB_NAME = "timestreamDB" private const val DEFAULT_TABLE_NAME = "timestreamTable" private fun createKinesisSource(env: StreamExecutionEnvironment, parameter: ParameterTool): DataStream { //set Kinesis consumer properties val kinesisConsumerConfig = Properties() //set the region the Kinesis stream is located in kinesisConsumerConfig[AWSConfigConstants.AWS_REGION] = parameter["Region", DEFAULT_REGION_NAME] //obtain credentials through the DefaultCredentialsProviderChain, which includes the instance metadata kinesisConsumerConfig[AWSConfigConstants.AWS_CREDENTIALS_PROVIDER] = "AUTO" val adaptiveReadSettingStr = parameter["SHARD_USE_ADAPTIVE_READS", "false"] if (adaptiveReadSettingStr == "true") { kinesisConsumerConfig[ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS] = "true" } else { //poll new events from the Kinesis stream once every second kinesisConsumerConfig[ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS] = parameter["SHARD_GETRECORDS_INTERVAL_MILLIS", "1000"] // max records to get in shot kinesisConsumerConfig[ConsumerConfigConstants.SHARD_GETRECORDS_MAX] = parameter["SHARD_GETRECORDS_MAX", "10000"] } val stream = parameter["InputStreamName", DEFAULT_STREAM_NAME] //create Kinesis source return env.addSource( FlinkKinesisConsumer( //read events from the Kinesis stream passed in as a parameter stream, //deserialize events with EventSchema SimpleStringSchema(), //using the previously defined properties kinesisConsumerConfig ) ).name("KinesisSource<${stream}>") } private fun createDatabaseAndTableIfNotExist( region: String, databaseName: String, tableName: String ) { val timestreamInitializer = TimestreamInitializer(region) timestreamInitializer.createDatabase(databaseName) timestreamInitializer.createTable(databaseName, tableName) } @JvmStatic fun main(args: Array) { val parameter: ParameterTool = ParameterToolUtils.fromArgsAndApplicationProperties(args) // set up the streaming execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment() val region = parameter["Region", DEFAULT_REGION_NAME] val databaseName = parameter["TimestreamDbName", DEFAULT_DB_NAME] val tableName = parameter["TimestreamTableName", DEFAULT_TABLE_NAME] val batchSize = parameter["TimestreamIngestBatchSize", "75"].toInt() createDatabaseAndTableIfNotExist(region, databaseName, tableName) env.streamTimeCharacteristic = TimeCharacteristic.EventTime env.config.autoWatermarkInterval = 1000L createKinesisSource(env, parameter) .map(JsonToTimestreamPayloadFn()).name("MaptoTimestreamPayload") .process(OffsetFutureTimestreamPoints()).name("UpdateFutureOffsetedTimestreamPoints") .addSink(TimestreamSink(region, databaseName, tableName, batchSize)) .name("TimestreamSink<$databaseName, $tableName>") // execute program env.execute("Flink Streaming Java API Skeleton") } }