## Amazon Sagemaker Feature Store


In [None]:
import io
import pandas as pd
import time
import sagemaker
import boto3
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker import get_execution_role

role = get_execution_role()
sess = sagemaker.Session()
bucket = sess.default_bucket()
region = sagemaker.Session().boto_region_name

# read the prepared data from S3. Enter the S3 URI value. Example S3 URI: s3://sagemaker-ap-south-1-367858208265/data-preparation-using-amazon-sagemaker-and-glue-databrew/Results/DataWrangler/output_1637236520/part-00000-5f963eac-eb6b-4af8-aed3-d777ec46c79a-c000.csv
source = '{S3 URI of csv file}'
s3Bucket = source.replace('s3://', '')
s3Key = s3Bucket[s3Bucket.find('/')+1:]
s3Bucket = s3Bucket[:s3Bucket.find('/')]
s3Client = boto3.client('s3')
s3Obj = s3Client.get_object(Bucket = s3Bucket, Key = s3Key)
df = pd.read_csv(io.BytesIO(s3Obj['Body'].read()))


When creating a feature group, you can also create the metadata for the feature group, such as a short description, storage configuration, features for identifying each record, and the event time, as well as tags to store information such as the author, data source, version, and more. Since we do not have any such column, we are adding two extra columns called Fraud_ID and Fraud_time

In [None]:
#Add unique ID and event time for features store

df['Fraud_ID'] = df.index + 1000
current_time_sec = int(round(time.time()))
df['Fraud_time'] = pd.Series([current_time_sec]*len(df), dtype="float64")
df=df.drop(['_c0'],axis=1)
df.head()

In [None]:
# initialize boto3 client

boto3.setup_default_session(region_name=region)
s3_client = boto3.client("s3", region_name=region)

### Configure the feature groups
The datatype for each feature is set by passing a dataframe and inferring the proper datatype. Feature data types can also be set via a config variable, but it will have to match the correspongin Python data type in the Pandas dataframe when it’s ingested to the Feature Group.

In [None]:
#configure the features

fraud_fg_name = f"auto-fraud"
fraud_feature_group = FeatureGroup(name=fraud_fg_name, sagemaker_session=sess)
#fraud_feature_group.delete()
fraud_feature_group.load_feature_definitions(data_frame=df)

### Create the feature groups
You must tell the Feature Group which columns in the dataframe correspond to the required record indentifier and event time features.

In [None]:
record_identifier_feature_name = "Fraud_ID"
event_time_feature_name = "Fraud_time"
fraud_feature_group_s3_prefix="data-preparation-using-amazon-sagemaker-and-glue-databrew/FeatureStore"

try:
 fraud_feature_group.create(
 s3_uri=f"s3://{bucket}/{fraud_feature_group_s3_prefix}",
 record_identifier_name=record_identifier_feature_name,
 event_time_feature_name=event_time_feature_name,
 role_arn=role,
 enable_online_store=True,
 )
 print(f'Create "fraud" feature group: SUCCESS')
except Exception as e:
 code = e.response.get("Error").get("Code")
 if code == "ResourceInUse":
 print(f"Using existing feature group: {fraud_fg_name}")
 else:
 raise (e)

### Wait until feature group creation has fully completed

In [None]:
def wait_for_feature_group_creation_complete(feature_group):
 status = feature_group.describe().get("FeatureGroupStatus")
 while status == "Creating":
 print("Waiting for Feature Group Creation")
 time.sleep(5)
 status = feature_group.describe().get("FeatureGroupStatus")
 if status != "Created":
 raise RuntimeError(f"Failed to create feature group {feature_group.name}")
 print(f"FeatureGroup {feature_group.name} successfully created.")


wait_for_feature_group_creation_complete(feature_group=fraud_feature_group)

### Ingest records into the Feature Groups
After the Feature Groups have been created, we can put data into each store by using the PutRecord API. This API can handle high TPS and is designed to be called by different streams. The data from all of these Put requests is buffered and written to s3 in chunks. The files will be written to the offline store within a few minutes of ingestion.

In [None]:
fraud_feature_group.ingest(data_frame=df, max_workers=3, wait=True)

### Wait for offline store data to become available
This usually takes 5-8 minutes

In [None]:
#The FeatureGroup contains an OnlineStoreConfig and an OfflineStoreConfig controlling where the data is stored. 

fraud_feature_group_resolved_output_s3_uri = (
 fraud_feature_group.describe()
 .get("OfflineStoreConfig")
 .get("S3StorageConfig")
 .get("ResolvedOutputS3Uri")
)

fraud_feature_group_s3_prefix = fraud_feature_group_resolved_output_s3_uri.replace(
 f"s3://{bucket}/", ""
)

In [None]:
offline_store_contents = None
while offline_store_contents is None:
 objects_in_bucket = s3_client.list_objects(
 Bucket=bucket, Prefix=fraud_feature_group_s3_prefix
 )
 if "Contents" in objects_in_bucket and len(objects_in_bucket["Contents"]) > 1:
 offline_store_contents = objects_in_bucket["Contents"]
 else:
 print("Waiting for data in offline store...")
 time.sleep(60)

print("\nData is available now.")