{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Copyright 2021 Amazon.com and its affiliates; all rights reserved. This file is AWS Content and may not be duplicated or distributed without permission" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Using SQL feature pipelines with Amazon SageMaker Feature Store\n", "This notebook provides a demo of setting up a SQL-based scheduled feature pipeline for \n", "transformation of raw data and ingestion into SageMaker Feature Store. Customers have\n", "many ways to get this done, and this example takes the following approach:\n", "\n", "- Uses a single SQL function provided by the data scientist for feature transformation\n", "- Uses Amazon Event Bridge for scheduling\n", "- Uses Amazon SageMaker Pipelines for execution of the feature pipeline\n", "- Uses an Amazon SageMaker Processing job to do the core feature transformation and ingestion work within the pipeline\n", "\n", "The notebook assumes that the feature group already exists." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### A few imports" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from utilities.feature_store_helper import FeatureStore\n", "\n", "import boto3\n", "import json\n", "\n", "from IPython.core.display import display, HTML, Markdown\n", "import time\n", "import pandas as pd\n", "\n", "\n", "FG_NAME = 'fs-demo-2022-03-24-sql'\n", "\n", "fs = FeatureStore()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Read some raw customer data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = pd.read_csv('utilities/customers.csv')\n", "ORIGINAL_RECORD_COUNT = df.shape[0]\n", "df.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.create_fg_from_df(FG_NAME, df, id_name='Id', event_time_name='UpdateTime')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a scheduled SQL-based feature pipeline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### First define any transformations needed from raw data to final features\n", "For this first example, we have no feature transformations. We are simply ingesting the latest fully featurized data on a scheduled basis to be available in both our online and online store." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Ensure the raw data is available as input for the new pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "default_bucket = sagemaker.Session().default_bucket()\n", "data_source = f's3://{default_bucket}/sagemaker-feature-store/hello-data/'\n", "\n", "!aws s3 cp utilities/customers.csv $data_source" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!head utilities/customers.csv" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Now, schedule the new feature pipeline to run daily, with the first execution starting a few seconds from now" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from datetime import datetime\n", "pipeline_start_time = str(datetime.now())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.schedule_feature_pipeline(data_source, FG_NAME, script_type='pyspark_sql')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sm = boto3.client('sagemaker')\n", "no_executions = True\n", "while no_executions:\n", " summs = sm.list_pipeline_executions(PipelineName=f'sm-pipeline-{FG_NAME}')['PipelineExecutionSummaries']\n", " if len(summs) > 0:\n", " print(json.dumps(summs, indent=4, default=str))\n", " break\n", " time.sleep(15)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Once the pipeline execution has completed, should see one more version of the features in history\n", "**NOTE:** you won't see the new updates in the offline store until they are replicated. Takes a few minutes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "while True:\n", " hist_df = fs.get_historical_offline_feature_values(FG_NAME, record_ids=[2])\n", " rec_count = hist_df.shape[0]\n", " if rec_count > 0:\n", " sorted_df = hist_df.sort_values(by=['write_time'], ascending=[False])\n", " latest_write = sorted_df.iloc[0]['write_time']\n", " if latest_write > pipeline_start_time:\n", " break\n", " else:\n", " time.sleep(60)\n", " else:\n", " time.sleep(60)\n", "hist_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And we can see that the online store still has the value with the newest event time (updatetime in our feature group)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.get_latest_feature_values(FG_NAME, [2])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Make a new version of the feature group, adding new features" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### First we'll disable the pipeline for the original feature group. In practice, you may keep it around for some period of time and then deprecate it." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.disable_feature_pipeline(FG_NAME)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### We'll auto-define the new schema, adding two features to the original dataframe\n", "We update the Pandas dataframe, indicating both the **name** and the **type** of each new feature." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df['Persona'] = int(0)\n", "df['NewFeature1'] = float(0.0)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Now we create a new feature group with a new version, based on the updated schema" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "new_fg_name = FG_NAME + '-v2'\n", "fs.create_fg_from_df(new_fg_name, df)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.describe_feature_group(new_fg_name)['FeatureDefinitions']" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Use a new transform function that creates the two new features based on the raw data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile utilities/customer_sql_v2.py\n", "\n", "def transform_query(fg_name: str) -> str:\n", " return f'''\n", " SELECT *, \n", " IF (Id > 3, 0, 1) as Persona, \n", " (zipcode * 2) + RAND() as NewFeature1 \n", " FROM {fg_name} \n", " '''\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Locally test out the new transforms" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import utilities.customer_sql_v2\n", "print(utilities.customer_sql_v2.transform_query(new_fg_name))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Schedule the new pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.schedule_feature_pipeline(data_source, new_fg_name, \n", " 'utilities/customer_sql_v2.py', \n", " script_type='pyspark_sql', schedule='rate(1 day)',\n", " instance_type='ml.m5.4xlarge', instance_count=1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sm = boto3.client('sagemaker')\n", "no_executions = True\n", "while no_executions:\n", " summs = sm.list_pipeline_executions(PipelineName=f'sm-pipeline-{new_fg_name}')['PipelineExecutionSummaries']\n", " if len(summs) > 0:\n", " print(json.dumps(summs, indent=4, default=str))\n", " break\n", " time.sleep(15)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Here's an example transform query for creating time windowed aggregate features" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile utilities/credit_card_sql_agg.py\n", "\n", "def transform_query(fg_name: str) -> str:\n", " return f'''\n", " SELECT cc_num,\n", " COUNT(*) OVER 30day_w as trans_count_30d,\n", " COUNT(*) OVER day_w as trans_count_1d,\n", " AVG(amount) OVER 30day_w as amt_avg_30d,\n", " AVG(amount) OVER day_w as amt_avg_1d,\n", " SUM(amount) OVER 30day_w as amt_sum_30d,\n", " SUM(amount) OVER day_w as amt_sum_1d,\n", " date_format(datetime, \"yyyy-MM-dd'T'HH:mm:ss.SS'Z'\") as event_time\n", " FROM {fg_name}\n", " WINDOW\n", " 30day_w AS (PARTITION BY cc_num order by cast(datetime AS timestamp) \n", " RANGE INTERVAL 30 DAY PRECEDING), \n", " day_w AS (PARTITION BY cc_num order by cast(datetime AS timestamp) \n", " RANGE INTERVAL 1 DAY PRECEDING)\n", " '''\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Show that the new features are available in the online store following the pipeline execution\n", "The online store feature values will be available once the pipeline is completed." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feature_values = []\n", "iterations = 0\n", "while True:\n", " feature_values = fs.get_latest_feature_values(new_fg_name, [2])\n", " if len(feature_values):\n", " break\n", " else:\n", " if iterations == 0:\n", " print('Waiting for pipeline to complete...')\n", " iterations += 1\n", " time.sleep(60)\n", "feature_values" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.disable_feature_pipeline(new_fg_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Clean up\n", "Remove the pipeline for the first feature group, and the one for the new feature group." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.disable_feature_pipeline(FG_NAME)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.remove_feature_pipeline(FG_NAME)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.remove_feature_pipeline(new_fg_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Delete the feature groups" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# fs.delete_feature_group(FG_NAME)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.delete_feature_group(new_fg_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }