{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Setup\n", "\n", "**NOTE:** Before running this notebook, be sure to set the stack name in the first code cell to match the name of the CloudFormation stack you used to create this notebook instance. If you used the default stack name, you should not need to make any updates.\n", "\n", "This notebook performs the following setup actions for this example use of Amazon SageMaker Feature Store:\n", "\n", "1. Create online-only feature groups\n", "2. Create an Amazon Kinesis data stream\n", "3. Create an Amazon Kinesis Data Applications (KDA) application" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Get ARN's of Lambda functions from CloudFormation stack outputs\n", "1. InvokeFraudEndpointLambdaARN\n", "2. StreamingAggLambdaARN" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "STACK_NAME = 'sm-fs-streaming-agg-stack' # if you're not using the default stack name, replace this\n", "%store STACK_NAME" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sys\n", "import boto3\n", "\n", "cf_client = boto3.client('cloudformation')\n", "\n", "try:\n", " outputs = cf_client.describe_stacks(StackName=STACK_NAME)['Stacks'][0]['Outputs']\n", " for o in outputs:\n", " if o['OutputKey'] == 'IngestLambdaFunctionARN':\n", " lambda_to_fs_arn = o['OutputValue']\n", " if o['OutputKey'] == 'PredictLambdaFunctionARN':\n", " lambda_to_model_arn = o['OutputValue']\n", " if o['OutputKey'] == 'PredictLambdaFunctionName':\n", " predict_lambda_name = o['OutputValue']\n", "\n", "except:\n", " msg = f'CloudFormation stack {STACK_NAME} was not found. Please set the STACK_NAME properly and re-run this cell'\n", " sys.exit(ValueError(msg))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(f'lambda_to_model_arn: {lambda_to_model_arn}')\n", "print(f'lambda_to_fs_arn: {lambda_to_fs_arn}')\n", "print(f'predict_lambda_name: {predict_lambda_name}')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%store lambda_to_model_arn" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%store predict_lambda_name" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "# to get the latest sagemaker python sdk\n", "#!pip install -U sagemaker" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from IPython.display import display_html\n", "def restartkernel() :\n", " display_html(\"\",raw=True)\n", "#restartkernel()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Imports and other setup" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker import get_execution_role\n", "import sagemaker\n", "import boto3\n", "import json\n", "\n", "role = get_execution_role()\n", "sm = boto3.Session().client(service_name='sagemaker')\n", "smfs_runtime = boto3.Session().client(service_name='sagemaker-featurestore-runtime')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create online-only feature groups\n", "When using Amazon SageMaker Feature Store, a core design decision is the definition of feature groups. For our credit card fraud detection use case, we have decided to use two of them:\n", "\n", "1. `cc-agg-fg` - holds aggregate features that will be updated in near real-time (streaming ingestion)\n", "2. `cc-agg-batch-fg` - holds aggregate features that will be updated in batch\n", "\n", "Establishing a feature group is a one-time step and is done using the `CreateFeatureGroup` API. \n", "\n", "Feature groups can be created as **online-only**, **offline-only**, or both **online and offline**, which replicates updates from an online store to an offline store in Amazon S3. Since our focus in this example is on demonstrating the use of the feature store for online inference and streaming aggregation of features, we make each of our feature groups online-only.\n", "\n", "In addition to a feature group name, we provide metadata about each feature in the group. We are using a json file to define the schema, but this is not a requirement. We use a schema file to demonstrate how you might capture the feature group definitions, enabling you to recreate them consistently as you move from a development environment to a test or production environment. In our schema file, we also highlight the record identifier and the event timestamp. All feature groups must have these two features, but you get to decide how to name them.\n", "\n", "Here is a visual summary of the feature groups we will create below.\n", "\n", "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### cc-agg-fg schema" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pygmentize schema/cc-agg-fg-schema.json" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### cc-agg-batch-fg schema" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pygmentize schema/cc-agg-batch-fg-schema.json" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Utility functions to simplify creation of feature groups\n", "`schema_to_defs` takes our schema file and returns feature definitions, and the names of the record identifier and event timestamp feature." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def schema_to_defs(filename):\n", " schema = json.loads(open(filename).read())\n", " \n", " feature_definitions = []\n", " \n", " for col in schema['Features']:\n", " feature = {'FeatureName': col['name']}\n", " if col['type'] == 'double':\n", " feature['FeatureType'] = 'Fractional'\n", " elif col['type'] == 'bigint':\n", " feature['FeatureType'] = 'Integral'\n", " else:\n", " feature['FeatureType'] = 'String'\n", " feature_definitions.append(feature)\n", "\n", " return feature_definitions, schema['record_identifier_feature_name'], schema['event_time_feature_name']" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`schema_to_fg` creates a feature group from a schema file. If no s3 URI is passed, an online-only feature group is created." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def create_feature_group_from_schema(filename, fg_name, role_arn=None, s3_uri=None):\n", " schema = json.loads(open(filename).read())\n", " \n", " feature_defs = []\n", " \n", " for col in schema['features']:\n", " feature = {'FeatureName': col['name']}\n", " if col['type'] == 'double':\n", " feature['FeatureType'] = 'Fractional'\n", " elif col['type'] == 'bigint':\n", " feature['FeatureType'] = 'Integral'\n", " else:\n", " feature['FeatureType'] = 'String'\n", " feature_defs.append(feature)\n", "\n", " record_identifier_name = schema['record_identifier_feature_name']\n", " event_time_name = schema['event_time_feature_name']\n", "\n", " if role_arn is None:\n", " role_arn = get_execution_role()\n", "\n", " if s3_uri is None:\n", " offline_config = {}\n", " else:\n", " offline_config = {'OfflineStoreConfig': {'S3StorageConfig': {'S3Uri': s3_uri}}}\n", " \n", " sm.create_feature_group(\n", " FeatureGroupName = fg_name,\n", " RecordIdentifierFeatureName = record_identifier_name,\n", " EventTimeFeatureName = event_time_name,\n", " FeatureDefinitions = feature_defs,\n", " Description = schema['description'],\n", " Tags = schema['tags'],\n", " OnlineStoreConfig = {'EnableOnlineStore': True},\n", " RoleArn = role_arn,\n", " **offline_config)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create the two feature groups" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "create_feature_group_from_schema('schema/cc-agg-fg-schema.json', 'cc-agg-fg')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "create_feature_group_from_schema('schema/cc-agg-batch-fg-schema.json', 'cc-agg-batch-fg')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Show that the feature store is aware of the new feature groups" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sm.list_feature_groups()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Describe each feature group\n", "Note that each feature group gets its own ARN, allowing you to manage IAM policies that control access to individual feature groups. The feature names and types are displayed, and the record identifier and event time features are called out specifically. Notice that there is only an `OnlineStoreConfig` and no `OfflineStoreConfig`, as we have decided not to replicate features offline for these groups." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sm.describe_feature_group(FeatureGroupName='cc-agg-fg')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sm.describe_feature_group(FeatureGroupName='cc-agg-batch-fg')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create an Amazon Kinesis Data Stream" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "kinesis_client = boto3.client('kinesis')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "kinesis_client.create_stream(StreamName='cc-stream', ShardCount=1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "kinesis_client.list_streams()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "kinesis_client.describe_stream(StreamName='cc-stream')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "active_stream = False\n", "while not active_stream:\n", " status = kinesis_client.describe_stream(StreamName='cc-stream')['StreamDescription']['StreamStatus']\n", " if (status == 'CREATING'):\n", " print('Waiting for the Kinesis stream to become active...')\n", " time.sleep(20) \n", " elif (status == 'ACTIVE'): \n", " active_stream = True\n", " print('ACTIVE')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "stream_arn = kinesis_client.describe_stream(StreamName='cc-stream')['StreamDescription']['StreamARN']" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "stream_arn" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Map the Kinesis stream as an event source for Lambda fraud detection" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "lambda_client = boto3.client('lambda')\n", "\n", "lambda_client.create_event_source_mapping(EventSourceArn=stream_arn,\n", " FunctionName=lambda_to_model_arn,\n", " StartingPosition='LATEST',\n", " Enabled=True,\n", " MaximumRecordAgeInSeconds=60*10\n", " ) #DestinationConfig would handle discarded records" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create an Amazon Kinesis Data Applications (KDA) application" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "kda_client = boto3.client('kinesisanalytics')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sql_code = 'CREATE OR REPLACE STREAM \"DESTINATION_SQL_STREAM\" (\\n' + \\\n", " '\"cc_num\" BIGINT,\\n' + \\\n", " '\"num_trans_last_10m\" SMALLINT,\\n' + \\\n", " '\"avg_amt_last_10m\" REAL\\n);\\n\\n' + \\\n", " 'CREATE OR REPLACE PUMP \"STREAM_PUMP\" AS\\n' + \\\n", " 'INSERT INTO \"DESTINATION_SQL_STREAM\"\\n' + \\\n", " 'SELECT STREAM \"cc_num\", \\n' + \\\n", " 'COUNT(*) OVER LAST_10_MINUTES, \\n' + \\\n", " 'AVG(\"amount\") OVER LAST_10_MINUTES\\n' + \\\n", " 'FROM \"SOURCE_SQL_STREAM_001\"\\n' + \\\n", " 'WINDOW LAST_10_MINUTES AS (\\n' + \\\n", " 'PARTITION BY \"cc_num\"\\n' + \\\n", " 'RANGE INTERVAL \\'10\\' MINUTE PRECEDING);\\n'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "kda_inputs = [{\n", " 'NamePrefix': 'SOURCE_SQL_STREAM',\n", " 'KinesisStreamsInput': {\n", " 'ResourceARN': stream_arn,\n", " 'RoleARN': role\n", " },\n", " 'InputSchema': {\n", " 'RecordFormat': {\n", " 'RecordFormatType': 'JSON',\n", " 'MappingParameters': {\n", " 'JSONMappingParameters': {\n", " 'RecordRowPath': '$'\n", " }\n", " },\n", " },\n", " 'RecordEncoding': 'UTF-8',\n", " 'RecordColumns': [\n", " {'Name': 'cc_num', 'Mapping': '$.cc_num', 'SqlType': 'DECIMAL(1,1)'},\n", " {'Name': 'merchant','Mapping': '$.merchant', 'SqlType': 'VARCHAR(64)'},\n", " {'Name': 'amount', 'Mapping': '$.amount', 'SqlType': 'REAL'},\n", " {'Name': 'zip_code', 'Mapping': '$.zip_code', 'SqlType': 'INTEGER'}\n", " ]\n", " }\n", " } \n", " ]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "

Create Kinesis Data Analytics Application

\n", "\n", "We first lookup Lambda ARNs from CloudFormation output, then create a Kinesis Data Analytics application that connects its output to the Streaming Lambda. This Lambda will ingest the records and write them to the SageMaker Feature Group." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "kda_outputs = [{'LambdaOutput': {'ResourceARN': lambda_to_fs_arn, 'RoleARN': role},\n", " 'Name': 'DESTINATION_SQL_STREAM',\n", " 'DestinationSchema': {'RecordFormatType': 'JSON'}}]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "kda_outputs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "kda_client.create_application(ApplicationName='cc-agg-app', \n", " Inputs=kda_inputs,\n", " Outputs=kda_outputs,\n", " ApplicationCode=sql_code)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "kda_client.describe_application(ApplicationName='cc-agg-app')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "kda_client.start_application(ApplicationName='cc-agg-app',\n", " InputConfigurations=[{'Id': '1.1',\n", " 'InputStartingPositionConfiguration': \n", " {'InputStartingPosition':'NOW'}}])" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 4 }