# Notebook 3: Click Stream Kinesis

Specify "Python 3" Kernel and "Data Science" Image.

### Background

As the user interacts with the e-commerce website, we need a way to capture their activity in the form of click stream events. In this notebook, we'll be simulating user activity and capturing these click stream events with Amazon [Kinesis Data Streams (KDA)](https://docs.aws.amazon.com/streams/latest/dev/introduction.html), aggregating them with [Amazon Kinesis Data Analytics (KDA)](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/what-is.html), and then ingesting these events into SageMaker Feature Store.

There will be a Producer that will emit click stream events (simulating user activity) to the Kinesis Data Stream and we'll use Kinesis Data Analytics to aggregate the click stream data for the last 2 minutes of activity.

Finally, a Lambda function will take the data from Kinesis Data Analytics and ingest into SageMaker Feature Store (specifically the `click_stream` Feature Group).

"Kinesis"

### Imports

In [None]:
import pandas as pd
import numpy as np
import random
import datetime
import time
import boto3
import json
from sagemaker import get_execution_role
import sagemaker
from sagemaker.lambda_helper import Lambda
import sys
from utils import *
from parameter_store import ParameterStore

### Session variables

In [None]:
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session()
default_bucket = sagemaker_session.default_bucket()
region = sagemaker_session.boto_region_name
s3_client = boto3.client("s3", region_name=region)
featurestore_runtime = boto3.client(service_name='sagemaker-featurestore-runtime',
 region_name=region)
ps = ParameterStore(verbose=False)
ps.set_namespace('feature-store-workshop')

Defining click stream related variables

In [None]:
current_time = time.strftime("%m-%d-%H-%M-%S", time.localtime())
kinesis_stream_name = f'fs-click-stream-activity-{current_time}'
kinesis_analytics_application_name = f'fs-click-stream-application-{current_time}'
lambda_name = f'click-stream-aggregator-lambda{current_time}'
n_range = 6

Load variables from the previous notebook.

In [None]:
parameters = ps.read()
click_stream_feature_group_name = parameters['click_stream_feature_group_name']
inference_customer_id = parameters['inference_customer_id']
products_table = parameters['products_table']
products_feature_group_name = parameters['products_feature_group_name']

In [None]:
ps.add({'kinesis_stream_name': kinesis_stream_name,
 'kinesis_analytics_application_name': kinesis_analytics_application_name})
ps.store()

### Create an Amazon Kinesis Data Stream

In this section, we will simulate customer click stream activity on a web application like saving products to cart, liking products, and so on. For this, we will use Amazon Kinesis Data Streams, a scalable real-time streaming service.

In [None]:
kinesis_client = boto3.client('kinesis')
kinesis_client.create_stream(StreamName=kinesis_stream_name, ShardCount=1)

active_stream = False
while not active_stream:
 status = kinesis_client.describe_stream(StreamName=kinesis_stream_name)['StreamDescription']['StreamStatus']
 if (status == 'CREATING'):
 print('Waiting for the Kinesis stream to become active...')
 time.sleep(20) 
 elif (status == 'ACTIVE'): 
 active_stream = True
 print('ACTIVE')
 
stream_arn = kinesis_client.describe_stream(StreamName=kinesis_stream_name)['StreamDescription']['StreamARN']
print(f'Amazon kinesis stream arn: {stream_arn}')

### Create a Kinesis Data Analytics application

The Ranking model will recommend ranked products to a customer based on a customer's last 2 minutes of activity on the e-commerce website. To aggregte the streaming infomation over a window of last 2 minutes, we will use Kinesis Data Analytics (KDA) and create a KDA application. KDA can process data with sub-second latency from Amazon Kinesis Data Streams using SQL transformations.

In the below cells, we will create a KDA application and transform the data coming from the Kinesis Data Stream with the SQL query string stored in the `sql_code` variable.

In [None]:
kda_client = boto3.client('kinesisanalytics')

