{ "paragraphs": [ { "text": "%md\n## Amazon Kinesis Data Analytics for Apache Flink application\n\nIn this notebook, a new Amazon Kinesis Data Analytics for SQL Application will be initialized and created. The notebook identifies the source/destination MSK topics and defines schema for the respective topics via AWS Glue Schema Registry. In addition, this notebook creates a streaming SQL query to aggregate data from the source MSK topic and store the results in the destination MSK topic. \n\nWith a simple click of a few buttons the process to identify records in the source MSK topic, define logic and push it to the destination topic is accomplished. By using Flink SQL, the logic is understandable in natural language without any need to learn programming languages. Once the data transformation logic is defined in this notebook, you can build your code and export it to Amazon S3. You can promote the code that you wrote in your note to a continuously running stream processing application. After you deploy a note to run in streaming mode, Kinesis Data Analytics creates an application for you that runs continuously, reads data from your sources, writes to your destinations, maintains long-running application state, and autoscales automatically based on the throughput of your source streams. Documentation on this topic is available at https://docs.aws.amazon.com/kinesisanalytics/latest/dev/what-is.html.\n\nPLEASE NOTE WE DO NOT EXCECUTE INDIVIDUAL CELLS IN THIS NOTEBOOK, RATHER WE BUILD AND DEPLOY INTO AN EXECUTABLE APPLICATION.", "user": "anonymous", "dateUpdated": "2023-04-12T22:40:54+0000", "progress": 0, "config": { "tableHide": false, "editorSetting": { "language": "markdown", "editOnDblClick": true, "completionSupport": false }, "editorMode": "ace/mode/markdown", "colWidth": 12, "editorHide": true, "fontSize": 9, "results": {}, "enabled": true }, "settings": { "params": {}, "forms": {} }, "results": { "code": "SUCCESS", "msg": [ { "type": "HTML", "data": "
\n

Amazon Kinesis Data Analytics for Apache Flink application

\n

In this notebook, a new Amazon Kinesis Data Analytics for SQL Application will be initialized and created. The notebook identifies the source/destination MSK topics and defines schema for the respective topics via AWS Glue Schema Registry. In addition, this notebook creates a streaming SQL query to aggregate data from the source MSK topic and store the results in the destination MSK topic.

\n

With a simple click of a few buttons the process to identify records in the source MSK topic, define logic and push it to the destination topic is accomplished. By using Flink SQL, the logic is understandable in natural language without any need to learn programming languages. Once the data transformation logic is defined in this notebook, you can build your code and export it to Amazon S3. You can promote the code that you wrote in your note to a continuously running stream processing application. After you deploy a note to run in streaming mode, Kinesis Data Analytics creates an application for you that runs continuously, reads data from your sources, writes to your destinations, maintains long-running application state, and autoscales automatically based on the throughput of your source streams. Documentation on this topic is available at https://docs.aws.amazon.com/kinesisanalytics/latest/dev/what-is.html.

\n

PLEASE NOTE WE DO NOT EXCECUTE INDIVIDUAL CELLS IN THIS NOTEBOOK, RATHER WE BUILD AND DEPLOY INTO AN EXECUTABLE APPLICATION.

\n\n
" } ] }, "apps": [], "runtimeInfos": {}, "progressUpdateIntervalMs": 500, "jobName": "paragraph_1681339094607_1880859091", "id": "20230130-203244_191216359", "dateCreated": "2023-04-12T22:38:14+0000", "status": "FINISHED", "focus": true, "$$hashKey": "object:166", "dateFinished": "2023-04-12T22:40:55+0000", "dateStarted": "2023-04-12T22:40:54+0000" }, { "text": "%md\n### Obtain MSK (Kafka) Cluster's Connection String\n\nBefore proceeding, fetch the MSK cluster connection string using below steps -\n\n1. Navigate to MSK in AWS console.\n2. On the left hamburger menu, choose “Clusters”.\n3. Find the cluster pre-created by the CloudFormation template.\n4. Click on the cluster name.\n5. Hit button “View client information”.\n6. Under “Bootstrap servers” section, copy the “Plaintext” private endpoint and use that in topic create commands below.\n7. Replace the obtained connection string for property 'properties.bootstrap.servers'. Just an example is shown below - DO NOT KEEP THE EXAMPLE STRING WHILE EXECUTING", "user": "anonymous", "dateUpdated": "2023-04-12T22:38:14+0000", "progress": 0, "config": { "editorSetting": { "language": "markdown", "editOnDblClick": true }, "editorMode": "ace/mode/markdown", "colWidth": 12, "editorHide": true, "fontSize": 9, "results": {}, "enabled": true }, "settings": { "params": {}, "forms": {} }, "results": { "code": "SUCCESS", "msg": [ { "type": "HTML", "data": "
\n

Obtain MSK (Kafka) Cluster’s Connection String

\n

Before proceeding, fetch the MSK cluster connection string using below steps -

\n
    \n
  1. Navigate to MSK in AWS console.
  2. \n
  3. On the left hamburger menu, choose “Clusters”.
  4. \n
  5. Find the cluster pre-created by the CloudFormation template.
  6. \n
  7. Click on the cluster name.
  8. \n
  9. Hit button “View client information”.
  10. \n
  11. Under “Bootstrap servers” section, copy the “Plaintext” private endpoint and use that in topic create commands below.
  12. \n
  13. Replace the obtained connection string for property ‘properties.bootstrap.servers’. Just an example is shown below - DO NOT KEEP THE EXAMPLE STRING WHILE EXECUTING
  14. \n
\n
" } ] }, "apps": [], "runtimeInfos": {}, "progressUpdateIntervalMs": 500, "jobName": "paragraph_1681339094607_1610544726", "id": "20230130-203244_1712890654", "dateCreated": "2023-04-12T22:38:14+0000", "status": "READY", "$$hashKey": "object:167" }, { "text": "%md\n### Define schema for source and destination topics\n\nWe are using Flink’s Connector for Apache Kafka to interact with Kafka topics. The Kafka connector “maps” Kafka topics to a created table. When you are selecting the data from the table, the connector acts as topic consumer and reads the data from Kafka topic, when you are inserting data to the table, connector acts as Kafka producer and produces the messages to Kafka topic, the topic is denoted by ‘topic’ metadata provided in create table sql statement. See https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#how-to-create-a-kafka-table.\n\nWhen you create a table in a Kinesis Data Analytics Studio Notebook, an accompanying table in the Glue Data Catalog is also created. AWS Glue Data Catalog is a persistent metadata store that serves as a central repository containing table definitions. You can use the AWS Glue Data Catalog to quickly discover and search across multiple AWS datasets. Kinesis Data Analytics Studio is compatible with the AWS Glue Data Catalog, where you can define the schema for your source and destination tables. The reason for this is so that other applications, be it Apache Flink applications or Batch applications, etc, can reference the same schema that we defined in this notebook instead of creating an entirely different copy of the schema. This ensures consistency between applications working with the same data. ", "user": "anonymous", "dateUpdated": "2023-04-12T22:38:14+0000", "progress": 0, "config": { "tableHide": false, "editorSetting": { "language": "markdown", "editOnDblClick": true, "completionSupport": false }, "editorMode": "ace/mode/markdown", "colWidth": 12, "editorHide": true, "fontSize": 9, "results": {}, "enabled": true }, "settings": { "params": {}, "forms": {} }, "results": { "code": "SUCCESS", "msg": [ { "type": "HTML", "data": "
\n

Define schema for source and destination topics

\n

We are using Flink’s Connector for Apache Kafka to interact with Kafka topics. The Kafka connector “maps” Kafka topics to a created table. When you are selecting the data from the table, the connector acts as topic consumer and reads the data from Kafka topic, when you are inserting data to the table, connector acts as Kafka producer and produces the messages to Kafka topic, the topic is denoted by ‘topic’ metadata provided in create table sql statement. See https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#how-to-create-a-kafka-table.

\n

When you create a table in a Kinesis Data Analytics Studio Notebook, an accompanying table in the Glue Data Catalog is also created. AWS Glue Data Catalog is a persistent metadata store that serves as a central repository containing table definitions. You can use the AWS Glue Data Catalog to quickly discover and search across multiple AWS datasets. Kinesis Data Analytics Studio is compatible with the AWS Glue Data Catalog, where you can define the schema for your source and destination tables. The reason for this is so that other applications, be it Apache Flink applications or Batch applications, etc, can reference the same schema that we defined in this notebook instead of creating an entirely different copy of the schema. This ensures consistency between applications working with the same data.

\n\n
" } ] }, "apps": [], "runtimeInfos": {}, "progressUpdateIntervalMs": 500, "jobName": "paragraph_1681339094608_289840536", "id": "20230130-203244_1097886842", "dateCreated": "2023-04-12T22:38:14+0000", "status": "READY", "$$hashKey": "object:168" }, { "text": "%flink.ssql\n\nCREATE TABLE IF NOT EXISTS cctopic (\ncc_num BIGINT, --DECIMAL(1,1)\nmerchant VARCHAR(64),\namount DECIMAL(10,2), --REAL\nzip_code VARCHAR(20), --INTEGER\ntrans_ts INT,\nproc_ts AS PROCTIME()\n\n)\nWITH (\n'connector'= 'kafka',\n'topic' = 'cctopic',\n'properties.group.id' = 'testGroup1',\n'properties.bootstrap.servers' = '',\n'format' = 'json',\n'scan.startup.mode'= 'latest-offset',\n'json.timestamp-format.standard'= 'ISO-8601'\n);", "user": "anonymous", "dateUpdated": "2023-04-12T22:38:14+0000", "progress": 0, "config": { "editorSetting": { "language": "sql", "editOnDblClick": false }, "editorMode": "ace/mode/sql", "colWidth": 12, "editorHide": false, "fontSize": 9, "results": {}, "enabled": true }, "settings": { "params": {}, "forms": {} }, "results": { "code": "SUCCESS", "msg": [] }, "apps": [], "runtimeInfos": {}, "progressUpdateIntervalMs": 500, "jobName": "paragraph_1681339094608_800739619", "id": "20230130-203244_1163108927", "dateCreated": "2023-04-12T22:38:14+0000", "status": "READY", "$$hashKey": "object:169" }, { "text": "%flink.ssql\n\nCREATE TABLE IF NOT EXISTS ccdesttopic (\ncc_num BIGINT,\nnum_trans_last_10m BIGINT,\navg_amt_last_10m DECIMAL(38,6)\n)\nWITH (\n'connector'= 'kafka',\n'topic' = 'ccdesttopic',\n'properties.group.id' = 'testGroup1',\n'properties.bootstrap.servers' = '',\n'format' = 'json',\n'scan.startup.mode'= 'latest-offset',\n'json.timestamp-format.standard'= 'ISO-8601'\n);", "user": "anonymous", "dateUpdated": "2023-04-12T22:38:14+0000", "progress": 0, "config": { "editorSetting": { "language": "sql", "editOnDblClick": false, "completionKey": "TAB", "completionSupport": true }, "editorMode": "ace/mode/sql", "colWidth": 12, "editorHide": false, "fontSize": 9, "results": {}, "enabled": true }, "settings": { "params": {}, "forms": {} }, "results": { "code": "SUCCESS", "msg": [] }, "apps": [], "runtimeInfos": {}, "progressUpdateIntervalMs": 500, "jobName": "paragraph_1681339094608_1844828444", "id": "20230130-203244_1372551065", "dateCreated": "2023-04-12T22:38:14+0000", "status": "READY", "$$hashKey": "object:170" }, { "text": "%md\n### Define Flink SQL to aggregate data from source topic and insert the results into destination topic\n\nWhen executing the 4_streaming_predictions.ipynb from SageMaker Studio, the put_to_topic function streams credit card transactions to the source MSK topic. From there, the InvokeFraudEndpoint Lambda function predicts potential fradulant transactions. In addition, our KDA Flink application would aggregate all transactions in the last 10 minutes and streams it to the destination topic for downstream processing. In the SQL cell below, we define the logic for aggregation by tying the source and destination topics. The SQL logic selects from the credit card topic, aggregates the counts and averages, and writes to the credit card destination topic. To learn more on Flink range aggregate functions, refer to https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/over-agg/#range-definitions.", "user": "anonymous", "dateUpdated": "2023-04-12T22:39:24+0000", "progress": 0, "config": { "tableHide": false, "editorSetting": { "language": "markdown", "editOnDblClick": true, "completionSupport": false }, "editorMode": "ace/mode/markdown", "colWidth": 12, "editorHide": true, "fontSize": 9, "results": {}, "enabled": true }, "settings": { "params": {}, "forms": {} }, "results": { "code": "SUCCESS", "msg": [ { "type": "HTML", "data": "
\n

Define Flink SQL to aggregate data from source topic and insert the results into destination topic

\n

When executing the 4_streaming_predictions.ipynb from SageMaker Studio, the put_to_topic function streams credit card transactions to the source MSK topic. From there, the InvokeFraudEndpoint Lambda function predicts potential fradulant transactions. In addition, our KDA Flink application would aggregate all transactions in the last 10 minutes and streams it to the destination topic for downstream processing. In the SQL cell below, we define the logic for aggregation by tying the source and destination topics. The SQL logic selects from the credit card topic, aggregates the counts and averages, and writes to the credit card destination topic. To learn more on Flink range aggregate functions, refer to https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/over-agg/#range-definitions.

\n\n
" } ] }, "apps": [], "runtimeInfos": {}, "progressUpdateIntervalMs": 500, "jobName": "paragraph_1681339094608_322657104", "id": "20230130-203244_320782950", "dateCreated": "2023-04-12T22:38:14+0000", "status": "FINISHED", "$$hashKey": "object:171", "dateFinished": "2023-04-12T22:39:24+0000", "dateStarted": "2023-04-12T22:39:24+0000" }, { "text": "%flink.ssql\n\nINSERT INTO ccdesttopic\nSELECT\ncc_num,\nCOUNT(*) OVER LAST_10_MINUTES as cc_count,\nAVG(amount) OVER LAST_10_MINUTES as avg_amount\nFROM cctopic\nWINDOW LAST_10_MINUTES AS (\nPARTITION BY cc_num\nORDER BY proc_ts\nRANGE INTERVAL '10' MINUTE PRECEDING\n);", "user": "anonymous", "dateUpdated": "2023-04-12T22:38:14+0000", "progress": 0, "config": { "editorSetting": { "language": "sql", "editOnDblClick": false }, "editorMode": "ace/mode/sql", "colWidth": 12, "editorHide": false, "fontSize": 9, "results": {}, "enabled": true }, "settings": { "params": {}, "forms": {} }, "results": { "code": "SUCCESS", "msg": [] }, "apps": [], "runtimeInfos": {}, "progressUpdateIntervalMs": 500, "jobName": "paragraph_1681339094609_1264858694", "id": "20230130-203244_1266997344", "dateCreated": "2023-04-12T22:38:14+0000", "status": "READY", "$$hashKey": "object:172" } ], "name": "kda-msk-flink-note", "id": "2HZTFP147", "defaultInterpreterGroup": "flink", "version": "0.9.0", "noteParams": {}, "noteForms": {}, "angularObjects": {}, "config": { "isZeppelinNotebookCronEnable": false, "looknfeel": "default", "personalizedMode": "false" }, "info": {}, "path": "/kda-msk-flink-note" }