{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Notebook 3: Click Stream Kinesis\n", "\n", "Specify \"Python 3\" Kernel and \"Data Science\" Image." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Background\n", "\n", "As the user interacts with the e-commerce website, we need a way to capture their activity in the form of click stream events. In this notebook, we'll be simulating user activity and capturing these click stream events with Amazon [Kinesis Data Streams (KDA)](https://docs.aws.amazon.com/streams/latest/dev/introduction.html), aggregating them with [Amazon Kinesis Data Analytics (KDA)](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/what-is.html), and then ingesting these events into SageMaker Feature Store.\n", "\n", "There will be a Producer that will emit click stream events (simulating user activity) to the Kinesis Data Stream and we'll use Kinesis Data Analytics to aggregate the click stream data for the last 2 minutes of activity.\n", "\n", "Finally, a Lambda function will take the data from Kinesis Data Analytics and ingest into SageMaker Feature Store (specifically the `click_stream` Feature Group).\n", "\n", "\"Kinesis\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Imports" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import numpy as np\n", "import random\n", "import datetime\n", "import time\n", "import boto3\n", "import json\n", "from sagemaker import get_execution_role\n", "import sagemaker\n", "from sagemaker.lambda_helper import Lambda\n", "import sys\n", "from utils import *\n", "from parameter_store import ParameterStore" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Session variables" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "role = sagemaker.get_execution_role()\n", "sagemaker_session = sagemaker.Session()\n", "default_bucket = sagemaker_session.default_bucket()\n", "region = sagemaker_session.boto_region_name\n", "s3_client = boto3.client(\"s3\", region_name=region)\n", "featurestore_runtime = boto3.client(service_name='sagemaker-featurestore-runtime',\n", " region_name=region)\n", "ps = ParameterStore(verbose=False)\n", "ps.set_namespace('feature-store-workshop')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Defining click stream related variables" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "current_time = time.strftime(\"%m-%d-%H-%M-%S\", time.localtime())\n", "kinesis_stream_name = f'fs-click-stream-activity-{current_time}'\n", "kinesis_analytics_application_name = f'fs-click-stream-application-{current_time}'\n", "lambda_name = f'click-stream-aggregator-lambda{current_time}'\n", "n_range = 6" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Load variables from the previous notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "parameters = ps.read()\n", "click_stream_feature_group_name = parameters['click_stream_feature_group_name']\n", "inference_customer_id = parameters['inference_customer_id']\n", "products_table = parameters['products_table']\n", "products_feature_group_name = parameters['products_feature_group_name']" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ps.add({'kinesis_stream_name': kinesis_stream_name,\n", " 'kinesis_analytics_application_name': kinesis_analytics_application_name})\n", "ps.store()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create an Amazon Kinesis Data Stream" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this section, we will simulate customer click stream activity on a web application like saving products to cart, liking products, and so on. For this, we will use Amazon Kinesis Data Streams, a scalable real-time streaming service." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "kinesis_client = boto3.client('kinesis')\n", "kinesis_client.create_stream(StreamName=kinesis_stream_name, ShardCount=1)\n", "\n", "active_stream = False\n", "while not active_stream:\n", " status = kinesis_client.describe_stream(StreamName=kinesis_stream_name)['StreamDescription']['StreamStatus']\n", " if (status == 'CREATING'):\n", " print('Waiting for the Kinesis stream to become active...')\n", " time.sleep(20) \n", " elif (status == 'ACTIVE'): \n", " active_stream = True\n", " print('ACTIVE')\n", " \n", "stream_arn = kinesis_client.describe_stream(StreamName=kinesis_stream_name)['StreamDescription']['StreamARN']\n", "print(f'Amazon kinesis stream arn: {stream_arn}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create a Kinesis Data Analytics application\n", "\n", "The Ranking model will recommend ranked products to a customer based on a customer's last 2 minutes of activity on the e-commerce website. To aggregte the streaming infomation over a window of last 2 minutes, we will use Kinesis Data Analytics (KDA) and create a KDA application. KDA can process data with sub-second latency from Amazon Kinesis Data Streams using SQL transformations.\n", "\n", "In the below cells, we will create a KDA application and transform the data coming from the Kinesis Data Stream with the SQL query string stored in the `sql_code` variable." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "kda_client = boto3.client('kinesisanalytics')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sql_code = '''\n", "CREATE OR REPLACE STREAM \"DESTINATION_SQL_STREAM\" (\n", " customer_id VARCHAR(8), \n", " sum_activity_weight_last_2m INTEGER, \n", " avg_product_health_index_last_2m DOUBLE\n", ");\n", "CREATE OR REPLACE PUMP \"STREAM_PUMP\" AS INSERT INTO \"DESTINATION_SQL_STREAM\" \n", "SELECT \n", " STREAM CUSTOMER_ID, \n", " SUM(ACTIVITY_WEIGHT) AS sum_activity_weight_last_2m, \n", " AVG(PRODUCT_HEALTH_INDEX) AS avg_product_health_index_last_2m\n", "FROM \n", " \"SOURCE_SQL_STREAM_001\" \n", "WINDOWED BY STAGGER (\n", " PARTITION BY CUSTOMER_ID RANGE INTERVAL \\'2\\' MINUTE);\n", "'''" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The below input schema defines how data from the Kinesis Data Stream is made available to SQL queries in the KDA application." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "kda_input_schema = [{\n", " 'NamePrefix': 'SOURCE_SQL_STREAM',\n", " 'KinesisStreamsInput': {\n", " 'ResourceARN': stream_arn,\n", " 'RoleARN': role\n", " },\n", " 'InputSchema': {\n", " 'RecordFormat': {\n", " 'RecordFormatType': 'JSON',\n", " 'MappingParameters': {\n", " 'JSONMappingParameters': {\n", " 'RecordRowPath': '$'\n", " }\n", " },\n", " },\n", " 'RecordEncoding': 'UTF-8',\n", " 'RecordColumns': [\n", " {'Name': 'EVENT_TIME', 'Mapping': '$.event_time', 'SqlType': 'TIMESTAMP'},\n", " {'Name': 'CUSTOMER_ID','Mapping': '$.customer_id', 'SqlType': 'VARCHAR(8)'},\n", " {'Name': 'PRODUCT_ID', 'Mapping': '$.product_id', 'SqlType': 'VARCHAR(8)'},\n", " {'Name': 'PRODUCT_CATEGORY', 'Mapping': '$.product_category', 'SqlType': 'VARCHAR(20)'},\n", " {'Name': 'HEALTH_CATEGORY', 'Mapping': '$.health_category', 'SqlType': 'VARCHAR(10)'},\n", " {'Name': 'ACTIVITY_TYPE', 'Mapping': '$.activity_type', 'SqlType': 'VARCHAR(10)'},\n", " {'Name': 'ACTIVITY_WEIGHT', 'Mapping': '$.activity_weight', 'SqlType': 'INTEGER'},\n", " {'Name': 'PRODUCT_HEALTH_INDEX', 'Mapping': '$.product_health_index', 'SqlType': 'DOUBLE'}\n", " ]\n", " }\n", " } \n", " ]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create a Lambda function and associate it with the KDA application" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we'll need to create a Lambda function to take the output from our KDA application and ingest that data into SageMaker Feature Store. Specifically, we'll be ingesting that data into our `click stream` Feature Group.\n", "\n", "See the following Lambda Python code for more details on how we do this:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pygmentize ./scripts/lambda-stream.py" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "lambda_function = Lambda(\n", " function_name=lambda_name,\n", " execution_role_arn=role,\n", " script=\"./scripts/lambda-stream.py\",\n", " handler=\"lambda-stream.lambda_handler\",\n", " timeout=600,\n", " memory_size=10240,\n", ")\n", "\n", "lambda_function_response = lambda_function.create()\n", "lambda_function_arn = lambda_function_response['FunctionArn']\n", "\n", "print(f'Lambda function arn: {lambda_function_arn}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Update the Lambda function to accept the `click stream` Feature Group name as an environment variable." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "lambda_client = boto3.client('lambda')\n", "lambda_client.update_function_configuration(FunctionName=lambda_name,\n", " Environment={\n", " 'Variables': {\n", " 'click_stream_feature_group_name': click_stream_feature_group_name\n", " }\n", " })" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Define a KDA output schema which will contain the Lambda ARN and destination schema." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "kda_output_schema = [{'LambdaOutput': {'ResourceARN': lambda_function_arn, 'RoleARN': role},\n", " 'Name': 'DESTINATION_SQL_STREAM',\n", " 'DestinationSchema': {'RecordFormatType': 'JSON'}}]\n", "print(f'KDA output schema: {kda_output_schema}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, create the Kinesis Data Analytics application that will aggregate the incoming streaming data from KDS using the SQL provided above." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "creating_app = False\n", "while not creating_app:\n", " response = kda_client.create_application(ApplicationName=kinesis_analytics_application_name, \n", " Inputs=kda_input_schema,\n", " Outputs=kda_output_schema,\n", " ApplicationCode=sql_code)\n", " status = response['ApplicationSummary']['ApplicationStatus']\n", " if (status != 'READY'):\n", " print('Waiting for the Kinesis Analytics Application to be in READY state...')\n", " time.sleep(20) \n", " elif (status == 'READY'): \n", " creating_app = True\n", " print('READY')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Start the Kinesis Data Analytics application." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "kda_client.start_application(ApplicationName=kinesis_analytics_application_name,\n", " InputConfigurations=[{'Id': '1.1',\n", " 'InputStartingPositionConfiguration': \n", " {'InputStartingPosition':'NOW'}}])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Wait on the KDA application to spin up." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "running_app = False\n", "while not running_app:\n", " status = kda_client.describe_application(ApplicationName=kinesis_analytics_application_name)['ApplicationDetail']['ApplicationStatus']\n", " if (status != 'RUNNING'):\n", " print('Waiting for the Kinesis Application to be in RUNNING state...')\n", " time.sleep(20) \n", " elif (status == 'RUNNING'): \n", " running_app = True\n", " print('RUNNING')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Simulate click stream events and ingest into the Kinesis Data Stream" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def generate_click_stream_data(customer_id, product_health_index_low, product_health_index_high):\n", " # Let's get some random product categories to help us generate click stream data\n", " query = f'''\n", " select product_category,\n", " product_health_index,\n", " product_id\n", " from \"{products_table}\"\n", " where product_health_index between {product_health_index_low} and {product_health_index_high}\n", " order by random()\n", " limit 1\n", " '''\n", "\n", " event_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=10)\n", " random_products_df, query = query_offline_store(products_feature_group_name, query,\n", " sagemaker_session)\n", " # Pick randon activity type and activity weights\n", " activities = ['liked', 'added_to_cart', 'added_to_wish_list', 'saved_for_later']\n", " activity_weights_dict = {'liked': 1, 'added_to_cart': 2,\n", " 'added_to_wish_list': 1, 'saved_for_later': 2}\n", " random_activity_type = random.choice(activities)\n", " random_activity_weight = activity_weights_dict[random_activity_type]\n", " \n", " data = {\n", " 'event_time': event_time.isoformat(),\n", " 'customer_id': customer_id,\n", " 'product_id': random_products_df.product_id.values[0],\n", " 'product_category': random_products_df.product_category.values[0],\n", " 'activity_type': random_activity_type,\n", " 'activity_weight': random_activity_weight,\n", " 'product_health_index': random_products_df.product_health_index.values[0]\n", " }\n", " return data\n", " \n", "def put_records_in_kinesis_stream(customer_id, product_health_index_low,product_health_index_high):\n", " for i in range(n_range):\n", " data = generate_click_stream_data(customer_id, product_health_index_low, product_health_index_high)\n", " print(data)\n", " \n", " kinesis_client = boto3.client('kinesis')\n", " response = kinesis_client.put_record(\n", " StreamName=kinesis_stream_name,\n", " Data=json.dumps(data),\n", " PartitionKey=\"partitionkey\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's ingest our click stream data into SageMaker Feature via KDS and KDA. For `inference_customer_id`, we simulate customer browsing pattern for unhealthy products like cookies, ice creams, and candies using a lower health index range 0.1 to 0.3." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll be producing 6 records which will get ingested into the Kinesis Data Stream, aggregated by Kinesis Data Analytics into a single record which is then ingested into the `click stream` Feature Group in SageMaker Feature Store. This process should take 2 minutes." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note: You can change the range to 0.7 and 0.9 to simulate customer browsing pattern of healthy products like oat meals, vitamin supplements, etc..." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "put_records_in_kinesis_stream(inference_customer_id, 0.1, 0.3)\n", "# It takes 2 minutes for KDA to call lambda to update feature store \n", "# because we are capturing 2 minute interval of customer activity \n", "time.sleep(120)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "See that the ingested record is in the feature group." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "record = featurestore_runtime.get_record(FeatureGroupName=click_stream_feature_group_name,\n", " RecordIdentifierValueAsString=inference_customer_id)\n", "print(f'Online feature store data for customer id {inference_customer_id}')\n", "print(f'Record: {record}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Go back to Workshop Studio and click on \"Next\"." ] } ], "metadata": { "instance_type": "ml.t3.medium", "interpreter": { "hash": "fea7262dfaa662dc7ea8f1b256cf975fd886d2f868152164ef15877318a1e322" }, "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-west-2:236514542706:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }