{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Copyright 2021 Amazon.com and its affiliates; all rights reserved. This file is AWS Content and may not be duplicated or distributed without permission" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Using Python feature pipelines with Amazon SageMaker Feature Store\n", "This notebook provides a demo of setting up a scheduled feature pipeline for \n", "transformation of raw data and ingestion into SageMaker Feature Store. Customers have\n", "many ways to get this done, and this example takes the following approach:\n", "\n", "- Uses a single Python function provided by the data scientist for feature transformation\n", "- Uses Amazon Event Bridge for scheduling\n", "- Uses Amazon SageMaker Pipelines for execution of the feature pipeline\n", "- Uses an Amazon SageMaker Processing job to do the core feature transformation and ingestion work within the pipeline\n", "\n", "The notebook assumes that the feature group already exists." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### A few imports" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from utilities.feature_store_helper import FeatureStore\n", "\n", "from IPython.core.display import display, HTML, Markdown\n", "import pandas as pd\n", "import time\n", "from sklearn.ensemble import RandomForestClassifier\n", "\n", "FG_NAME = 'fs-demo-2022-03-24'\n", "\n", "fs = FeatureStore()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Read some raw customer data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = pd.read_csv('utilities/customers.csv')\n", "ORIGINAL_RECORD_COUNT = df.shape[0]\n", "df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a scheduled feature pipeline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### First define any transformations needed from raw data to final features\n", "For this first example, we have no feature transformations. We are simply ingesting the latest fully featurized data on a scheduled basis to be available in both our online and online store." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Ensure the raw data is available as input for the new pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "default_bucket = sagemaker.Session().default_bucket()\n", "data_source = f's3://{default_bucket}/sagemaker-feature-store/hello-data/'\n", "\n", "!aws s3 cp utilities/customers.csv $data_source" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Now, schedule the new feature pipeline to run daily, with the first execution starting a few seconds from now" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from datetime import datetime\n", "pipeline_start_time = str(datetime.now())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.schedule_feature_pipeline(data_source, FG_NAME)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import json\n", "\n", "sm = boto3.client('sagemaker')\n", "no_executions = True\n", "while no_executions:\n", " summs = sm.list_pipeline_executions(PipelineName=f'sm-pipeline-{FG_NAME}')['PipelineExecutionSummaries']\n", " if len(summs) > 0:\n", " print(json.dumps(summs, indent=4, default=str))\n", " break\n", " time.sleep(15)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Once the pipeline execution has completed, should see one more version of the features in history\n", "**NOTE:** you won't see the new updates in the offline store until they are replicated. Takes a few minutes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "while True:\n", " hist_df = fs.get_historical_offline_feature_values(FG_NAME, record_ids=[2])\n", " rec_count = hist_df.shape[0]\n", " if rec_count > 0:\n", " sorted_df = hist_df.sort_values(by=['write_time'], ascending=[False])\n", " latest_write = sorted_df.iloc[0]['write_time']\n", " if latest_write > pipeline_start_time:\n", " break\n", " else:\n", " time.sleep(60)\n", " else:\n", " time.sleep(60)\n", "hist_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And we can see that the online store still has the value with the newest event time (updatetime in our feature group)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.get_latest_feature_values(FG_NAME, [2])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Make a new version of the feature group, adding new features" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### First we'll disable the pipeline for the original feature group. In practice, you may keep it around for some period of time and then deprecate it." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.disable_feature_pipeline(FG_NAME)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### We'll auto-define the new schema, adding two features to the original dataframe\n", "We update the Pandas dataframe, indicating both the **name** and the **type** of each new feature." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df['Persona'] = int(0)\n", "df['NewFeature1'] = float(0.0)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Now we create a new feature group with a new version, based on the updated schema" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "new_fg_name = FG_NAME + '-v2'\n", "fs.create_fg_from_df(new_fg_name, df)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.describe_feature_group(new_fg_name)['FeatureDefinitions']" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Use a new Python transform function that creates the two new features based on the raw data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile utilities/customer_v2.py\n", "\n", "import pandas as pd\n", "import numpy as np\n", "\n", "def choose_persona(row):\n", " if row['Id'] > 3:\n", " return 0\n", " else:\n", " return 1\n", "\n", "def apply_transforms(df: pd.DataFrame) -> pd.DataFrame:\n", " df['Persona'] = df.apply(lambda row : choose_persona(row), axis=1) \n", " df['NewFeature1'] = df['Persona'] * np.random.rand() + (2* df['ZipCode'])\n", " return df\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Locally test out the new transforms" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import utilities.customer_v2\n", "df = utilities.customer_v2.apply_transforms(df)\n", "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Schedule the new pipeline hourly" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.schedule_feature_pipeline(data_source, new_fg_name, \n", " 'utilities/customer_v2.py', schedule='rate(1 hour)')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Show that the new features are available in the online store following the pipeline execution\n", "The online store feature values will be available once the pipeline is completed." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feature_values = []\n", "iterations = 0\n", "while True:\n", " feature_values = fs.get_latest_feature_values(new_fg_name, [2])\n", " if len(feature_values):\n", " break\n", " else:\n", " if iterations == 0:\n", " print('Waiting for record to be available in online store..')\n", " iterations += 1\n", " time.sleep(60)\n", "feature_values" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.disable_feature_pipeline(new_fg_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Update the pipeline to use a new implementation of the feature transformation script, to improve the features or fix bugs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile utilities/customer_v1_bugfix.py\n", "\n", "import pandas as pd\n", "import numpy as np\n", "\n", "def choose_persona(row):\n", " if row['Id'] > 3:\n", " return 0\n", " else:\n", " return 1\n", "\n", "def apply_transforms(df: pd.DataFrame) -> pd.DataFrame:\n", " df['Persona'] = df.apply(lambda row : choose_persona(row), axis=1) \n", " df['NewFeature1'] = df['Persona'] * np.random.rand() + df['ZipCode']\n", " return df\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Try out the updated transforms locally" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import utilities.customer_v1_bugfix\n", "df = utilities.customer_v1_bugfix.apply_transforms(df)\n", "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Now, update the pipeline to have the new transforms" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.update_feature_pipeline(data_source, new_fg_name, \n", " 'utilities/customer_v1_bugfix.py', instance_count=1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Perform a full backfill from raw data through the updated transforms\n", "This demonstrates that the bug fix worked, and that any subsequent use of this feature group for training or for inference will get the fresh and correct feature values, using a consistent feature implementation.\n", "\n", "Once the pipeline has completed executing, you should see that records returned from the online store have values in `NewFeature1` that no longer look like a multiple of the `ZipCode`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feature_values = []\n", "iterations = 0\n", "while True:\n", " feature_values = fs.get_latest_feature_values(new_fg_name, [2])\n", " if len(feature_values):\n", " if (feature_values[0]['NewFeature1'] <= 22222.0):\n", " break\n", " else:\n", " if iterations == 0:\n", " iterations += 1\n", " print('Waiting for pipeline to complete...')\n", " time.sleep(60)\n", " else:\n", " if iterations == 0:\n", " iterations += 1\n", " print('Waiting for pipeline to complete...')\n", " time.sleep(60)\n", "feature_values" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Show that we have multiple versions of the feature records in the offline store, including the original values and now the fixed values\n", "Be sure to give it a few minutes for the offline store to be updated. Once you see 2 different records being returned for this record ID, you know it is complete. You'll see that there's an old version with the original feature value, and new version, with a more recent `write_time` with the corrected value.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "while True:\n", " hist_df = fs.get_historical_offline_feature_values(new_fg_name, record_ids=[2])\n", " rec_count = hist_df.shape[0]\n", " if rec_count > 1:\n", " break\n", " else:\n", " time.sleep(60)\n", "hist_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Show that the latest offline store feature values have the corrected values" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.get_latest_offline_feature_values(new_fg_name, record_ids=[2])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Clean up\n", "Remove the pipeline for the first feature group, and the one for the new feature group." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.remove_feature_pipeline(FG_NAME)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.remove_feature_pipeline(new_fg_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Delete the feature groups" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# fs.delete_feature_group(FG_NAME)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# fs.delete_feature_group(new_fg_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }