{ "cells": [ { "cell_type": "markdown", "id": "8c51a9fc", "metadata": {}, "source": [ "# Amazon SageMaker Feature Store Update Feature Group\n", "\n", "This notebook demonstrates how a Feature Group in Amazon SageMaker Feature Store can be updated to add a new feature using the new UpdateFeatureGroup API." ] }, { "cell_type": "code", "execution_count": null, "id": "edb2de5f", "metadata": {}, "outputs": [], "source": [ "from sagemaker.feature_store.feature_group import FeatureGroup\n", "from time import gmtime, strftime, sleep\n", "from random import randint\n", "import boto3\n", "import sagemaker\n", "import pandas as pd\n", "import numpy as np\n", "import logging\n", "import random\n", "import time\n", "import subprocess\n", "import sys\n", "import importlib" ] }, { "cell_type": "code", "execution_count": null, "id": "acf1ecc3", "metadata": {}, "outputs": [], "source": [ "logger = logging.getLogger('__name__')\n", "logger.setLevel(logging.DEBUG)\n", "logger.addHandler(logging.StreamHandler())" ] }, { "cell_type": "markdown", "id": "4d61fce9", "metadata": {}, "source": [ "We ensure latest versions of the libraries are used." ] }, { "cell_type": "code", "execution_count": null, "id": "4015f85a", "metadata": {}, "outputs": [], "source": [ "if sagemaker.__version__ < '2.48.1':\n", " subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sagemaker==2.48.1'])\n", " importlib.reload(sagemaker)" ] }, { "cell_type": "code", "execution_count": null, "id": "760fca8c", "metadata": {}, "outputs": [], "source": [ "if boto3.__version__ < '1.24.23':\n", " subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'boto3==1.24.23'])\n", " importlib.reload(boto3)" ] }, { "cell_type": "code", "execution_count": null, "id": "c679602d", "metadata": {}, "outputs": [], "source": [ "logger.info(f'Using SageMaker version: {sagemaker.__version__}')\n", "logger.info(f'Using Pandas version: {pd.__version__}')\n", "logger.info(f'Using boto3 version: {boto3.__version__}')" ] }, { "cell_type": "code", "execution_count": null, "id": "13edd6a8", "metadata": {}, "outputs": [], "source": [ "import pprint\n", "pretty_printer = pprint.PrettyPrinter(indent=4)" ] }, { "cell_type": "code", "execution_count": null, "id": "7cda7e84", "metadata": {}, "outputs": [], "source": [ "sagemaker_session = sagemaker.Session()\n", "role = sagemaker.get_execution_role()\n", "default_bucket = sagemaker_session.default_bucket()\n", "logger.info(f'Default S3 bucket = {default_bucket}')\n", "prefix = 'sagemaker-feature-store'\n", "region = sagemaker_session.boto_region_name" ] }, { "cell_type": "code", "execution_count": null, "id": "ee797045", "metadata": {}, "outputs": [], "source": [ "boto_session = boto3.Session(region_name=region)\n", "sagemaker_runtime = boto_session.client(service_name='sagemaker', region_name=region)\n", "featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)\n", "s3 = boto_session.resource('s3')" ] }, { "cell_type": "markdown", "id": "7b10dafb", "metadata": {}, "source": [ "Read the customers csv data into a dataframe" ] }, { "cell_type": "code", "execution_count": null, "id": "b72d92fa", "metadata": {}, "outputs": [], "source": [ "customers_df = pd.read_csv('data/customers.csv')\n", "customers_df.head(5)" ] }, { "cell_type": "code", "execution_count": null, "id": "9ff155e0", "metadata": {}, "outputs": [], "source": [ "customers_df.dtypes" ] }, { "cell_type": "code", "execution_count": null, "id": "eb4121f9", "metadata": {}, "outputs": [], "source": [ "current_time_sec = int(round(time.time()))" ] }, { "cell_type": "markdown", "id": "ba95d38e", "metadata": {}, "source": [ "Add event_time timestamp. This is a point in time when a new event occurs that corresponds to the creation or update of a record in a feature group. All records in the feature group must have a corresponding event_time" ] }, { "cell_type": "code", "execution_count": null, "id": "bf402f2a", "metadata": {}, "outputs": [], "source": [ "customers_df['event_time'] = pd.Series([current_time_sec] * len(customers_df), dtype=\"float64\")\n", "customers_df.head(5)" ] }, { "cell_type": "code", "execution_count": null, "id": "4dfec07e", "metadata": {}, "outputs": [], "source": [ "customers_df['customer_id'] = customers_df['customer_id'].astype('string')" ] }, { "cell_type": "code", "execution_count": null, "id": "929c1281", "metadata": {}, "outputs": [], "source": [ "current_timestamp = strftime('%m-%d-%H-%M', gmtime())\n", "customers_feature_group_name = f'fs-customers-{current_timestamp}'\n", "logger.info(f'Feature group name = {customers_feature_group_name}')" ] }, { "cell_type": "code", "execution_count": null, "id": "0ac5fbdb", "metadata": {}, "outputs": [], "source": [ "customers_feature_group = FeatureGroup(name=customers_feature_group_name, \n", " sagemaker_session=sagemaker_session)\n", "customers_feature_group.load_feature_definitions(data_frame=customers_df)" ] }, { "cell_type": "code", "execution_count": null, "id": "9a293b58", "metadata": {}, "outputs": [], "source": [ "def wait_for_feature_group_creation_complete(feature_group):\n", " status = feature_group.describe().get('FeatureGroupStatus')\n", " print(f'Initial status: {status}')\n", " while status == 'Creating':\n", " logger.info(f'Waiting for feature group: {feature_group.name} to be created ...')\n", " time.sleep(5)\n", " status = feature_group.describe().get('FeatureGroupStatus')\n", " if status != 'Created':\n", " raise SystemExit(f'Failed to create feature group {feature_group.name}: {status}')\n", " logger.info(f'FeatureGroup {feature_group.name} was successfully created.')" ] }, { "cell_type": "markdown", "id": "4f3b99b9", "metadata": {}, "source": [ "Create customer feature group" ] }, { "cell_type": "code", "execution_count": null, "id": "c55ada4a", "metadata": {}, "outputs": [], "source": [ "customers_feature_group.create(s3_uri=f's3://{default_bucket}/{prefix}', \n", " record_identifier_name='customer_id', \n", " event_time_feature_name='event_time', \n", " role_arn=role, \n", " enable_online_store=True)" ] }, { "cell_type": "code", "execution_count": null, "id": "565e223a", "metadata": {}, "outputs": [], "source": [ "wait_for_feature_group_creation_complete(customers_feature_group)" ] }, { "cell_type": "code", "execution_count": null, "id": "652705f0", "metadata": {}, "outputs": [], "source": [ "describe_feature_group_result = sagemaker_runtime.describe_feature_group(\n", " FeatureGroupName=customers_feature_group_name\n", ")\n", "pretty_printer.pprint(describe_feature_group_result)" ] }, { "cell_type": "markdown", "id": "d0cad816", "metadata": {}, "source": [ "Ingest the intitial data frame into the Feature Group. The ingest may take some time since data is buffered, batched, and written into offline store i.e. Amazon S3 within 15 minutes." ] }, { "cell_type": "code", "execution_count": null, "id": "57e089b4", "metadata": {}, "outputs": [], "source": [ "%%time\n", "logger.info(f'Ingesting data into feature group: {customers_feature_group.name} ')\n", "customers_feature_group.ingest(data_frame=customers_df, \n", " max_workers=3, \n", " wait=True)" ] }, { "cell_type": "code", "execution_count": null, "id": "0425d618", "metadata": {}, "outputs": [], "source": [ "customer_id = f'C{randint(1, 500)}'\n", "logger.info(f'customer_id={customer_id}') " ] }, { "cell_type": "markdown", "id": "b01ad60a", "metadata": {}, "source": [ "Verify a record in the online feature store" ] }, { "cell_type": "code", "execution_count": null, "id": "0e56312d", "metadata": {}, "outputs": [], "source": [ "feature_record = featurestore_runtime.get_record(FeatureGroupName=customers_feature_group_name, \n", " RecordIdentifierValueAsString=customer_id)\n", "feature_record" ] }, { "cell_type": "markdown", "id": "447bf4f2", "metadata": {}, "source": [ "Let us run some Athena queries to verify the offline feature store data." ] }, { "cell_type": "code", "execution_count": null, "id": "1b6428e7", "metadata": {}, "outputs": [], "source": [ "customers_query = customers_feature_group.athena_query()\n", "customers_table = customers_query.table_name" ] }, { "cell_type": "code", "execution_count": null, "id": "092ced4f", "metadata": {}, "outputs": [], "source": [ "output_location = f's3://{default_bucket}/{prefix}/query_results/'" ] }, { "cell_type": "code", "execution_count": null, "id": "08d2335e", "metadata": {}, "outputs": [], "source": [ "query_string = f'SELECT * FROM \"{customers_table}\" limit 10'" ] }, { "cell_type": "markdown", "id": "a6420114", "metadata": {}, "source": [ "Now we run the query to load all of the data into dataframe and explore " ] }, { "cell_type": "code", "execution_count": null, "id": "e4d6029a", "metadata": {}, "outputs": [], "source": [ "customers_query.run(query_string=query_string,output_location=output_location)\n", "customers_query.wait()\n", "athena_df = customers_query.as_dataframe()\n", "athena_df.head()" ] }, { "cell_type": "markdown", "id": "38a4532e", "metadata": {}, "source": [ "#### Query in Athena console\n", "\n", "If it is for the first time we are launching Athena in AWS console we need to click on `Get Started` button and then before we run the first query we need to set up a query results location in Amazon S3. \n", "\n", "After setting the query results location, on the left panel we need to select the `AwsDataCatalog` as Data source and the `sagemaker_featurestore` as Database.\n", "\n", "We can run now run a query for the offline feature store data in Athena. To select the entries from the customers feature group we use the following SQL query. You will need to replace the customers table name with the corresponding value from the one created here in the notebook.\n", "\n", "```sql\n", "select * from \"\"\n", "limit 10\n", "```\n", "\n", "![Athena Query](./images/athena-query.png \"Athena Query\")" ] }, { "cell_type": "markdown", "id": "30a6b222", "metadata": {}, "source": [ "#### Do not proceed till all of the 500 records are ingested into the feature store. As mentioned before the ingest may take some time since data is buffered, batched, and written into offline store i.e. Amazon S3 within 15 minutes." ] }, { "cell_type": "markdown", "id": "6fc10fc4", "metadata": {}, "source": [ "The sample product set that we have are spread out across different categories - baby products, candies, cleaning products etc. So let us assume that a customer *“having kids or not”* is defintely an indicator of them buying baby and kids products. Lets go ahead and modify the customer feature group to add this new feature. " ] }, { "cell_type": "code", "execution_count": null, "id": "1fd47f66", "metadata": {}, "outputs": [], "source": [ "sagemaker_runtime.update_feature_group(\n", " FeatureGroupName=customers_feature_group_name,\n", " FeatureAdditions=[\n", " {\"FeatureName\": \"has_kids\", \"FeatureType\": \"Integral\"}\n", " ])" ] }, { "cell_type": "markdown", "id": "dee9417a", "metadata": {}, "source": [ "When update_feature_group API is invoked, the control plane will reflect the schema change instantaneously but the data plane will take at the most 5 minutes to update its feature group schema. We must ensure that enough time is given for the update operation before proceeding to data ingestion." ] }, { "cell_type": "code", "execution_count": null, "id": "d8370578", "metadata": {}, "outputs": [], "source": [ "time.sleep(60)" ] }, { "cell_type": "code", "execution_count": null, "id": "28892af4", "metadata": {}, "outputs": [], "source": [ "describe_feature_group_result = sagemaker_runtime.describe_feature_group(\n", " FeatureGroupName=customers_feature_group_name\n", ")\n", "pretty_printer.pprint(describe_feature_group_result)" ] }, { "cell_type": "code", "execution_count": null, "id": "851bd9d4", "metadata": {}, "outputs": [], "source": [ "customers_query.run(query_string=query_string,output_location=output_location)\n", "customers_query.wait()\n", "athena_df_update = customers_query.as_dataframe()\n", "athena_df_update.head()" ] }, { "cell_type": "code", "execution_count": null, "id": "e9edd16a", "metadata": {}, "outputs": [], "source": [ "customers_df.drop(['event_time'],axis=1)" ] }, { "cell_type": "markdown", "id": "914c02be", "metadata": {}, "source": [ "We randomly generate 0 or 1 for \"has_kids\" feature and ingest into feature group" ] }, { "cell_type": "code", "execution_count": null, "id": "cc195f83", "metadata": {}, "outputs": [], "source": [ "customers_df['has_kids'] =np.random.randint(0, 2, customers_df.shape[0])\n", "customers_df.dtypes" ] }, { "cell_type": "code", "execution_count": null, "id": "52029370", "metadata": {}, "outputs": [], "source": [ "customers_df['event_time'] = pd.Series([current_time_sec] * len(customers_df), dtype=\"float64\")\n", "customers_df.head(10)" ] }, { "cell_type": "markdown", "id": "d853dfa2", "metadata": {}, "source": [ "Ingest the updated data into feature group. In case ingest operation throws errors regarding feature not being present in the Feature Group, wait for the update operation executed previosuly to finish and run the ingest again." ] }, { "cell_type": "code", "execution_count": null, "id": "8a39ef1e", "metadata": {}, "outputs": [], "source": [ "%%time\n", "logger.info(f'Ingesting data into feature group: {customers_feature_group.name} ...')\n", "customers_feature_group.ingest(data_frame=customers_df, max_workers=3, wait=True)\n", "logger.info(f'{len(customers_df)} customer records ingested into feature group: {customers_feature_group.name}')" ] }, { "cell_type": "code", "execution_count": null, "id": "c4b17ff8", "metadata": {}, "outputs": [], "source": [ "get_record_result = featurestore_runtime.get_record(\n", " FeatureGroupName=customers_feature_group_name,\n", " RecordIdentifierValueAsString=customer_id\n", ")\n", "pretty_printer.pprint(get_record_result)" ] }, { "cell_type": "markdown", "id": "2afd1902", "metadata": {}, "source": [ "Let us re run the Athena query to verify data for this new feature. The ingest will take time since data is buffered, batched, and written into offline store i.e. Amazon S3 within 15 minutes." ] }, { "cell_type": "code", "execution_count": null, "id": "17450a11", "metadata": {}, "outputs": [], "source": [ "customers_query.run(query_string=query_string,output_location=output_location)\n", "customers_query.wait()\n", "athena_df_update = customers_query.as_dataframe()\n", "athena_df_update.head()" ] }, { "cell_type": "markdown", "id": "06c093db", "metadata": {}, "source": [ "Verify via Athena console that data has been added. You will see two rows for each customer record, one that has no value for \"has_kids\" and one that has a value." ] }, { "cell_type": "markdown", "id": "efe9e0a4", "metadata": {}, "source": [ "![Athena Query Final](./images/athena-query-final.png \"Athena Query Final\")" ] }, { "cell_type": "markdown", "id": "200bf969", "metadata": {}, "source": [ "### Cleanup" ] }, { "cell_type": "markdown", "id": "5a4fd11f", "metadata": {}, "source": [ "Now that we have seen how features can be added to feature groups, it is time to delete unwated resources to not inciur charges" ] }, { "cell_type": "markdown", "id": "581f2d8d", "metadata": {}, "source": [ "Delete the S3 artifacts for the offline store" ] }, { "cell_type": "code", "execution_count": null, "id": "fbcf30d1", "metadata": {}, "outputs": [], "source": [ "describe_feature_group_result = sagemaker_runtime.describe_feature_group(\n", " FeatureGroupName=customers_feature_group_name\n", ")\n", "pretty_printer.pprint(describe_feature_group_result)" ] }, { "cell_type": "code", "execution_count": null, "id": "bdb6b012", "metadata": {}, "outputs": [], "source": [ "s3_config = describe_feature_group_result['OfflineStoreConfig']['S3StorageConfig']\n", "s3_uri = s3_config['ResolvedOutputS3Uri']\n", "full_prefix = '/'.join(s3_uri.split('/')[3:])\n", "logger.info(full_prefix)" ] }, { "cell_type": "code", "execution_count": null, "id": "7588a381", "metadata": {}, "outputs": [], "source": [ "bucket = s3.Bucket(default_bucket)\n", "offline_objects = bucket.objects.filter(Prefix=full_prefix)\n", "offline_objects.delete()" ] }, { "cell_type": "markdown", "id": "29f17451", "metadata": {}, "source": [ "Delete the feature group" ] }, { "cell_type": "code", "execution_count": null, "id": "ee5419d0", "metadata": {}, "outputs": [], "source": [ "customers_feature_group.delete()" ] }, { "cell_type": "code", "execution_count": null, "id": "4511ebf6", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.12" } }, "nbformat": 4, "nbformat_minor": 5 }