{ "paragraphs": [ { "title": "#1 : Import dependencies and configure Kinesis Producer", "text": "%flink(parallelism=1)\n\nimport java.util.Properties\nimport java.util.UUID\nimport org.apache.flink.streaming.api.scala._\nimport org.apache.flink.api.common.serialization.SimpleStringSchema\nimport org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer\nimport org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants\nimport org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner\nimport scala.util.Random\nimport org.apache.flink.util.Collector;\n\nval producerConfig = new Properties()\nproducerConfig.put(AWSConfigConstants.AWS_REGION, \"us-east-1\")\n\nval kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig)\nkinesis.setFailOnError(false)\nkinesis.setDefaultStream(\"amazon_reviews_raw_stream_91ff95a0\")\n\n// spray data across all available shards\nkinesis.setCustomPartitioner(new KinesisPartitioner[String]() {\n override def getPartitionId(s: String): String = {\n // we dont' care about shard affinity in this app\n UUID.randomUUID().toString()\n }\n})", "user": "anonymous", "dateUpdated": "2021-10-27T06:47:56+0000", "progress": 0, "config": { "lineNumbers": true, "editorSetting": { "language": "scala", "editOnDblClick": false, "completionKey": "TAB", "completionSupport": true }, "colWidth": 12, "editorMode": "ace/mode/scala", "fontSize": 9, "title": true, "results": {}, "enabled": true }, "settings": { "params": {}, "forms": {} }, "results": { "code": "SUCCESS", "msg": [ { "type": "TEXT", "data": "import java.util.Properties\nimport java.util.UUID\nimport org.apache.flink.streaming.api.scala._\nimport org.apache.flink.api.common.serialization.SimpleStringSchema\nimport org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer\nimport org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants\nimport org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner\nimport scala.util.Random\nimport org.apache.flink.util.Collector\n\u001b[1m\u001b[34mproducerConfig\u001b[0m: \u001b[1m\u001b[32mjava.util.Properties\u001b[0m = {}\n\u001b[1m\u001b[34mres0\u001b[0m: \u001b[1m\u001b[32mObject\u001b[0m = null\n\u001b[1m\u001b[34mkinesis\u001b[0m: \u001b[1m\u001b[32msoftware.amazon.kinesis.connectors.flink.FlinkKinesisProducer[String]\u001b[0m = software.amazon.kinesis.connectors.flink.FlinkKinesisProducer@57a820dc\n" } ] }, "apps": [], "runtimeInfos": {}, "progressUpdateIntervalMs": 500, "jobName": "paragraph_1635317211669_859624107", "id": "paragraph_1634157776488_2087810889", "dateCreated": "2021-10-27T06:46:51+0000", "status": "FINISHED", "focus": true, "$$hashKey": "object:30", "dateFinished": "2021-10-27T06:48:50+0000", "dateStarted": "2021-10-27T06:47:56+0000" }, { "title": "#2 : Load trimmed down version of Amazon Customer Reviews Dataset ", "text": "%flink(parallelism=1)\n\n/*\n Apache Zeppelin provides access to table environment resources using environment variables. You access Scala table environment resources with 'senv' variable for StreamingTableEnvironment\n For more details refer : https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-zeppelin-interactive.html\n \n Load a trimmed down vesrion of Amazon Customer Reviews Dataset : https://s3.amazonaws.com/amazon-reviews-pds/readme.html\n*/\n\nval personalCareData_DataStream: DataStream[String] = senv.readTextFile(\"s3://amazon-reviews-bucket-91ff95a0/artifacts/amazon_reviews_us_Personal_Care_Appliances_trimmed.tsv\")\nval personalCareData = personalCareData_DataStream\n .flatMap(new FlatMapFunction[String, String]() {\n def flatMap(value: String, out: Collector[String]){\n if (value.length() > 0) {\n out.collect(value.replaceAll(\"[^\\\\x00-\\\\x7F]\", \" \"));\n }\n } \n })\n \n\nval groceryData_DataStream: DataStream[String] = senv.readTextFile(\"s3://amazon-reviews-bucket-91ff95a0/artifacts/amazon_reviews_us_Grocery_trimmed.tsv\")\nval groceryData = groceryData_DataStream\n .flatMap(new FlatMapFunction[String, String]() {\n def flatMap(value: String, out: Collector[String]){\n if (value.length() > 0) {\n out.collect(value.replaceAll(\"[^\\\\x00-\\\\x7F]\", \" \"));\n }\n } \n })\n \n// Union them into a single stream\nval data = groceryData.union(personalCareData)\n\n// Since metadata fields are not available in 1.11, we have to come up with a way to generate timestamps that we can use for watermarking. \n// Add some random delay to event time to make it bit less aggresive and uniform \nval dataWithIngestTime = data.map(v => {v + \"\\t\" + (System.currentTimeMillis() + Random.nextInt(5000)) })\n\ndataWithIngestTime.addSink(kinesis)\n\nsenv.execute()", "user": "anonymous", "dateUpdated": "2021-10-27T06:49:17+0000", "progress": 0, "config": { "lineNumbers": true, "editorSetting": { "language": "scala", "editOnDblClick": false, "completionKey": "TAB", "completionSupport": true }, "colWidth": 12, "editorMode": "ace/mode/scala", "fontSize": 9, "title": true, "results": {}, "enabled": true }, "settings": { "params": {}, "forms": {} }, "results": { "msg": [ { "data": "", "type": "TEXT" } ] }, "apps": [], "runtimeInfos": { "jobUrl": { "propertyName": "jobUrl", "label": "FLINK JOB", "tooltip": "View in Flink web UI", "group": "flink", "values": [ { "jobUrl": "/flinkdashboard/#/job/35d46c98d408c20044a780e768650128", "$$hashKey": "object:252" } ], "interpreterSettingId": "flink" } }, "progressUpdateIntervalMs": 500, "jobName": "paragraph_1635317211671_1655702509", "id": "paragraph_1634894613349_1620312504", "dateCreated": "2021-10-27T06:46:51+0000", "status": "RUNNING", "$$hashKey": "object:31", "dateStarted": "2021-10-27T06:49:17+0000" }, { "text": "%flink\n", "user": "anonymous", "dateUpdated": "2021-10-27T06:46:51+0000", "progress": 0, "config": { "editorSetting": { "language": "scala", "editOnDblClick": false, "completionKey": "TAB", "completionSupport": true }, "colWidth": 12, "editorMode": "ace/mode/scala", "fontSize": 9, "results": {}, "enabled": true }, "settings": { "params": {}, "forms": {} }, "apps": [], "runtimeInfos": {}, "progressUpdateIntervalMs": 500, "jobName": "paragraph_1635317211674_375305396", "id": "paragraph_1634894893504_804597303", "dateCreated": "2021-10-27T06:46:51+0000", "status": "READY", "$$hashKey": "object:32" } ], "name": "1-data-load-notebook", "id": "2GNQRZ6UV", "defaultInterpreterGroup": "flink", "version": "0.9.0", "noteParams": {}, "noteForms": {}, "angularObjects": {}, "config": { "isZeppelinNotebookCronEnable": false, "looknfeel": "default", "personalizedMode": "false" }, "info": {}, "path": "/1-data-load-notebook" }