# Mnist classification pipeline using Sagemaker

This sample runs a pipeline to train a classficiation model using Kmeans with MNIST dataset on Sagemaker.

We will have all required steps here and for other details like how to get source data, please check [documentation](https://github.com/kubeflow/pipelines/tree/master/samples/contrib/aws-samples/mnist-kmeans-sagemaker).


This sample is based on the [Train a Model with a Built-in Algorithm and Deploy it](https://docs.aws.amazon.com/sagemaker/latest/dg/ex1.html).

The sample performs hyperparameter optimization, trains a model, deploy a live endpoint and perform batch inference based on the [MNIST dataset](http://www.deeplearning.net/tutorial/gettingstarted.html). It takes approximately 35 min for the pipeline to run end to end.



## Prerequisite

1. Install Sagemaker, kfp and boto sdk. 

> Note: Be sure to use specified KFP SDK version in this notebook. Notebook is tested for kfp v0.1.29 release

In [None]:
!pip install sagemaker https://storage.googleapis.com/ml-pipeline/release/0.1.29/kfp.tar.gz --upgrade 

2. Create an S3 bucket to store pipeline data

> Note: Be sure to change the HASH variable to random hash and change AWS_REGION before running next cell

> Note: you use us-east-1, please use command `!aws s3 mb s3://$S3_BUCKET --region $AWS_REGION --endpoint-url https://s3.us-east-1.amazonaws.com`

In [None]:
import random, string
HASH = ''.join([random.choice(string.ascii_lowercase) for n in range(16)] + [random.choice(string.digits) for n in range(16)])
AWS_REGION = 'us-west-2'
S3_BUCKET = '{}-kubeflow-pipeline-data'.format(HASH)
!aws s3 mb s3://$S3_BUCKET --region $AWS_REGION

3. Prepare dataset

> Download `data` and `valid_data.csv`, convert them to the format required by KMeans and upload them into your S3 bucket.

In [None]:
import pickle, gzip, numpy, urllib.request, json
from urllib.parse import urlparse

# Load the dataset
urllib.request.urlretrieve("http://deeplearning.net/data/mnist/mnist.pkl.gz", "mnist.pkl.gz")
with gzip.open('mnist.pkl.gz', 'rb') as f:
 train_set, valid_set, test_set = pickle.load(f, encoding='latin1')


# Upload dataset to S3
from sagemaker.amazon.common import write_numpy_to_dense_tensor
import io
import boto3

###################################################################
# This is the only thing that you need to change to run this code 
# Give the name of your S3 bucket 
bucket = S3_BUCKET 

# If you are gonna use the default values of the pipeline then 
# give a bucket name which is in us-west-2 region 
###################################################################

train_data_key = 'mnist_kmeans_example/train_data'
test_data_key = 'mnist_kmeans_example/test_data'
train_data_location = 's3://{}/{}'.format(bucket, train_data_key)
test_data_location = 's3://{}/{}'.format(bucket, test_data_key)
print('Training data will be uploaded to: {}'.format(train_data_location))
print('Test data will be uploaded to: {}'.format(test_data_location))

# Convert the training data into the format required by the SageMaker KMeans algorithm
buf = io.BytesIO()
write_numpy_to_dense_tensor(buf, train_set[0], train_set[1])
buf.seek(0)

boto3.resource('s3').Bucket(bucket).Object(train_data_key).upload_fileobj(buf)

# Convert the test data into the format required by the SageMaker KMeans algorithm
write_numpy_to_dense_tensor(buf, test_set[0], test_set[1])
buf.seek(0)

boto3.resource('s3').Bucket(bucket).Object(test_data_key).upload_fileobj(buf)

# Convert the valid data into the format required by the SageMaker KMeans algorithm
numpy.savetxt('valid-data.csv', valid_set[0], delimiter=',', fmt='%g')
s3_client = boto3.client('s3')
input_key = "{}/valid_data.csv".format("mnist_kmeans_example/input")
s3_client.upload_file('valid-data.csv', bucket, input_key)

4. Grant SageMaker permission

> Typically in a production environment, you would assign fine-grained permissions depending on the nature of actions you take and leverage tools like [IAM Role for Service Account](https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html) for securing access to AWS resources but for simplicity we will assign AmazonSageMakerFullAccess and AmazonS3FullAccess IAM policy. You can read more about granular policies [here](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-roles.html) 

> In order to run this pipeline, we need two levels of IAM permissions

> a) create Kubernetes secrets **aws-secret** with Sagemaker and S3 policies. Please make sure to create `aws-secret` in kubeflow namespace.

```yaml
apiVersion: v1
kind: Secret
metadata:
 name: aws-secret
 namespace: kubeflow
type: Opaque
data:
 AWS_ACCESS_KEY_ID: YOUR_BASE64_ACCESS_KEY
 AWS_SECRET_ACCESS_KEY: YOUR_BASE64_SECRET_ACCESS
```
> Note: To get base64 string, try `echo -n $AWS_ACCESS_KEY_ID | base64`

> b) create an IAM execution role for Sagemaker and S3 so that the job can assume this role in order to perform Sagemaker and S3 actions. Make a note of this role as you will need it during pipeline creation step


5. Install Kubeflow Pipelines SDK
> You can skip this step if its already installed. You can validate if you have SDK installed by running `!pip show kfp`. The notebook has been tested for kfp v0.1.29 release

## Build pipeline

1. Run the following command to load Kubeflow Pipelines SDK

In [None]:
import kfp
from kfp import components
from kfp import dsl
from kfp.aws import use_aws_secret

2. Load reusable sagemaker components.

In [None]:
sagemaker_train_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/942be78bfe0f063084a5a006b3310b811a39f1ec/components/aws/sagemaker/train/component.yaml')
sagemaker_model_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/942be78bfe0f063084a5a006b3310b811a39f1ec/components/aws/sagemaker/model/component.yaml')
sagemaker_deploy_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/942be78bfe0f063084a5a006b3310b811a39f1ec/components/aws/sagemaker/deploy/component.yaml')
sagemaker_batch_transform_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/942be78bfe0f063084a5a006b3310b811a39f1ec/components/aws/sagemaker/batch_transform/component.yaml')
sagemaker_hpo_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/942be78bfe0f063084a5a006b3310b811a39f1ec/components/aws/sagemaker/hyperparameter_tuning/component.yaml')

3. Create pipeline. 

We will create a training job first. Once training job is done, it will persist trained model to S3. 

Then a job will be kicked off to create a `Model` manifest in Sagemaker. 

With this model, batch transformation job can use it to predict on other datasets, prediction service can create an endpoint using it.


> Note: remember to use pass your **role_arn** to successfully run the job.

> Note: If you use a different region, please replace `us-west-2` with your region. 

> Note: ECR Images for k-means algorithm

|Region| ECR Image|
|------|----------|
|us-west-1|632365934929.dkr.ecr.us-west-1.amazonaws.com|
|us-west-2|174872318107.dkr.ecr.us-west-2.amazonaws.com|
|us-east-1|382416733822.dkr.ecr.us-east-1.amazonaws.com|
|us-east-2|404615174143.dkr.ecr.us-east-2.amazonaws.com|
|us-gov-west-1|226302683700.dkr.ecr.us-gov-west-1.amazonaws.com|
|ap-east-1|286214385809.dkr.ecr.ap-east-1.amazonaws.com|
|ap-northeast-1|351501993468.dkr.ecr.ap-northeast-1.amazonaws.com|
|ap-northeast-2|835164637446.dkr.ecr.ap-northeast-2.amazonaws.com|
|ap-south-1|991648021394.dkr.ecr.ap-south-1.amazonaws.com|
|ap-southeast-1|475088953585.dkr.ecr.ap-southeast-1.amazonaws.com|
|ap-southeast-2|712309505854.dkr.ecr.ap-southeast-2.amazonaws.com|
|ca-central-1|469771592824.dkr.ecr.ca-central-1.amazonaws.com|
|eu-central-1|664544806723.dkr.ecr.eu-central-1.amazonaws.com|
|eu-north-1|669576153137.dkr.ecr.eu-north-1.amazonaws.com|
|eu-west-1|438346466558.dkr.ecr.eu-west-1.amazonaws.com|
|eu-west-2|644912444149.dkr.ecr.eu-west-2.amazonaws.com|
|eu-west-3|749696950732.dkr.ecr.eu-west-3.amazonaws.com|
|me-south-1|249704162688.dkr.ecr.me-south-1.amazonaws.com|
|sa-east-1|855470959533.dkr.ecr.sa-east-1.amazonaws.com|

In [None]:
# Configure your s3 bucket.
S3_BUCKET = '{}-kubeflow-pipeline-data'.format(HASH)
S3_PIPELINE_PATH='s3://{}/mnist_kmeans_example'.format(S3_BUCKET)

# Configure your Sagemaker execution role.
SAGEMAKER_ROLE_ARN=''


@dsl.pipeline(
 name='MNIST Classification pipeline',
 description='MNIST Classification using KMEANS in SageMaker'
)
def mnist_classification(region='us-west-2',
 image='174872318107.dkr.ecr.us-west-2.amazonaws.com/kmeans:1',
 training_input_mode='File',
 hpo_strategy='Bayesian',
 hpo_metric_name='test:msd',
 hpo_metric_type='Minimize',
 hpo_early_stopping_type='Off',
 hpo_static_parameters='{"k": "10", "feature_dim": "784"}',
 hpo_integer_parameters='[{"Name": "mini_batch_size", "MinValue": "500", "MaxValue": "600"}, {"Name": "extra_center_factor", "MinValue": "10", "MaxValue": "20"}]',
 hpo_continuous_parameters='[]',
 hpo_categorical_parameters='[{"Name": "init_method", "Values": ["random", "kmeans++"]}]',
 hpo_channels='[{"ChannelName": "train", \
 "DataSource": { \
 "S3DataSource": { \
 "S3Uri": "' + S3_PIPELINE_PATH + '/train_data", \
 "S3DataType": "S3Prefix", \
 "S3DataDistributionType": "FullyReplicated" \
 } \
 }, \
 "ContentType": "", \
 "CompressionType": "None", \
 "RecordWrapperType": "None", \
 "InputMode": "File"}, \
 {"ChannelName": "test", \
 "DataSource": { \
 "S3DataSource": { \
 "S3Uri": "' + S3_PIPELINE_PATH + '/test_data", \
 "S3DataType": "S3Prefix", \
 "S3DataDistributionType": "FullyReplicated" \
 } \
 }, \
 "ContentType": "", \
 "CompressionType": "None", \
 "RecordWrapperType": "None", \
 "InputMode": "File"}]',
 hpo_spot_instance='False',
 hpo_max_wait_time='3600',
 hpo_checkpoint_config='{}',
 output_location=S3_PIPELINE_PATH + '/output',
 output_encryption_key='',
 instance_type='ml.p3.2xlarge',
 instance_count='1',
 volume_size='50',
 hpo_max_num_jobs='9',
 hpo_max_parallel_jobs='2',
 max_run_time='3600',
 endpoint_url='',
 network_isolation='True',
 traffic_encryption='False',
 train_channels='[{"ChannelName": "train", \
 "DataSource": { \
 "S3DataSource": { \
 "S3Uri": "' + S3_PIPELINE_PATH + '/train_data", \
 "S3DataType": "S3Prefix", \
 "S3DataDistributionType": "FullyReplicated" \
 } \
 }, \
 "ContentType": "", \
 "CompressionType": "None", \
 "RecordWrapperType": "None", \
 "InputMode": "File"}]',
 train_spot_instance='False',
 train_max_wait_time='3600',
 train_checkpoint_config='{}',
 batch_transform_instance_type='ml.m4.xlarge',
 batch_transform_input=S3_PIPELINE_PATH + '/input',
 batch_transform_data_type='S3Prefix',
 batch_transform_content_type='text/csv',
 batch_transform_compression_type='None',
 batch_transform_ouput=S3_PIPELINE_PATH + '/output',
 batch_transform_max_concurrent='4',
 batch_transform_max_payload='6',
 batch_strategy='MultiRecord',
 batch_transform_split_type='Line',
 role_arn=SAGEMAKER_ROLE_ARN
 ):

 hpo = sagemaker_hpo_op(
 region=region,
 endpoint_url=endpoint_url,
 image=image,
 training_input_mode=training_input_mode,
 strategy=hpo_strategy,
 metric_name=hpo_metric_name,
 metric_type=hpo_metric_type,
 early_stopping_type=hpo_early_stopping_type,
 static_parameters=hpo_static_parameters,
 integer_parameters=hpo_integer_parameters,
 continuous_parameters=hpo_continuous_parameters,
 categorical_parameters=hpo_categorical_parameters,
 channels=hpo_channels,
 output_location=output_location,
 output_encryption_key=output_encryption_key,
 instance_type=instance_type,
 instance_count=instance_count,
 volume_size=volume_size,
 max_num_jobs=hpo_max_num_jobs,
 max_parallel_jobs=hpo_max_parallel_jobs,
 max_run_time=max_run_time,
 network_isolation=network_isolation,
 traffic_encryption=traffic_encryption,
 spot_instance=hpo_spot_instance,
 max_wait_time=hpo_max_wait_time,
 checkpoint_config=hpo_checkpoint_config,
 role=role_arn,
 ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))

 training = sagemaker_train_op(
 region=region,
 endpoint_url=endpoint_url,
 image=image,
 training_input_mode=training_input_mode,
 hyperparameters=hpo.outputs['best_hyperparameters'],
 channels=train_channels,
 instance_type=instance_type,
 instance_count=instance_count,
 volume_size=volume_size,
 max_run_time=max_run_time,
 model_artifact_path=output_location,
 output_encryption_key=output_encryption_key,
 network_isolation=network_isolation,
 traffic_encryption=traffic_encryption,
 spot_instance=train_spot_instance,
 max_wait_time=train_max_wait_time,
 checkpoint_config=train_checkpoint_config,
 role=role_arn,
 ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))

 create_model = sagemaker_model_op(
 region=region,
 endpoint_url=endpoint_url,
 model_name=training.outputs['job_name'],
 image=training.outputs['training_image'],
 model_artifact_url=training.outputs['model_artifact_url'],
 network_isolation=network_isolation,
 role=role_arn
 ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))

 prediction = sagemaker_deploy_op(
 region=region,
 endpoint_url=endpoint_url,
 model_name_1=create_model.output,
 ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))

 batch_transform = sagemaker_batch_transform_op(
 region=region,
 endpoint_url=endpoint_url,
 model_name=create_model.output,
 instance_type=batch_transform_instance_type,
 instance_count=instance_count,
 max_concurrent=batch_transform_max_concurrent,
 max_payload=batch_transform_max_payload,
 batch_strategy=batch_strategy,
 input_location=batch_transform_input,
 data_type=batch_transform_data_type,
 content_type=batch_transform_content_type,
 split_type=batch_transform_split_type,
 compression_type=batch_transform_compression_type,
 output_location=batch_transform_ouput
 ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))

