Copyright 2021 Amazon.com and its affiliates; all rights reserved. This file is AWS Content and may not be duplicated or distributed without permission

# Demonstrating aspects of Amazon SageMaker Feature Store performance

In [None]:
from utilities.feature_store_helper import FeatureStore
import boto3
import sagemaker
import json
from datetime import timedelta
from datetime import datetime
import time
import pytz
utc=pytz.UTC

smfs_runtime = boto3.session.Session().client(service_name='sagemaker-featurestore-runtime')
default_bucket = sagemaker.Session().default_bucket()

In [3]:
FG_NAME = 'fs-demo-2022-03-24'

fs = FeatureStore()

# Performance demo 1: Online store, feature lookups

In [4]:
import time
import numpy as np

def print_percentiles(latencies):
 latency_dict = np.array([int(np.percentile(latencies, 50)), 
 int(np.percentile(latencies, 90)), 
 int(np.percentile(latencies, 95))])#,

 print('p50 p90 p95') #p99 p99.5 p99.7 p99.8 p99.9')
 print(latency_dict)
 return

def get_record(fg_name, max_id):
 start_time = time.time()
 fs._featurestore_runtime.get_record(FeatureGroupName=fg_name,
 RecordIdentifierValueAsString=str(np.random.randint(0, max_id)))
 end_time = time.time()
 elapsed_time = (end_time - start_time)*1000
 return int(elapsed_time)

def test_gets(fg_name, max_id=100000, num_iterations=1000):
 latencies = []
 sample = num_iterations // 10
 for request_num in range(num_iterations):
 elapsed_time = get_record(fg_name, max_id)
 latencies.append(elapsed_time)
 return latencies

In [5]:
fs._featurestore_runtime.get_record(FeatureGroupName=FG_NAME,
 RecordIdentifierValueAsString='6')

{'ResponseMetadata': {'RequestId': '6b60577d-ffb1-4114-b304-87b66c26018b',
 'HTTPStatusCode': 200,
 'HTTPHeaders': {'x-amzn-requestid': '6b60577d-ffb1-4114-b304-87b66c26018b',
 'content-type': 'application/json',
 'content-length': '215',
 'date': 'Tue, 13 Sep 2022 23:12:08 GMT'},
 'RetryAttempts': 0},
 'Record': [{'FeatureName': 'Id', 'ValueAsString': '6'},
 {'FeatureName': 'UpdateTime', 'ValueAsString': '2020-02-03T00:00:00Z'},
 {'FeatureName': 'ZipCode', 'ValueAsString': '33333'},
 {'FeatureName': 'Churn', 'ValueAsString': '0'}]}

In [6]:
latencies = test_gets(FG_NAME, max_id=6, num_iterations=5000)
print_percentiles(latencies)

p50 p90 p95
[7 8 9]


# Performance demo 2: Feature ingestion

#### Sample input file with 200+ features

In [7]:
import pandas as pd
df = pd.read_csv('utilities/data-001.csv')
print(f'{df.shape[0]:,d} rows')
df.head()

ParserError: Error tokenizing data. C error: Expected 204 fields in line 13032, saw 286


#### Ingest a single file directly from the notebook

In [10]:
FG_NAME = 'fs-demo-2022-03-24'

In [11]:
%%time
fs.ingest_from_df(FG_NAME, df, max_processes=12, max_workers=4)

NameError: name 'df' is not defined

#### Now experiment with different feature pipeline configurations

