{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Automating feature transformations with SageMaker Data Wrangler, Pipelines, and Feature Store\n", "\n", "This notebook shows you how to create a SageMaker Pipeline along with an AWS Lambda function to automate feature transformations and ingestion into Feature Store, triggered off of new data files that are uploaded to S3. It assumes that you already have already created a Data Wrangler `.flow` file, which is done within module 05 (`05-module-scalable-batch-ingestion`) of this training workshop.\n", "\n", "The notebook has three main sections:\n", "1. General setup\n", "2.\tCreating a SageMaker Pipeline which:\n", " - Performs the transformations contained in a Data Wrangler `.flow` file stored in Amazon S3 using a SageMaker Processing Job \n", " - Stores the transformed features in the Amazon SageMaker Feature Store\n", "3.\tCreating an AWS Lambda function which:\n", " - Is triggered whenever any new data is uploaded to S3\n", " - Updates the `.flow` file to reference the new dataset\n", " - Triggers the SageMaker Pipeline with the new `.flow` file\n", " \n", "**Note:** Please set kernel to `Python 3 (Data Science)` and select instance to `ml.t3.medium`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, we need to copy these variables from the Data Wrangler generated notebook from the previous step: " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# SageMaker Python SDK version 2.x is required\n", "import sagemaker\n", "import subprocess\n", "import sys\n", "import os\n", "import uuid\n", "import json\n", "import time\n", "import boto3\n", "from zipfile import ZipFile\n", "import inspect\n", "\n", "#module containing utility functions for this notebook\n", "import pipeline_utils\n", "\n", "original_version = sagemaker.__version__\n", "if sagemaker.__version__ < \"2.48.1\":\n", " subprocess.check_call(\n", " [sys.executable, \"-m\", \"pip\", \"install\", \"sagemaker==2.48.1\"]\n", " )\n", " import importlib\n", " importlib.reload(sagemaker)\n", " \n", "# S3 bucket for saving processing job outputs\n", "# Feel free to specify a different bucket here if you wish.\n", "sess = sagemaker.Session()\n", "bucket = sess.default_bucket()\n", "sm_client = boto3.client('sagemaker')\n", "iam_role = sagemaker.get_execution_role()\n", "region = sess.boto_region_name" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('SageMaker version: ' + sagemaker.__version__)\n", "print(f'Default bucket: {bucket}')\n", "print(f'IAM Role: {iam_role}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Let's define bunch of useful prefix vars for Feature Store and Data Wrangler" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Location of data files \n", "workshop_prefix = 'fscw'\n", "s3_feature_store_workshop_prefix = f'sagemaker-feature-store/{workshop_prefix}'\n", "\n", "s3_data_wrangler_flow_files_prefix = f'{s3_feature_store_workshop_prefix}/data_wrangler_flows'\n", "data_wrangler_flow_file = 'DWF-Orders.flow'\n", "\n", "# Flow URI is taken from DataWrangler Flow UI previously created\n", "s3_data_wrangler_flow_uri=f's3://{bucket}/{s3_data_wrangler_flow_files_prefix}/{data_wrangler_flow_file}'\n", "print(f's3_data_wrangler_flow_uri: {s3_data_wrangler_flow_uri}')\n", "\n", "s3_data_landing_zone_prefix = f'{s3_feature_store_workshop_prefix}/data_landing_zone'\n", "print(f's3_data_landing_zone_prefix: {s3_data_landing_zone_prefix}')\n", "\n", "s3_feature_store_data_files_prefix = f'{s3_feature_store_workshop_prefix}/data'\n", "fs_orders_file = 'orders.csv'\n", "\n", "s3_feature_store_orders_file = f'{s3_feature_store_data_files_prefix}/{fs_orders_file}'\n", "print(f's3_feature_store_orders_file: {s3_feature_store_orders_file}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### NOTE: The \"%store -r\" command reads variable values set by the previous notebook outputs\n", "\n", "This method ensures that we get the correct current value for critical variables like the Feature Group name." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Read the name of the Feature Group created in previous notebook\n", "# For example: 'fscw-orders-07-06-19-19'\n", "\n", "%store -r feature_group_name\n", "print(feature_group_name)\n", "\n", "# DataWrangler Flow file output name is taken from the auto-generated value in previous notebook\n", "\n", "%store -r output_name\n", "print(output_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Locate Feature Groups created in previous notebooks\n", "\n", "Several feature groups were created by the Jupyter notebook in module-5 \n", "notebook" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Now let's lookup feature group names that start with our special prefix\n", "filter_subword = workshop_prefix\n", "fg_summaries = sm_client.list_feature_groups(NameContains=filter_subword)['FeatureGroupSummaries']\n", "\n", "for fg in fg_summaries:\n", " print(fg['FeatureGroupName']) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Let's lookup the schema for the FeatureGroups we want to load.\n", "\n", "We can extract this information from the Feature Store via the `describe_feature_group` method which returns a list of `FeatureDefinitions` that contains the column name and data type. In our case, we want to load the `orders` feature group which we reference by name." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "order_feature_defs = sm_client.describe_feature_group(\n", " FeatureGroupName=feature_group_name)['FeatureDefinitions']\n", "\n", "order_feature_defs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Collect list of column names from Feature Definition\n", "order_column_names = []\n", "for feature_def in order_feature_defs:\n", " order_column_names.append(feature_def['FeatureName'])\n", "\n", "order_column_names" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query_columns = \", \".join(order_column_names)\n", "print(query_columns)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Query Historical Records in Feature Store\n", "First, let's ensure that the records we processed from the original processing job made it into the Feature Store successfully. Note that once the job is complete, it can still take several minutes for all records to be replicated from the online store to the offline store." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def get_resolved_s3uri_base(fg_name):\n", " fg_config = sm_client.describe_feature_group(FeatureGroupName=fg_name)\n", " s3_uri_full = fg_config['OfflineStoreConfig']['S3StorageConfig']['ResolvedOutputS3Uri']\n", " s3_uri_base, last = os.path.split(s3_uri_full)\n", " print (f'Found Resolved S3 Uri Base: {s3_uri_base}')\n", " return s3_uri_base\n", "\n", "orders_fg_s3_path = get_resolved_s3uri_base(feature_group_name)\n", "print(f'Orders Feature Group S3 path to Offline data: \\n {orders_fg_s3_path}')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = pipeline_utils.get_offline_store_data(feature_group_name, \n", " s3_uri=orders_fg_s3_path, column_list=query_columns)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(df.shape)\n", "df.dtypes" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Update policy of SageMaker Studio execution role \n", "As part of automation in this notebook, you will create IAM roles to assign to AWS Lambda. To do that, you first need to give some permission to am IAM execution role. You can provide those permissions by adding the following as an [inline policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-edit.html#edit-inline-policy-console).\n", "\n", "If you are running this notebook in Amazon SageMaker Studio, the IAM role assumed by your Studio user needs permission to create AWS Lambda functions and IAM roles. To provide this permission to the role, do the following.\n", "\n", "1. Open the [Amazon SageMaker console](https://console.aws.amazon.com/sagemaker/).\n", "2. Select Amazon SageMaker Studio and choose your user name.\n", "3. Under **User summary**, copy just the name part of the execution role ARN \n", "5. Go to the [IAM console](https://console.aws.amazon.com/iam) and click on **Roles**. \n", "6. Find the role associated with your SageMaker Studio user\n", "7. Under the Permissions tab, click **Add inline policy** and enter the following in the JSON tab:\n", "```\n", "{\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Sid\": \"IAMPolicy1\",\n", " \"Effect\": \"Allow\",\n", " \"Action\": [\n", " \"iam:CreatePolicy\",\n", " \"iam:AttachRolePolicy\",\n", " \"iam:CreateRole\"\n", " ],\n", " \"Resource\": [\n", " \"*\"\n", " ]\n", " },\n", " {\n", " \"Sid\": \"IAMPolicy2\",\n", " \"Effect\": \"Allow\",\n", " \"Action\": [\n", " \"iam:PassRole\"\n", " ],\n", " \"Resource\": [\n", " \"*\"\n", " ],\n", " \"Condition\": {\n", " \"StringEquals\": {\n", " \"iam:PassedToService\": [\n", " \"lambda.amazonaws.com\"\n", " ]\n", " }\n", " }\n", " },\n", " {\n", " \"Sid\": \"LambdaFunction\",\n", " \"Effect\": \"Allow\",\n", " \"Action\": [\n", " \"lambda:CreateFunction\",\n", " \"lambda:UpdateFunctionCode\",\n", " \"lambda:AddPermission\"\n", " ],\n", " \"Resource\": \"*\"\n", " },\n", " {\n", " \"Sid\": \"S3Notification\",\n", " \"Effect\": \"Allow\",\n", " \"Action\": [\n", " \"s3:PutBucketNotification\"\n", " ],\n", " \"Resource\": \"*\"\n", " },\n", " {\n", " \"Sid\": \"STSPermission\",\n", " \"Effect\": \"Allow\",\n", " \"Action\": [\n", " \"sts:GetCallerIdentity\"\n", " ],\n", " \"Resource\": \"*\"\n", " }\n", " ]\n", "}\n", "```\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a SageMaker Pipeline from the Data Wrangler Flow\n", "The transformations we defined in Data Wrangler are encapsulated in a `.flow` file. We will parameterize our SageMaker pipeline with the S3 URI of a new input flow file we will create on the fly once new data is made available in S3. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.parameters import (\n", " ParameterInteger,\n", " ParameterString,\n", ")\n", "\n", "processing_instance_count = ParameterInteger(\n", " name=\"ProcessingInstanceCount\",\n", " default_value=1\n", ")\n", "processing_instance_type = ParameterString(\n", " name=\"ProcessingInstanceType\",\n", " default_value=\"ml.m5.4xlarge\"\n", ")\n", "\n", "# Note: this parameter will be replaced by the Lambda function\n", "input_flow= ParameterString(\n", " name='InputFlow',\n", " default_value='s3://placeholder-bucket/placeholder.flow'\n", ")\n", "\n", "# Output configuration if any passed as processing job container arguments \n", "output_config = {}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.processing import Processor\n", "\n", "container_id = pipeline_utils.get_container(region)\n", "\n", "container_uri=f\"{container_id}.dkr.ecr.{region}.amazonaws.com/sagemaker-data-wrangler-container:1.x\"\n", "print(container_uri)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create Processor instance\n", "\n", "processor = Processor(\n", " role=iam_role,\n", " image_uri=container_uri,\n", " instance_count=processing_instance_count,\n", " instance_type=processing_instance_type\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.processing import FeatureStoreOutput\n", "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "from sagemaker.workflow.steps import ProcessingStep\n", "\n", "step_process = ProcessingStep(\n", " name=\"fscw-Automated-DataWrangler-Processing\",\n", " processor=processor,\n", " inputs=[\n", " ProcessingInput(input_name='flow', \n", " destination='/opt/ml/processing/flow',\n", " source=input_flow,\n", " s3_data_type= 'S3Prefix',\n", " s3_input_mode= 'File'\n", " )\n", " ],\n", " outputs=[\n", " ProcessingOutput(\n", " output_name=output_name,\n", " app_managed=True, \n", " feature_store_output=FeatureStoreOutput(feature_group_name=feature_group_name))\n", " ],\n", " job_arguments=[f\"--output-config '{json.dumps(output_config)}'\"]\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.pipeline import Pipeline\n", "\n", "pipeline_basename = \"featurestore-ingest-pipeline\"\n", "\n", "pipeline_name=f\"{pipeline_basename}-{time.strftime('%d-%H-%M-%S', time.gmtime())}\"\n", "\n", "print(pipeline_name)\n", "\n", "pipeline = Pipeline(\n", " name=pipeline_name,\n", " parameters=[\n", " processing_instance_type, \n", " processing_instance_count,\n", " input_flow\n", " ],\n", " steps=[step_process],\n", " sagemaker_session=sess\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline.upsert(iam_role)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we have a Pipeline set up ready to execute when called with a new input flow file. Now we'll create a lambda function that will automatically create a new flow file when new data is uploaded to S3. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Creating a Lambda Function triggered off of S3\n", "We have a provided a file `pipeline_utils.py` which contains some helper functions we can use to create a lambda function containing our custom code. \n", "\n", "### Setup IAM Roles\n", "AWS Lambda needs permissions to be able to call other AWS services. These permissions are provided by IAM roles. We first create the IAM role that will be assumed by AWS Lambda and then assign permissions to it.\n", "\n", "We now set variables that will be used to setup the automation. The default placeholder values will work but you can update them as well, if you wish." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create IAM role for Lambda function\n", "\n", "role_name = f\"sm-lambda-role-{time.strftime('%d-%H-%M-%S', time.gmtime())}\"\n", "fcn_name = f\"sm-lambda-fcn-{time.strftime('%d-%H-%M-%S', time.gmtime())}\"\n", "\n", "account_num = boto3.client('sts').get_caller_identity()['Account']\n", "\n", "#Create IAM role for the Lambda function\n", "lambda_role = pipeline_utils.create_role(role_name)\n", "\n", "#Wait for the role to be activated\n", "print('Waiting for 30 seconds for the newly created role to be active.')\n", "time.sleep(30)\n", "print('30 seconds are up; proceeding with rest of the execution.')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once the Lambda function is created, we zip it into a deployment package ready for upload onto AWS Lambda. Once the package is ready, we create the AWS Lambda function using the IAM role created earlier." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Create code for AWS Lambda function\n", "print(f'Data Wrangler Flow Uri passed to Lambda: {s3_data_wrangler_flow_uri}')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "lambda_code = pipeline_utils.create_lambda_fcn(s3_data_wrangler_flow_uri, pipeline_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(lambda_code)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Zip AWS Lambda function code\n", "#Write code to a .py file\n", "with open('lambda_function.py', 'w') as f:\n", " f.write(inspect.cleandoc(lambda_code))\n", "#Compress file into a zip\n", "with ZipFile('function.zip','w') as z:\n", " z.write('lambda_function.py')\n", "#Use zipped code as AWS Lambda function code\n", "with open('lambda_function.py', 'w') as f:\n", " f.write(lambda_code)\n", "\n", "#Create AWS Lambda function\n", "with open('function.zip', 'rb') as f:\n", " fcn_code = f.read() \n", "\n", "lambda_arn = pipeline_utils.create_lambda(fcn_name, fcn_code, lambda_role['arn'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Lastly, we setup Amazon S3 to trigger AWS Lambda whenever a new CSV file is uploaded into the S3 bucket under the `prefix` specified earlier. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create and attach trigger for Amazon S3 event to kick-off AWS Lambda function\n", "print(f'Data landing zone prefix for S3 trigger: {s3_data_landing_zone_prefix}')\n", "pipeline_utils.create_s3_trigger(fcn_name, bucket, s3_data_landing_zone_prefix, account_num, lambda_arn)\n", "\n", "#Wait for the trigger to be created\n", "print('Waiting for 5 seconds for the newly created trigger to be active.')\n", "time.sleep(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We have now successfully setup up all the necessary pieces of infrastructure. Now we will try and test the setup by uploading a CSV file into your Amazon S3 Bucket and monitor the pipeline execution. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Validate Feature Pipeline\n", "\n", "Now that you have configured the automated pipeline, you can test that everything has been setup correctly. To do that, we will leverage a small CSV file that contains orders data under our partitions directory (`../data/partitions/2021-5/partition.csv`). In the cell below, we will upload this file to a designated location in our Amazon S3 bucket. This will trigger our Lambda function which will kick-off the automated pipeline that you have just setup." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# This file contains a small sample of orders data\n", "sagemaker.s3.S3Uploader.upload(\"../05-module-scalable-batch-ingestion/orders.csv\", \n", " f\"s3://{bucket}/{s3_data_landing_zone_prefix}\")\n", "#wait for file to finish uploading \n", "time.sleep(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### S3 file upload will trigger SageMaker Pipelines execution\n", "\n", "The uploading of a file to the designated S3 location above will trigger a Lambda function to execute. This Lambda function will then execute the reference Pipeline. You can use SageMaker Studio to navigate to the Pipeline which should appear similar to the image below.\n", "\n", "" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# check pipeline execution \n", "summaries = sm_client.list_pipeline_executions(PipelineName=pipeline_name).get('PipelineExecutionSummaries')\n", "summaries" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "latest_execution = sm_client.list_pipeline_executions(PipelineName=pipeline_name).get('PipelineExecutionSummaries')[0].get('PipelineExecutionArn')\n", "print (latest_execution)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Wait for pipeline execution to complete 'Executing' status\n", "\n", "response = sm_client.describe_pipeline_execution(PipelineExecutionArn=latest_execution)\n", "print('Pipeline Execution status: ' + response['PipelineExecutionStatus'])\n", "\n", "while response['PipelineExecutionStatus'] == 'Executing':\n", " print('Pipeline is still in Executing status...')\n", " time.sleep(60)\n", " response = sm_client.describe_pipeline_execution(PipelineExecutionArn=latest_execution)\n", " print('Pipeline Execution status: ' + response['PipelineExecutionStatus'])\n", "\n", "\n", "print('Pipeline is done Executing')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can monitor the pipeline run inside the Pipelines section of Studio. Once the execution completes, we can test retrieval of a record from the Feature Store. This will confirm that the Data Wrangler job executed by the Pipeline ran successfully." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Lookup valid Record ID for get_record call\n", "record_id = 'O2235'\n", "\n", "feature_store_runtime = sess.boto_session.client('sagemaker-featurestore-runtime', region_name=region)\n", "sample_record = feature_store_runtime.get_record(FeatureGroupName=feature_group_name, \n", " RecordIdentifierValueAsString=str(record_id))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sample_record" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Clean-up\n", "To avoid recurring charges you need to: \n", "* Delete the datasets uploaded to S3.\n", "* Stop any running Data Wrangler and Jupyter Notebook instances within Studio when not in use.\n", "* Delete the feature group.\n", "* Delete the SageMaker Pipelines pipeline\n", "* Delete the AWS Lambda function that was created\n", "* Remove the Event notification that was setup on the Amazon S3 bucket to trigger the pipeline on upload of new file\n" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }