/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package software.amazon.flink.example; import software.amazon.flink.example.model.InputData; import software.amazon.flink.example.model.OutputData; import software.amazon.flink.example.operator.RandomCutForestOperator; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.util.Properties; public class FlinkStreamingJob { private static DataStream<InputData> createSource( final StreamExecutionEnvironment env, final String inputStreamName, final String region) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new DataSerializationSchema(), inputProperties)); } private static KinesisStreamsSink<OutputData> createSink(final String outputStreamName, final String region) { Properties outputProperties = new Properties(); outputProperties.setProperty(AWSConfigConstants.AWS_REGION, region); return KinesisStreamsSink.<OutputData>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new DataSerializationSchema()) .setStreamName(outputProperties.getProperty("OUTPUT_STREAM", outputStreamName)) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties().get("RcfExampleEnvironment"); String region = applicationProperties.getProperty("region", "us-west-2"); String inputStreamName = applicationProperties.getProperty("inputStreamName", "ExampleInputStream-RCF"); String outputStreamName = applicationProperties.getProperty("outputStreamName", "ExampleOutputStream-RCF"); DataStream<InputData> source = createSource(env, inputStreamName, region); RandomCutForestOperator<InputData, OutputData> randomCutForestOperator = RandomCutForestOperator.<InputData, OutputData>builder() .setDimensions(1) .setShingleSize(1) .setSampleSize(628) .setInputDataMapper((inputData) -> new float[]{inputData.getValue()}) .setResultMapper(((inputData, score) -> new OutputData(inputData.getTime(), inputData.getValue(), score))) .build(); source .process(randomCutForestOperator, TypeInformation.of(OutputData.class)).setParallelism(1) .sinkTo(createSink(outputStreamName, region)); env.execute(); } }