{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Copyright 2021 Amazon.com and its affiliates; all rights reserved. This file is AWS Content and may not be duplicated or distributed without permission"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Try new Feature Store helper class"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%load_ext autoreload\n",
"%autoreload 2"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sm.feature_store_helper import FeatureStore\n",
"fs = FeatureStore()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"help(fs)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ids = [5,6]\n",
"features = ['ZipCode'] #['*'] \n",
"hist_df = fs.get_historical_offline_feature_values('customers-summit', record_ids=ids, feature_names=features,\n",
" verbose=False)\n",
"hist_df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ids = [5,6]\n",
"features = ['*'] \n",
"latest_df = fs.get_latest_offline_feature_values('customers-summit', record_ids=ids, feature_names=features,\n",
" verbose=False)\n",
"latest_df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.get_latest_offline_feature_values_as_of('customers-summit', '2020-02-02T00:00:00Z')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.list_feature_groups()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.list_feature_groups(name_contains='recsys')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.describe_feature_group('customers-10k-demo')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs._wait_for_feature_group_deletion_complete('customers-10k-demo')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.delete_feature_group('customers-summit-sql-sql-v2')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import sagemaker\n",
"default_bucket = sagemaker.Session().default_bucket()\n",
"data_source = f's3://{default_bucket}/sagemaker-feature-store/hello-data/'\n",
"\n",
"fs.schedule_feature_pipeline(data_source, 'customers-summit')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.update_feature_pipeline(data_source, 'customers-summit', instance_type='ml.m5.large')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.remove_feature_pipeline('customers-summit')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sm.feature_store_helper import FeatureStore\n",
"fs = FeatureStore()\n",
"\n",
"df = pd.read_csv('./customers.csv')\n",
"ORIGINAL_RECORD_COUNT = df.shape[0]\n",
"df.head()\n",
"\n",
"tags = {'Environment': 'DEV', \n",
" 'CostCenter': 'C20', \n",
" 'Maintainer': 'John Smith', \n",
" 'DocURL': 'https://www.google.com'}\n",
"fs.create_fg_from_df('tmp-fg', df, 'this is my new fg', tags=tags, id_name='Id')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sm.feature_store_helper import FeatureStore\n",
"fs = FeatureStore()\n",
"\n",
"fs.ingest_from_df('tmp-fg', df)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.get_latest_feature_values('tmp-fg', [4], features=['ZipCode'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.get_latest_feature_values('tmp-fg', [4])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.get_latest_feature_values('tmp-fg', [4,2,6])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.delete_record('tmp-fg', 6, '2020-02-01T00:00:00Z')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sm.feature_store_helper import FeatureStore\n",
"fs = FeatureStore()\n",
"fs_dict = fs.get_latest_featureset_values({'Id': 2},\n",
" ['tmp-fg:ZipCode'])\n",
"\n",
"print(f'Feature set as dictionary: {fs_dict}')\n",
"\n",
"print(f'Feature set as vector: {list(fs_dict.values())}')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.describe_feature_group('tmp-fg')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.delete_feature_group('tmp-fg')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.create_fg_from_df('tmp-fg-light', df, id_name='Id', event_time_name='UpdateTime')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.describe_feature_group('tmp-fg-light')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.delete_feature_group('tmp-fg-light')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sm.feature_store_helper import FeatureStore\n",
"fs = FeatureStore()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.describe_feature_group('tmp-fg')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.get_tags('tmp-fg')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fg_name = 'housing'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sm.feature_store_helper import FeatureStore\n",
"fs = FeatureStore()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"right_now = datetime.now()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pytz\n",
"\n",
"created_at = fs.describe_feature_group(fg_name)['CreationTime']\n",
"rn_2 = right_now.replace(tzinfo=pytz.UTC)\n",
"created_n_days = (rn_2 - created_at).days\n",
"print(f'\"{fg_name}\" was created {created_n_days} days ago (\"{created_at}\")')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fg_name = 'housing'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.sample(fg_name, sample_pct=5)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"count = fs.get_historical_record_count(fg_name)\n",
"print(f'Found {count:,d} total records in offline store for \"{fg_name}\"')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import sys\n",
"import os\n",
"\n",
"notebook_dir = os.getcwd()\n",
"package_dir = notebook_dir + '/ml-lineage-helper'\n",
"sys.path.append(package_dir)\n",
"\n",
"from ml_lineage_helper import *\n",
"from ml_lineage_helper.query_lineage import *"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fg_name = 'tmp-fg'\n",
"fg_name = 'fscw-orders-08-10-17-21-52'\n",
"\n",
"def get_models_list(fg_name):\n",
" try:\n",
" query_lineage = QueryLineage()\n",
" fg_arn = fs.describe_feature_group(fg_name)['FeatureGroupArn']\n",
" models_df = query_lineage.get_models_from_feature_group(fg_arn)\n",
" if models_df is not None:\n",
" models_list = models_df['SageMaker Model Name'].values[0:4]\n",
" else:\n",
" models_list = []\n",
" except:\n",
" models_list = []\n",
" pass\n",
" return models_list\n",
"\n",
"models_string = ', '.join(get_models_list(fg_name))\n",
"print(f'Models: {models_string}')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.get_minmax_timestamps('housing')\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fg_name = 'tmp-fg'\n",
"fg_name = 'fscw-orders-08-10-17-21-52'\n",
"fg_name = 'customers-summit'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pytz\n",
"from datetime import datetime\n",
"from IPython.core.display import display, HTML, Markdown\n",
"import pandas as pd\n",
"\n",
"def fg_profile_view(fs, fg_name):\n",
" fg_desc = fs.describe_feature_group(fg_name)\n",
" if 'Description' in fg_desc:\n",
" description = fg_desc['Description']\n",
" else:\n",
" description = ''\n",
" online = fg_desc['OnlineStoreConfig']['EnableOnlineStore']\n",
" offline = fg_desc['OfflineStoreStatus']['Status'] == 'Active'\n",
" if online and not offline:\n",
" mode_string = 'Online-only'\n",
" elif online and offline:\n",
" mode_string = 'Online and offline'\n",
" elif offline and not online:\n",
" mode_string = 'Offline-only'\n",
" \n",
" tags_dict = fs.get_tags(fg_name)\n",
"\n",
" models_list = get_models_list(fg_name)\n",
" models_string = ', '.join(models_list)\n",
" models_count = len(models_list)\n",
"\n",
" fg_summary_markdown = \\\n",
" f'Name: {fg_name}
' +\\\n",
" f'Description: {description}
' +\\\n",
" f'Mode: {mode_string}
'\n",
" \n",
" if len(tags_dict) == 0:\n",
" fg_summary_markdown += f'Tags: None
'\n",
" else:\n",
" fg_summary_markdown += f'Tags: {tags_dict}
'\n",
" \n",
" if models_count == 0:\n",
" fg_summary_markdown += f'Models using this feature group: None
'\n",
" else:\n",
" fg_summary_markdown += f'Models using this feature group: {models_count}: {models_string}
'\n",
"\n",
" if offline:\n",
" offline_store_url = fs.get_offline_store_url(fg_name)\n",
" glue_console_url = fs.get_glue_table_url(fg_name)\n",
" athena_url = 'https://console.aws.amazon.com/athena/query-editor'\n",
" count = fs.get_historical_record_count(fg_name)\n",
"\n",
" right_now = datetime.now()\n",
"\n",
" times_df = fs.get_minmax_timestamps(fg_name)\n",
"\n",
" most_recent_write = times_df.iloc[0]['max_write_time'] #fs.get_most_recent_write_time(fg_name)\n",
" last_write = datetime.fromisoformat(most_recent_write)\n",
" last_n_days = (right_now - last_write).days\n",
"\n",
" oldest_write_time = times_df.iloc[0]['min_write_time'] #fs.get_oldest_write_time(fg_name)\n",
" oldest_write = datetime.fromisoformat(oldest_write_time)\n",
" oldest_n_days = (right_now - oldest_write).days\n",
"\n",
" max_event_time = times_df.iloc[0]['max_event_time']\n",
" min_event_time = times_df.iloc[0]['min_event_time']\n",
"\n",
" created_at = fs.describe_feature_group(fg_name)['CreationTime']\n",
" rn_2 = right_now.replace(tzinfo=pytz.UTC)\n",
" created_n_days = (rn_2 - created_at).days\n",
"\n",
" fg_summary_markdown += \\\n",
" f'Total records: {count:,d}
' +\\\n",
" f'Created: {created_n_days} days ago ({created_at})
' +\\\n",
" f'Oldest record: {oldest_n_days} days ago ({oldest_write})
' +\\\n",
" f'Most recent record: {last_n_days} days ago ({most_recent_write})
' +\\\n",
" f'Event time range: {min_event_time} -> to -> {max_event_time}
' +\\\n",
" f'Offline store in s3 console: [here]({offline_store_url})
' +\\\n",
" f'Glue table in console: [here]({glue_console_url})
' +\\\n",
" f'Athena query editor: [here]({athena_url})
'\n",
" display(Markdown(fg_summary_markdown))\n",
" display(Markdown(f'Sample offline store records:
'))\n",
" sample_df = fs.sample(fg_name, 5)\n",
" display(sample_df.head())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fg_profile_view(fs, 'customers-summit')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fg_profile_view(fs, 'housing')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tmp_filename = fs.download_sample_offline_file('customers-summit')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"p_df = pd.read_parquet(tmp_filename)\n",
"p_df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.get_tags('customers-summit')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sm.feature_store_helper import FeatureStore\n",
"fs = FeatureStore()\n",
"\n",
"tmp_df = fs.get_minmax_timestamps(fg_name)\n",
"tmp_df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tmp_df.iloc[0]['max_event_time']"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fg_name = 'fscw-orders-08-10-17-21-52'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ml_lineage = MLLineageHelper()\n",
"lineage = ml_lineage.create_ml_lineage('pytorch-hosted-model-2021-10-09-13-32-06-083', \n",
" model_name='house-price-estimate',\n",
" feature_group_names=[fg_name])\n",
"lineage"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print('Here are a few sample records:')\n",
"fs.sample('housing', 5)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"from sm.feature_store_helper import FeatureStore\n",
"fs = FeatureStore()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"multi_id_events = [['2020-02-01T08:30:00Z', 6, 450],\n",
" ['2020-02-02T10:15:30Z', 5, 5000],\n",
" ['2020-02-03T13:20:59Z', 1, 1999],\n",
" ['2021-01-01T00:00:00Z', 1, 2001]\n",
" ]\n",
"multi_id_df = pd.DataFrame(multi_id_events, columns=['my_event_time', 'Id', 'HOUSE_ID'])\n",
"multi_id_df.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"fs.get_features(multi_id_df, 'my_event_time', \n",
" features=['customers:ZipCode', \n",
" 'payments:avg_amount', \n",
" 'payments:avg_days_late',\n",
" 'housing:SQUARE_FEET',\n",
" 'housing:PRICE'],\n",
" parallel=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"fs.get_features(multi_id_df, 'my_event_time', \n",
" features=['customers:ZipCode', \n",
" 'payments:avg_amount', \n",
" 'payments:avg_days_late',\n",
" 'housing:SQUARE_FEET',\n",
" 'housing:PRICE'],\n",
" parallel=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"fs.get_features(multi_id_df, 'my_event_time', \n",
" features=['customers:ZipCode', \n",
" 'payments:avg_amount', \n",
" 'payments:avg_days_late',\n",
" 'housing:SQUARE_FEET',\n",
" 'housing:PRICE'],\n",
" parallel=False,\n",
" verbose=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"fs.get_features(multi_id_df, 'my_event_time', \n",
" features=['customers:ZipCode', \n",
" 'payments:avg_amount', \n",
" 'payments:avg_days_late',\n",
" 'housing:SQUARE_FEET',\n",
" 'housing:PRICE'],\n",
" parallel=False) #, verbose=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"fs.get_features(multi_id_df, 'my_event_time', \n",
" features=['customers:ZipCode', \n",
" 'payments:avg_amount', \n",
" 'payments:avg_days_late',\n",
" 'housing:SQUARE_FEET',\n",
" 'housing:PRICE'],\n",
" parallel=False) #, verbose=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"fs.get_features(multi_id_df, 'my_event_time', \n",
" features=['customers:ZipCode', \n",
" 'payments:avg_amount', \n",
" 'payments:avg_days_late',\n",
" 'housing:SQUARE_FEET',\n",
" 'housing:PRICE'],\n",
" parallel=True) #, verbose=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%load_ext autoreload\n",
"%autoreload 2\n",
"\n",
"import pandas as pd\n",
"from sm.feature_store_helper import FeatureStore\n",
"fs = FeatureStore()\n",
"\n",
"extended_order_events = [['2021-07-07T10:01:00Z', 'C1', 'O1', 'P1'],\n",
" ['2021-07-07T10:02:00Z', 'C2', 'O2', 'P2'],\n",
" ['2021-07-07T10:03:00Z', 'C3', 'O3', 'P3'],\n",
" ['2021-07-07T10:04:00Z', 'C4', 'O4', 'P4']\n",
" ]\n",
"extended_orders_df = pd.DataFrame(extended_order_events,\n",
" columns=['my_event_time', 'customer_id', 'order_id', 'product_id'])\n",
"fs.get_features(extended_orders_df, 'my_event_time', \n",
" features=['fscw-orders-08-10-17-21-52:*'],\n",
" verbose=True,\n",
" parallel=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"full_df = fs.get_latest_offline_feature_values('tmp-fg', feature_names=['ZipCode','Churn'])\n",
"full_df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"full_df = fs.get_latest_offline_feature_values_as_of('customers-summit', '2020-02-03T08:30:00Z', feature_names=['ZipCode','Churn'])\n",
"full_df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"instance_type": "ml.t3.medium",
"kernelspec": {
"display_name": "Python 3 (Data Science)",
"language": "python",
"name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.10"
}
},
"nbformat": 4,
"nbformat_minor": 4
}