package com.amazonaws.services.kinesisanalytics; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.aws.config.AWSConfigConstants; import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.util.Properties; /** * A Kinesis Data Analytics for Java application that calculates minimum stock * price for all stock symbols in a given Kinesis stream over a sliding window * and overrides the operator level parallelism in the flink application. *

* Note that the maximum parallelism in the Flink code cannot be greater than * provisioned parallelism (default is 1). To get this application to work, * use following AWS CLI commands to set the parallelism configuration of the * Kinesis Data Analytics for Java application. *

* 1. Fetch the current application version Id using following command: * aws kinesisanalyticsv2 describe-application --application-name * 2. Update the parallelism configuration of the application using version Id: * aws kinesisanalyticsv2 update-application * --application-name * --current-application-version-id * --application-configuration-update "{\"FlinkApplicationConfigurationUpdate\": { \"ParallelismConfigurationUpdate\": {\"ParallelismUpdate\": 5, \"ConfigurationTypeUpdate\": \"CUSTOM\" }}}" */ public class SlidingWindowStreamingJobWithParallelism { private static final String region = "us-west-2"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static KinesisStreamsSink createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(AWSConfigConstants.AWS_REGION, region); return KinesisStreamsSink.builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputStreamName) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream input = createSourceFromStaticConfig(env); ObjectMapper jsonParser = new ObjectMapper(); input.map(value -> { // Parse the JSON JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class); return new Tuple2<>(jsonNode.get("ticker").toString(), jsonNode.get("price").asDouble()); }).returns(Types.TUPLE(Types.STRING, Types.DOUBLE)) .keyBy(v -> v.f0) // Logically partition the stream per stock symbol .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .min(1) // Calculate minimum price per stock over the window .setParallelism(3) // Set parallelism for the min operator .map(value -> value.f0 + String.format(",%.2f", value.f1) + "\n") .sinkTo(createSinkFromStaticConfig()); env.execute("Min Stock Price"); } }