This notebook must be run within account B. Using this notebook, you will be setting up a centralized feature store in this account. First, you will create a feature group that will be store a collection of customer centric features. Then, you will populate some features into this newly created feature group. The features will be written to both the Online and Offline stores of the centralized feature store.\n",
"Later, you will see, 1/ how to read features from the Online store and 2/ how to read features from the Offline store via an Athena query to create a training set for your data science work.
\n",
"\n",
"**IMPORTANT:** This notebook must run be run BEFORE you execute notebook [account-a.ipynb](./account-a.ipynb)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Imports "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import sagemaker\n",
"import logging\n",
"import pandas\n",
"import boto3\n",
"import json\n",
"import time"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Setup logging"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"logger = logging.getLogger('sagemaker')\n",
"logger.setLevel(logging.INFO)\n",
"logger.addHandler(logging.StreamHandler())\n",
"logger.info(f'[Using Boto3 version: {boto3.__version__}]')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Essentials"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The Offline store S3 location can be a S3 bucket or a S3 prefix."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"OFFLINE_STORE_BUCKET = '' # e.g., sagemaker-offline-store\n",
"OFFLINE_STORE_PREFIX = '' # this is optional, e.g., project-x \n",
"OFFLINE_STORE_LOCATION = f's3://{OFFLINE_STORE_BUCKET}/{OFFLINE_STORE_PREFIX}'\n",
"ACCOUNT_ID = boto3.client('sts').get_caller_identity().get('Account')\n",
"REGION = boto3.Session().region_name\n",
"FEATURE_GROUP_NAME = '' # e.g., customers"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"role_arn = sagemaker.get_execution_role()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sagemaker_client = boto3.client('sagemaker')\n",
"sagemaker_featurestore_runtime_client = boto3.client(service_name='sagemaker-featurestore-runtime')\n",
"s3_client = boto3.client(service_name='s3')\n",
"athena_client = boto3.client(service_name='athena')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"offline_config = {'OfflineStoreConfig': {'S3StorageConfig': {'S3Uri': OFFLINE_STORE_LOCATION }}}\n",
"# offline_config = {} # uncomment and use this line if needed to write ONLY to the Online feature store"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Load schema"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def load_schema(schema):\n",
" feature_definitions = []\n",
" for col in schema['features']:\n",
" feature = {'FeatureName': col['name']}\n",
" if col['type'] == 'double':\n",
" feature['FeatureType'] = 'Fractional'\n",
" elif col['type'] == 'bigint':\n",
" feature['FeatureType'] = 'Integral'\n",
" else:\n",
" feature['FeatureType'] = 'String'\n",
" feature_definitions.append(feature)\n",
" return feature_definitions, schema['record_identifier_feature_name'], schema['event_time_feature_name']"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"schema = json.loads(open('./schema/customers.json').read())\n",
"feature_definitions, record_identifier_feature_name, event_time_feature_name = load_schema(schema)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"feature_definitions"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create a feature group"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Uncomment and run the cell below if the feature group already exists or during re-runs."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# sagemaker_client.delete_feature_group(FeatureGroupName=FEATURE_GROUP_NAME)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sagemaker_client.create_feature_group(FeatureGroupName=FEATURE_GROUP_NAME,\n",
" RecordIdentifierFeatureName=record_identifier_feature_name,\n",
" EventTimeFeatureName=event_time_feature_name,\n",
" FeatureDefinitions=feature_definitions,\n",
" Description=schema['description'],\n",
" Tags=schema['tags'],\n",
" OnlineStoreConfig={'EnableOnlineStore': True},\n",
" RoleArn=role_arn,\n",
" **offline_config)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sagemaker_client.describe_feature_group(FeatureGroupName=FEATURE_GROUP_NAME)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Populate features to the feature group"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"customers_df = pandas.read_csv('./data/customers.csv', header=None)\n",
"customers_df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"records = []\n",
"for _, row in customers_df.iterrows():\n",
" cid, name, age, marital_status, sex, city, state = row\n",
" record = []\n",
" record.append({'ValueAsString': str(cid), 'FeatureName': 'cid'})\n",
" record.append({'ValueAsString': name, 'FeatureName': 'name'})\n",
" record.append({'ValueAsString': str(age), 'FeatureName': 'age'})\n",
" record.append({'ValueAsString': marital_status, 'FeatureName': 'marital_status'})\n",
" record.append({'ValueAsString': sex, 'FeatureName': 'sex'})\n",
" record.append({'ValueAsString': city, 'FeatureName': 'city'})\n",
" record.append({'ValueAsString': state, 'FeatureName': 'state'})\n",
" event_time_feature = {'ValueAsString': str(int(round(time.time()))), 'FeatureName': 'created_at'}\n",
" record.append(event_time_feature)\n",
" records.append(record)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Write features to the feature store"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"for record in records:\n",
" response = sagemaker_featurestore_runtime_client.put_record(FeatureGroupName=FEATURE_GROUP_NAME, \n",
" Record=record)\n",
" print(response['ResponseMetadata']['HTTPStatusCode'])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Verify if you can retrieve features from your feature group using record identifier\n",
"Here, you are reading features from the Online store."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"response = sagemaker_featurestore_runtime_client.get_record(FeatureGroupName=FEATURE_GROUP_NAME, \n",
" RecordIdentifierValueAsString='1002')\n",
"response"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Get records from the Offline store (S3 bucket)\n",
"Now let us wait for the data to appear in the Offline store (S3 bucket) before moving forward to creating a dataset. This will take approximately take <= 5 minutes."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"feature_group_s3_prefix = f'{OFFLINE_STORE_PREFIX}/{ACCOUNT_ID}/sagemaker/{REGION}/offline-store/{FEATURE_GROUP_NAME}/data'\n",
"feature_group_s3_prefix"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"offline_store_contents = None\n",
"while offline_store_contents is None:\n",
" objects = s3_client.list_objects(Bucket=OFFLINE_STORE_BUCKET, Prefix=feature_group_s3_prefix)\n",
" if 'Contents' in objects and len(objects['Contents']) > 1:\n",
" logger.info('[Features are available in Offline Store!]')\n",
" offline_store_contents = objects['Contents']\n",
" else:\n",
" logger.info('[Waiting for data in Offline Store...]')\n",
" time.sleep(60)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Use Athena to query features from the Offline store and create a training set"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"feature_group = sagemaker_client.describe_feature_group(FeatureGroupName=FEATURE_GROUP_NAME)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"glue_table_name = feature_group['OfflineStoreConfig']['DataCatalogConfig']['TableName']"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"query_string = f'SELECT * FROM \"{glue_table_name}\"'\n",
"query_string"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Run Athena query and save results "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can save the results of the Athena query to a folder within the Offline store S3 bucket or any other bucket. Here, we are storing the query results to a prefix within the Offline store s3 bucket."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"response = athena_client.start_query_execution(\n",
" QueryString=query_string,\n",
" QueryExecutionContext={\n",
" 'Database': 'sagemaker_featurestore',\n",
" 'Catalog': 'AwsDataCatalog'\n",
" },\n",
" ResultConfiguration={\n",
" 'OutputLocation': f's3://{OFFLINE_STORE_BUCKET}/query_results/{FEATURE_GROUP_NAME}',\n",
" }\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"query_results = athena_client.get_query_results(QueryExecutionId=response['QueryExecutionId'],\n",
" MaxResults=100)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"training_set_csv_s3_key = None\n",
"for s3_object in s3_client.list_objects(Bucket=OFFLINE_STORE_BUCKET)['Contents']:\n",
" key = s3_object['Key']\n",
" if key.startswith(f'query_results/{FEATURE_GROUP_NAME}') and key.endswith('csv'):\n",
" training_set_csv_s3_key = key"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"training_set_s3_path = f's3://{OFFLINE_STORE_BUCKET}/{training_set_csv_s3_key}'\n",
"training_set_s3_path"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"training_set = pandas.read_csv(training_set_s3_path)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"training_set"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "conda_python3",
"language": "python",
"name": "conda_python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.10"
}
},
"nbformat": 4,
"nbformat_minor": 4
}