---
## <span style="color:orange"> Inside account B (centralized feature store)</span>
---

<div style="text-align: justify">This notebook must be run within account B. Using this notebook, you will be setting up a centralized feature store in this account. First, you will create a feature group that will be store a collection of customer centric features. Then, you will populate some features into this newly created feature group. The features will be written to both the Online and Offline stores of the centralized feature store.
Later, you will see, 1/ how to read features from the Online store and 2/ how to read features from the Offline store via an Athena query to create a training set for your data science work.</div>

**IMPORTANT:** This notebook must run be run BEFORE you execute notebook [account-a.ipynb](./account-a.ipynb)

### Imports 

In [None]:
import sagemaker
import logging
import pandas
import boto3
import json
import time

#### Setup logging

In [None]:
logger = logging.getLogger('sagemaker')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())
logger.info(f'[Using Boto3 version: {boto3.__version__}]')

### Essentials

The Offline store S3 location can be a S3 bucket or a S3 prefix.

In [None]:
OFFLINE_STORE_BUCKET = '<YOUR OFFLINE STORE S3 BUCKET NAME>' # e.g., sagemaker-offline-store
OFFLINE_STORE_PREFIX = '<PREFIX WITHIN OFFLINE STORE BUCKET>'  # this is optional, e.g., project-x 
OFFLINE_STORE_LOCATION = f's3://{OFFLINE_STORE_BUCKET}/{OFFLINE_STORE_PREFIX}'
ACCOUNT_ID = boto3.client('sts').get_caller_identity().get('Account')
REGION = boto3.Session().region_name
FEATURE_GROUP_NAME = '<YOUR FEATURE GROUP NAME>'  # e.g., customers

In [None]:
role_arn = sagemaker.get_execution_role()

In [None]:
sagemaker_client = boto3.client('sagemaker')
sagemaker_featurestore_runtime_client = boto3.client(service_name='sagemaker-featurestore-runtime')
s3_client = boto3.client(service_name='s3')
athena_client = boto3.client(service_name='athena')

In [None]:
offline_config = {'OfflineStoreConfig': {'S3StorageConfig': {'S3Uri': OFFLINE_STORE_LOCATION }}}
# offline_config = {}  # uncomment and use this line if needed to write ONLY to the Online feature store

### Load schema

In [None]:
def load_schema(schema):
    feature_definitions = []
    for col in schema['features']:
        feature = {'FeatureName': col['name']}
        if col['type'] == 'double':
            feature['FeatureType'] = 'Fractional'
        elif col['type'] == 'bigint':
            feature['FeatureType'] = 'Integral'
        else:
            feature['FeatureType'] = 'String'
        feature_definitions.append(feature)
    return feature_definitions, schema['record_identifier_feature_name'], schema['event_time_feature_name']

In [None]:
schema = json.loads(open('./schema/customers.json').read())
feature_definitions, record_identifier_feature_name, event_time_feature_name = load_schema(schema)

In [None]:
feature_definitions

### Create a feature group

Uncomment and run the cell below if the feature group already exists or during re-runs.

In [None]:
# sagemaker_client.delete_feature_group(FeatureGroupName=FEATURE_GROUP_NAME)

In [None]:
sagemaker_client.create_feature_group(FeatureGroupName=FEATURE_GROUP_NAME,
                                    RecordIdentifierFeatureName=record_identifier_feature_name,
                                    EventTimeFeatureName=event_time_feature_name,
                                    FeatureDefinitions=feature_definitions,
                                    Description=schema['description'],
                                    Tags=schema['tags'],
                                    OnlineStoreConfig={'EnableOnlineStore': True},
                                    RoleArn=role_arn,
                                    **offline_config)

In [None]:
sagemaker_client.describe_feature_group(FeatureGroupName=FEATURE_GROUP_NAME)

### Populate features to the feature group

In [None]:
customers_df = pandas.read_csv('./data/customers.csv', header=None)
customers_df

