In [None]:
import pandas as pd
import os
import sys
import time
import sagemaker
import random
import boto3
import numpy as np
import math
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from time import gmtime, strftime
from generate_synthetic_housing_data import *

In [None]:
notebook_dir = os.getcwd()
package_dir = '/' + '/'.join(notebook_dir.split('/')[1:-1])
sys.path.append(package_dir)

In [None]:
%load_ext autoreload
%autoreload 2
from feature_store import *
from ml_lineage_helper import *
from ml_lineage_helper.query_lineage import *

**Session variables**

In [None]:
role_arn = sagemaker.get_execution_role()
try:
 role_name=role_arn.split('/')[2]
except:
 role_name=role_arn.split('/')[1]

sagemaker_session = SageMakerSession()

feature_group_name = 'synthetic-housing-data-2'
feature_group_description = 'Synthetic housing Feature Group'
s3_prefix = 'ml-lineage-synthetic-housing-2'
model_name = 'pytorch-hosted-model-v9'

### Process data

Create local directory structure.

In [None]:
random.seed(113)

data_dir = os.path.join(os.getcwd(), 'data')
os.makedirs(data_dir, exist_ok=True)

train_dir = os.path.join(os.getcwd(), 'data/train')
os.makedirs(train_dir, exist_ok=True)

test_dir = os.path.join(os.getcwd(), 'data/test')
os.makedirs(test_dir, exist_ok=True)

raw_dir = os.path.join(os.getcwd(), 'data/raw')
os.makedirs(raw_dir, exist_ok=True)

Load data locally and upload data to S3.

In [None]:
df = generate_houses(1506)

# Get training columns
train_cols = list(df.columns)
del train_cols[-1]
train_cols

# Split data
training_index = math.floor(0.8 * df.shape[0])
x_train, y_train = df[train_cols][:training_index], df.PRICE[:training_index]
x_test, y_test = df[train_cols][training_index:], df.PRICE[training_index:]

# Scale price
y_train = y_train / 100000
y_test = y_test / 100000

# Save locally
np.save(os.path.join(raw_dir, 'x_train.npy'), x_train)
np.save(os.path.join(raw_dir, 'x_test.npy'), x_test)
np.save(os.path.join(train_dir, 'y_train.npy'), y_train)
np.save(os.path.join(test_dir, 'y_test.npy'), y_test)

# Upload to S3
rawdata_s3_prefix = '{}/data/raw'.format(s3_prefix)
raw_s3 = sagemaker_session.session.upload_data(path='./data/raw/', key_prefix=rawdata_s3_prefix)

Process data with SageMaker Processing.

In [None]:
%%writefile preprocessing.py

import glob
import numpy as np
import os
from sklearn.preprocessing import StandardScaler

if __name__=='__main__':
 
 input_files = glob.glob('{}/*.npy'.format('/opt/ml/processing/input'))
 print('\nINPUT FILE LIST: \n{}\n'.format(input_files))
 scaler = StandardScaler()
 for file in input_files:
 raw = np.load(file)
 transformed = scaler.fit_transform(raw)
 if 'train' in file:
 output_path = os.path.join('/opt/ml/processing/train', 'x_train.npy')
 np.save(output_path, transformed)
 print('SAVED TRANSFORMED TRAINING DATA FILE\n')
 else:
 output_path = os.path.join('/opt/ml/processing/test', 'x_test.npy')
 np.save(output_path, transformed)
 print('SAVED TRANSFORMED TEST DATA FILE\n')

In [None]:
sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
 role=sagemaker_session.role_arn,
 instance_type='ml.m5.xlarge',
 instance_count=2)

In [None]:
processing_job_name = "{}-preprocessing-{}".format(s3_prefix, strftime("%d-%H-%M-%S", gmtime()))
output_destination = 's3://{}/{}/data'.format(sagemaker_session.bucket_name, s3_prefix)

# code=can be a s3 uri for the input script
sklearn_processor.run(code='preprocessing.py',
 job_name=processing_job_name,
 inputs=[ProcessingInput(
 source=raw_s3,
 destination='/opt/ml/processing/input',
 s3_data_distribution_type='ShardedByS3Key')],
 outputs=[ProcessingOutput(output_name='train',
 destination='{}/train'.format(output_destination),
 source='/opt/ml/processing/train'),
 ProcessingOutput(output_name='test',
 destination='{}/test'.format(output_destination),
 source='/opt/ml/processing/test')])

preprocessing_job_description = sklearn_processor.jobs[-1].describe()

### Getting data into a Feature Store

So we've got data that's already been processed. In our case, it's a synthetic housing dataset and it's been standardized.

In [None]:
train_in_s3 = '{}/train/x_train.npy'.format(output_destination)
test_in_s3 = '{}/test/x_test.npy'.format(output_destination)
!aws s3 cp {train_in_s3} ./data/train/x_train.npy
!aws s3 cp {test_in_s3} ./data/test/x_test.npy

In [None]:
x_train = np.load('./data/train/x_train.npy')
x_test = np.load('./data/test/x_test.npy')

# Convert to Pandas and standardize
train_df = pd.DataFrame(data=x_train)
train_df['target'] = y_train
first_col = train_df.pop('target')
train_df.insert(0, 'target', first_col)

test_df = pd.DataFrame(data=x_test)
test_df['target'] = y_test.reset_index(drop=True)
first_col = test_df.pop('target')
test_df.insert(0, 'target', first_col)

# Add train/test indicator variable
train_df['train'] = 1
test_df['train'] = 0

# Add column names
column_names = list(df.columns)
column_names.remove('PRICE')
column_names.insert(0, 'target')
column_names.append('train')
column_rename_dict = {}
for i, v in enumerate(train_df.columns):
 column_rename_dict[v] = column_names[i]
