# Module 5: Materialize Offline features to Online Store
---

**Note:** Please set kernel to `Python 3 (Data Science)` and select instance to `ml.t3.medium`
# Content
1. [Background](#Background)
1. [Setup](#Setup)
1. [Create Feature Group](#Create-Feature-Group)
1. [Ingest Data to the Offline Store](#Ingest-Data-to-the-Offline-Store)
1. [Materialize Latest Features to Online Store](#Materialize-Latest-Features-to-Online-Store)


# Background

In this example, we demonstrate how customers can use the [Feature Store Spark Connector](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-ingestion-spark-connector-setup.html) to ingest features directly to the offline store, and incrementally materialize the latest features to the online store.


### Create Feature Group

First, create a feature group with online and offline stores configured.

# Setup

In [None]:
import sagemaker
import boto3

#### Essentials

In [None]:
sm_client = boto3.client('sagemaker')
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
region_name = sagemaker_session.boto_region_name
default_bucket = sagemaker_session.default_bucket()
prefix = 'sagemaker-feature-store'

# Create Feature Group

First, create a feature group with online and offline stores configured.

In [None]:
feature_group_name = 'feature-store-offline-to-online-example'

We highly recommend storing offline features using the Apache Iceberg table format. By combining Iceberg and its table maintenance operations such as compaction, customer will benefit from faster query performance when working with offline feature groups at scale and as a result help customer build training dataset faster. 

If you need to use the Glue table format, please update the variable below to `'Glue'`. For more information on offline store formats, please refer to the [documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store-offline.html).

In [None]:
table_format = 'Iceberg' # or 'Glue'

In [None]:
sm_client.create_feature_group(
 FeatureGroupName=feature_group_name,
 RecordIdentifierFeatureName='RecordIdentifier',
 EventTimeFeatureName='EventTime',
 OnlineStoreConfig={
 'EnableOnlineStore': True
 },
 OfflineStoreConfig={
 'S3StorageConfig': {
 'S3Uri': f's3://{default_bucket}/{prefix}'
 },
 'TableFormat': table_format
 },
 FeatureDefinitions=[
 {
 'FeatureName': 'RecordIdentifier',
 'FeatureType': 'Integral'
 },
 {
 'FeatureName': 'Measure',
 'FeatureType': 'Fractional'
 },
 {
 'FeatureName': 'EventTime',
 'FeatureType': 'String'
 }
 ],
 RoleArn=role
)

# Ingest Data to the Offline Store

We will create a [SageMaker Processing Job](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html) which uses the Feature Store Spark Connector to ingests a set of features directly into the offline store.

To use the Feature Store Spark Connector in a Processing Job, we recommend extending the prebuilt SageMaker Spark Processing container as shown in the [documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-ingestion-spark-connector-setup.html#:~:text=Installation%20on%20a%20Amazon%20SageMaker%20Processing%20Job
). For this example, we will install the Spark Connector to a local directory and submit the required modules and Jar file when we run the processing job.

In [None]:
spark_version = '3.1' # MAJOR.MINOR

Install the Spark Connector under `./temp`.

In [None]:
%pip install sagemaker-feature-store-pyspark-{spark_version} -t ./temp --no-binary :all:

Zip up the required Python modules.

In [None]:
import zipfile
import os

zf = zipfile.ZipFile('feature_store_pyspark.zip', 'w', zipfile.ZIP_DEFLATED)

for f in os.listdir('./temp/feature_store_pyspark'):
 if f.endswith('.py'):
 zf.write(os.path.join('./temp/feature_store_pyspark', f), os.path.join('feature_store_pyspark', f))

zf.close()

Use `feature_store_pyspark.classpath_jars()` to get the absolute path to the Jar file.

In [None]:
from temp import feature_store_pyspark

jar_path = feature_store_pyspark.classpath_jars()[0]
jar_path

Run a processing job using `scripts/ingest_to_offline_only.py` and include the zipped Python modules and Jar file.

In [None]:
from sagemaker.spark.processing import PySparkProcessor

spark_processor = PySparkProcessor(
 role=role,
 instance_count=1,
 instance_type='ml.m5.large',
 max_runtime_in_seconds=1200,
 framework_version=spark_version,
)

spark_processor.run(
 submit_app='./scripts/ingest_to_offline_only.py',
 arguments=[
 '--feature_group_name',
 feature_group_name,
 '--region_name',
 region_name
 ],
 logs=False,
 submit_jars=[jar_path],
 submit_py_files=[
 './feature_store_pyspark.zip'
 ]
)

### Materialize Latest Features to Online Store

Now that our features are ingested to offline store, we can materialize the latest features (for each record identifier) to the online store. To do this, we we will run another Spark Processing Job using `scripts/materialize_to_online.py`. Since the task may need to run on a regular cadence, we can add the processing job to a SageMaker Pipeline. This pipeline can then be scheduled with [Amazon EventBridge](https://docs.aws.amazon.com/sagemaker/latest/dg/pipeline-eventbridge.html).

In [None]:
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep

pipeline_name = feature_group_name
pipeline_session = PipelineSession()

spark_processor = PySparkProcessor(
 role=role,
 instance_count=1,
 instance_type='ml.m5.large',
 max_runtime_in_seconds=1200,
 sagemaker_session=pipeline_session,
 framework_version=spark_version
)

processor_args = spark_processor.run(
 submit_app='./scripts/materialize_to_online.py',
 logs=False,
 arguments = [
 '--table_format',
 table_format,
 '--feature_group_name',
 feature_group_name,
 '--region_name',
 region_name
 ],
 submit_jars=[
 jar_path
 ],
 submit_py_files=[
 './feature_store_pyspark.zip'
 ]
)

step_process = ProcessingStep(name='MaterializeToOnlineStore', step_args=processor_args)

pipeline = Pipeline(
 name=pipeline_name,
 steps=[step_process],
)

pipeline.upsert(role_arn=role)


Manually run the pipeline.

In [None]:
execution = pipeline.start()
execution.wait()

Verify that the latest features are available in the online store.

In [None]:
fs_client = boto3.client('sagemaker-featurestore-runtime')
fs_client.batch_get_record(
 Identifiers=[
 {
 'FeatureGroupName': feature_group_name,
 'RecordIdentifiersValueAsString': ['1', '2', '3']
 }
 ]
)

# Cleanup

In [None]:
pipeline.delete()