# Automating feature transformations with SageMaker Data Wrangler, Pipelines, and Feature Store

This notebook shows you how to create a SageMaker Pipeline along with an AWS Lambda function to automate feature transformations and ingestion into Feature Store, triggered off of new data files that are uploaded to S3. It assumes that you already have already created a Data Wrangler `.flow` file, which is the main output of the first half of the steps described it this blog post. 

The notebook has three main sections:
1.  General setup
2.	Creating a SageMaker Pipeline which:
    - Performs the transformations contained in a Data Wrangler `.flow` file stored in Amazon S3 using a SageMaker Processing Job 
    - Stores the transformed features in the Amazon SageMaker Feature Store
3.	Creating an AWS Lambda function which:
    - Is triggered whenever any new data is uploaded to S3
    - Updates the `.flow` file to reference the new dataset
    - Triggers the SageMaker Pipeline with the new `.flow` file

First, we need to copy these variables from the Data Wrangler generated notebook from the previous step: 

In [None]:
feature_group_name = "<FEATURE GROUP NAME>"
output_name = "<OUTPUT NAME>"
flow_uri='<FLOW URI>'

In [None]:
# SageMaker Python SDK version 2.x is required
import sagemaker
import subprocess
import sys
import os
import uuid
import json
import time
import boto3
from zipfile import ZipFile
import inspect

#module containing utility functions for this notebook
import pipeline_utils

original_version = sagemaker.__version__
if sagemaker.__version__ != "2.20.0":
    subprocess.check_call(
        [sys.executable, "-m", "pip", "install", "sagemaker==2.20.0"]
    )
    import importlib
    importlib.reload(sagemaker)
    
# S3 bucket for saving processing job outputs
# Feel free to specify a different bucket here if you wish.
sess = sagemaker.Session()
bucket = sess.default_bucket()
sm_client = boto3.client('sagemaker')
iam_role = sagemaker.get_execution_role()
region = sess.boto_region_name

## Query Historical Records in Feature Store
First, let's ensure that the records we processed from the original processing job made it into the Feature Store successfully. Note that once the job is complete, it can still take several minutes for all records to be replicated from the online store to the offline store.

In [None]:
pipeline_utils.get_historical_record_count(feature_group_name)

## Update policy of SageMaker Studio execution role 
As part of automation in this notebook, you will create IAM roles to assign to AWS Lambda. To do that, you first need to give some permission to am IAM execution role. You can provide those permissions by adding the following as an [inline policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-edit.html#edit-inline-policy-console).

If you are running this notebook in Amazon SageMaker Studio, the IAM role assumed by your Studio user needs permission to create AWS Lambda functions and IAM roles. To provide this permission to the role, do the following.

1. Open the [Amazon SageMaker console](https://console.aws.amazon.com/sagemaker/).
2. Select Amazon SageMaker Studio and choose your user name.
3. Under **User summary**, copy just the name part of the execution role ARN 
5. Go to the [IAM console](https://console.aws.amazon.com/iam) and click on **Roles**. 
6. Find the role associated with your SageMaker Studio user
7. Under the Permissions tab, click **Add inline policy** and enter the following in the JSON tab:
```
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "IAMPolicy1",
            "Effect": "Allow",
            "Action": [
                "iam:CreatePolicy",
                "iam:AttachRolePolicy",
                "iam:CreateRole"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Sid": "IAMPolicy2",
            "Effect": "Allow",
            "Action": [
                "iam:PassRole"
            ],
            "Resource": [
                "*"
            ],
            "Condition": {
                "StringEquals": {
                    "iam:PassedToService": [
                        "lambda.amazonaws.com"
                    ]
                }
            }
        },
        {
            "Sid": "LambdaFunction",
            "Effect": "Allow",
            "Action": [
                "lambda:CreateFunction",
                "lambda:UpdateFunctionCode",
                "lambda:AddPermission"
            ],
            "Resource": "*"
        },
        {
            "Sid": "S3Notification",
            "Effect": "Allow",
            "Action": [
                "s3:PutBucketNotification"
            ],
            "Resource": "*"
        },
        {
            "Sid": "STSPermission",
            "Effect": "Allow",
            "Action": [
                "sts:GetCallerIdentity"
            ],
            "Resource": "*"
        }
    ]
}
```
 

## Create a SageMaker Pipeline from the Data Wrangler Flow
The transformations we defined in Data Wrangler are encapsulated in a `.flow` file. We will parameterize our SageMaker pipeline with the S3 URI of a new input flow file we will create on the fly once new data is made available in S3. 

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.4xlarge"
)

input_flow= ParameterString(
    name='InputFlow',
    default_value='s3://placeholder-bucket/placeholder.flow'
)

In [None]:
from sagemaker.processing import Processor

container_id = pipeline_utils.get_container(region)

container_uri=f"{container_id}.dkr.ecr.{region}.amazonaws.com/sagemaker-data-wrangler-container:1.x"

processor = Processor(
    role=iam_role,
    image_uri=container_uri,
    instance_count=processing_instance_count,
    instance_type=processing_instance_type
)

In [None]:
from sagemaker.processing import FeatureStoreOutput
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
    

step_process = ProcessingStep(
    name="DailyFlightDataETL",
    processor=processor,
    inputs=[
        ProcessingInput(input_name='flow', 
                        destination='/opt/ml/processing/flow',
                        source=input_flow,
                        s3_data_type= 'S3Prefix',
                        s3_input_mode= 'File'
                       )
    ],
    outputs=[
        ProcessingOutput(
            output_name=output_name,
            app_managed=True, 
            feature_store_output=FeatureStoreOutput(feature_group_name=feature_group_name))
    ]
)

In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name=f"daily-flight-ETL-pipeline-{time.strftime('%d-%H-%M-%S', time.gmtime())}"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type, 
        processing_instance_count,
        input_flow
    ],
    steps=[step_process],
    sagemaker_session=sess
)

