In [None]:
!pip install boto3

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/jovyan/spark-sql-kinesis_2.12-1.2.0_spark-3.0-SNAPSHOT.jar,/home/jovyan/.local/lib/python3.7/site-packages pyspark-shell'

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
import boto3
import json

In [2]:
# you don't need any AWS cerdentials set when running with EMR on EKS. We will use IRSA feature to control the access.

# creating the Kinesis stream
client = boto3.client('kinesis')
stream_name='pyspark-kinesis'

try:
    print("create a new stream")
    client.create_stream(
            StreamName=stream_name,
            ShardCount=1)
except:
    print("the stream exists")

create a new stream
the stream exists


In [3]:
# creating a couple of messages to send to kinesis
messages = [
    {'message_type': 'message1', 'count': 2},
    {'message_type': 'message2', 'count': 1},
    {'message_type': 'message1', 'count': 2},
    {'message_type': 'message3', 'count': 3},
    {'message_type': 'message1', 'count': 5}
]

In [4]:
for message in messages:
    client.put_record(
        StreamName=stream_name,
        Data=json.dumps(message),
        PartitionKey='part_key')

In [5]:
spark = SparkSession.builder.appName('PySparkKinesis').getOrCreate()

In [6]:
kinesis = spark \
        .readStream \
        .format('kinesis') \
        .option('streamName', stream_name) \
        .option('endpointUrl', 'https://kinesis.us-east-1.amazonaws.com')\
        .option('region', 'us-east-1') \
        .option('startingposition', 'TRIM_HORIZON')\
        .option('awsUseInstanceProfile', 'false') \
        .load()

In [7]:
schema = StructType([
            StructField("message_type", StringType()),
            StructField("count", IntegerType())])

In [8]:
stream = kinesis\
    .selectExpr('CAST(data AS STRING)')\
    .select(from_json('data', schema).alias('data'))\
    .select('data.*')\
    .writeStream\
    .outputMode('append')\
    .format('console')\
    .trigger(once=True)\
    .start()
    .awaitTermination()

In [None]:
# delete the kinesis stream
# client.delete_stream(StreamName=stream_name)