train_df.rename(columns=column_rename_dict, inplace=True)
test_df.rename(columns=column_rename_dict, inplace=True)

But we want to put this data into a Feature Store so that other data scientists can use this data for building models without having to go through the pre-processing steps again.

So let's get our DataFrame into the Feature Store by creating a new Feature Group and ingesting the data from the dataframe into that Feature Group. By default, ingestion in turned on, but you can turn it off by passing in the parameter `ingest=False`.

In [None]:
offline_prefix = feature_group_name.replace('-', '_')
feature_store = FeatureStore(feature_group_name, sagemaker_session)
feature_group = feature_store.create_feature_group(train_df,
 feature_group_description,
 f'{sagemaker_session.bucket_s3_uri}/{offline_prefix}')

Ingest the test data into the Feature Group as well.

In [None]:
feature_group = feature_store.create_feature_group(test_df,
 feature_group_description,
 f'{sagemaker_session.bucket_s3_uri}/{offline_prefix}')

### Get training and test data from the Feature Store

Get training data.

In [None]:
# Wait for online FS data to be replicated to offline FS
time.sleep(60*5)

In [None]:
feature_store = FeatureStore(feature_group_name, sagemaker_session)

query = """
select *
from "{0}"
where train=1
""".format(feature_store.table_name)

train_df, athena_query = feature_store.query_feature_group(query)
train_df.head()

Get test data.

In [None]:
query = """
select *
from "{0}"
where train=0
""".format(feature_store.table_name)

test_df, athena_query = feature_store.query_feature_group(query)
test_df.head()

In [None]:
# Only select features we care about and ignore the metadata columns
train_df = train_df.iloc[:,:-6]
test_df = test_df.iloc[:,:-6]

In [None]:
train_s3 = f's3://{sagemaker_session.bucket_name}/{s3_prefix}/train.npy'
test_s3 = f's3://{sagemaker_session.bucket_name}/{s3_prefix}/test.npy'
upload_df_to_s3(train_df,
 train_s3,
 sagemaker_session,
 csv=False)
upload_df_to_s3(test_df,
 test_s3,
 sagemaker_session,
 csv=False)

### ML Lineage Tracking for Training and Deployment

In [None]:
from sagemaker.pytorch import PyTorch


inputs = {'train': train_s3, 'test': test_s3}
#inputs = {'train': 'file://data/train.npy', 'test': 'file://data/test.npy'}

hyperparameters = {'epochs': 30, 'batch_size': 128, 'learning_rate': 0.01}

# Metrics to be captured from logs.
metric_definitions = [{'Name': 'loss',
 'Regex': ' loss: ([0-9\\.]+)'},
 {'Name': 'val_loss',
 'Regex': 'Test MSE: ([0-9\\.]+)'}]

instance_type = 'ml.c5.xlarge'
estimator_parameters = {'source_dir': 'pytorch-model',
 'entry_point':'train_deploy.py',
 'instance_type' : instance_type,
 'instance_count': 1,
 'hyperparameters': hyperparameters,
 'role' : sagemaker_session.role_arn,
 'base_job_name':'pytorch-hosted-model',
 'framework_version':'1.5.0',
 'py_version':'py3',
 'metric_definitions':metric_definitions}

estimator = PyTorch(**estimator_parameters)

estimator.fit(inputs=inputs)

In [None]:
# Create model if you haven't already used it to deploy a real-time endpoint
# or do a Batch Transform job
from sagemaker.pytorch import PyTorchModel

model = PyTorchModel(entry_point='train_deploy.py', source_dir='pytorch-model',
 model_data=estimator.model_data, role=sagemaker_session.role_arn,
 framework_version='1.5.0', py_version='py3', name=model_name,
 sagemaker_session=sagemaker.Session())
model._create_sagemaker_model(instance_type=instance_type)

In [None]:
# Get repo links to processing and training code
processing_code_repo_url = get_repo_link(os.getcwd(), 'processing.py')
training_code_repo_url = get_repo_link(os.getcwd(), 'pytorch-model/train_deploy.py', processing_code=False)
repo_links = [processing_code_repo_url, training_code_repo_url]

ml_lineage = MLLineageHelper()
lineage = ml_lineage.create_ml_lineage(estimator, model_name=model_name,
 query=query,
 sagemaker_processing_job_description=preprocessing_job_description,
 feature_group_names=[feature_group_name],
 repo_links=repo_links)
lineage

In [None]:
ml_lineage.graph()

If you want to get the lineage of any SageMaker model, you can use the following snippet of code.

In [None]:
lineage = MLLineageHelper(sagemaker_model_name_or_model_s3_uri=model_name)
lineage.df

If you have a data source, you can find associated Feature Groups by providing the data source's S3 URI or Artifact ARN:

In [None]:
query_lineage = QueryLineage()
query_lineage.get_feature_groups_from_data_source(train_s3)

You can also start with a Feature Group, and find associated data sources:

In [None]:
query_lineage.get_data_sources_from_feature_group(feature_group.describe()['FeatureGroupArn'],
 max_depth=3)

Given a Feature Group, you can also find associated models:

In [None]:
query_lineage.get_models_from_feature_group(feature_group.describe()['FeatureGroupArn'])

Given a SageMaker model name or artifact ARN, you can find associated Feature Groups.

In [None]:
query_lineage.get_feature_groups_from_model(model_name)

### Cleanup

In [None]:
dest_arns = lineage.df['Artifact Destination ARN'].values
for arn in dest_arns:
 ml_lineage.delete_associations(arn)