In [None]:
pipeline.upsert(iam_role)

Now we have a Pipeline set up ready to execute when called with a new input flow file. Now we'll create a lambda function that will automatically create a new flow file when new data is uploaded to S3. 

## Creating a Lambda Function triggered off of S3
We have a provided a file `pipeline_utils.py` which contains some helper functions we can use to create a lambda function containing our custom code. 

### Setup IAM Roles
AWS Lambda needs permissions to be able to call other AWS services. These permissions are provided by IAM roles. We first create the IAM role that will be assumed by AWS Lambda and then assign permissions to it.

We now set variables that will be used to setup the automation. The default placeholder values will work but you can update them as well, if you wish.

In [None]:
prefix='daily_data'

role_name = f"sm-lambda-role-{time.strftime('%d-%H-%M-%S', time.gmtime())}"
fcn_name = f"sm-lambda-fcn-{time.strftime('%d-%H-%M-%S', time.gmtime())}"

account_num = boto3.client('sts').get_caller_identity()['Account']

#Create IAM role for the Lambda function
lambda_role = pipeline_utils.create_role(role_name)

#Wait for the role to be activated
print('Waiting for 30 seconds for the newly created role to be active.')
time.sleep(30)
print('30 seconds are up; proceeding with rest of the execution.')

Once the Lambda function is created, we zip it into a deployment package ready for upload onto AWS Lambda. Once the package is ready, we create the AWS Lambda function using the IAM role created earlier.

In [None]:
#Create code for AWS Lambda function
lambda_code = pipeline_utils.create_lambda_fcn(flow_uri, pipeline_name)

In [None]:
print(lambda_code)

In [None]:
#Zip AWS Lambda function code
#Write code to a .py file
with open('lambda_function.py', 'w') as f:
    f.write(inspect.cleandoc(lambda_code))
#Compress file into a zip
with ZipFile('function.zip','w') as z:
    z.write('lambda_function.py')
#Use zipped code as AWS Lambda function code
with open('lambda_function.py', 'w') as f:
    f.write(lambda_code)

#Create AWS Lambda function
with open('function.zip', 'rb') as f:
    fcn_code = f.read()   
lambda_arn = pipeline_utils.create_lambda(fcn_name, fcn_code, lambda_role['arn'])

Lastly, we setup Amazon S3 to trigger AWS Lambda whenever a new CSV file is uploaded into the S3 bucket under the `prefix` specified earlier. 

In [None]:
#Add permission for Amazon S3 to trigger AWS Lambda and set up trigger
pipeline_utils.create_s3_trigger(fcn_name, bucket, prefix, account_num, lambda_arn)

We have now successfully setup up all the necessary pieces of infrastructure. Now we will try and test the setup by uploading a CSV file into your Amazon S3 Bucket and monitor the pipeline execution. 

## Validate Feature Pipeline
Now that you have setup the automated pipeline, you can test that everything has been setup correctly. To do that, first download `mar31_2020.csv` locally from public S3 bucket, using the link provided in the README. Once downloaded, upload it to your Amazon S3 bucket. This will trigger the automated pipeline that you have just setup.

In [None]:
sagemaker.s3.S3Uploader.upload("./mar31_2020.csv", f"s3://{bucket}/{prefix}")
#wait for file to finish uploading 
time.sleep(5)

In [None]:
# check pipeline execution 
latest_execution = sm_client.list_pipeline_executions(PipelineName=pipeline_name).get('PipelineExecutionSummaries')[0].get('PipelineExecutionArn')
sm_client.describe_pipeline_execution(PipelineExecutionArn=latest_execution)

You can also monitor the pipeline run inside the Pipelines section of Studio. Once the execution completes, we can check the record in Feature Store for the flight AA1538 from Denver CO to Los Angeles CA from 31 March 2020:

In [None]:
record_id='1538DEN707LAX'
sample_record = sess.boto_session.client('sagemaker-featurestore-runtime', region_name=region).get_record(FeatureGroupName=feature_group_name, RecordIdentifierValueAsString=str(record_id))

In [None]:
sample_record

### Clean-up
To avoid recurring charges you need to: 
* Delete the datasets uploaded to S3.
* Stop any running Data Wrangler and Jupyter Notebook instances within Studio when not in use.
* Delete the feature group.
* Delete the SageMaker Pipelines pipeline
* Delete the AWS Lambda function that was created
* Remove the Event notification that was setup on the Amazon S3 bucket to trigger the pipeline on upload of new file
