---
## <span style="color:orange"> Inside account A (external account) </span>
---
Run this notebook within account A. In this notebook, we demonstrate the 3 scenarios:<br>

* **Scenario 1** - How to CREATE a feature group inside the centralized feature store and WRITE/READ features to and from it.
* **Scenario 2** - How to WRITE features to a feature group already located in the centralized feature store (account B).
* **Scenario 3** - How to READ features from a feature group already located in the centralized feature store (account B).


**Note:** For scenario 1, the feature group is created by account A inside account B using the assumed role from account B. For scenarios 2 and 3, the feature groups are already created by account B. 

**IMPORTANT:** This notebook must ONLY be run after you had executed notebook [account-b.ipynb](./account-b.ipynb).

### Imports 

In [None]:
import logging
import pandas
import boto3
import json
import time

#### Setup logging

In [None]:
logger = logging.getLogger('sagemaker')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())
logger.info(f'[Using Boto3 version: {boto3.__version__}]')

### Assume role from account B using STS
Here, let us see how to use the ARN of feature store access role (`cross-account-assume-role`) that we created in account B previously during the setup process to create temporary credentials. This is faciliated by [AWS Security Token Service](https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html) (STS) via its `AssumeRole` API call. This call returns a set of temporary credentials that you can use to create any service clients. When using these clients, your function has permissions conferred to it by the assumed role, and acts as if it belongs to account B. For more information, see `assume_role` in the AWS SDK for Python (Boto 3) documentation.

#### Generate temporary credentials

In [None]:
sts = boto3.client('sts')

In [None]:
# Use ARN of the feature store access role created in account B below. 
# E.g., arn:aws:iam::<ACCOUNT B ID>:role/cross-account-assume-role
CROSS_ACCOUNT_ASSUME_ROLE = '<ARN OF CROSS ACCOUNT ROLE CREATED IN ACCOUNT B>'

In [None]:
response = sts.assume_role(RoleArn=CROSS_ACCOUNT_ASSUME_ROLE, 
                            RoleSessionName='FeatureStoreCrossAccountAccessDemo')

In [None]:
access_key_id = response['Credentials']['AccessKeyId']
secret_access_key = response['Credentials']['SecretAccessKey']
session_token = response['Credentials']['SessionToken']

#### Setup sessions and clients using the temporary credentials
Create SageMaker client using the assumed role temporary credentials.

In [None]:
sagemaker_client = boto3.client('sagemaker', 
                                 aws_access_key_id=access_key_id,
                                 aws_secret_access_key=secret_access_key,
                                 aws_session_token=session_token)

In [None]:
sagemaker_featurestore_runtime_client = boto3.client(service_name='sagemaker-featurestore-runtime', 
                                                     aws_access_key_id=access_key_id,
                                                     aws_secret_access_key=secret_access_key,
                                                     aws_session_token=session_token)

In [None]:
s3_client = boto3.client(service_name='s3',
                         aws_access_key_id=access_key_id,
                         aws_secret_access_key=secret_access_key,
                         aws_session_token=session_token)

In [None]:
athena_client = boto3.client(service_name='athena',
                             aws_access_key_id=access_key_id,
                             aws_secret_access_key=secret_access_key,
                             aws_session_token=session_token)

### Scenario 1: CREATE a feature group inside the centralized feature store and WRITE/READ features to and from it.
Let us create a **new** feature group in account B (centralized feature store) from here (account A). We can do this by using the service clients we created above. This feature group will hold all the features related to a customer product purchase. <br><br>
After we create the feature group, we will also see how we can write and read features to and from it.

In [None]:
def load_schema(schema):
    feature_definitions = []
    for col in schema['features']:
        feature = {'FeatureName': col['name']}
        if col['type'] == 'double':
            feature['FeatureType'] = 'Fractional'
        elif col['type'] == 'bigint':
            feature['FeatureType'] = 'Integral'
        else:
            feature['FeatureType'] = 'String'
        feature_definitions.append(feature)
    return feature_definitions, schema['record_identifier_feature_name'], schema['event_time_feature_name']

In [None]:
schema = json.loads(open('./schema/purchases.json').read())
feature_definitions, record_identifier_feature_name, event_time_feature_name = load_schema(schema)

In [None]:
feature_definitions

In [None]:
# Ensure the same bucket that you had created it account B.
OFFLINE_STORE_BUCKET = '<YOUR OFFLINE STORE S3 BUCKET NAME IN ACCOUNT B>' # e.g., sagemaker-offline-store
OFFLINE_STORE_PREFIX = '<PREFIX WITHIN OFFLINE STORE BUCKET>'  # this is optional, e.g., project-x 
OFFLINE_STORE_LOCATION = f's3://{OFFLINE_STORE_BUCKET}/{OFFLINE_STORE_PREFIX}'
FEATURE_GROUP_NAME = '<YOUR FEATURE GROUP NAME>'  # e.g., purchases

