package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; /** * A basic Kinesis Data Analytics for Java application with Kinesis data * streams as source and sink. */ public class BasicStreamingJob { private static final String region = "us-west-2"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static KinesisStreamsSink createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(AWSConfigConstants.AWS_REGION, region); return KinesisStreamsSink.builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputProperties.getProperty("OUTPUT_STREAM", "ExampleOutputStream")) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } private static KinesisStreamsSink createSinkFromApplicationProperties() throws IOException { return KinesisStreamsSink.builder() .setKinesisClientProperties(KinesisAnalyticsRuntime.getApplicationProperties().get("ProducerConfigProperties")) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputStreamName) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } public static void main(String[] args) throws Exception { // Set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* If you would like to use runtime configuration properties, uncomment the lines below * DataStream input = createSourceFromApplicationProperties(env); */ DataStream input = createSourceFromStaticConfig(env); /* If you would like to use runtime configuration properties, uncomment the lines below * input.sinkTo(createSinkFromApplicationProperties()) */ input.sinkTo(createSinkFromStaticConfig()); env.execute("Flink Streaming Java API Skeleton"); } }