{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Copyright 2021 Amazon.com and its affiliates; all rights reserved. This file is AWS Content and may not be duplicated or distributed without permission" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Hello World, Amazon SageMaker Feature Store\n", "This notebook provides a demo of how easy it is to use SageMaker Feature Store. It does this by leveraging a simple set of utility functions that wrap the feature store API to keep it simple for a data scientist using Python." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### A few imports" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# %pip install sagemaker --upgrade\n", "%pip install sagemaker --upgrade\n", "%pip install pandas --upgrade\n", "%pip uninstall -y boto3\n", "%pip uninstall -y botocore\n", "%pip uninstall -y aiobotocore\n", "%pip install boto3 --upgrade\n", "%pip install botocore --upgrade\n", "%pip install s3fs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from utilities.feature_store_helper import FeatureStore\n", "\n", "from IPython.core.display import display, HTML, Markdown\n", "import pandas as pd\n", "import time\n", "import json\n", "import os\n", "from sklearn.ensemble import RandomForestClassifier\n", "\n", "FG_NAME = 'fs-demo-2022-03-24'\n", "FG_NAME = FG_NAME\n", "\n", "fs = FeatureStore()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "claims_features = [[0, 8019, 0, 0, 1, 4000], \n", " [1, 0, 0, 0, 0, 8000], \n", " [2, 540, 1, 2, 10, 2000], \n", " [3, 955, 0, 1, 3, 3500], \n", " [4, 1200, 0, 0, 5, 10544], \n", " [5, 600, 0, 1, 2, 7843]]\n", "c_df = pd.DataFrame(claims_features, columns=['Id', 'avg_claim_amount', 'num_claims_last_7d', 'num_claims_last_1y', \n", " 'num_claims_lifetime', 'total_premiums'])\n", "c_df['update_time'] = '2020-02-01T00:00:00Z'\n", "c_df.head()\n", "fs.create_fg_from_df('claims', c_df, id_name='Id', event_time_name='update_time')\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.ingest_from_df('claims', c_df)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "payment_features = [[0, 1000, 0], [1, 1100, 5], [2, 1200, 2], [3, 1300, 3], [4, 1400, 4], [5, 1500, 5], [6, 1600, 6]]\n", "p_df = pd.DataFrame(payment_features, columns=['Id', 'avg_amount', 'avg_days_late'])\n", "p_df['update_time'] = '2020-02-01T00:00:00Z'\n", "p_df.head()\n", "fs.create_fg_from_df('payments', p_df, id_name='Id', event_time_name='update_time')\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.ingest_from_df('payments', p_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load sample customer data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = pd.read_csv('utilities/customers.csv')\n", "ORIGINAL_RECORD_COUNT = df.shape[0]\n", "df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create a new feature group, with schema inferred directly from my dataframe" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tags = {'Environment': 'DEV', \n", " 'CostCenter': 'C20', \n", " 'Maintainer': 'John Smith', \n", " 'DocURL': 'https://www.google.com'}\n", "fs.create_fg_from_df(FG_NAME, df, tags=tags, id_name='Id')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Ingest features from my dataframe into my new feature group" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.ingest_from_df(FG_NAME, df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Show that we can lookup the latest feature values\n", "Notice that the feature values come back in the proper datatype, as defined in the feature definitions." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.get_latest_feature_values(FG_NAME, [4], features=['ZipCode'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Can get selected features, or in this case, we get all features" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.get_latest_feature_values(FG_NAME, [4,2,6])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Show that we can get the history of feature values\n", "The offline store is append-only. New records are added." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Now, ingest some new data with later event timestamps\n", "We'll put in two new sets of records each with the event timestamp advanced one day, and the zipcode changed. We should end up with three total sets of records:\n", "\n", "1. Original, event timestamp Feb 1, zip code 11111\n", "2. New set, with event timestamp Feb 2, zip code 22222\n", "3. Final set, with event timestamp Feb 3, zip code 33333" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df['UpdateTime'] = '2020-02-02T00:00:00Z'\n", "df['ZipCode'] = 22222\n", "fs.ingest_from_df(FG_NAME, df)\n", "\n", "df['UpdateTime'] = '2020-02-03T00:00:00Z'\n", "df['ZipCode'] = 33333\n", "fs.ingest_from_df(FG_NAME, df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Look up the full history for a few id's\n", "It takes a few minutes (up to 15) for the data to be available in the offline store. We'll wait until we see 3 copies of each of the records since we've ingested each record 3 times so far." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ids = [5,6]\n", "features = ['*'] \n", "\n", "mins = 0\n", "while True:\n", " hist_df = fs.get_historical_offline_feature_values(FG_NAME, record_ids=ids, feature_names=features,\n", " verbose=False)\n", " rec_count = hist_df.shape[0]\n", " if rec_count < (3 * len(ids)):\n", " if mins == 0:\n", " print(f'Waiting for offline store data...')\n", " time.sleep(60)\n", " mins += 1\n", " else:\n", " break\n", "\n", "print(f'\\nData is available, {rec_count} records. Waited {mins} minutes\\n')\n", "hist_df.sort_values(by=['id','zipcode']).head(30)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Now show the latest offline features" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.get_latest_offline_feature_values(FG_NAME, record_ids=[5,6])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Browse the set of offline store files in the S3 console" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3_console_url = fs.get_offline_store_url(FG_NAME)\n", "display(Markdown(f'Review offline store partitioned data files here: [{s3_console_url}]({s3_console_url})'))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### See the Glue table that can be used for Athena queries" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "glue_console_url = fs.get_glue_table_url(FG_NAME)\n", "display(Markdown(f'To see the Glue table that was created for you, go here: [{glue_console_url}]({glue_console_url})'))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Now let's see what the online store thinks are the latest values" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.get_latest_feature_values(FG_NAME, [4,2,6])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Train a simple model with features extracted from the feature store\n", "For our example, the dataset we want to train on will have the latest values for specific features for each record id." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "full_df = fs.get_latest_offline_feature_values(FG_NAME, feature_names=['ZipCode','Churn'])\n", "full_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "train_rec_count = int(full_df.shape[0] * 0.70)\n", "\n", "train_df = full_df[0:train_rec_count]\n", "test_df = full_df[train_rec_count:6]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Make predictions using the trained model" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "clf = RandomForestClassifier(max_depth=2, random_state=0)\n", "clf.fit(train_df[['ZipCode']], train_df[['Churn']].values.ravel())\n", "clf.predict(test_df[['ZipCode']])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Make predictions using features from the online store" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "customer_id = 1" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "customer_features = fs.get_latest_feature_values(FG_NAME, [customer_id], features=['ZipCode'])[0]['ZipCode']\n", "preds = clf.predict([[customer_features]])\n", "churn_pred = 'will' if preds[0] == 1 else 'will NOT'\n", "print(f'Customer {customer_id} {churn_pred} churn.')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Explore feature groups and metadata" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Find features" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.list_feature_groups('demo')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Describe a feature group" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "descr = fs.describe_feature_group(FG_NAME)\n", "print(json.dumps(descr, indent=4, sort_keys=True, default=str))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Get feature group tags" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "doc_url = fs.get_tags(FG_NAME)['DocURL']\n", "display(Markdown(f'Docs for feature group \"**{FG_NAME}**\" is [here]({doc_url})'))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }