{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Copyright 2021 Amazon.com and its affiliates; all rights reserved. This file is AWS Content and may not be duplicated or distributed without permission" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Try new Feature Store helper class" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%load_ext autoreload\n", "%autoreload 2" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sm.feature_store_helper import FeatureStore\n", "fs = FeatureStore()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "help(fs)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ids = [5,6]\n", "features = ['ZipCode'] #['*'] \n", "hist_df = fs.get_historical_offline_feature_values('customers-summit', record_ids=ids, feature_names=features,\n", " verbose=False)\n", "hist_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ids = [5,6]\n", "features = ['*'] \n", "latest_df = fs.get_latest_offline_feature_values('customers-summit', record_ids=ids, feature_names=features,\n", " verbose=False)\n", "latest_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.get_latest_offline_feature_values_as_of('customers-summit', '2020-02-02T00:00:00Z')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.list_feature_groups()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.list_feature_groups(name_contains='recsys')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.describe_feature_group('customers-10k-demo')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs._wait_for_feature_group_deletion_complete('customers-10k-demo')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.delete_feature_group('customers-summit-sql-sql-v2')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "default_bucket = sagemaker.Session().default_bucket()\n", "data_source = f's3://{default_bucket}/sagemaker-feature-store/hello-data/'\n", "\n", "fs.schedule_feature_pipeline(data_source, 'customers-summit')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.update_feature_pipeline(data_source, 'customers-summit', instance_type='ml.m5.large')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.remove_feature_pipeline('customers-summit')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sm.feature_store_helper import FeatureStore\n", "fs = FeatureStore()\n", "\n", "df = pd.read_csv('./customers.csv')\n", "ORIGINAL_RECORD_COUNT = df.shape[0]\n", "df.head()\n", "\n", "tags = {'Environment': 'DEV', \n", " 'CostCenter': 'C20', \n", " 'Maintainer': 'John Smith', \n", " 'DocURL': 'https://www.google.com'}\n", "fs.create_fg_from_df('tmp-fg', df, 'this is my new fg', tags=tags, id_name='Id')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sm.feature_store_helper import FeatureStore\n", "fs = FeatureStore()\n", "\n", "fs.ingest_from_df('tmp-fg', df)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.get_latest_feature_values('tmp-fg', [4], features=['ZipCode'])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.get_latest_feature_values('tmp-fg', [4])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.get_latest_feature_values('tmp-fg', [4,2,6])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.delete_record('tmp-fg', 6, '2020-02-01T00:00:00Z')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sm.feature_store_helper import FeatureStore\n", "fs = FeatureStore()\n", "fs_dict = fs.get_latest_featureset_values({'Id': 2},\n", " ['tmp-fg:ZipCode'])\n", "\n", "print(f'Feature set as dictionary: {fs_dict}')\n", "\n", "print(f'Feature set as vector: {list(fs_dict.values())}')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.describe_feature_group('tmp-fg')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.delete_feature_group('tmp-fg')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.create_fg_from_df('tmp-fg-light', df, id_name='Id', event_time_name='UpdateTime')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.describe_feature_group('tmp-fg-light')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.delete_feature_group('tmp-fg-light')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sm.feature_store_helper import FeatureStore\n", "fs = FeatureStore()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.describe_feature_group('tmp-fg')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.get_tags('tmp-fg')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fg_name = 'housing'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sm.feature_store_helper import FeatureStore\n", "fs = FeatureStore()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "right_now = datetime.now()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pytz\n", "\n", "created_at = fs.describe_feature_group(fg_name)['CreationTime']\n", "rn_2 = right_now.replace(tzinfo=pytz.UTC)\n", "created_n_days = (rn_2 - created_at).days\n", "print(f'\"{fg_name}\" was created {created_n_days} days ago (\"{created_at}\")')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fg_name = 'housing'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.sample(fg_name, sample_pct=5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "count = fs.get_historical_record_count(fg_name)\n", "print(f'Found {count:,d} total records in offline store for \"{fg_name}\"')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sys\n", "import os\n", "\n", "notebook_dir = os.getcwd()\n", "package_dir = notebook_dir + '/ml-lineage-helper'\n", "sys.path.append(package_dir)\n", "\n", "from ml_lineage_helper import *\n", "from ml_lineage_helper.query_lineage import *" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fg_name = 'tmp-fg'\n", "fg_name = 'fscw-orders-08-10-17-21-52'\n", "\n", "def get_models_list(fg_name):\n", " try:\n", " query_lineage = QueryLineage()\n", " fg_arn = fs.describe_feature_group(fg_name)['FeatureGroupArn']\n", " models_df = query_lineage.get_models_from_feature_group(fg_arn)\n", " if models_df is not None:\n", " models_list = models_df['SageMaker Model Name'].values[0:4]\n", " else:\n", " models_list = []\n", " except:\n", " models_list = []\n", " pass\n", " return models_list\n", "\n", "models_string = ', '.join(get_models_list(fg_name))\n", "print(f'Models: {models_string}')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.get_minmax_timestamps('housing')\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fg_name = 'tmp-fg'\n", "fg_name = 'fscw-orders-08-10-17-21-52'\n", "fg_name = 'customers-summit'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pytz\n", "from datetime import datetime\n", "from IPython.core.display import display, HTML, Markdown\n", "import pandas as pd\n", "\n", "def fg_profile_view(fs, fg_name):\n", " fg_desc = fs.describe_feature_group(fg_name)\n", " if 'Description' in fg_desc:\n", " description = fg_desc['Description']\n", " else:\n", " description = ''\n", " online = fg_desc['OnlineStoreConfig']['EnableOnlineStore']\n", " offline = fg_desc['OfflineStoreStatus']['Status'] == 'Active'\n", " if online and not offline:\n", " mode_string = 'Online-only'\n", " elif online and offline:\n", " mode_string = 'Online and offline'\n", " elif offline and not online:\n", " mode_string = 'Offline-only'\n", " \n", " tags_dict = fs.get_tags(fg_name)\n", "\n", " models_list = get_models_list(fg_name)\n", " models_string = ', '.join(models_list)\n", " models_count = len(models_list)\n", "\n", " fg_summary_markdown = \\\n", " f'Name: {fg_name}
' +\\\n", " f'Description: {description}
' +\\\n", " f'Mode: {mode_string}
'\n", " \n", " if len(tags_dict) == 0:\n", " fg_summary_markdown += f'Tags: None
'\n", " else:\n", " fg_summary_markdown += f'Tags: {tags_dict}
'\n", " \n", " if models_count == 0:\n", " fg_summary_markdown += f'Models using this feature group: None
'\n", " else:\n", " fg_summary_markdown += f'Models using this feature group: {models_count}: {models_string}
'\n", "\n", " if offline:\n", " offline_store_url = fs.get_offline_store_url(fg_name)\n", " glue_console_url = fs.get_glue_table_url(fg_name)\n", " athena_url = 'https://console.aws.amazon.com/athena/query-editor'\n", " count = fs.get_historical_record_count(fg_name)\n", "\n", " right_now = datetime.now()\n", "\n", " times_df = fs.get_minmax_timestamps(fg_name)\n", "\n", " most_recent_write = times_df.iloc[0]['max_write_time'] #fs.get_most_recent_write_time(fg_name)\n", " last_write = datetime.fromisoformat(most_recent_write)\n", " last_n_days = (right_now - last_write).days\n", "\n", " oldest_write_time = times_df.iloc[0]['min_write_time'] #fs.get_oldest_write_time(fg_name)\n", " oldest_write = datetime.fromisoformat(oldest_write_time)\n", " oldest_n_days = (right_now - oldest_write).days\n", "\n", " max_event_time = times_df.iloc[0]['max_event_time']\n", " min_event_time = times_df.iloc[0]['min_event_time']\n", "\n", " created_at = fs.describe_feature_group(fg_name)['CreationTime']\n", " rn_2 = right_now.replace(tzinfo=pytz.UTC)\n", " created_n_days = (rn_2 - created_at).days\n", "\n", " fg_summary_markdown += \\\n", " f'Total records: {count:,d}
' +\\\n", " f'Created: {created_n_days} days ago ({created_at})
' +\\\n", " f'Oldest record: {oldest_n_days} days ago ({oldest_write})
' +\\\n", " f'Most recent record: {last_n_days} days ago ({most_recent_write})
' +\\\n", " f'Event time range: {min_event_time} -> to -> {max_event_time}
' +\\\n", " f'Offline store in s3 console: [here]({offline_store_url})
' +\\\n", " f'Glue table in console: [here]({glue_console_url})
' +\\\n", " f'Athena query editor: [here]({athena_url})
'\n", " display(Markdown(fg_summary_markdown))\n", " display(Markdown(f'Sample offline store records:
'))\n", " sample_df = fs.sample(fg_name, 5)\n", " display(sample_df.head())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fg_profile_view(fs, 'customers-summit')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fg_profile_view(fs, 'housing')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tmp_filename = fs.download_sample_offline_file('customers-summit')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "p_df = pd.read_parquet(tmp_filename)\n", "p_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs.get_tags('customers-summit')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sm.feature_store_helper import FeatureStore\n", "fs = FeatureStore()\n", "\n", "tmp_df = fs.get_minmax_timestamps(fg_name)\n", "tmp_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tmp_df.iloc[0]['max_event_time']" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fg_name = 'fscw-orders-08-10-17-21-52'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ml_lineage = MLLineageHelper()\n", "lineage = ml_lineage.create_ml_lineage('pytorch-hosted-model-2021-10-09-13-32-06-083', \n", " model_name='house-price-estimate',\n", " feature_group_names=[fg_name])\n", "lineage" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('Here are a few sample records:')\n", "fs.sample('housing', 5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "from sm.feature_store_helper import FeatureStore\n", "fs = FeatureStore()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "multi_id_events = [['2020-02-01T08:30:00Z', 6, 450],\n", " ['2020-02-02T10:15:30Z', 5, 5000],\n", " ['2020-02-03T13:20:59Z', 1, 1999],\n", " ['2021-01-01T00:00:00Z', 1, 2001]\n", " ]\n", "multi_id_df = pd.DataFrame(multi_id_events, columns=['my_event_time', 'Id', 'HOUSE_ID'])\n", "multi_id_df.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "fs.get_features(multi_id_df, 'my_event_time', \n", " features=['customers:ZipCode', \n", " 'payments:avg_amount', \n", " 'payments:avg_days_late',\n", " 'housing:SQUARE_FEET',\n", " 'housing:PRICE'],\n", " parallel=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "fs.get_features(multi_id_df, 'my_event_time', \n", " features=['customers:ZipCode', \n", " 'payments:avg_amount', \n", " 'payments:avg_days_late',\n", " 'housing:SQUARE_FEET',\n", " 'housing:PRICE'],\n", " parallel=False)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "fs.get_features(multi_id_df, 'my_event_time', \n", " features=['customers:ZipCode', \n", " 'payments:avg_amount', \n", " 'payments:avg_days_late',\n", " 'housing:SQUARE_FEET',\n", " 'housing:PRICE'],\n", " parallel=False,\n", " verbose=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "fs.get_features(multi_id_df, 'my_event_time', \n", " features=['customers:ZipCode', \n", " 'payments:avg_amount', \n", " 'payments:avg_days_late',\n", " 'housing:SQUARE_FEET',\n", " 'housing:PRICE'],\n", " parallel=False) #, verbose=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "fs.get_features(multi_id_df, 'my_event_time', \n", " features=['customers:ZipCode', \n", " 'payments:avg_amount', \n", " 'payments:avg_days_late',\n", " 'housing:SQUARE_FEET',\n", " 'housing:PRICE'],\n", " parallel=False) #, verbose=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "fs.get_features(multi_id_df, 'my_event_time', \n", " features=['customers:ZipCode', \n", " 'payments:avg_amount', \n", " 'payments:avg_days_late',\n", " 'housing:SQUARE_FEET',\n", " 'housing:PRICE'],\n", " parallel=True) #, verbose=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%load_ext autoreload\n", "%autoreload 2\n", "\n", "import pandas as pd\n", "from sm.feature_store_helper import FeatureStore\n", "fs = FeatureStore()\n", "\n", "extended_order_events = [['2021-07-07T10:01:00Z', 'C1', 'O1', 'P1'],\n", " ['2021-07-07T10:02:00Z', 'C2', 'O2', 'P2'],\n", " ['2021-07-07T10:03:00Z', 'C3', 'O3', 'P3'],\n", " ['2021-07-07T10:04:00Z', 'C4', 'O4', 'P4']\n", " ]\n", "extended_orders_df = pd.DataFrame(extended_order_events,\n", " columns=['my_event_time', 'customer_id', 'order_id', 'product_id'])\n", "fs.get_features(extended_orders_df, 'my_event_time', \n", " features=['fscw-orders-08-10-17-21-52:*'],\n", " verbose=True,\n", " parallel=False)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "full_df = fs.get_latest_offline_feature_values('tmp-fg', feature_names=['ZipCode','Churn'])\n", "full_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "full_df = fs.get_latest_offline_feature_values_as_of('customers-summit', '2020-02-03T08:30:00Z', feature_names=['ZipCode','Churn'])\n", "full_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }