{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install boto3" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import os\n", "os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/jovyan/spark-sql-kinesis_2.12-1.2.0_spark-3.0-SNAPSHOT.jar,/home/jovyan/.local/lib/python3.7/site-packages pyspark-shell'\n", "\n", "from pyspark.sql import SparkSession\n", "from pyspark.sql.functions import from_json, col\n", "from pyspark.sql.types import StructField, StructType, StringType, IntegerType\n", "import boto3\n", "import json" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "create a new stream\n", "the stream exists\n" ] } ], "source": [ "# you don't need any AWS cerdentials set when running with EMR on EKS. We will use IRSA feature to control the access.\n", "\n", "# creating the Kinesis stream\n", "client = boto3.client('kinesis')\n", "stream_name='pyspark-kinesis'\n", "\n", "try:\n", " print(\"create a new stream\")\n", " client.create_stream(\n", " StreamName=stream_name,\n", " ShardCount=1)\n", "except:\n", " print(\"the stream exists\")" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# creating a couple of messages to send to kinesis\n", "messages = [\n", " {'message_type': 'message1', 'count': 2},\n", " {'message_type': 'message2', 'count': 1},\n", " {'message_type': 'message1', 'count': 2},\n", " {'message_type': 'message3', 'count': 3},\n", " {'message_type': 'message1', 'count': 5}\n", "]" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "for message in messages:\n", " client.put_record(\n", " StreamName=stream_name,\n", " Data=json.dumps(message),\n", " PartitionKey='part_key')" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "spark = SparkSession.builder.appName('PySparkKinesis').getOrCreate()" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "kinesis = spark \\\n", " .readStream \\\n", " .format('kinesis') \\\n", " .option('streamName', stream_name) \\\n", " .option('endpointUrl', 'https://kinesis.us-east-1.amazonaws.com')\\\n", " .option('region', 'us-east-1') \\\n", " .option('startingposition', 'TRIM_HORIZON')\\\n", " .option('awsUseInstanceProfile', 'false') \\\n", " .load()" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "schema = StructType([\n", " StructField(\"message_type\", StringType()),\n", " StructField(\"count\", IntegerType())])" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "stream = kinesis\\\n", " .selectExpr('CAST(data AS STRING)')\\\n", " .select(from_json('data', schema).alias('data'))\\\n", " .select('data.*')\\\n", " .writeStream\\\n", " .outputMode('append')\\\n", " .format('console')\\\n", " .trigger(once=True)\\\n", " .start()\n", " .awaitTermination()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# delete the kinesis stream\n", "# client.delete_stream(StreamName=stream_name)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.3" } }, "nbformat": 4, "nbformat_minor": 4 }