In [None]:
sql_code = '''
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
 customer_id VARCHAR(8), 
 sum_activity_weight_last_2m INTEGER, 
 avg_product_health_index_last_2m DOUBLE
);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" 
SELECT 
 STREAM CUSTOMER_ID, 
 SUM(ACTIVITY_WEIGHT) AS sum_activity_weight_last_2m, 
 AVG(PRODUCT_HEALTH_INDEX) AS avg_product_health_index_last_2m
FROM 
 "SOURCE_SQL_STREAM_001" 
WINDOWED BY STAGGER (
 PARTITION BY CUSTOMER_ID RANGE INTERVAL \'2\' MINUTE);
'''

The below input schema defines how data from the Kinesis Data Stream is made available to SQL queries in the KDA application.

In [None]:
kda_input_schema = [{
 'NamePrefix': 'SOURCE_SQL_STREAM',
 'KinesisStreamsInput': {
 'ResourceARN': stream_arn,
 'RoleARN': role
 },
 'InputSchema': {
 'RecordFormat': {
 'RecordFormatType': 'JSON',
 'MappingParameters': {
 'JSONMappingParameters': {
 'RecordRowPath': '$'
 }
 },
 },
 'RecordEncoding': 'UTF-8',
 'RecordColumns': [
 {'Name': 'EVENT_TIME', 'Mapping': '$.event_time', 'SqlType': 'TIMESTAMP'},
 {'Name': 'CUSTOMER_ID','Mapping': '$.customer_id', 'SqlType': 'VARCHAR(8)'},
 {'Name': 'PRODUCT_ID', 'Mapping': '$.product_id', 'SqlType': 'VARCHAR(8)'},
 {'Name': 'PRODUCT_CATEGORY', 'Mapping': '$.product_category', 'SqlType': 'VARCHAR(20)'},
 {'Name': 'HEALTH_CATEGORY', 'Mapping': '$.health_category', 'SqlType': 'VARCHAR(10)'},
 {'Name': 'ACTIVITY_TYPE', 'Mapping': '$.activity_type', 'SqlType': 'VARCHAR(10)'},
 {'Name': 'ACTIVITY_WEIGHT', 'Mapping': '$.activity_weight', 'SqlType': 'INTEGER'},
 {'Name': 'PRODUCT_HEALTH_INDEX', 'Mapping': '$.product_health_index', 'SqlType': 'DOUBLE'}
 ]
 }
 } 
 ]

#### Create a Lambda function and associate it with the KDA application

Now we'll need to create a Lambda function to take the output from our KDA application and ingest that data into SageMaker Feature Store. Specifically, we'll be ingesting that data into our `click stream` Feature Group.

See the following Lambda Python code for more details on how we do this:

In [None]:
!pygmentize ./scripts/lambda-stream.py

In [None]:
lambda_function = Lambda(
 function_name=lambda_name,
 execution_role_arn=role,
 script="./scripts/lambda-stream.py",
 handler="lambda-stream.lambda_handler",
 timeout=600,
 memory_size=10240,
)

lambda_function_response = lambda_function.create()
lambda_function_arn = lambda_function_response['FunctionArn']

print(f'Lambda function arn: {lambda_function_arn}')

Update the Lambda function to accept the `click stream` Feature Group name as an environment variable.

In [None]:
lambda_client = boto3.client('lambda')
lambda_client.update_function_configuration(FunctionName=lambda_name,
 Environment={
 'Variables': {
 'click_stream_feature_group_name': click_stream_feature_group_name
 }
 })

Define a KDA output schema which will contain the Lambda ARN and destination schema.

In [None]:
kda_output_schema = [{'LambdaOutput': {'ResourceARN': lambda_function_arn, 'RoleARN': role},
 'Name': 'DESTINATION_SQL_STREAM',
 'DestinationSchema': {'RecordFormatType': 'JSON'}}]
print(f'KDA output schema: {kda_output_schema}')

Finally, create the Kinesis Data Analytics application that will aggregate the incoming streaming data from KDS using the SQL provided above.

