{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Module 5: Materialize Offline features to Online Store\n", "---\n", "\n", "**Note:** Please set kernel to `Python 3 (Data Science)` and select instance to `ml.t3.medium`\n", "# Content\n", "1. [Background](#Background)\n", "1. [Setup](#Setup)\n", "1. [Create Feature Group](#Create-Feature-Group)\n", "1. [Ingest Data to the Offline Store](#Ingest-Data-to-the-Offline-Store)\n", "1. [Materialize Latest Features to Online Store](#Materialize-Latest-Features-to-Online-Store)\n", "\n", "\n", "# Background\n", "\n", "In this example, we demonstrate how customers can use the [Feature Store Spark Connector](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-ingestion-spark-connector-setup.html) to ingest features directly to the offline store, and incrementally materialize the latest features to the online store.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Feature Group\n", "\n", "First, create a feature group with online and offline stores configured." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Setup" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "import sagemaker\n", "import boto3" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Essentials" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sm_client = boto3.client('sagemaker')\n", "sagemaker_session = sagemaker.session.Session()\n", "role = sagemaker.get_execution_role()\n", "region_name = sagemaker_session.boto_region_name\n", "default_bucket = sagemaker_session.default_bucket()\n", "prefix = 'sagemaker-feature-store'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Create Feature Group\n", "\n", "First, create a feature group with online and offline stores configured." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feature_group_name = 'feature-store-offline-to-online-example'" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "We highly recommend storing offline features using the Apache Iceberg table format. By combining Iceberg and its table maintenance operations such as compaction, customer will benefit from faster query performance when working with offline feature groups at scale and as a result help customer build training dataset faster. \n", "\n", "If you need to use the Glue table format, please update the variable below to `'Glue'`. For more information on offline store formats, please refer to the [documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store-offline.html)." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "table_format = 'Iceberg' # or 'Glue'" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "sm_client.create_feature_group(\n", " FeatureGroupName=feature_group_name,\n", " RecordIdentifierFeatureName='RecordIdentifier',\n", " EventTimeFeatureName='EventTime',\n", " OnlineStoreConfig={\n", " 'EnableOnlineStore': True\n", " },\n", " OfflineStoreConfig={\n", " 'S3StorageConfig': {\n", " 'S3Uri': f's3://{default_bucket}/{prefix}'\n", " },\n", " 'TableFormat': table_format\n", " },\n", " FeatureDefinitions=[\n", " {\n", " 'FeatureName': 'RecordIdentifier',\n", " 'FeatureType': 'Integral'\n", " },\n", " {\n", " 'FeatureName': 'Measure',\n", " 'FeatureType': 'Fractional'\n", " },\n", " {\n", " 'FeatureName': 'EventTime',\n", " 'FeatureType': 'String'\n", " }\n", " ],\n", " RoleArn=role\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# Ingest Data to the Offline Store\n", "\n", "We will create a [SageMaker Processing Job](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html) which uses the Feature Store Spark Connector to ingests a set of features directly into the offline store.\n", "\n", "To use the Feature Store Spark Connector in a Processing Job, we recommend extending the prebuilt SageMaker Spark Processing container as shown in the [documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-ingestion-spark-connector-setup.html#:~:text=Installation%20on%20a%20Amazon%20SageMaker%20Processing%20Job\n", "). For this example, we will install the Spark Connector to a local directory and submit the required modules and Jar file when we run the processing job." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "spark_version = '3.1' # MAJOR.MINOR" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Install the Spark Connector under `./temp`." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%pip install sagemaker-feature-store-pyspark-{spark_version} -t ./temp --no-binary :all:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Zip up the required Python modules." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import zipfile\n", "import os\n", "\n", "zf = zipfile.ZipFile('feature_store_pyspark.zip', 'w', zipfile.ZIP_DEFLATED)\n", "\n", "for f in os.listdir('./temp/feature_store_pyspark'):\n", " if f.endswith('.py'):\n", " zf.write(os.path.join('./temp/feature_store_pyspark', f), os.path.join('feature_store_pyspark', f))\n", "\n", "zf.close()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Use `feature_store_pyspark.classpath_jars()` to get the absolute path to the Jar file." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from temp import feature_store_pyspark\n", "\n", "jar_path = feature_store_pyspark.classpath_jars()[0]\n", "jar_path" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Run a processing job using `scripts/ingest_to_offline_only.py` and include the zipped Python modules and Jar file." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.spark.processing import PySparkProcessor\n", "\n", "spark_processor = PySparkProcessor(\n", " role=role,\n", " instance_count=1,\n", " instance_type='ml.m5.large',\n", " max_runtime_in_seconds=1200,\n", " framework_version=spark_version,\n", ")\n", "\n", "spark_processor.run(\n", " submit_app='./scripts/ingest_to_offline_only.py',\n", " arguments=[\n", " '--feature_group_name',\n", " feature_group_name,\n", " '--region_name',\n", " region_name\n", " ],\n", " logs=False,\n", " submit_jars=[jar_path],\n", " submit_py_files=[\n", " './feature_store_pyspark.zip'\n", " ]\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Materialize Latest Features to Online Store\n", "\n", "Now that our features are ingested to offline store, we can materialize the latest features (for each record identifier) to the online store. To do this, we we will run another Spark Processing Job using `scripts/materialize_to_online.py`. Since the task may need to run on a regular cadence, we can add the processing job to a SageMaker Pipeline. This pipeline can then be scheduled with [Amazon EventBridge](https://docs.aws.amazon.com/sagemaker/latest/dg/pipeline-eventbridge.html)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.pipeline_context import PipelineSession\n", "from sagemaker.workflow.pipeline import Pipeline\n", "from sagemaker.workflow.steps import ProcessingStep\n", "\n", "pipeline_name = feature_group_name\n", "pipeline_session = PipelineSession()\n", "\n", "spark_processor = PySparkProcessor(\n", " role=role,\n", " instance_count=1,\n", " instance_type='ml.m5.large',\n", " max_runtime_in_seconds=1200,\n", " sagemaker_session=pipeline_session,\n", " framework_version=spark_version\n", ")\n", "\n", "processor_args = spark_processor.run(\n", " submit_app='./scripts/materialize_to_online.py',\n", " logs=False,\n", " arguments = [\n", " '--table_format',\n", " table_format,\n", " '--feature_group_name',\n", " feature_group_name,\n", " '--region_name',\n", " region_name\n", " ],\n", " submit_jars=[\n", " jar_path\n", " ],\n", " submit_py_files=[\n", " './feature_store_pyspark.zip'\n", " ]\n", ")\n", "\n", "step_process = ProcessingStep(name='MaterializeToOnlineStore', step_args=processor_args)\n", "\n", "pipeline = Pipeline(\n", " name=pipeline_name,\n", " steps=[step_process],\n", ")\n", "\n", "pipeline.upsert(role_arn=role)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Manually run the pipeline." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "execution = pipeline.start()\n", "execution.wait()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Verify that the latest features are available in the online store." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fs_client = boto3.client('sagemaker-featurestore-runtime')\n", "fs_client.batch_get_record(\n", " Identifiers=[\n", " {\n", " 'FeatureGroupName': feature_group_name,\n", " 'RecordIdentifiersValueAsString': ['1', '2', '3']\n", " }\n", " ]\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# Cleanup" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline.delete()" ] } ], "metadata": { "availableInstances": [ { "_defaultOrder": 0, "_isFastLaunch": true, "category": "General purpose", "gpuNum": 0, "memoryGiB": 4, "name": "ml.t3.medium", "vcpuNum": 2 }, { "_defaultOrder": 1, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 8, "name": "ml.t3.large", "vcpuNum": 2 }, { "_defaultOrder": 2, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 16, "name": "ml.t3.xlarge", "vcpuNum": 4 }, { "_defaultOrder": 3, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 32, "name": "ml.t3.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 4, "_isFastLaunch": true, "category": "General purpose", "gpuNum": 0, "memoryGiB": 8, "name": "ml.m5.large", "vcpuNum": 2 }, { "_defaultOrder": 5, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 16, "name": "ml.m5.xlarge", "vcpuNum": 4 }, { "_defaultOrder": 6, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 32, "name": "ml.m5.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 7, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 64, "name": "ml.m5.4xlarge", "vcpuNum": 16 }, { "_defaultOrder": 8, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 128, "name": "ml.m5.8xlarge", "vcpuNum": 32 }, { "_defaultOrder": 9, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 192, "name": "ml.m5.12xlarge", "vcpuNum": 48 }, { "_defaultOrder": 10, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 256, "name": "ml.m5.16xlarge", "vcpuNum": 64 }, { "_defaultOrder": 11, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 384, "name": "ml.m5.24xlarge", "vcpuNum": 96 }, { "_defaultOrder": 12, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 8, "name": "ml.m5d.large", "vcpuNum": 2 }, { "_defaultOrder": 13, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 16, "name": "ml.m5d.xlarge", "vcpuNum": 4 }, { "_defaultOrder": 14, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 32, "name": "ml.m5d.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 15, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 64, "name": "ml.m5d.4xlarge", "vcpuNum": 16 }, { "_defaultOrder": 16, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 128, "name": "ml.m5d.8xlarge", "vcpuNum": 32 }, { "_defaultOrder": 17, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 192, "name": "ml.m5d.12xlarge", "vcpuNum": 48 }, { "_defaultOrder": 18, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 256, "name": "ml.m5d.16xlarge", "vcpuNum": 64 }, { "_defaultOrder": 19, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 384, "name": "ml.m5d.24xlarge", "vcpuNum": 96 }, { "_defaultOrder": 20, "_isFastLaunch": true, "category": "Compute optimized", "gpuNum": 0, "memoryGiB": 4, "name": "ml.c5.large", "vcpuNum": 2 }, { "_defaultOrder": 21, "_isFastLaunch": false, "category": "Compute optimized", "gpuNum": 0, "memoryGiB": 8, "name": "ml.c5.xlarge", "vcpuNum": 4 }, { "_defaultOrder": 22, "_isFastLaunch": false, "category": "Compute optimized", "gpuNum": 0, "memoryGiB": 16, "name": "ml.c5.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 23, "_isFastLaunch": false, "category": "Compute optimized", "gpuNum": 0, "memoryGiB": 32, "name": "ml.c5.4xlarge", "vcpuNum": 16 }, { "_defaultOrder": 24, "_isFastLaunch": false, "category": "Compute optimized", "gpuNum": 0, "memoryGiB": 72, "name": "ml.c5.9xlarge", "vcpuNum": 36 }, { "_defaultOrder": 25, "_isFastLaunch": false, "category": "Compute optimized", "gpuNum": 0, "memoryGiB": 96, "name": "ml.c5.12xlarge", "vcpuNum": 48 }, { "_defaultOrder": 26, "_isFastLaunch": false, "category": "Compute optimized", "gpuNum": 0, "memoryGiB": 144, "name": "ml.c5.18xlarge", "vcpuNum": 72 }, { "_defaultOrder": 27, "_isFastLaunch": false, "category": "Compute optimized", "gpuNum": 0, "memoryGiB": 192, "name": "ml.c5.24xlarge", "vcpuNum": 96 }, { "_defaultOrder": 28, "_isFastLaunch": true, "category": "Accelerated computing", "gpuNum": 1, "memoryGiB": 16, "name": "ml.g4dn.xlarge", "vcpuNum": 4 }, { "_defaultOrder": 29, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "memoryGiB": 32, "name": "ml.g4dn.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 30, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "memoryGiB": 64, "name": "ml.g4dn.4xlarge", "vcpuNum": 16 }, { "_defaultOrder": 31, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "memoryGiB": 128, "name": "ml.g4dn.8xlarge", "vcpuNum": 32 }, { "_defaultOrder": 32, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 4, "memoryGiB": 192, "name": "ml.g4dn.12xlarge", "vcpuNum": 48 }, { "_defaultOrder": 33, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "memoryGiB": 256, "name": "ml.g4dn.16xlarge", "vcpuNum": 64 }, { "_defaultOrder": 34, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "memoryGiB": 61, "name": "ml.p3.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 35, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 4, "memoryGiB": 244, "name": "ml.p3.8xlarge", "vcpuNum": 32 }, { "_defaultOrder": 36, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 8, "memoryGiB": 488, "name": "ml.p3.16xlarge", "vcpuNum": 64 }, { "_defaultOrder": 37, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 8, "memoryGiB": 768, "name": "ml.p3dn.24xlarge", "vcpuNum": 96 }, { "_defaultOrder": 38, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "memoryGiB": 16, "name": "ml.r5.large", "vcpuNum": 2 }, { "_defaultOrder": 39, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "memoryGiB": 32, "name": "ml.r5.xlarge", "vcpuNum": 4 }, { "_defaultOrder": 40, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "memoryGiB": 64, "name": "ml.r5.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 41, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "memoryGiB": 128, "name": "ml.r5.4xlarge", "vcpuNum": 16 }, { "_defaultOrder": 42, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "memoryGiB": 256, "name": "ml.r5.8xlarge", "vcpuNum": 32 }, { "_defaultOrder": 43, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "memoryGiB": 384, "name": "ml.r5.12xlarge", "vcpuNum": 48 }, { "_defaultOrder": 44, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "memoryGiB": 512, "name": "ml.r5.16xlarge", "vcpuNum": 64 }, { "_defaultOrder": 45, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "memoryGiB": 768, "name": "ml.r5.24xlarge", "vcpuNum": 96 }, { "_defaultOrder": 46, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "memoryGiB": 16, "name": "ml.g5.xlarge", "vcpuNum": 4 }, { "_defaultOrder": 47, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "memoryGiB": 32, "name": "ml.g5.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 48, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "memoryGiB": 64, "name": "ml.g5.4xlarge", "vcpuNum": 16 }, { "_defaultOrder": 49, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "memoryGiB": 128, "name": "ml.g5.8xlarge", "vcpuNum": 32 }, { "_defaultOrder": 50, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "memoryGiB": 256, "name": "ml.g5.16xlarge", "vcpuNum": 64 }, { "_defaultOrder": 51, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 4, "memoryGiB": 192, "name": "ml.g5.12xlarge", "vcpuNum": 48 }, { "_defaultOrder": 52, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 4, "memoryGiB": 384, "name": "ml.g5.24xlarge", "vcpuNum": 96 }, { "_defaultOrder": 53, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 8, "memoryGiB": 768, "name": "ml.g5.48xlarge", "vcpuNum": 192 } ], "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "base", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.13" } }, "nbformat": 4, "nbformat_minor": 4 }