In [None]:
offline_config = {'OfflineStoreConfig': {'S3StorageConfig': {'S3Uri': OFFLINE_STORE_LOCATION }}}
# offline_config = {}  # uncomment and use this line if needed to write ONLY to the Online feature store

#### Create a feature group

Uncomment and run the cell below if the feature group already exists or during re-runs.

In [None]:
# sagemaker_client.delete_feature_group(FeatureGroupName=FEATURE_GROUP_NAME)

In [None]:
sagemaker_client.create_feature_group(FeatureGroupName=FEATURE_GROUP_NAME,
                                    RecordIdentifierFeatureName=record_identifier_feature_name,
                                    EventTimeFeatureName=event_time_feature_name,
                                    FeatureDefinitions=feature_definitions,
                                    Description=schema['description'],
                                    Tags=schema['tags'],
                                    OnlineStoreConfig={'EnableOnlineStore': True},
                                    RoleArn=CROSS_ACCOUNT_ASSUME_ROLE,
                                    **offline_config)

In [None]:
sagemaker_client.describe_feature_group(FeatureGroupName=FEATURE_GROUP_NAME)

#### Write features to the created feature group `purchases` in account B (centralized feature store)

In [None]:
purchases_df = pandas.read_csv('./data/purchases.csv', header=None)
purchases_df

In [None]:
records = []
for _, row in purchases_df.iterrows():
    pid, cid, product_name, purchase_amount, product_category, purchased_at = row
    record = []
    record.append({'ValueAsString': str(pid), 'FeatureName': 'pid'})
    record.append({'ValueAsString': str(cid), 'FeatureName': 'cid'})
    record.append({'ValueAsString': product_name, 'FeatureName': 'product_name'})
    record.append({'ValueAsString': str(purchase_amount), 'FeatureName': 'purchase_amount'})
    record.append({'ValueAsString': product_category, 'FeatureName': 'product_category'})
    record.append({'ValueAsString': purchased_at, 'FeatureName': 'purchased_at'})
    event_time_feature = {'ValueAsString': str(int(round(time.time()))), 'FeatureName': 'created_at'}
    record.append(event_time_feature)
    records.append(record)

In [None]:
for record in records:
    response = sagemaker_featurestore_runtime_client.put_record(FeatureGroupName=FEATURE_GROUP_NAME, 
                                                                Record=record)
    print(response['ResponseMetadata']['HTTPStatusCode'])

#### Verify if we can retrieve features from the feature group in account B

In [None]:
response = sagemaker_featurestore_runtime_client.get_record(FeatureGroupName=FEATURE_GROUP_NAME, 
                                                            RecordIdentifierValueAsString='6034')
response

#### Get records from account B's Offline store (S3 bucket)
Now let's wait for the data to appear in our offline store before moving forward to creating a dataset. This will take approximately 5 minutes.

In [None]:
ACCOUNT_ID = '<ACCOUNT B ID>'
CROSS_ACCOUNT_REGION = '<REGION OF ACCOUNT B INSIDE WHICH YOUR OFFLINE STORE EXISTS>'

In [None]:
feature_group_s3_prefix = f'{OFFLINE_STORE_PREFIX}/{ACCOUNT_ID}/sagemaker/{CROSS_ACCOUNT_REGION}/offline-store/{FEATURE_GROUP_NAME}/data'
feature_group_s3_prefix

In [None]:
offline_store_contents = None
while offline_store_contents is None:
    objects = s3_client.list_objects(Bucket=OFFLINE_STORE_BUCKET, Prefix=feature_group_s3_prefix)
    if 'Contents' in objects and len(objects['Contents']) > 1:
        logger.info('[Features are available in Offline Store!]')
        offline_store_contents = objects['Contents']
    else:
        logger.info('[Waiting for data in Offline Store...]')
        time.sleep(60)

#### Use Athena to query features from the feature group `purchases` in account B here (account A)

In [None]:
feature_group = sagemaker_client.describe_feature_group(FeatureGroupName=FEATURE_GROUP_NAME)

In [None]:
glue_table_name = feature_group['OfflineStoreConfig']['DataCatalogConfig']['TableName']

In [None]:
query_string = f'SELECT * FROM "{glue_table_name}"'
query_string

#### Run Athena query in account B and save results back to a bucket in account A

In [None]:
ATHENA_RESULTS_BUCKET = '<NAME OF ATHENA QUERY RESULTS BUCKET IN ACCOUNT A>'

