{ "paragraphs": [ { "title": "Ensure you modify line 8 here to copy / paste your Kafka Bootstrap Server String", "text": "%flink.ipyflink\n\n%pip install kafka-python==2.0.2\n%pip install names\n%pip install numpy\n\n# KAFKA \nBROKERS = \"Bootstrap_Brokers_Go_Here\"\nz.put(\"brokers\", BROKERS)", "user": "anonymous", "dateUpdated": "2023-02-01T17:38:02+0000", "progress": 0, "config": { "lineNumbers": true, "editorSetting": { "language": "python", "editOnDblClick": false, "completionKey": "TAB", "completionSupport": true }, "colWidth": 12, "editorMode": "ace/mode/python", "fontSize": 9, "title": true, "results": {}, "enabled": true }, "settings": { "params": {}, "forms": {} }, "apps": [], "runtimeInfos": {}, "progressUpdateIntervalMs": 500, "jobName": "paragraph_1673991282268_1462592508", "id": "paragraph_1625249214423_249873898", "dateCreated": "2023-01-17T21:34:42+0000", "dateStarted": "2023-02-01T17:34:42+0000", "dateFinished": "2023-02-01T17:34:49+0000", "status": "FINISHED", "focus": true, "$$hashKey": "object:9265" }, { "text": "%flink.ipyflink\nfrom kafka.admin import KafkaAdminClient, NewTopic\nadmin_client = KafkaAdminClient(bootstrap_servers=BROKERS, client_id='test1')\n\ntopic_list = []\ntopic_list.append(NewTopic(name=\"stock-topic\", num_partitions=3, replication_factor=2))\ntopic_list.append(NewTopic(name=\"transaction-topic\", num_partitions=3, replication_factor=2))\ntopic_list.append(NewTopic(name=\"output\", num_partitions=3, replication_factor=2))\n\n\nadmin_client.create_topics(new_topics=topic_list, validate_only=False)", "user": "anonymous", "dateUpdated": "2023-02-01T17:33:37+0000", "progress": 0, "config": { "editorSetting": { "language": "python", "editOnDblClick": false, "completionKey": "TAB", "completionSupport": true }, "colWidth": 12, "editorMode": "ace/mode/python", "fontSize": 9, "results": {}, "enabled": true }, "settings": { "params": {}, "forms": {} }, "apps": [], "runtimeInfos": {}, "progressUpdateIntervalMs": 500, "jobName": "paragraph_1673991282270_248954833", "id": "paragraph_1660054498702_1243498210", "dateCreated": "2023-01-17T21:34:42+0000", "dateStarted": "2023-02-01T17:33:37+0000", "dateFinished": "2023-02-01T17:33:38+0000", "status": "FINISHED", "$$hashKey": "object:9266" }, { "text": "%flink.ipyflink\nadmin_client.delete_topics([\"stock-topic\", \"transaction-topic\", \"output\"])", "user": "anonymous", "dateUpdated": "2023-02-01T17:33:32+0000", "progress": 0, "config": { "editorSetting": { "language": "python", "editOnDblClick": false, "completionKey": "TAB", "completionSupport": true }, "colWidth": 12, "editorMode": "ace/mode/python", "fontSize": 9, "results": {}, "enabled": true }, "settings": { "params": {}, "forms": {} }, "apps": [], "runtimeInfos": {}, "progressUpdateIntervalMs": 500, "jobName": "paragraph_1673991748866_1030549359", "id": "paragraph_1673991748866_1030549359", "dateCreated": "2023-01-17T21:42:28+0000", "dateStarted": "2023-02-01T17:33:32+0000", "dateFinished": "2023-02-01T17:33:33+0000", "status": "FINISHED", "$$hashKey": "object:9267" }, { "text": "%flink.ssql(type=update)\nDROP TABLE IF EXISTS stock;\nDROP TABLE IF EXISTS stock_datagen;\nDROP TABLE IF EXISTS transactions;\nDROP TABLE IF EXISTS transactions_datagen;\nDROP TABLE IF EXISTS output;\n\nCREATE TABLE IF NOT EXISTS stock ( \n ticker_symbol STRING NOT NULL PRIMARY KEY,\n price DOUBLE,\n s_timestamp TIMESTAMP_LTZ(3)\n) WITH (\n 'connector' = 'upsert-kafka',\n 'topic' = 'stock-topic',\n 'properties.bootstrap.servers' = '{brokers}',\n 'key.format' = 'raw',\n 'value.format' = 'json'\n);\n\n\nCREATE TABLE IF NOT EXISTS transactions ( \n id STRING,\n transaction_ticker_symbol STRING,\n shares_purchased DOUBLE,\n t_timestamp TIMESTAMP_LTZ(3)\n \n) WITH (\n 'connector' = 'kafka',\n 'topic' = 'transaction-topic',\n 'properties.bootstrap.servers' = '{brokers}',\n 'value.format' = 'json'\n);\n\nCREATE TABLE IF NOT EXISTS stock_datagen\nWITH (\n 'connector' = 'datagen',\n 'rows-per-second' = '1',\n 'fields.ticker_symbol.length' = '2',\n 'fields.price.min' = '0.00',\n 'fields.price.max' = '10000.00'\n -- 'fields.s_timestamp.max-past' = '150000' -- only in 1.15 > \n )\n LIKE stock (EXCLUDING ALL);\n \nCREATE TABLE IF NOT EXISTS transactions_datagen\nWITH (\n 'connector' = 'datagen',\n 'rows-per-second' = '1000',\n 'fields.transaction_ticker_symbol.length' = '2',\n 'fields.shares_purchased.min' = '1',\n 'fields.shares_purchased.max' = '100'\n -- 'fields.t_timestamp.max-past' = '30000' -- only in 1.15 > \n )\n LIKE transactions (EXCLUDING ALL);\n \nCREATE TABLE IF NOT EXISTS output (\n data STRING\n ) WITH (\n 'connector' = 'kafka',\n 'topic' = 'output',\n 'properties.bootstrap.servers' = '{brokers}',\n 'value.format' = 'json',\n 'format' = 'raw')", "user": "anonymous", "dateUpdated": "2023-02-01T17:35:15+0000", "progress": 0, "config": { "editorSetting": { "language": "sql", "editOnDblClick": false, "completionKey": "TAB", "completionSupport": true }, "colWidth": 12, "editorMode": "ace/mode/sql", "fontSize": 9, "results": {}, "enabled": true }, "settings": { "params": {}, "forms": {} }, "apps": [], "runtimeInfos": {}, "progressUpdateIntervalMs": 500, "jobName": "paragraph_1673991681330_1650742522", "id": "paragraph_1673991681330_1650742522", "dateCreated": "2023-01-17T21:41:21+0000", "dateStarted": "2023-02-01T17:35:16+0000", "dateFinished": "2023-02-01T17:35:28+0000", "status": "FINISHED", "$$hashKey": "object:9268" }, { "text": "%flink.ssql(type=update)\nINSERT INTO STOCK SELECT * FROM STOCK_DATAGEN;\n", "user": "anonymous", "dateUpdated": "2023-01-18T15:42:47+0000", "progress": 0, "config": { "editorSetting": { "language": "sql", "editOnDblClick": false, "completionKey": "TAB", "completionSupport": true }, "colWidth": 12, "editorMode": "ace/mode/sql", "fontSize": 9, "results": {}, "enabled": true }, "settings": { "params": {}, "forms": {} }, "apps": [], "runtimeInfos": { "jobUrl": { "propertyName": "jobUrl", "label": "FLINK JOB", "tooltip": "View in Flink web UI", "group": "flink", "values": [ { "jobUrl": "/flinkdashboard/#/job/6cd791ea451fdda26211702af5d223eb", "$$hashKey": "object:9631" } ], "interpreterSettingId": "flink" } }, "progressUpdateIntervalMs": 500, "jobName": "paragraph_1673991778831_124555963", "id": "paragraph_1673991778831_124555963", "dateCreated": "2023-01-17T21:42:58+0000", "dateStarted": "2023-01-18T15:42:47+0000", "dateFinished": "2023-01-18T21:43:32+0000", "status": "ERROR", "$$hashKey": "object:9269" }, { "text": "%flink.ssql(type=update)\nINSERT INTO TRANSACTIONS SELECT * FROM TRANSACTIONS_DATAGEN;\n", "user": "anonymous", "dateUpdated": "2023-01-18T15:42:49+0000", "progress": 0, "config": { "editorSetting": { "language": "sql", "editOnDblClick": false, "completionKey": "TAB", "completionSupport": true }, "colWidth": 12, "editorMode": "ace/mode/sql", "fontSize": 9, "results": {}, "enabled": true }, "settings": { "params": {}, "forms": {} }, "apps": [], "runtimeInfos": { "jobUrl": { "propertyName": "jobUrl", "label": "FLINK JOB", "tooltip": "View in Flink web UI", "group": "flink", "values": [ { "jobUrl": "/flinkdashboard/#/job/528e45968185bfff0df1a838c007ac52", "$$hashKey": "object:9679" } ], "interpreterSettingId": "flink" } }, "progressUpdateIntervalMs": 500, "jobName": "paragraph_1674052986415_2014925260", "id": "paragraph_1674052986415_2014925260", "dateCreated": "2023-01-18T14:43:06+0000", "dateStarted": "2023-01-18T15:42:49+0000", "dateFinished": "2023-01-18T21:43:30+0000", "status": "ERROR", "$$hashKey": "object:9270" }, { "text": "%flink.ssql(type=update)\nSELECT * FROM OUTPUT;", "user": "anonymous", "dateUpdated": "2023-01-23T15:57:39+0000", "progress": 0, "config": { "editorSetting": { "language": "sql", "editOnDblClick": false, "completionKey": "TAB", "completionSupport": true }, "colWidth": 12, "editorMode": "ace/mode/sql", "fontSize": 9, "results": { "0": { "graph": { "mode": "table", "height": 300, "optionOpen": false, "setting": { "table": { "tableGridState": {}, "tableColumnTypeState": { "names": { "data": "string" }, "updated": false }, "tableOptionSpecHash": "[{\"name\":\"useFilter\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable filter for columns\"},{\"name\":\"showPagination\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable pagination for better navigation\"},{\"name\":\"showAggregationFooter\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable a footer for displaying aggregated values\"}]", "tableOptionValue": { "useFilter": false, "showPagination": false, "showAggregationFooter": false }, "updated": false, "initialized": false } }, "commonSetting": {} } } }, "enabled": true }, "settings": { "params": {}, "forms": {} }, "apps": [], "runtimeInfos": {}, "progressUpdateIntervalMs": 500, "jobName": "paragraph_1674054273387_118663021", "id": "paragraph_1674054273387_118663021", "dateCreated": "2023-01-18T15:04:33+0000", "dateStarted": "2023-01-18T16:56:50+0000", "dateFinished": "2023-01-23T15:57:27+0000", "status": "ABORT", "$$hashKey": "object:9271" } ], "name": "kafka-admin", "id": "2HP77QBDY", "defaultInterpreterGroup": "flink", "version": "0.9.0", "noteParams": {}, "noteForms": {}, "angularObjects": {}, "config": { "isZeppelinNotebookCronEnable": false, "looknfeel": "default", "personalizedMode": "false" }, "info": {}, "path": "/kafka-admin" }