In [None]:
records = []
for _, row in customers_df.iterrows():
    cid, name, age, marital_status, sex, city, state = row
    record = []
    record.append({'ValueAsString': str(cid), 'FeatureName': 'cid'})
    record.append({'ValueAsString': name, 'FeatureName': 'name'})
    record.append({'ValueAsString': str(age), 'FeatureName': 'age'})
    record.append({'ValueAsString': marital_status, 'FeatureName': 'marital_status'})
    record.append({'ValueAsString': sex, 'FeatureName': 'sex'})
    record.append({'ValueAsString': city, 'FeatureName': 'city'})
    record.append({'ValueAsString': state, 'FeatureName': 'state'})
    event_time_feature = {'ValueAsString': str(int(round(time.time()))), 'FeatureName': 'created_at'}
    record.append(event_time_feature)
    records.append(record)

#### Write features to the feature store

In [None]:
for record in records:
    response = sagemaker_featurestore_runtime_client.put_record(FeatureGroupName=FEATURE_GROUP_NAME, 
                                                                Record=record)
    print(response['ResponseMetadata']['HTTPStatusCode'])

#### Verify if you can retrieve features from your feature group using record identifier
Here, you are reading features from the Online store.

In [None]:
response = sagemaker_featurestore_runtime_client.get_record(FeatureGroupName=FEATURE_GROUP_NAME, 
                                                            RecordIdentifierValueAsString='1002')
response

### Get records from the Offline store (S3 bucket)
Now let us wait for the data to appear in the Offline store (S3 bucket) before moving forward to creating a dataset. This will take approximately take <= 5 minutes.

In [None]:
feature_group_s3_prefix = f'{OFFLINE_STORE_PREFIX}/{ACCOUNT_ID}/sagemaker/{REGION}/offline-store/{FEATURE_GROUP_NAME}/data'
feature_group_s3_prefix

In [None]:
offline_store_contents = None
while offline_store_contents is None:
    objects = s3_client.list_objects(Bucket=OFFLINE_STORE_BUCKET, Prefix=feature_group_s3_prefix)
    if 'Contents' in objects and len(objects['Contents']) > 1:
        logger.info('[Features are available in Offline Store!]')
        offline_store_contents = objects['Contents']
    else:
        logger.info('[Waiting for data in Offline Store...]')
        time.sleep(60)

### Use Athena to query features from the Offline store and create a training set

In [None]:
feature_group = sagemaker_client.describe_feature_group(FeatureGroupName=FEATURE_GROUP_NAME)

In [None]:
glue_table_name = feature_group['OfflineStoreConfig']['DataCatalogConfig']['TableName']

In [None]:
query_string = f'SELECT * FROM "{glue_table_name}"'
query_string

#### Run Athena query and save results 

You can save the results of the Athena query to a folder within the Offline store S3 bucket or any other bucket. Here, we are storing the query results to a prefix within the Offline store s3 bucket.

In [None]:
response = athena_client.start_query_execution(
                QueryString=query_string,
                QueryExecutionContext={
                    'Database': 'sagemaker_featurestore',
                    'Catalog': 'AwsDataCatalog'
                },
                ResultConfiguration={
                    'OutputLocation': f's3://{OFFLINE_STORE_BUCKET}/query_results/{FEATURE_GROUP_NAME}',
                }
            )

In [None]:
query_results = athena_client.get_query_results(QueryExecutionId=response['QueryExecutionId'],
                                                MaxResults=100)

In [None]:
training_set_csv_s3_key = None
for s3_object in s3_client.list_objects(Bucket=OFFLINE_STORE_BUCKET)['Contents']:
    key = s3_object['Key']
    if key.startswith(f'query_results/{FEATURE_GROUP_NAME}') and key.endswith('csv'):
        training_set_csv_s3_key = key

In [None]:
training_set_s3_path = f's3://{OFFLINE_STORE_BUCKET}/{training_set_csv_s3_key}'
training_set_s3_path

In [None]:
training_set = pandas.read_csv(training_set_s3_path)

In [None]:
training_set