{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install boto3" ] }, { "cell_type": "code", "execution_count": 95, "metadata": {}, "outputs": [], "source": [ "import os\n", "os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kinesis-asl_2.12/3.1.2/spark-streaming-kinesis-asl_2.12-3.1.2.jar,https://repo1.maven.org/maven2/com/amazonaws/amazon-kinesis-client/1.14.8/amazon-kinesis-client-1.14.8.jar,https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-kinesis/1.12.31/aws-java-sdk-kinesis-1.12.31.jar,/home/jovyan/.local/lib/python3.7/site-packages pyspark-shell'\n", "\n", "from pyspark import SparkContext\n", "from pyspark.streaming import StreamingContext\n", "from pyspark.sql.types import StructField, StructType, StringType, IntegerType\n", "from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream\n", "import boto3,json" ] }, { "cell_type": "code", "execution_count": 96, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "create a new stream\n", "the stream exists\n" ] } ], "source": [ "# make sure you have your AWS cerdentials set before you can run this\n", "\n", "# creating the Kinesis stream\n", "client = boto3.client('kinesis')\n", "stream_name='pyspark-kinesis'\n", "\n", "try:\n", " print(\"create a new stream\")\n", " client.create_stream(\n", " StreamName=stream_name,\n", " ShardCount=1)\n", "except:\n", " print(\"the stream exists\")" ] }, { "cell_type": "code", "execution_count": 97, "metadata": {}, "outputs": [], "source": [ "# creating a couple of messages to send to kinesis\n", "messages = [\n", " {'message_type': 'message1', 'count': 2},\n", " {'message_type': 'message2', 'count': 1},\n", " {'message_type': 'message1', 'count': 2},\n", " {'message_type': 'message3', 'count': 3},\n", " {'message_type': 'message1', 'count': 5}\n", "]" ] }, { "cell_type": "code", "execution_count": 98, "metadata": {}, "outputs": [], "source": [ "for message in messages:\n", " client.put_record(\n", " StreamName=stream_name,\n", " Data=json.dumps(message),\n", " PartitionKey='part_key')" ] }, { "cell_type": "code", "execution_count": 99, "metadata": {}, "outputs": [], "source": [ "sc = SparkContext(appName=\"PythonStreamingKinesisWordCountAsl\")\n", "ssc = StreamingContext(sc, 1)" ] }, { "cell_type": "code", "execution_count": 100, "metadata": {}, "outputs": [], "source": [ "kinesis = KinesisUtils.createStream(\n", " ssc, stream_name,stream_name, 'https://kinesis.us-east-1.amazonaws.com','us-east-1', \\\n", " InitialPositionInStream.TRIM_HORIZON, 2)" ] }, { "cell_type": "code", "execution_count": 92, "metadata": {}, "outputs": [], "source": [ "def format_sample(x):\n", " data = json.loads(x)\n", " return (data[0], json.dumps(data))" ] }, { "cell_type": "code", "execution_count": 101, "metadata": {}, "outputs": [], "source": [ "parsed = kinesis.map(lambda x: format_sample(x))\n", "parsed.pprint()" ] }, { "cell_type": "code", "execution_count": 102, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-------------------------------------------\n", "Time: 2022-03-21 19:05:15\n", "-------------------------------------------\n", "\n", "-------------------------------------------\n", "Time: 2022-03-21 19:05:16\n", "-------------------------------------------\n", "\n", "-------------------------------------------\n", "Time: 2022-03-21 19:05:17\n", "-------------------------------------------\n", "\n", "-------------------------------------------\n", "Time: 2022-03-21 19:05:18\n", "-------------------------------------------\n", "\n", "-------------------------------------------\n", "Time: 2022-03-21 19:05:19\n", "-------------------------------------------\n", "\n" ] } ], "source": [ "ssc.start()\n", "ssc.awaitTerminationOrTimeout(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# cleanup\n", "# client.delete_stream(StreamName=stream_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.3" } }, "nbformat": 4, "nbformat_minor": 4 }