# -*- coding: utf-8 -*- """ getting-started.py ~~~~~~~~~~~~~~~~~~~ This module: 1. Creates a table environment 2. Creates a source table from a Kinesis Data Stream 3. Creates a sink table writing to a Kinesis Data Stream 4. Inserts the source table data into the sink table """ from pyflink.table import EnvironmentSettings, StreamTableEnvironment import os import json # 1. Creates a Table Environment env_settings = ( EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() ) table_env = StreamTableEnvironment.create(environment_settings=env_settings) APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" # on kda is_local = ( True if os.environ.get("IS_LOCAL") else False ) # set this env var in your local environment if is_local: # only for local, overwrite variable to properties and pass in your jars delimited by a semicolon (;) APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" # local CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) table_env.get_config().get_configuration().set_string( "pipeline.jars", "file:///" + CURRENT_DIR + "/lib/flink-sql-connector-kinesis_2.11-1.13.2.jar", ) def get_application_properties(): if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH): with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file: contents = file.read() properties = json.loads(contents) return properties else: print('A file at "{}" was not found'.format(APPLICATION_PROPERTIES_FILE_PATH)) def property_map(props, property_group_id): for prop in props: if prop["PropertyGroupId"] == property_group_id: return prop["PropertyMap"] def create_table(table_name, stream_name, region, stream_initpos): return """ CREATE TABLE {0} ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{1}', 'aws.region' = '{2}', 'scan.stream.initpos' = '{3}', 'sink.partitioner-field-delimiter' = ';', 'sink.producer.collection-max-count' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """.format( table_name, stream_name, region, stream_initpos ) def main(): # Application Property Keys input_property_group_key = "consumer.config.0" producer_property_group_key = "producer.config.0" input_stream_key = "input.stream.name" input_region_key = "aws.region" input_starting_position_key = "flink.stream.initpos" output_stream_key = "output.stream.name" output_region_key = "aws.region" # tables input_table_name = "input_table" output_table_name = "output_table" # get application properties props = get_application_properties() input_property_map = property_map(props, input_property_group_key) output_property_map = property_map(props, producer_property_group_key) input_stream = input_property_map[input_stream_key] input_region = input_property_map[input_region_key] stream_initpos = input_property_map[input_starting_position_key] output_stream = output_property_map[output_stream_key] output_region = output_property_map[output_region_key] # 2. Creates a source table from a Kinesis Data Stream table_env.execute_sql( create_table(input_table_name, input_stream, input_region, stream_initpos) ) # 3. Creates a sink table writing to a Kinesis Data Stream table_env.execute_sql( create_table(output_table_name, output_stream, output_region, stream_initpos) ) # 4. Inserts the source table data into the sink table table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}" .format(output_table_name, input_table_name)) # get job status through TableResult print(table_result.get_job_client().get_job_status()) if __name__ == "__main__": main()