{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Automated data transformation and ingestion from an Amazon S3 bucket to SageMaker Feature Store"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Architecture Overview\n",
"This notebook shows you how to use [AWS Service Catalog](https://aws.amazon.com/servicecatalog), [SageMaker Projects](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-projects-whatis.html), and [Pipelines](https://aws.amazon.com/sagemaker/pipelines/) to create re-usable and portable components in SageMaker Studio.\n",
"\n",
"This project automates feature transformations and ingestion into [SageMaker Feature Store](https://aws.amazon.com/sagemaker/feature-store/), triggered off of new data files that are uploaded to an S3 bucket. The SageMaker project creates all necessary components, sets up all permissions and links between resources.\n",
"\n",
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Prerequisites\n",
"The following resources must be created before you can proceed with deployment of the SageMaker project:\n",
"- A Data Wrangler `.flow` file which contains an output node. The `.flow` file must be uploaded to a designated S3 prefix\n",
"- A Feature group to store features extracted from the data \n",
"- SageMaker project portfolio -> done with [intial setup](../README.md#deploy-sagemaker-project-portfolio)\n",
"- S3 bucket where new data files will be uploaded\n",
"\n",
"All these tasks are done in the [`00-setup` notebook](00-setup.ipynb). Please make sure you run through the setup notebook before running this one."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import sagemaker\n",
"import boto3\n",
"import time\n",
"import json\n",
"import os\n",
"from time import gmtime, strftime\n",
"from sagemaker.workflow.pipeline import Pipeline\n",
"from sagemaker.feature_store.feature_group import FeatureGroup\n",
"from sagemaker.session import Session\n",
"\n",
"print(sagemaker.__version__)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# load environment variables from %store\n",
"%store -r "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%store"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"try:\n",
" data_bucket\n",
" dw_flow_file_url\n",
" dw_output_name\n",
" feature_group_name\n",
" s3_fs_query_output_prefix\n",
" s3_data_prefix\n",
" s3_flow_prefix\n",
" abalone_dataset_local_url\n",
"except NameError:\n",
" print(\"+++++++++++++++++++++++++++++++++++++++++++++++\")\n",
" print(\"[ERROR] YOU HAVE TO RUN 00-setup.ipynb notebook\")\n",
" print(\"+++++++++++++++++++++++++++++++++++++++++++++++\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Set the string literals\n",
"s3_input_data_prefix = f\"{data_bucket}/feature-store-ingestion-pipeline/landing-zone/\"\n",
"pipeline_name_prefix = \"s3-fs-ingest-pipeline\"\n",
"\n",
"%store s3_input_data_prefix"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(\"Project parameters:\")\n",
"print(f\"S3 data prefix to monitor: {s3_input_data_prefix}\")\n",
"print(f\"Data Wrangler flow URL: {dw_flow_file_url}\")\n",
"print(f\"Data Wrangler output name: {dw_output_name}\")\n",
"print(f\"Feature group name: {feature_group_name}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create data load project\n",
"⭐ You can create a project in Studio IDE (Option 1) or programmatically directly in this notebook (Option 2). Option 2 is recommended as it requires no manual input. Option 1 is given to demostrate the UX for project parameters."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Option 1: Create a project in Studio\n",
"\n",
"1. Select **Projects** from **SageMaker resources** widget:\n",
"\n",
"\n",
"\n",
"2. Navigate to **Organization templates** and select a project template for automated transformation and ingestion pipeline. Click on **Select project template**:\n",
"\n",
"\n",
"\n",
"3. Enter the project parameters\n",
"\n",
"\n",
"The parameters are:\n",
"- **Project name and description**: provide your project name and description\n",
"- **Pipeline name prefix**: provide a prefix for the pipeline name or leave default\n",
"- **Pipeline description**: provide a description for your pipeline or leave default\n",
"- **S3 prefix**: set to the value of `s3_input_data_prefix` variable\n",
"- **Data Wrangler flow S3 url**: set to the value of `dw_flow_file_url` variable\n",
"- **Data Wrangler output name**: set to the value of `dw_output_name` variable\n",
"- **Feature group name**: set to the value of `feature_group_name` variable\n",
"- **Lambda execution role**: provide your own IAM role for the lambda function or leave at `Auto` to automatically create a new one\n",
"\n",
"Click on **Create project**\n",
"\n",
"
💡 Wait until project creation is completed \n",
"
\n",
"The banner \"Creating project...\":\n",
"\n",
"\n",
"\n",
"will change to the project details page:\n",
"\n",
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Get the name and id of the created project\n",
"\n",
"
💡 Run the following cells only if you use Option 1 - create a project in Studio IDE "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Get the latest created project\n",
"sm = boto3.client(\"sagemaker\")\n",
"r = sm.list_projects(SortBy=\"CreationTime\", SortOrder=\"Descending\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"if r.get(\"ProjectSummaryList\") is None or len(r.get(\"ProjectSummaryList\")) == 0:\n",
" raise Exception(\"[ERROR]: cannot retrieve the project list!\")\n",
" \n",
"if r[\"ProjectSummaryList\"][0][\"ProjectStatus\"] not in (\"CreateCompleted\"):\n",
" raise Exception(\"[ERROR]: wait until project creation is completed!\")\n",
"else:\n",
" project_name = r[\"ProjectSummaryList\"][0][\"ProjectName\"]\n",
" project_id = r[\"ProjectSummaryList\"][0][\"ProjectId\"]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### End of Option 1 section\n",
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Option 2: Create project in code - recommended\n",
"
💡 Skip this section if you created a project via Studio IDE "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can use [boto3 Python SDK](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.create_project) to create a new project from the notebook or any Python code.\n",
"First, get the `ProvisioningArtifactIds` and `ProductId` from service catalog CloudFormation template:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cf = boto3.client(\"cloudformation\")\n",
"\n",
"r = cf.describe_stacks(StackName=\"sm-project-sc-portfolio\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Set parameters for the SageMaker project:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sm = boto3.client(\"sagemaker\")\n",
"\n",
"provisioning_artifact_ids = [v for v in r[\"Stacks\"][0][\"Outputs\"] if v[\"OutputKey\"] == \"ProvisioningArtifactIds\"][0][\"OutputValue\"]\n",
"product_id = [v for v in r[\"Stacks\"][0][\"Outputs\"] if v[\"OutputKey\"] == \"ProductId\"][0][\"OutputValue\"]\n",
"project_name = f\"s3-fs-ingest-{strftime('%d-%H-%M-%S', gmtime())}\"\n",
"project_parameters = [\n",
" {\n",
" 'Key': 'PipelineDescription',\n",
" 'Value': 'Feature Store ingestion pipeline'\n",
" },\n",
" {\n",
" 'Key': 'DataWranglerFlowUrl',\n",
" 'Value': dw_flow_file_url\n",
" },\n",
" {\n",
" 'Key': 'DataWranglerOutputName',\n",
" 'Value': dw_output_name\n",
" },\n",
" {\n",
" 'Key': 'S3DataPrefix',\n",
" 'Value': s3_input_data_prefix\n",
" },\n",
" {\n",
" 'Key': 'FeatureGroupName',\n",
" 'Value': feature_group_name\n",
" },\n",
" {\n",
" 'Key': 'PipelineNamePrefix',\n",
" 'Value': pipeline_name_prefix\n",
" },\n",
" ]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, create a SageMaker project from the service catalog product template:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# create SageMaker project\n",
"r = sm.create_project(\n",
" ProjectName=project_name,\n",
" ProjectDescription=\"Feature Store ingestion from S3\",\n",
" ServiceCatalogProvisioningDetails={\n",
" 'ProductId': product_id,\n",
" 'ProvisioningArtifactId': provisioning_artifact_ids,\n",
" 'ProvisioningParameters': project_parameters\n",
" },\n",
")\n",
"\n",
"print(r)\n",
"project_id = r[\"ProjectId\"]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"
💡 Wait until project creation is completed \n",
"
"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### End of Option 2 section\n",
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Working with data ingestion project"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Project resources\n",
"The project template creates all necessary resources for an automated data transformation and ingestion:\n",
"- [EventBridge rule](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rules.html) for launching an AWS Lambda function whenever any new data is uploaded to the specified S3 prefix\n",
"- AWS Lambda function which launches the SageMaker pipeline\n",
"- SageMaker pipeline which runs a processing job using a DataWrangler processor\n",
"- DataWrangler processor which uses an uploaded `.flow` file with data transformation workflow\n",
"\n",
"### CodeCommit repository with seed code\n",
"All source code for pipeline creation and pipeline parameter configuration is delivered as an [AWS CodeCommit](https://aws.amazon.com/codecommit/) repository. The code is fully functional and works out-of-the-box. You own this code and can change any configuration or parameters of the pipeline according to your requirements.\n",
"\n",
"To start working with the code you must clone the repository into Studio user's home directory:\n",
"\n",
"\n",
"\n",
"You can make your changes to the source code and push it to the CodeCommit repository. The project also delivers an [AWS CodePipeline](https://aws.amazon.com/codepipeline/) CI/CD pipeline which launches an [AWS CodeBuild](https://aws.amazon.com/codebuild/) stage whenever there is a new commit in the repository. The build pulls the code from the repository and calls `create_pipeline` function (file `build.py`). You can change the existing or provide your own code in the `pipeline.create_pipeline` in the file `pipeline.py`. The default code configures a SageMaker pipeline with Data Wrangler processor and upserts the pipeline."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### SageMaker pipeline\n",
"The project delivers a SageMaker pipeline consisting of one processing step with Data Wrangler processor. The pipeline performs the transformation contained in a specified Data Wrangler `.flow` file and ingests the transformed features in a specified feature group in Feature Store.\n",
"This pipeline is launched by a Lambda function whenever there is a new file uploaded to the specified S3 location. The pipeline is linked to the project and available in the **Pipeline** tab of the project details page:\n",
"\n",
"\n",
"\n",
"From there you can see the pipeline graph, parameters, settings, and the execution history:\n",
"\n",
"\n",
"\n",
"You can also start a new execution manually from Studio by clicking on **Start an execution** and provide pipeline parameters:\n",
"\n",
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Test the automation pipeline\n",
"\n",
"To test the deployed data transformation and feature store ingestion pipeline, perform the following steps:\n",
"1. Upload a data file to the monitored S3 prefix location - this will launch the data transformation and ingestion via our data pipeline\n",
"1. Monitor the pipeline execution\n",
"1. Check the loaded data in the feature group"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Upload data to S3 bucket"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"⭐ The EventBridge rule monitors two S3 events: `PutObject` and `CompleteMultipartUpload`. If you copy an object between two S3 buckets, the EventBrige rule won't be launched."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The following s3 `PUT` event will launch the Lambda function, which will start a new pipeline execution:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"file_name = f\"abalone-{strftime('%d-%H-%M-%S', gmtime())}.csv\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!aws s3 cp {abalone_dataset_local_url} s3://{s3_input_data_prefix}{file_name}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Monitor pipeline execution"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"try:\n",
" project_id\n",
" project_name\n",
"except NameError:\n",
" raise Exception(\"[ERROR]: project_id or project_name variables are not set\")\n",
" \n",
"if project_id is None or project_name is None:\n",
" raise Exception(\"[ERROR]: project_id or project_name variables are not set\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Get the the project data\n",
"r = sm.describe_project(ProjectName=project_name)\n",
"\n",
"# Get the pipeline prefix from the project parameters\n",
"pipeline_name_prefix = [p for p in r[\"ServiceCatalogProvisioningDetails\"][\"ProvisioningParameters\"] if p[\"Key\"] == \"PipelineNamePrefix\"][0][\"Value\"]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pipeline_name_prefix"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# set the pipeline name\n",
"s3_to_fs_pipeline_name = f\"{pipeline_name_prefix}-{project_id}\"\n",
"\n",
"%store s3_to_fs_pipeline_name"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# check pipeline execution \n",
"summaries = sm.list_pipeline_executions(PipelineName=s3_to_fs_pipeline_name).get('PipelineExecutionSummaries')\n",
"summaries"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"latest_execution = sm.list_pipeline_executions(PipelineName=s3_to_fs_pipeline_name).get('PipelineExecutionSummaries')[0].get('PipelineExecutionArn')\n",
"print (latest_execution)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Wait for pipeline execution to complete 'Executing' status\n",
"while sm.describe_pipeline_execution(PipelineExecutionArn=latest_execution)[\"PipelineExecutionStatus\"] == \"Executing\":\n",
" print('Pipeline is in Executing status...')\n",
" time.sleep(30)\n",
" \n",
"print('Pipeline is done Executing')\n",
"print(sm.describe_pipeline_execution(PipelineExecutionArn=latest_execution))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Alternatively, you can monitor the pipeline execution inside the Pipeline widget of Studio:\n",
"\n",
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Check the loaded data\n",
"Once the execution completes, we can check that the data is loaded into the feature group."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create a feature group object:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"feature_store_session = Session()\n",
"\n",
"feature_group = FeatureGroup(\n",
" name=feature_group_name, \n",
" sagemaker_session=feature_store_session\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Build SQL query to features group\n",
"fs_query = feature_group.athena_query()\n",
"\n",
"query_string = f'SELECT * FROM \"{fs_query.table_name}\"'\n",
"print(f'Prepared query {query_string}')\n",
"print(fs_query)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Run Athena query. The output is loaded to a Pandas dataframe.\n",
"fs_query.run(\n",
" query_string=query_string, \n",
" output_location=f\"s3://{s3_fs_query_output_prefix}\"\n",
")\n",
"\n",
"fs_query.wait()\n",
"data_df = fs_query.as_dataframe()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The `DataFrame` contains now all features from the feature group:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"data_df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Start pipeline run via SDK\n",
"You can start the data transformation and ingestion pipeline on demand using [SageMaker SDK](https://sagemaker.readthedocs.io/en/v2.57.0/workflows/pipelines/index.html). `pipeline.start` function allows you to provide parameter values to override the default value for the pipeline execution. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# get Pipeline object\n",
"pipeline = Pipeline(name=s3_to_fs_pipeline_name)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# start execution with the specified parameters\n",
"execution = pipeline.start(\n",
" parameters=dict(\n",
" InputDataUrl=f\"s3://{s3_input_data_prefix}{abalone_dataset_file_name}\",\n",
" InputFlowUrl=dw_flow_file_url,\n",
" FlowOutputName=dw_output_name,\n",
" FeatureGroupName=feature_group_name\n",
" )\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"execution.wait()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"execution.list_steps()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Change the default values for pipeline parameters\n",
"To change the default values for the parameters, you can edit `pipeline.py` file with pipeline and parameter definition code:\n",
"```python\n",
" # setup pipeline parameters\n",
" p_processing_instance_count = ParameterInteger(\n",
" name=\"ProcessingInstanceCount\",\n",
" default_value=1\n",
" )\n",
" p_processing_instance_type = ParameterString(\n",
" name=\"ProcessingInstanceType\",\n",
" default_value=\"ml.m5.4xlarge\"\n",
" )\n",
" p_processing_volume_size = ParameterInteger(\n",
" name=\"ProcessingVolumeSize\",\n",
" default_value=50\n",
" )\n",
" p_flow_output_name = ParameterString(\n",
" name='FlowOutputName',\n",
" default_value=flow_output_name\n",
" )\n",
" p_input_flow = ParameterString(\n",
" name='InputFlowUrl',\n",
" default_value=data_wrangler_flow_s3_url\n",
" )\n",
" p_input_data = ParameterString(\n",
" name=\"InputDataUrl\",\n",
" default_value=input_data_s3_url\n",
" )\n",
" p_feature_group_name = ParameterString(\n",
" name=\"FeatureGroupName\",\n",
" default_value=feature_group_name\n",
" )\n",
"```\n",
"\n",
"The CI/CD CodePipeline pipeline will be automatically started after you commit and push the changes into the project's source code repository."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Release resources"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%html\n",
"\n",
"
Shutting down your kernel for this notebook to release resources.