/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* This file has been extended from the Apache Flink project skeleton.
*/
package basic.application;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.Map;
/**
* Skeleton for a Hudi Flink Streaming Job.
*
*
For a tutorial how to write a Flink streaming application, check the
* tutorials and examples on the Flink Website.
*
*
To package your application into a JAR file for execution, run
* 'mvn clean package' on the command line.
*
*
If you change the name of the main class (with the public static void main(String[] args))
* method, change the respective entry in the pom.xml file (simply search for 'mainClass').
*
*
Disclaimer: This code is not production ready.
*/
public class StreamingJob {
private static final Logger LOG = LoggerFactory.getLogger(StreamingJob.class);
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final ParameterTool parameter = ParameterTool.fromArgs(args);
//read the parameters from the Kinesis Analytics environment
Map applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
Properties flinkProperties = null;
String kafkaTopic = parameter.get("kafka-topic", "AWSKafkaTutorialTopic");
String brokers = parameter.get("brokers", "");
String s3Path = parameter.get("s3Path", "");
if (applicationProperties != null) {
flinkProperties = applicationProperties.get("FlinkApplicationProperties");
}
if (flinkProperties != null) {
kafkaTopic = flinkProperties.get("kafka-topic").toString();
brokers = flinkProperties.get("brokers").toString();
s3Path = flinkProperties.get("s3Path").toString();
}
LOG.info("kafkaTopic is :" + kafkaTopic);
LOG.info("brokers is :" + brokers);
LOG.info("s3Path is :" + s3Path);
//Create Properties object for the Kafka consumer
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", brokers);
//Process stream using sql API
KafkaHudiSqlExample.createAndDeployJob(env, kafkaTopic, s3Path , kafkaProps);
}
public static class KafkaHudiSqlExample {
public static void createAndDeployJob(StreamExecutionEnvironment env, String kafkaTopic, String s3Path, Properties kafkaProperties) {
StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().build());
Configuration configuration = streamTableEnvironment.getConfig().getConfiguration();
configuration.setString("execution.checkpointing.interval", "1 min");
final String createTableStmt = "CREATE TABLE IF NOT EXISTS CustomerTable (\n" +
" `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, -- from Debezium format\n" +
" `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format\n" +
" `record_time` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,\n" +
" `CUST_ID` BIGINT,\n" +
" `NAME` STRING,\n" +
" `MKTSEGMENT` STRING,\n" +
" WATERMARK FOR event_time AS event_time\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = '"+ kafkaTopic +"',\n" +
" 'properties.bootstrap.servers' = '"+ kafkaProperties.get("bootstrap.servers") +"',\n" +
" 'properties.group.id' = 'kdaConsumerGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'value.format' = 'debezium-json'\n" +
")";
final String s3Sink = "CREATE TABLE CustomerHudi (\n" +
" `event_time` TIMESTAMP(3),\n" +
" `origin_table` STRING, -- from Debezium format\n" +
" `record_time` TIMESTAMP(3),\n" +
" `customer_id` BIGINT,\n" +
" `name` STRING,\n" +
" `mktsegment` STRING,\n" +
" PRIMARY KEY (`customer_id`) NOT Enforced\n" +
" ) PARTITIONED BY (`mktsegment`)\n" +
" WITH (\n" +
" 'connector' = 'hudi',\n" +
" 'compaction.tasks'='2',\n" +
" 'changelog.enabled'='true',\n" +
" 'read.streaming.enabled' = 'true',\n" +
" 'read.streaming.skip_compaction' = 'true',\n" +
" 'write.task.max.size'='4096',\n" +
" 'write.bucket_assign.tasks'='1',\n" +
" 'compaction.delta_seconds'='120',\n" +
" 'compaction.delta_commits'='2',\n" +
" 'compaction.trigger.strategy'='num_or_time',\n" +
" 'compaction.max_memory'='2048',\n" +
" 'write.merge.max_memory'='1024',\n" +
" 'write.tasks' = '4',\n" +
" 'hive_sync.enable' = 'true',\n" +
" 'hive_sync.db' = 'hudi',\n" +
" 'hive_sync.table' = 'customer_hudi_auto',\n" +
" 'hive_sync.mode' = 'glue',\n" +
" 'hive_sync.partition_fields' = 'mktsegment',\n" +
" 'hive_sync.use_jdbc' = 'false',\n" +
" 'path' = '" + s3Path + "',\n" +
" 'table.type' = 'MERGE_ON_READ' -- MERGE_ON_READ table or, by default is COPY_ON_WRITE\n" +
" )";
streamTableEnvironment.executeSql(createTableStmt);
streamTableEnvironment.executeSql(s3Sink);
final String insertSql = "insert into CustomerHudi select * from CustomerTable";
streamTableEnvironment.executeSql(insertSql);
}
}
}