{ "metadata": { "name": "kafka-admin", "kernelspec": { "language": "scala", "name": "spark2-scala" }, "language_info": { "codemirror_mode": "text/x-scala", "file_extension": ".scala", "mimetype": "text/x-scala", "name": "scala", "pygments_lexer": "scala" } }, "nbformat": 4, "nbformat_minor": 2, "cells": [ { "cell_type": "code", "execution_count": 0, "metadata": { "autoscroll": "auto" }, "outputs": [], "source": "%flink.ipyflink\n\n%pip install kafka-python\u003d\u003d2.0.2\n%pip install names\n%pip install numpy\n\n# KAFKA \nBROKERS \u003d \"Bootstrap_Brokers_Go_Here\"\nz.put(\"brokers\", BROKERS)" }, { "cell_type": "code", "execution_count": 1, "metadata": { "autoscroll": "auto" }, "outputs": [], "source": "%flink.ipyflink\nfrom kafka.admin import KafkaAdminClient, NewTopic\nadmin_client \u003d KafkaAdminClient(bootstrap_servers\u003dBROKERS, client_id\u003d\u0027test1\u0027)\n\ntopic_list \u003d []\ntopic_list.append(NewTopic(name\u003d\"stock-topic\", num_partitions\u003d3, replication_factor\u003d2))\ntopic_list.append(NewTopic(name\u003d\"transaction-topic\", num_partitions\u003d3, replication_factor\u003d2))\ntopic_list.append(NewTopic(name\u003d\"output\", num_partitions\u003d3, replication_factor\u003d2))\n\n\nadmin_client.create_topics(new_topics\u003dtopic_list, validate_only\u003dFalse)" }, { "cell_type": "code", "execution_count": 2, "metadata": { "autoscroll": "auto" }, "outputs": [], "source": "%flink.ipyflink\nadmin_client.delete_topics([\"stock-topic\", \"transaction-topic\", \"output\"])" }, { "cell_type": "code", "execution_count": 3, "metadata": { "autoscroll": "auto" }, "outputs": [], "source": "%flink.ssql(type\u003dupdate)\nDROP TABLE IF EXISTS stock;\nDROP TABLE IF EXISTS stock_datagen;\nDROP TABLE IF EXISTS transactions;\nDROP TABLE IF EXISTS transactions_datagen;\nDROP TABLE IF EXISTS output;\n\nCREATE TABLE IF NOT EXISTS stock ( \n ticker_symbol STRING NOT NULL PRIMARY KEY,\n price DOUBLE,\n s_timestamp TIMESTAMP_LTZ(3)\n) WITH (\n \u0027connector\u0027 \u003d \u0027upsert-kafka\u0027,\n \u0027topic\u0027 \u003d \u0027stock-topic\u0027,\n \u0027properties.bootstrap.servers\u0027 \u003d \u0027{brokers}\u0027,\n \u0027key.format\u0027 \u003d \u0027raw\u0027,\n \u0027value.format\u0027 \u003d \u0027json\u0027\n);\n\n\nCREATE TABLE IF NOT EXISTS transactions ( \n id STRING,\n transaction_ticker_symbol STRING,\n shares_purchased DOUBLE,\n t_timestamp TIMESTAMP_LTZ(3)\n \n) WITH (\n \u0027connector\u0027 \u003d \u0027kafka\u0027,\n \u0027topic\u0027 \u003d \u0027transaction-topic\u0027,\n \u0027properties.bootstrap.servers\u0027 \u003d \u0027{brokers}\u0027,\n \u0027value.format\u0027 \u003d \u0027json\u0027\n);\n\nCREATE TABLE IF NOT EXISTS stock_datagen\nWITH (\n \u0027connector\u0027 \u003d \u0027datagen\u0027,\n \u0027rows-per-second\u0027 \u003d \u00271\u0027,\n \u0027fields.ticker_symbol.length\u0027 \u003d \u00272\u0027,\n \u0027fields.price.min\u0027 \u003d \u00270.00\u0027,\n \u0027fields.price.max\u0027 \u003d \u002710000.00\u0027\n -- \u0027fields.s_timestamp.max-past\u0027 \u003d \u0027150000\u0027 -- only in 1.15 \u003e \n )\n LIKE stock (EXCLUDING ALL);\n \nCREATE TABLE IF NOT EXISTS transactions_datagen\nWITH (\n \u0027connector\u0027 \u003d \u0027datagen\u0027,\n \u0027rows-per-second\u0027 \u003d \u00271000\u0027,\n \u0027fields.transaction_ticker_symbol.length\u0027 \u003d \u00272\u0027,\n \u0027fields.shares_purchased.min\u0027 \u003d \u00271\u0027,\n \u0027fields.shares_purchased.max\u0027 \u003d \u0027100\u0027\n -- \u0027fields.t_timestamp.max-past\u0027 \u003d \u002730000\u0027 -- only in 1.15 \u003e \n )\n LIKE transactions (EXCLUDING ALL);\n \nCREATE TABLE IF NOT EXISTS output (\n data STRING\n ) WITH (\n \u0027connector\u0027 \u003d \u0027kafka\u0027,\n \u0027topic\u0027 \u003d \u0027output\u0027,\n \u0027properties.bootstrap.servers\u0027 \u003d \u0027{brokers}\u0027,\n \u0027value.format\u0027 \u003d \u0027json\u0027,\n \u0027format\u0027 \u003d \u0027raw\u0027)" }, { "cell_type": "code", "execution_count": 4, "metadata": { "autoscroll": "auto" }, "outputs": [], "source": "%flink.ssql(type\u003dupdate)\nINSERT INTO STOCK SELECT * FROM STOCK_DATAGEN;\n" }, { "cell_type": "code", "execution_count": 5, "metadata": { "autoscroll": "auto" }, "outputs": [], "source": "%flink.ssql(type\u003dupdate)\nINSERT INTO TRANSACTIONS SELECT * FROM TRANSACTIONS_DATAGEN;\n" }, { "cell_type": "code", "execution_count": 6, "metadata": { "autoscroll": "auto" }, "outputs": [], "source": "%flink.ssql(type\u003dupdate)\nSELECT * FROM OUTPUT;" } ] }