In [None]:
response = athena_client.start_query_execution(
                QueryString=query_string,
                QueryExecutionContext={
                    'Database': 'sagemaker_featurestore',
                    'Catalog': 'AwsDataCatalog'
                },
                ResultConfiguration={
                    'OutputLocation': f's3://{ATHENA_RESULTS_BUCKET}/query_results/{FEATURE_GROUP_NAME}',
                }
            )
response

In [None]:
query_results = athena_client.get_query_results(QueryExecutionId=response['QueryExecutionId'],
                                                MaxResults=100
                                               )

In [None]:
query_results['ResultSet']['Rows']

#### Grant account A access to Athena results bucket (Important)
The objects in Athena query results bucket (account A) are owned by account B. To allow this notebook or account A to access these objects, we would have to grant account A permissions via ACL. 

Get canonical ID of account A

In [None]:
# Note: the client below does not use the temporary credentials from the assumed role,
# hence points to this account (account A)
s3 = boto3.client('s3')
can_a = s3.list_buckets()['Owner']['ID']
can_a

Get canonical ID of account B

In [None]:
# Note: the client below is the one created at the beginning of this notebook
# using the temporary credentials from the assumed role, 
# hence it points to account B
can_b = s3_client.list_buckets()['Owner']['ID'] 
can_b

In [None]:
training_set_csv_s3_key = None
for s3_object in s3_client.list_objects(Bucket=ATHENA_RESULTS_BUCKET)['Contents']:
    key = s3_object['Key']
    if key.startswith(f'query_results/{FEATURE_GROUP_NAME}') and key.endswith('csv'):
        print(f'Bucket = {ATHENA_RESULTS_BUCKET} | Key = {key}')
        training_set_csv_s3_key = key
        response = s3_client.put_object_acl(
            AccessControlPolicy={
                "Grants": [
                    {
                        'Grantee': {
                            'ID': can_a,
                            'Type': 'CanonicalUser'
                        },
                        'Permission': 'FULL_CONTROL'
                    }
                ],
                'Owner': {
                    'ID': can_b
                }
            },
            Bucket=ATHENA_RESULTS_BUCKET,
            Key=key,
        )

In [None]:
training_set_s3_path = f's3://{ATHENA_RESULTS_BUCKET}/{training_set_csv_s3_key}'
training_set_s3_path

#### Load Athena query result csv into a Pandas dataframe for model training

In [None]:
training_set = pandas.read_csv(training_set_s3_path)
training_set

### Scenario 2: WRITE features to an existing feature group  located in the centralized feature store (account B).
Here, let us see how to write features to a feature group that already exists in account B (centralized feature store). <br><br>
In notebook `account-b`, we had created a feature group named `customers` inside the centralized feature store. Let us now write a few records into this feature group from here (account A).

In [None]:
FEATURE_GROUP_NAME_IN_ACCOUNT_B = 'NAME OF EXISTING FEATURE GROUP IN ACCOUNT B' # e.g., customers

In [None]:
record = [{'ValueAsString': '1006', 'FeatureName': 'cid'},
 {'ValueAsString': 'farah', 'FeatureName': 'name'},
 {'ValueAsString': '45', 'FeatureName': 'age'},
 {'ValueAsString': 'married', 'FeatureName': 'marital_status'},
 {'ValueAsString': 'female', 'FeatureName': 'sex'},
 {'ValueAsString': 'houston', 'FeatureName': 'city'},
 {'ValueAsString': 'TX', 'FeatureName': 'state'},
 {'ValueAsString': str(int(round(time.time()))), 'FeatureName': 'created_at'}]

In [None]:
response = sagemaker_featurestore_runtime_client.put_record(FeatureGroupName=FEATURE_GROUP_NAME_IN_ACCOUNT_B, 
                                                            Record=record
                                                           )
print(response['ResponseMetadata']['HTTPStatusCode'])

### Scenario 3: READ features from an existing feature group located in the centralized feature store (account B).

Here, let us see how we can READ a record (row of features) we just put into the `customers` feature group first. 
Later, we will also see how to READ a record that already exists in the `customers` feature group. This record was previously populated by account B in the notebook [account-b.ipynb](./account-b.ipynb).  

In [None]:
response = sagemaker_featurestore_runtime_client.get_record(FeatureGroupName=FEATURE_GROUP_NAME_IN_ACCOUNT_B, 
                                                            RecordIdentifierValueAsString='1006'
                                                           )
response

READ a record that already exists in the `customers` feature group.

In [None]:
response = sagemaker_featurestore_runtime_client.get_record(FeatureGroupName=FEATURE_GROUP_NAME_IN_ACCOUNT_B, 
                                                            RecordIdentifierValueAsString='1001'
                                                           )
response