{ "cells": [ { "cell_type": "raw", "metadata": { "format": "text/plain" }, "source": [ "%flink\n", "\n", "import org.apache.kafka.clients.admin.AdminClient\n", "import org.apache.kafka.clients.admin.AdminClientConfig\n", "import java.util.Properties\n", "\n", "// replace with your brokers, etc...\n", "val bootstrapServers : String = \"boot-jh3g3srn.c3.kafka-serverless.us-east-2.amazonaws.com:9098\"\n", "var config = new Properties()\n", "config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)\n", "config.put(\"security.protocol\", \"SASL_SSL\")\n", "config.put(\"sasl.mechanism\", \"AWS_MSK_IAM\")\n", "config.put(\"sasl.jaas.config\", \"software.amazon.msk.auth.iam.IAMLoginModule required;\")\n", "config.put(\"sasl.client.callback.handler.class\", \"software.amazon.msk.auth.iam.IAMClientCallbackHandler\")\n", "var admin = AdminClient.create(config)\n" ] }, { "cell_type": "raw", "metadata": { "format": "text/plain" }, "source": [ "%flink\n", "\n", "// list topics\n", "var topicListing = admin.listTopics().listings().get()" ] }, { "cell_type": "raw", "metadata": { "format": "text/plain" }, "source": [ "%flink\n", "\n", "import org.apache.kafka.clients.admin.NewTopic\n", "\n", "// 3 partitions and replication factor of 1\n", "var newTopic = new NewTopic(\"MyOrdersTopic\", 3, 1.toShort);\n", "admin.createTopics(Collections.singleton(newTopic));" ] }, { "cell_type": "raw", "metadata": { "format": "text/plain" }, "source": [ "%flink\n", "\n", "admin.deleteTopics(Collections.singleton(\"DatagenJsonTopic2\"))" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "autoscroll": "auto" }, "outputs": [], "source": [ "%flink.ssql\n", "\n", "DROP TABLE IF EXISTS orders_datagen_source;\n", "\n", "CREATE TABLE orders_datagen_source (\n", " product_id BIGINT,\n", " order_number BIGINT,\n", " quantity INT,\n", " price_int INT,\n", " price AS CAST(price_int/100.0 AS DECIMAL(32, 2)),\n", " buyer STRING,\n", " order_time TIMESTAMP(3)\n", ")\n", "WITH (\n", " 'connector'= 'datagen',\n", " 'fields.product_id.min' = '1',\n", " 'fields.product_id.max' = '99999',\n", " 'fields.quantity.min' = '1',\n", " 'fields.quantity.max' = '25',\n", " 'fields.price_int.min' = '29',\n", " 'fields.price_int.max' = '99999999',\n", " 'fields.order_number.min' = '1',\n", " 'fields.order_number.max' = '9999999999',\n", " 'fields.buyer.length' = '15'\n", ");\n" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "autoscroll": "auto" }, "outputs": [], "source": [ "%flink.ssql\n", "\n", "DROP TABLE IF EXISTS orders_msk;\n", "\n", "CREATE TABLE orders_msk (\n", " product_id BIGINT,\n", " order_number BIGINT,\n", " quantity INT,\n", " price DECIMAL(32, 2),\n", " buyer STRING,\n", " order_time TIMESTAMP(3)\n", ")\n", "WITH (\n", " 'connector'= 'kafka',\n", " 'topic' = 'MyOrdersTopic',\n", " 'format' = 'json',\n", " 'scan.startup.mode' = 'earliest-offset',\n", " 'properties.bootstrap.servers' = 'boot-jh3g3srn.c3.kafka-serverless.us-east-2.amazonaws.com:9098',\n", " 'properties.security.protocol' = 'SASL_SSL',\n", " 'properties.sasl.mechanism' = 'AWS_MSK_IAM',\n", " 'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',\n", " 'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler'\n", ");\n" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "autoscroll": "auto" }, "outputs": [], "source": [ "%flink.pyflink\n", "\n", "s_env.disable_operator_chaining()\n" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "autoscroll": "auto" }, "outputs": [], "source": [ "%flink.ssql(parallelism=2)\n", "\n", "INSERT INTO orders_msk\n", "SELECT \n", " product_id,\n", " order_number,\n", " quantity,\n", " price,\n", " buyer,\n", " order_time\n", "FROM orders_datagen_source;" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "autoscroll": "auto" }, "outputs": [], "source": [ "%flink.ssql(type=update, parallelism=2)\n", "\n", "select * from orders_msk;" ] } ], "metadata": { "kernelspec": { "display_name": "Spark 2.0.0", "language": "python", "name": "spark2" }, "language_info": { "codemirror_mode": "text/python", "file_extension": ".py", "mimetype": "text/python", "name": "scala", "pygments_lexer": "python", "version": "3.6" } }, "nbformat": 4, "nbformat_minor": 2 }