4. Compile your pipeline

In [None]:
kfp.compiler.Compiler().compile(mnist_classification, 'mnist-classification-pipeline.zip')

5. Deploy your pipeline

In [None]:
client = kfp.Client()
aws_experiment = client.create_experiment(name='aws')
my_run = client.run_pipeline(aws_experiment.id, 'mnist-classification-pipeline', 
 'mnist-classification-pipeline.zip')

## Prediction

Open Sagemaker console and find your endpoint name. Please check dataset section to get train_set.

Once your pipeline is done, you can find sagemaker endpoint name and replace `ENDPOINT_NAME` value with your newly created endpoint name. 


> Note: make sure to attach `sagemaker:InvokeEndpoint` to the worker node nodegroup that is running this jupyter notebook.

```json
{
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Action": [
 "sagemaker:InvokeEndpoint"
 ],
 "Resource": "*"
 }
 ]
}

```


## Find your Endpoint name in AWS Console

Open AWS console and enter Sagemaker service, find the endpoint name as the following picture shows.

![download-pipeline](./images/sm-endpoint.jpg)

In [None]:
import pickle, gzip, numpy, urllib.request, json
from urllib.parse import urlparse
import json
import io
import boto3

# Replace the endpoint name with yours.
ENDPOINT_NAME='Endpoint-20190916223205-Y635'

# We will use the same dataset that was downloaded at the beginning of the notebook.

# Simple function to create a csv from our numpy array
def np2csv(arr):
 csv = io.BytesIO()
 numpy.savetxt(csv, arr, delimiter=',', fmt='%g')
 return csv.getvalue().decode().rstrip()

runtime = boto3.Session(region_name='us-west-2').client('sagemaker-runtime')

payload = np2csv(train_set[0][30:31])

response = runtime.invoke_endpoint(EndpointName=ENDPOINT_NAME,
 ContentType='text/csv',
 Body=payload)
result = json.loads(response['Body'].read().decode())
print(result)

## Clean up

Go to Sagemaker console and delete `endpoint`, `model`.

### Clean up S3 bucket
Delete S3 bucket that was created for this exercise

In [None]:
!aws s3 rb s3://$S3_BUCKET --force