# Walkthrough MLOps Demo

This notebook walks you through the whole steps in MLOps with SageMaker.

- [1.Prepare the environment](#envpreparation)
- [2.Data preparation](#datapreparation)
- [3.Feature ingestion](#featureingestion)
- [4.Model building](#modelbuilding)
- [5.Asynchronous inference](#asyncinfer)
- [6.Real-time inference](#realtimeinfer)
- [7.Cleanup](#cleanup)

<a id="envpreparation"></a>
## 1. Prepare the environment

In [None]:
!pip install pandas==1.1.5
!pip install awswrangler

In [None]:
import json
import logging
import boto3
import io
import glob
import os
import re
from time import strftime,gmtime
from botocore.exceptions import ClientError
import urllib
import sys
import pandas as pd
import awswrangler as wr
import time

import sagemaker
from sagemaker import get_execution_role

In [None]:
logger = logging.getLogger(name='project')
sagemaker_session = sagemaker.Session()
boto_session = sagemaker_session.boto_session
sagemaker_client = boto_session.client('sagemaker')
sm_runtime = boto3.Session().client('sagemaker-runtime')
region = sagemaker_session.boto_region_name

role = get_execution_role()

client = boto3.client('sts')
account = client.get_caller_identity()['Account']

bucket = sagemaker_session.default_bucket()

In [None]:
project_name = # <--- fill here

<a id="datapreparation"></a>
## 2. Data preparation

Download the data from [Standord AI Lab](https://ai.stanford.edu/~amaas/data/sentiment/). We stage data with SageMaker processing.

In [None]:
%%writefile ./processing/data_preparation.py

import string
import os
import glob
import re
import pandas as pd
import time
import subprocess
import argparse

punc_list = string.punctuation  # you can self define list of punctuation to remove here

def remove_punctuation(text):
    """
    This function takes strings containing self defined punctuations and returns
    strings with punctuations removed.
    Input(string): one tweet, contains punctuations in the self-defined list
    Output(string): one tweet, self-defined punctuations removed
    """
    translator = str.maketrans("", "", punc_list)
    return text.translate(translator)

def staging_data(data_dir):
    for data_type in ["train", "test"]:
        data_list = []
        for label in ["neg", "pos"]:
            data_path = os.path.join(data_dir, data_type, label)
            for files in glob.glob(data_path + '/*.txt'):
                data_id = files.split('/')[-1].replace('.txt', '')
                with open(files, 'r') as f:
                    line = f.readline()
                    line = remove_punctuation(line)
                    line = re.sub("\s+", " ", line)
                    data_list.append([data_id, line, label])
                    
        data_df = pd.DataFrame(data_list, columns=["index", "text", "label"])
        data_df["event_time"] = time.time()
        data_df["data_type"] = data_type
        #data_df.reset_index(inplace=True)
        data_df.to_csv(f'/opt/ml/processing/output/raw/{data_type}.csv', index=False)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--raw-data-url", type=str, required=True)
    args, _ = parser.parse_known_args()
    
    subprocess.run(f"wget {args.raw_data_url} -O aclImdb_v1.tar.gz && tar --no-same-owner -xzf aclImdb_v1.tar.gz && rm aclImdb_v1.tar.gz", shell=True)
    
    data_dir = f"{os.getcwd()}/aclImdb"
    staging_data(data_dir)

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor

sklearn_processor = SKLearnProcessor(
    framework_version="0.20.0", role=role, instance_type="ml.m5.xlarge", instance_count=1
)

sklearn_processor.run(
    code='processing/data_preparation.py',
    arguments = ['--raw-data-url', 'https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz'],
    outputs=[ProcessingOutput(output_name="raw_data", source='/opt/ml/processing/output/raw')]
)

In [None]:
preprocessing_job_description = sklearn_processor.jobs[-1].describe()

output_config = preprocessing_job_description["ProcessingOutputConfig"]
raw_data_dir = output_config["Outputs"][0]["S3Output"]["S3Uri"]
raw_data_dir

In [None]:
!aws s3 ls $raw_data_dir/

In [None]:
train_df = wr.s3.read_csv(path=f"{raw_data_dir}/train.csv")
train_df.head()

In [None]:
sample_data_dir = "./data"
if not os.path.exists(sample_data_dir):
    os.makedirs(sample_data_dir)
train_df["text"][:50].to_csv(f"{sample_data_dir}/sample_imdb.csv", header=None, index=None)

<a id="featureingestion"></a>
## 3. Feature ingestion

### 3.1 Launch feature ingestion pipeline.

In [None]:
imdb_pipeline_name = f'{project_name}-imdb-preprocessing'

imdb_pipeline_execution = sagemaker_client.start_pipeline_execution(
    PipelineName=imdb_pipeline_name,
    PipelineExecutionDisplayName="ManualExecution",
    PipelineParameters=[
        {"Name": "InputDataUrl_train", "Value": f'{raw_data_dir}/train.csv'},
        {"Name": "InputDataUrl_test", "Value": f'{raw_data_dir}/test.csv'},
    ],
)

### 3.2 Verify feature ingestion

In [None]:
featurestore_runtime = boto_session.client(
    service_name="sagemaker-featurestore-runtime", region_name=region
)

In [None]:
feature_group_name = "imdb"
response = featurestore_runtime.get_record(
        FeatureGroupName=feature_group_name,
        RecordIdentifierValueAsString="3142_1",
    )

In [None]:
record = response["Record"]
df = pd.DataFrame(record).set_index('FeatureName').transpose()
df["text"].tolist()[0]

<a id="modelbuilding"></a>
## 4. Model Building
With data in the feature store, you can now start the model building pipeline. You can leave the default parameter values.

In [None]:
build_pipeline_name = f'{project_name}-build-byoc'

build_pipeline_execution = sagemaker_client.start_pipeline_execution(
    PipelineName=build_pipeline_name,
    PipelineExecutionDisplayName="ManualExecution",
    PipelineParameters=[
        {"Name": "TokenizerModelS3URI", "Value": "None"},
    ],
)

Manually setting the model status to Approved is required if you set ModelApprovalStatus to PendingManualApproval as below.

In [None]:
sagemaker_client = boto_session.client('sagemaker')

model_package_group_name = f"{project_name}-imdb"

model_list = sagemaker_client.list_model_packages(ModelPackageGroupName=model_package_group_name)["ModelPackageSummaryList"]
model_package_arn = model_list[0]["ModelPackageArn"]
model_package_arn

In [None]:
model_package_update_input_dict = {
    "ModelPackageArn" : model_package_arn,
    "ModelApprovalStatus" : "Approved"
}
model_package_update_response = sagemaker_client.update_model_package(**model_package_update_input_dict)

The status change of model will trigger endpoint serving, you can check the status of deploying endpoints in [AWS CodePipeline console](https://us-east-1.console.aws.amazon.com/codesuite/codepipeline/pipelines?). In this solution, we deploy endpoints for both real-time inference and [asychronous inference](https://docs.aws.amazon.com/sagemaker/latest/dg/async-inference.html).

<a id="asyncinfer"></a>
## 5. Asynchronous inference

### 5.1 Testing batch inference with asynchronous inference

In [None]:
input_s3_location = f"s3://{bucket}/{project_name}/sample_data/sample_imdb.csv"

!aws s3 cp ./data/sample_imdb.csv $input_s3_location

In [None]:
async_endpoint_name = f"{project_name}-byoc-asynchronous-async"

response = sm_runtime.invoke_endpoint_async(
    EndpointName=async_endpoint_name, 
    InputLocation=input_s3_location
)
output_location = response['OutputLocation']
print(f"OutputLocation: {output_location}")

In [None]:
def get_output(output_location):
    output_url = urllib.parse.urlparse(output_location)
    bucket = output_url.netloc
    key = output_url.path[1:]
    while True:
        try:
            return sagemaker_session.read_s3_file(bucket=output_url.netloc, key_prefix=output_url.path[1:])
        except ClientError as e:
            if e.response['Error']['Code'] == 'NoSuchKey':
                print("waiting for output...")
                time.sleep(2)
                continue
            raise

In [None]:
output = get_output(output_location)
print(f"Output size in bytes: {((sys.getsizeof(output)))}")

In [None]:
async_infer_res = "./data/async_res.json"

!aws s3 cp $output_location $async_infer_res

with open(async_infer_res, 'r') as f:
    async_res = json.load(f)
async_res

### 5.2 Testing auto-scaling with multiple invocations (comming soon!)
We enable auto scaling by monitoring the metric `ApproximateBacklogSizePerInstance`. You can find more details about [Asynchronous Inference Endpoint Metrics](https://docs.aws.amazon.com/sagemaker/latest/dg/async-inference-monitor.html). Jump to [CloudWatch](https://console.aws.amazon.com/cloudwatch/home), search endpoint name in search box of `Metrics` console, select `SageMaker>EndpointName` in `AWS namespaces`, you can find `ApproximateBacklogSizePerInstance`.

<div align="center"><img width=800 src="images/async_metrics.png"><figcaption>Check asynchronous inference endpoint Metrics</figcaption></div>

For the auto scaling setting, you can refer to [Run computer vision inference on large videos with Amazon SageMaker asynchronous endpoints](https://aws.amazon.com/blogs/machine-learning/run-computer-vision-inference-on-large-videos-with-amazon-sagemaker-asynchronous-endpoints/).

In [None]:
def upload_file(input_location):
    prefix = f"{project_name}/input"
    return sagemaker_session.upload_data(
        input_location,
        bucket=bucket,
        key_prefix=prefix,
        extra_args={"ContentType": "text/libsvm"},
    )

In [None]:
inferences = []

async_endpoint_name = "mlops-byoc02-byoc-asynchronous-async"

input_file = "./data/sample_imdb.csv"
for i in range(100):
    response = sm_runtime.invoke_endpoint_async(
        EndpointName=async_endpoint_name, InputLocation=input_s3_location
    )
    output_location = response["OutputLocation"]
    inferences += [(input_file, output_location)]
    time.sleep(0.5)

for input_file, output_location in inferences:
    output = get_output(output_location)
    print(f"Input File: {input_file}, Output: {output}")

<a id="realtimeinfer"></a>
## 6. Real-time inference

### 6.1 Testing real-time inference endpoint

In [None]:
sample_df = pd.read_csv('./data/sample_imdb.csv', header=None)
sample_df.columns = ["text"]
sample_df

Error will occur when parameter of request is too long, where asynchronous inference would be an alternative.

In [None]:
sample_list = sample_df["text"].values.tolist()[:5]

In [None]:
df_record = pd.DataFrame({"inputs": sample_list})
csv_file = io.StringIO()
df_record.to_csv(csv_file, sep=",", header=False, index=False)
payload_as_csv = csv_file.getvalue()

endpoint_name = f"{project_name}-byoc"

response = sm_runtime.invoke_endpoint(
    EndpointName=endpoint_name,
    Body= payload_as_csv,
    ContentType = 'text/csv'
)

body = response["Body"].read()
msg = body.decode("utf-8")
data = json.loads(msg)
data

### 6.2 Request API Gateway
Jump to https://console.aws.amazon.com/apigateway/main/apis to find the name(`{project_name}_api`) of API Gateway. Then select `prod->GET->get-{project_name}-byoc->GET` in `Stages` console. You will find the invoke URL like below:

```
https://tkga9zza0a.execute-api.{region}.amazonaws.com/prod/get-{project_name}-byoc
```

In [None]:
!curl -X GET https://<your api url>/prod/get-<project name>-byoc?index=3142_1

<a id="cleanup"></a>
## 7. Cleanup

Jump to https://console.aws.amazon.com/cloudformation/home to delete the stacks created in this project, or run the following cell to delete all stacks in this project. All resources built in this project will be deleted by deleting stacks.

In [None]:
!aws cloudformation delete-stack --stack-name $project_name-FeatureStore
!aws cloudformation delete-stack --stack-name $project_name-BuildModelStack
!aws cloudformation delete-stack --stack-name $project_name-ServingStack