In [None]:
creating_app = False
while not creating_app:
 response = kda_client.create_application(ApplicationName=kinesis_analytics_application_name, 
 Inputs=kda_input_schema,
 Outputs=kda_output_schema,
 ApplicationCode=sql_code)
 status = response['ApplicationSummary']['ApplicationStatus']
 if (status != 'READY'):
 print('Waiting for the Kinesis Analytics Application to be in READY state...')
 time.sleep(20) 
 elif (status == 'READY'): 
 creating_app = True
 print('READY')

Start the Kinesis Data Analytics application.

In [None]:
kda_client.start_application(ApplicationName=kinesis_analytics_application_name,
 InputConfigurations=[{'Id': '1.1',
 'InputStartingPositionConfiguration': 
 {'InputStartingPosition':'NOW'}}])

Wait on the KDA application to spin up.

In [None]:
running_app = False
while not running_app:
 status = kda_client.describe_application(ApplicationName=kinesis_analytics_application_name)['ApplicationDetail']['ApplicationStatus']
 if (status != 'RUNNING'):
 print('Waiting for the Kinesis Application to be in RUNNING state...')
 time.sleep(20) 
 elif (status == 'RUNNING'): 
 running_app = True
 print('RUNNING')

### Simulate click stream events and ingest into the Kinesis Data Stream

In [None]:
def generate_click_stream_data(customer_id, product_health_index_low, product_health_index_high):
 # Let's get some random product categories to help us generate click stream data
 query = f'''
 select product_category,
 product_health_index,
 product_id
 from "{products_table}"
 where product_health_index between {product_health_index_low} and {product_health_index_high}
 order by random()
 limit 1
 '''

 event_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=10)
 random_products_df, query = query_offline_store(products_feature_group_name, query,
 sagemaker_session)
 # Pick randon activity type and activity weights
 activities = ['liked', 'added_to_cart', 'added_to_wish_list', 'saved_for_later']
 activity_weights_dict = {'liked': 1, 'added_to_cart': 2,
 'added_to_wish_list': 1, 'saved_for_later': 2}
 random_activity_type = random.choice(activities)
 random_activity_weight = activity_weights_dict[random_activity_type]
 
 data = {
 'event_time': event_time.isoformat(),
 'customer_id': customer_id,
 'product_id': random_products_df.product_id.values[0],
 'product_category': random_products_df.product_category.values[0],
 'activity_type': random_activity_type,
 'activity_weight': random_activity_weight,
 'product_health_index': random_products_df.product_health_index.values[0]
 }
 return data
 
def put_records_in_kinesis_stream(customer_id, product_health_index_low,product_health_index_high):
 for i in range(n_range):
 data = generate_click_stream_data(customer_id, product_health_index_low, product_health_index_high)
 print(data)
 
 kinesis_client = boto3.client('kinesis')
 response = kinesis_client.put_record(
 StreamName=kinesis_stream_name,
 Data=json.dumps(data),
 PartitionKey="partitionkey")

Now let's ingest our click stream data into SageMaker Feature via KDS and KDA. For `inference_customer_id`, we simulate customer browsing pattern for unhealthy products like cookies, ice creams, and candies using a lower health index range 0.1 to 0.3.

We'll be producing 6 records which will get ingested into the Kinesis Data Stream, aggregated by Kinesis Data Analytics into a single record which is then ingested into the `click stream` Feature Group in SageMaker Feature Store. This process should take 2 minutes.

Note: You can change the range to 0.7 and 0.9 to simulate customer browsing pattern of healthy products like oat meals, vitamin supplements, etc...

In [None]:
put_records_in_kinesis_stream(inference_customer_id, 0.1, 0.3)
# It takes 2 minutes for KDA to call lambda to update feature store 
# because we are capturing 2 minute interval of customer activity 
time.sleep(120)

See that the ingested record is in the feature group.

In [None]:
record = featurestore_runtime.get_record(FeatureGroupName=click_stream_feature_group_name,
 RecordIdentifierValueAsString=inference_customer_id)
print(f'Online feature store data for customer id {inference_customer_id}')
print(f'Record: {record}')

Go back to Workshop Studio and click on "Next".