// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT-0 package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.operators.JsonToTimestreamPayloadFn; import com.amazonaws.services.kinesisanalytics.operators.OffsetFutureTimestreamPoints; import com.amazonaws.services.kinesisanalytics.utils.ParameterToolUtils; import com.amazonaws.services.timestream.TimestreamInitializer; import com.amazonaws.services.timestream.TimestreamSink; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.util.Properties; /** * Skeleton for a Flink Streaming Job. * *

For a tutorial how to write a Flink streaming application, check the * tutorials and examples on the Flink Website. * *

To package your application into a JAR file for execution, run * 'mvn clean package' on the command line. * *

If you change the name of the main class (with the public static void main(String[] args)) * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). */ public class StreamingJob { private static final String DEFAULT_STREAM_NAME = "timeseries-input-stream"; private static final String DEFAULT_REGION_NAME = "eu-west-1"; private static final String DEFAULT_DB_NAME = "timestreamDB"; private static final String DEFAULT_TABLE_NAME = "timestreamTable"; public static DataStream createKinesisSource(StreamExecutionEnvironment env, ParameterTool parameter) { //set Kinesis consumer properties Properties kinesisConsumerConfig = new Properties(); //set the region the Kinesis stream is located in kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, parameter.get("Region", DEFAULT_REGION_NAME)); //obtain credentials through the DefaultCredentialsProviderChain, which includes the instance metadata kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO"); String adaptiveReadSettingStr = parameter.get("SHARD_USE_ADAPTIVE_READS", "false"); if (adaptiveReadSettingStr.equals("true")) { kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS, "true"); } else { //poll new events from the Kinesis stream once every second kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, parameter.get("SHARD_GETRECORDS_INTERVAL_MILLIS", "1000")); // max records to get in shot kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, parameter.get("SHARD_GETRECORDS_MAX", "10000")); } //create Kinesis source return env.addSource(new FlinkKinesisConsumer<>( //read events from the Kinesis stream passed in as a parameter parameter.get("InputStreamName", DEFAULT_STREAM_NAME), //deserialize events with EventSchema new SimpleStringSchema(), //using the previously defined properties kinesisConsumerConfig )).name("KinesisSource"); } public static void main(String[] args) throws Exception { final ParameterTool parameter = ParameterToolUtils.fromArgsAndApplicationProperties(args); // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final String region = parameter.get("Region", DEFAULT_REGION_NAME); final String databaseName = parameter.get("TimestreamDbName", DEFAULT_DB_NAME); final String tableName = parameter.get("TimestreamTableName", DEFAULT_TABLE_NAME); final int batchSize = Integer.parseInt(parameter.get("TimestreamIngestBatchSize", "75")); TimestreamInitializer timestreamInitializer = new TimestreamInitializer(region); timestreamInitializer.createDatabase(databaseName); timestreamInitializer.createTable(databaseName, tableName); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(1000L); createKinesisSource(env, parameter) .map(new JsonToTimestreamPayloadFn()).name("MaptoTimestreamPayload") .process(new OffsetFutureTimestreamPoints()).name("UpdateFutureOffsetedTimestreamPoints") .addSink(new TimestreamSink(region, databaseName, tableName, batchSize)) .name("TimeSeries<" + databaseName + ", " + tableName + ">"); // execute program env.execute("Flink Streaming Java API Skeleton"); } }