{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## Inside account A (external account) \n", "---\n", "Run this notebook within account A. In this notebook, we demonstrate the 3 scenarios:
\n", "\n", "* **Scenario 1** - How to CREATE a feature group inside the centralized feature store and WRITE/READ features to and from it.\n", "* **Scenario 2** - How to WRITE features to a feature group already located in the centralized feature store (account B).\n", "* **Scenario 3** - How to READ features from a feature group already located in the centralized feature store (account B).\n", "\n", "\n", "**Note:** For scenario 1, the feature group is created by account A inside account B using the assumed role from account B. For scenarios 2 and 3, the feature groups are already created by account B. \n", "\n", "**IMPORTANT:** This notebook must ONLY be run after you had executed notebook [account-b.ipynb](./account-b.ipynb)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Imports " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import logging\n", "import pandas\n", "import boto3\n", "import json\n", "import time" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Setup logging" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "logger = logging.getLogger('sagemaker')\n", "logger.setLevel(logging.INFO)\n", "logger.addHandler(logging.StreamHandler())\n", "logger.info(f'[Using Boto3 version: {boto3.__version__}]')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Assume role from account B using STS\n", "Here, let us see how to use the ARN of feature store access role (`cross-account-assume-role`) that we created in account B previously during the setup process to create temporary credentials. This is faciliated by [AWS Security Token Service](https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html) (STS) via its `AssumeRole` API call. This call returns a set of temporary credentials that you can use to create any service clients. When using these clients, your function has permissions conferred to it by the assumed role, and acts as if it belongs to account B. For more information, see `assume_role` in the AWS SDK for Python (Boto 3) documentation." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Generate temporary credentials" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sts = boto3.client('sts')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Use ARN of the feature store access role created in account B below. \n", "# E.g., arn:aws:iam:::role/cross-account-assume-role\n", "CROSS_ACCOUNT_ASSUME_ROLE = ''" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = sts.assume_role(RoleArn=CROSS_ACCOUNT_ASSUME_ROLE, \n", " RoleSessionName='FeatureStoreCrossAccountAccessDemo')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "access_key_id = response['Credentials']['AccessKeyId']\n", "secret_access_key = response['Credentials']['SecretAccessKey']\n", "session_token = response['Credentials']['SessionToken']" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Setup sessions and clients using the temporary credentials\n", "Create SageMaker client using the assumed role temporary credentials." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_client = boto3.client('sagemaker', \n", " aws_access_key_id=access_key_id,\n", " aws_secret_access_key=secret_access_key,\n", " aws_session_token=session_token)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_featurestore_runtime_client = boto3.client(service_name='sagemaker-featurestore-runtime', \n", " aws_access_key_id=access_key_id,\n", " aws_secret_access_key=secret_access_key,\n", " aws_session_token=session_token)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3_client = boto3.client(service_name='s3',\n", " aws_access_key_id=access_key_id,\n", " aws_secret_access_key=secret_access_key,\n", " aws_session_token=session_token)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "athena_client = boto3.client(service_name='athena',\n", " aws_access_key_id=access_key_id,\n", " aws_secret_access_key=secret_access_key,\n", " aws_session_token=session_token)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Scenario 1: CREATE a feature group inside the centralized feature store and WRITE/READ features to and from it.\n", "Let us create a **new** feature group in account B (centralized feature store) from here (account A). We can do this by using the service clients we created above. This feature group will hold all the features related to a customer product purchase.

\n", "After we create the feature group, we will also see how we can write and read features to and from it." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def load_schema(schema):\n", " feature_definitions = []\n", " for col in schema['features']:\n", " feature = {'FeatureName': col['name']}\n", " if col['type'] == 'double':\n", " feature['FeatureType'] = 'Fractional'\n", " elif col['type'] == 'bigint':\n", " feature['FeatureType'] = 'Integral'\n", " else:\n", " feature['FeatureType'] = 'String'\n", " feature_definitions.append(feature)\n", " return feature_definitions, schema['record_identifier_feature_name'], schema['event_time_feature_name']" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "schema = json.loads(open('./schema/purchases.json').read())\n", "feature_definitions, record_identifier_feature_name, event_time_feature_name = load_schema(schema)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feature_definitions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Ensure the same bucket that you had created it account B.\n", "OFFLINE_STORE_BUCKET = '' # e.g., sagemaker-offline-store\n", "OFFLINE_STORE_PREFIX = '' # this is optional, e.g., project-x \n", "OFFLINE_STORE_LOCATION = f's3://{OFFLINE_STORE_BUCKET}/{OFFLINE_STORE_PREFIX}'\n", "FEATURE_GROUP_NAME = '' # e.g., purchases" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "offline_config = {'OfflineStoreConfig': {'S3StorageConfig': {'S3Uri': OFFLINE_STORE_LOCATION }}}\n", "# offline_config = {} # uncomment and use this line if needed to write ONLY to the Online feature store" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create a feature group" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Uncomment and run the cell below if the feature group already exists or during re-runs." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# sagemaker_client.delete_feature_group(FeatureGroupName=FEATURE_GROUP_NAME)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_client.create_feature_group(FeatureGroupName=FEATURE_GROUP_NAME,\n", " RecordIdentifierFeatureName=record_identifier_feature_name,\n", " EventTimeFeatureName=event_time_feature_name,\n", " FeatureDefinitions=feature_definitions,\n", " Description=schema['description'],\n", " Tags=schema['tags'],\n", " OnlineStoreConfig={'EnableOnlineStore': True},\n", " RoleArn=CROSS_ACCOUNT_ASSUME_ROLE,\n", " **offline_config)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_client.describe_feature_group(FeatureGroupName=FEATURE_GROUP_NAME)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Write features to the created feature group `purchases` in account B (centralized feature store)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "purchases_df = pandas.read_csv('./data/purchases.csv', header=None)\n", "purchases_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "records = []\n", "for _, row in purchases_df.iterrows():\n", " pid, cid, product_name, purchase_amount, product_category, purchased_at = row\n", " record = []\n", " record.append({'ValueAsString': str(pid), 'FeatureName': 'pid'})\n", " record.append({'ValueAsString': str(cid), 'FeatureName': 'cid'})\n", " record.append({'ValueAsString': product_name, 'FeatureName': 'product_name'})\n", " record.append({'ValueAsString': str(purchase_amount), 'FeatureName': 'purchase_amount'})\n", " record.append({'ValueAsString': product_category, 'FeatureName': 'product_category'})\n", " record.append({'ValueAsString': purchased_at, 'FeatureName': 'purchased_at'})\n", " event_time_feature = {'ValueAsString': str(int(round(time.time()))), 'FeatureName': 'created_at'}\n", " record.append(event_time_feature)\n", " records.append(record)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for record in records:\n", " response = sagemaker_featurestore_runtime_client.put_record(FeatureGroupName=FEATURE_GROUP_NAME, \n", " Record=record)\n", " print(response['ResponseMetadata']['HTTPStatusCode'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Verify if we can retrieve features from the feature group in account B" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = sagemaker_featurestore_runtime_client.get_record(FeatureGroupName=FEATURE_GROUP_NAME, \n", " RecordIdentifierValueAsString='6034')\n", "response" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get records from account B's Offline store (S3 bucket)\n", "Now let's wait for the data to appear in our offline store before moving forward to creating a dataset. This will take approximately 5 minutes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ACCOUNT_ID = ''\n", "CROSS_ACCOUNT_REGION = ''" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feature_group_s3_prefix = f'{OFFLINE_STORE_PREFIX}/{ACCOUNT_ID}/sagemaker/{CROSS_ACCOUNT_REGION}/offline-store/{FEATURE_GROUP_NAME}/data'\n", "feature_group_s3_prefix" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "offline_store_contents = None\n", "while offline_store_contents is None:\n", " objects = s3_client.list_objects(Bucket=OFFLINE_STORE_BUCKET, Prefix=feature_group_s3_prefix)\n", " if 'Contents' in objects and len(objects['Contents']) > 1:\n", " logger.info('[Features are available in Offline Store!]')\n", " offline_store_contents = objects['Contents']\n", " else:\n", " logger.info('[Waiting for data in Offline Store...]')\n", " time.sleep(60)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Use Athena to query features from the feature group `purchases` in account B here (account A)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feature_group = sagemaker_client.describe_feature_group(FeatureGroupName=FEATURE_GROUP_NAME)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "glue_table_name = feature_group['OfflineStoreConfig']['DataCatalogConfig']['TableName']" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query_string = f'SELECT * FROM \"{glue_table_name}\"'\n", "query_string" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Run Athena query in account B and save results back to a bucket in account A" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ATHENA_RESULTS_BUCKET = ''" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = athena_client.start_query_execution(\n", " QueryString=query_string,\n", " QueryExecutionContext={\n", " 'Database': 'sagemaker_featurestore',\n", " 'Catalog': 'AwsDataCatalog'\n", " },\n", " ResultConfiguration={\n", " 'OutputLocation': f's3://{ATHENA_RESULTS_BUCKET}/query_results/{FEATURE_GROUP_NAME}',\n", " }\n", " )\n", "response" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query_results = athena_client.get_query_results(QueryExecutionId=response['QueryExecutionId'],\n", " MaxResults=100\n", " )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query_results['ResultSet']['Rows']" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Grant account A access to Athena results bucket (Important)\n", "The objects in Athena query results bucket (account A) are owned by account B. To allow this notebook or account A to access these objects, we would have to grant account A permissions via ACL. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Get canonical ID of account A" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Note: the client below does not use the temporary credentials from the assumed role,\n", "# hence points to this account (account A)\n", "s3 = boto3.client('s3')\n", "can_a = s3.list_buckets()['Owner']['ID']\n", "can_a" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Get canonical ID of account B" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Note: the client below is the one created at the beginning of this notebook\n", "# using the temporary credentials from the assumed role, \n", "# hence it points to account B\n", "can_b = s3_client.list_buckets()['Owner']['ID'] \n", "can_b" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "training_set_csv_s3_key = None\n", "for s3_object in s3_client.list_objects(Bucket=ATHENA_RESULTS_BUCKET)['Contents']:\n", " key = s3_object['Key']\n", " if key.startswith(f'query_results/{FEATURE_GROUP_NAME}') and key.endswith('csv'):\n", " print(f'Bucket = {ATHENA_RESULTS_BUCKET} | Key = {key}')\n", " training_set_csv_s3_key = key\n", " response = s3_client.put_object_acl(\n", " AccessControlPolicy={\n", " \"Grants\": [\n", " {\n", " 'Grantee': {\n", " 'ID': can_a,\n", " 'Type': 'CanonicalUser'\n", " },\n", " 'Permission': 'FULL_CONTROL'\n", " }\n", " ],\n", " 'Owner': {\n", " 'ID': can_b\n", " }\n", " },\n", " Bucket=ATHENA_RESULTS_BUCKET,\n", " Key=key,\n", " )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "training_set_s3_path = f's3://{ATHENA_RESULTS_BUCKET}/{training_set_csv_s3_key}'\n", "training_set_s3_path" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Load Athena query result csv into a Pandas dataframe for model training" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "training_set = pandas.read_csv(training_set_s3_path)\n", "training_set" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Scenario 2: WRITE features to an existing feature group located in the centralized feature store (account B).\n", "Here, let us see how to write features to a feature group that already exists in account B (centralized feature store).

\n", "In notebook `account-b`, we had created a feature group named `customers` inside the centralized feature store. Let us now write a few records into this feature group from here (account A)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "FEATURE_GROUP_NAME_IN_ACCOUNT_B = 'NAME OF EXISTING FEATURE GROUP IN ACCOUNT B' # e.g., customers" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "record = [{'ValueAsString': '1006', 'FeatureName': 'cid'},\n", " {'ValueAsString': 'farah', 'FeatureName': 'name'},\n", " {'ValueAsString': '45', 'FeatureName': 'age'},\n", " {'ValueAsString': 'married', 'FeatureName': 'marital_status'},\n", " {'ValueAsString': 'female', 'FeatureName': 'sex'},\n", " {'ValueAsString': 'houston', 'FeatureName': 'city'},\n", " {'ValueAsString': 'TX', 'FeatureName': 'state'},\n", " {'ValueAsString': str(int(round(time.time()))), 'FeatureName': 'created_at'}]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = sagemaker_featurestore_runtime_client.put_record(FeatureGroupName=FEATURE_GROUP_NAME_IN_ACCOUNT_B, \n", " Record=record\n", " )\n", "print(response['ResponseMetadata']['HTTPStatusCode'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Scenario 3: READ features from an existing feature group located in the centralized feature store (account B)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here, let us see how we can READ a record (row of features) we just put into the `customers` feature group first. \n", "Later, we will also see how to READ a record that already exists in the `customers` feature group. This record was previously populated by account B in the notebook [account-b.ipynb](./account-b.ipynb). " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = sagemaker_featurestore_runtime_client.get_record(FeatureGroupName=FEATURE_GROUP_NAME_IN_ACCOUNT_B, \n", " RecordIdentifierValueAsString='1006'\n", " )\n", "response" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "READ a record that already exists in the `customers` feature group." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = sagemaker_featurestore_runtime_client.get_record(FeatureGroupName=FEATURE_GROUP_NAME_IN_ACCOUNT_B, \n", " RecordIdentifierValueAsString='1001'\n", " )\n", "response" ] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.10" } }, "nbformat": 4, "nbformat_minor": 4 }