/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * This file has been extended from the Apache Flink project skeleton. */ package basic.application; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import java.io.*; import java.lang.reflect.Method; import java.net.URL; import java.net.URLClassLoader; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.util.*; import java.util.concurrent.TimeUnit; /** * Skeleton for a Flink Streaming Job. * *

For a tutorial how to write a Flink streaming application, check the * tutorials and examples on the Flink Website. * *

To package your application into a JAR file for execution, run * 'mvn clean package' on the command line. * *

If you change the name of the main class (with the public static void main(String[] args)) * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). * *

Disclaimer: This code is not production ready.

* */ public class StreamingJob { private static final Logger LOG = LoggerFactory.getLogger(StreamingJob.class); private static DataStream createSourceFromStaticConfig(StreamExecutionEnvironment env, String region, String inputStreamName) { WatermarkStrategy watermarkStrategy = WatermarkStrategy .forBoundedOutOfOrderness(Duration.ofSeconds(20)).withIdleness(Duration.ofMinutes(1)); Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON"); inputProperties.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)) .assignTimestampsAndWatermarks(watermarkStrategy); } private static StreamingFileSink createS3SinkFromStaticConfig(String s3SinkPath) { final StreamingFileSink sink = StreamingFileSink .forRowFormat(new org.apache.flink.core.fs.Path(s3SinkPath), new SimpleStringEncoder("UTF-8")) .withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMillis(1)) .withInactivityInterval(TimeUnit.SECONDS.toMillis(15)) .withMaxPartSize(10 * 1024) .build()) .build(); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final ParameterTool parameter = ParameterTool.fromArgs(args); //read the parameters from the Kinesis Analytics environment Map applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); Properties flinkProperties = null; String s3SinkPath = parameter.get("s3SinkPath", ""); String region = parameter.get("region", "us-east-1"); String inputStreamName = parameter.get("inputStreamName", ""); String userJarBucket = parameter.get("userJarBucket", "{AWS BUCKET NAME}"); String userJarKey = parameter.get("userJarKey", "{USER JAR KEY PREFIX (i.e. prefix/myfile.jar)}"); if (applicationProperties != null) { flinkProperties = applicationProperties.get("FlinkApplicationProperties"); } if (flinkProperties != null) { s3SinkPath = flinkProperties.get("s3SinkPath").toString(); region = flinkProperties.get("region").toString(); inputStreamName = flinkProperties.get("inputStreamName").toString(); userJarBucket = flinkProperties.get("userJarBucket").toString(); userJarKey = flinkProperties.get("userJarKey").toString(); } final String userJarFileName = userJarKey.substring(userJarKey.lastIndexOf("/") + 1); final String userJarFileURI = "/tmp/" + userJarKey.substring(userJarKey.lastIndexOf("/") + 1); LOG.info("s3SinkPath is :" + s3SinkPath); LOG.info("region is :" + region); LOG.info("inputStreamName is :" + inputStreamName); LOG.info("userJarBucket is :" + userJarBucket); LOG.info("userJarKey is :" + userJarKey); downloadUserJars(region, userJarKey, userJarFileURI,userJarBucket); LOG.info("Starting the application..."); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // If the referenced JAR is needed to run as part of the Flink operators, usually it's a best practice to // ... download the jar files and save them in the cache store. This way Task Managers can access them with no delay. env.registerCachedFile(userJarFileURI, userJarKey); env.getCheckpointConfig().setCheckpointInterval(Time.minutes(1).toMilliseconds()); DataStream input = createSourceFromStaticConfig(env, region, inputStreamName); SingleOutputStreamOperator> tokenizedStream = input .process(new ProcessTokenizer(userJarKey)); DataStream inputStream = tokenizedStream .keyBy(event -> event.f0) .window(TumblingEventTimeWindows.of(Time.seconds(15))) .sum(1) .map(value -> value.f0 + " Count: " + value.f1 + "\n"); inputStream.addSink(createS3SinkFromStaticConfig(s3SinkPath)); env.execute("Flink Streaming Java API Skeleton"); } /** * @param region specifies the AWS region * @param userJarKey is the prefix of the JAR in Amazon S3 * @param userJarFileURI is the location URI, where the JAR is is downloaded * @param userJarBucket is the bucket name, form where the JAR file should be downloaded * @throws IOException */ private static void downloadUserJars(String region, String userJarKey,String userJarFileURI, String userJarBucket) throws IOException { Region aws_region = Region.of(region); S3Client s3 = S3Client.builder() .region(aws_region) .build(); // Get an object and print its contents. LOG.info("Downloading " + userJarKey); Path myFile = Paths.get(userJarFileURI); GetObjectRequest request = GetObjectRequest.builder() .bucket(userJarBucket) .key(userJarKey) .build(); s3.getObject(request, myFile); s3.close(); } // you don't need this process function if the JAR you're downloading only needs to be referenced in the main method // ... and not within the operators. public static final class ProcessTokenizer extends ProcessFunction> { private final String userJarFileName; public ProcessTokenizer(String userJarFileName) { this.userJarFileName = userJarFileName; } /** The state that is maintained by this process function */ private URLClassLoader userCodeClassLoader; private Class clazz; // This method will be called once each time the job starts or restarts. //... you can use the URLClassLoader directly in main method if you don't need to reference the //... downloaded JAR in the stream processing operators @Override public void open(Configuration parameters) throws Exception { File jar = getRuntimeContext().getDistributedCache().getFile(userJarFileName); ClassLoader original = getRuntimeContext().getUserCodeClassLoader(); userCodeClassLoader = new URLClassLoader(new URL[]{ jar.toURL() }, original); // Get the reference to the class clazz = userCodeClassLoader.loadClass("com.myapp.core.utils.EventData"); } @Override public void processElement(String s, ProcessFunction>.Context context, Collector> collector) throws Exception { String[] tokens = s.toLowerCase().split("\\W+"); for (String token : tokens) { // Create the instance of the loaded class Object instance = clazz.getConstructor(String.class, Date.class, int.class, String.class).newInstance( s, new Date(System.currentTimeMillis()), tokens.length, token ); // invoke any method from the class. Method m = clazz.getMethod("methodName"); String result = (String)m.invoke(instance); if (token.length() > 0) { collector.collect(new Tuple2<>(token + result, 1)); } } } @Override public void close() throws Exception { userCodeClassLoader.close(); } } }