In [15]:
def run_once(instance_type, instance_count, input_uri, fg_name=FG_NAME, script='utilities/customer_v2.py'):
 if instance_type == 'ml.m5.4xlarge':
 max_processes = 16
 elif instance_type == 'ml.c5.9xlarge':
 max_processes = 36
 elif instance_type == 'ml.c5.18xlarge':
 max_processes = 72
 
 before_launch = datetime.now()

 print(f'Launching pipeline with {instance_count} {instance_type} instances ')
 print(f' to feature group: {fg_name}')
 print(f' using data from : {input_uri}...')
 
 #fs.disable_feature_pipeline(fg_name)
 sm = boto3.client('sagemaker')
 time.sleep(5)
 fs.update_feature_pipeline(input_uri, fg_name, script, 
 instance_type=instance_type, instance_count=instance_count,
 max_processes=max_processes, max_workers=4)
 fs.enable_feature_pipeline(fg_name)

 new_executions = True
 iterations = 0
 while new_executions:
 if iterations == 0:
 print(' Waiting for pipeline to start executing...')
 iterations += 1
 
 summs = sm.list_pipeline_executions(PipelineName=f'sm-pipeline-{fg_name}')['PipelineExecutionSummaries']
 if len(summs) > 0:
 if summs[0]['StartTime'] > utc.localize(before_launch):
 break
 time.sleep(5)
 
 print(' Pipeline is now executing.')
 done = False
 mins = 0
 while not done:
 summs = sm.list_pipeline_executions(PipelineName=f'sm-pipeline-{fg_name}')['PipelineExecutionSummaries']
 if mins == 0:
 print(' Waiting for pipeline to complete executing...')
 mins += 1
 if summs[0]['StartTime'] > utc.localize(before_launch):
 status = summs[0]['PipelineExecutionStatus']
 if not status == 'Executing':
 print(f' Final status: {status}')
 break
 time.sleep(30)
 
 print(' Pipeline completed.\n')
 return

#### Use an S3 location with 20 input files, 2 million feature records by 204 features

In [16]:
#s3_base_uri = f's3://{default_bucket}/sagemaker-feature-store/customer-bulk-test/'
s3_base_uri = f's3://{default_bucket}/sagemaker-feature-store/hello-data/'

s3_base_uri

's3://sagemaker-us-east-1-869044399089/sagemaker-feature-store/hello-data/'

#### Ingest using m5.4xl, increasing the number of instances to reduce job runtime

In [17]:
run_once('ml.m5.4xlarge', 1, s3_base_uri)
run_once('ml.m5.4xlarge', 2, s3_base_uri)
run_once('ml.m5.4xlarge', 4, s3_base_uri)
run_once('ml.m5.4xlarge', 8, s3_base_uri)

Launching pipeline with 1 ml.m5.4xlarge instances 
 to feature group: fs-demo-2022-03-24
 using data from : s3://sagemaker-us-east-1-869044399089/sagemaker-feature-store/hello-data/...
 Waiting for pipeline to start executing...
 Pipeline is now executing.
 Waiting for pipeline to complete executing...
 Final status: Failed
 Pipeline completed.

Launching pipeline with 2 ml.m5.4xlarge instances 
 to feature group: fs-demo-2022-03-24
 using data from : s3://sagemaker-us-east-1-869044399089/sagemaker-feature-store/hello-data/...
 Waiting for pipeline to start executing...
 Pipeline is now executing.
 Waiting for pipeline to complete executing...
 Final status: Failed
 Pipeline completed.

Launching pipeline with 4 ml.m5.4xlarge instances 
 to feature group: fs-demo-2022-03-24
 using data from : s3://sagemaker-us-east-1-869044399089/sagemaker-feature-store/hello-data/...
 Waiting for pipeline to start executing...
 Pipeline is now executing.
 Waiting for pipeline to complete executing...


#### Now ingest using c5.9xl, achieving same speed with fewer instances

In [18]:
run_once('ml.c5.9xlarge', 2, s3_base_uri)
run_once('ml.c5.9xlarge', 3, s3_base_uri)

Launching pipeline with 2 ml.c5.9xlarge instances 
 to feature group: fs-demo-2022-03-24
 using data from : s3://sagemaker-us-east-1-869044399089/sagemaker-feature-store/hello-data/...
 Waiting for pipeline to start executing...
 Pipeline is now executing.
 Waiting for pipeline to complete executing...
 Final status: Failed
 Pipeline completed.

Launching pipeline with 3 ml.c5.9xlarge instances 
 to feature group: fs-demo-2022-03-24
 using data from : s3://sagemaker-us-east-1-869044399089/sagemaker-feature-store/hello-data/...
 Waiting for pipeline to start executing...
 Pipeline is now executing.
 Waiting for pipeline to complete executing...
 Final status: Failed
 Pipeline completed.



#### Sample ingestion run times and throughput
![Ingestion performance](ingest.png)

In [None]:
latencies = test_gets('customers', num_customers, 3000)
print_percentiles(latencies)

In [None]:
latencies = test_gets('customers', 3000)
print_percentiles(latencies)