# Notebook - Distributed ML Training on Amazon SageMaker with S3 Bucket

This notebook creates a Kubeflow pipeline that runs distributed ML training on Amazon SageMaker. SageMaker components for Kubeflow Pipelines are levaraged for implementing the pipeline. Training data is stored in S3 bucket.

![image.png](attachment:99e085ae-3de6-4366-a45e-dcb637764c40.png)

Please make sure that the following two scripts are placed correctly:
- `code/cifar10-distribute-gpu-final.py`: Python script for distributed ML training with Pytorch Distributed Data Parallel (DDP).
- `code/inference.py`: Python script for inference.

The architecture of the convolutional neural network (CNN) is defined there.

## Preparation and setting

In [None]:
!pip install sagemaker

In [None]:
# import necessary libraries 

import os
import time
import boto3
import torchvision

# import Kubeflow-specific libraries
import kfp
from kfp import components
from kfp import dsl
from kfp import compiler
from kubeflow.training.utils import utils

# import SageMaker-specific libraries
import sagemaker

### NOTE ###
# if you encounter sagemaker import error, then please do 
# 1. !pip install sagemaker
# 2. restart the Kernel of the notebook (Go to 'Kernel' Menu -> Click 'Restart Kernel...')

In [None]:
# NOTE: we will use us-west-2 region throughout this notebook
!aws configure set default.region 'us-west-2'


In [None]:
# initialize global variables 
user_namespace = utils.get_default_target_namespace() # namespace of this notebook instace
ROOT_DIR = os.path.abspath('/home/jovyan') # root directory of this notebook instance

print(f'namespace: {user_namespace}')
print(f'root directory: {ROOT_DIR}')


## Create SageMaker session and default bucket

In [None]:
# get role that will be assigned to Sagemaker to call S3 APIs 
sagemakerrole=!(aws iam get-role --role-name sagemakerrole --output text --query 'Role.Arn')
role = sagemakerrole[0]

print(f'role ARN for SageMaker: {role}')


In [None]:
# initialize SageMaker session
sess = boto3.Session()
region = sess.region_name
sagemaker_session = sagemaker.Session(boto_session=sess)

print(f'region: {region}')


In [None]:
# default bucket for SageMaker
bucket_name = sagemaker_session.default_bucket()
job_folder = 'jobs'
dataset_folder = 'datasets'
local_dataset = 'cifar10'
pytorchjob_name = f'pytorch-dist-gpu-{time.strftime("%Y-%m-%d-%H-%M-%S-%j", time.gmtime())}'

print(f'bucket name: {bucket_name}')


## Deep Learning Containers for ML training and inference with Pytorch

In [None]:
# reference: https://github.com/aws/deep-learning-containers/blob/master/available_images.md
aws_dlc_sagemaker_train_image=\
 f'763104351884.dkr.ecr.{region}.amazonaws.com/pytorch-training:1.8.0-gpu-py3'
aws_dlc_sagemaker_serving_image=\
 f'763104351884.dkr.ecr.{region}.amazonaws.com/pytorch-inference:1.6.0-cpu-py3'

print(f'training image: {aws_dlc_sagemaker_train_image}')
print(f'serving image: {aws_dlc_sagemaker_serving_image}')


## Upload CIFAR-10 dataset and scripts for trianing and inference to S3

### Upload CIFAR-10 dataset

In [None]:
# download CIFAR-10 and upload to S3 bucket
cifar10_dataset = torchvision.datasets.CIFAR10(
 'cifar10-dataset', 
 train=True, 
 download=True
)

datasets = sagemaker_session.upload_data(
 path='cifar10-dataset', 
 key_prefix=f'{dataset_folder}/cifar10-dataset'
)

print(f'dataset uploaded to: {datasets}')


### Upload python scripts for training and inference

In [None]:
# upload the training and inference scripts to the S3 bucket 
# to be accessed by SageMaker training and deployment job
!tar cvfz sourcedir.tar.gz --exclude=".ipynb*" -C code . 
source_s3 = sagemaker_session.upload_data(path='sourcedir.tar.gz', key_prefix='training-scripts')

print(f'scripts for training and inference uploaded to: {source_s3}')


## Set up Kubeflow Pipeline Components

### Loads SageMaker Components for Kubeflow Pipelines from URL

In [None]:
# loads SageMaker Training Components v2 for Kubeflow Pipelines from the URL 
sagemaker_train_ack_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/aws/sagemaker/TrainingJob/component.yaml')

# loads SageMaker model & deploy Components v1 for Kubeflow Pipelines from the URL 
sagemaker_model_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/cb36f87b727df0578f4c1e3fe9c24a30bb59e5a2/components/aws/sagemaker/model/component.yaml')
sagemaker_deploy_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/cb36f87b727df0578f4c1e3fe9c24a30bb59e5a2/components/aws/sagemaker/deploy/component.yaml')



### Create Pipeline Component from Function 

In [None]:
# use a function to create a pipeline component for retrieving the model artifact from S3
def get_s3_model_artifact(model_artifacts) -> str:
 import ast
 model_artifacts = ast.literal_eval(model_artifacts)
 return model_artifacts["s3ModelArtifacts"]

get_s3_model_artifact_op = kfp.components.create_component_from_func(
 get_s3_model_artifact, output_component_file="get_s3_model_artifact.yaml"
)


## Build and Execute Kubeflow Pipeline

In [None]:
# utility functions for setting up configs for training input and ouput
def training_input(datasets):
 return [
 {
 "channelName": "train",
 "dataSource": {
 "s3DataSource": {
 "s3DataType": "S3Prefix",
 "s3URI": str(datasets),
 "s3DataDistributionType": "FullyReplicated",
 },
 },
 "compressionType": "None",
 "RecordWrapperType": "None",
 "InputMode": "File",
 }
 ]

def training_output(s3_bucket_name):
 return {"s3OutputPath": f"s3://{s3_bucket_name}"}


### Build Kubeflow Pipeline

The following cell creates a Kubeflow Pipeline composed of the following four components:

- **Sagemaker - TrainingJob**: `SageMaker Training Components v2 for Kubeflow Pipelines` is leveraged to train a CNN model. The model itself is defined in `code/cifar10-distributed-gpu-final.py` and CIFAR-10 dataset is automatically loaded from S3 once the training job is executed.
- **Get s3 model artifacts**: This component retrieves S3 key for the artifact of the training. 
- **Sagemaker - Create Model**: This component leverages `SageMaker Components v1 for Kubeflow Pipelines` to create a model from the artifacts of the training job.
- **Sagemaker - Deploy Model**: This component leverages `SageMaker Components v1 for Kubeflow Pipelines` to deploye the created model for inference.

In [None]:
# create job name for tracking kuberenets PyTorchJob custom resource or SageMaker training job
pytorch_distributed_jobname=\
 f'pytorch-cnn-dist-job-{time.strftime("%Y-%m-%d-%H-%M-%S-%j", time.gmtime())}'

# create Kubeflow pipeline using Amazon SageMaker
@dsl.pipeline(
 name="pipeline for PyTorch distributed training on SageMaker", 
 description="sample training job"
)
def pytorch_cnn_pipeline(
 region=region,
 train_image=aws_dlc_sagemaker_train_image,
 serving_image=aws_dlc_sagemaker_serving_image,
 training_job_name=pytorch_distributed_jobname, 
):
 
 # distributed ML training using SageMaker Training Components v2 for Kubeflow Pipeline 
 training = sagemaker_train_ack_op(
 region=region,
 algorithm_specification={
 "trainingImage": train_image,
 "trainingInputMode": "File"
 },
 training_job_name=training_job_name,
 hyper_parameters={
 "backend": "gloo",
 "batch-size": "64",
 "epochs": "2",
 "lr": "0.01",
 "model-type": "custom",
 "sagemaker_container_log_level": "20",
 "sagemaker_program": "cifar10-distributed-gpu-final.py",
 "sagemaker_region": region,
 "sagemaker_submit_directory": source_s3
 },
 resource_config={
 "instanceType": "ml.p3.2xlarge",
 "instanceCount": 2,
 "volumeSizeInGB": 50
 },
 input_data_config=training_input(datasets), 
 output_data_config=training_output(bucket_name), 
 enable_network_isolation=False,
 enable_inter_container_traffic_encryption=False,
 role_arn=role,
 stopping_condition={"maxRuntimeInSeconds": 300}
 )
 
 # retrieve the artifacts for the trained model from S3
 model_artifact_url = get_s3_model_artifact_op(
 training.outputs["model_artifacts"]
 ).output
 
 # This step creates SageMaker Model which refers to model artifacts and inference script to deserialize the input image
 create_model = sagemaker_model_op(
 region=region,
 model_name=training_job_name,
 image=serving_image,
 model_artifact_url=model_artifact_url,
 network_isolation='False',
 environment=(f'{{ '
 '"SAGEMAKER_CONTAINER_LOG_LEVEL": "20",'
 '"SAGEMAKER_PROGRAM": "inference.py",'
 f'"SAGEMAKER_REGION": "{region}",'
 f'"SAGEMAKER_SUBMIT_DIRECTORY": "{model_artifact_url}"'
 f'}}'),
 role=role
 )

 # create SageMaker endpoint which will be called to run inference
 prediction = sagemaker_deploy_op(
 region=region,
 model_name_1=create_model.output,
 instance_type_1='ml.c5.xlarge'
 )
 
 # disable pipeline cache
 training.execution_options.caching_strategy.max_cache_staleness = "P0D"
 

### Compile Kubeflow Pipeline

In [None]:
# DSL Compiler that compiles pipeline functions into workflow yaml.
workflow_yaml = "pytorch_cnn_pipeline.yaml" # name of workflow yaml file

kfp.compiler.Compiler().compile(pytorch_cnn_pipeline, workflow_yaml)


### Execute Kubeflow Pipeline using Kubeflow Pipeline Client

In [None]:
# Connect to Kubeflow Pipelines using the Kubeflow Pipelines SDK client
experiment_name = "ml_workflow" # name of experiment in Kubeflow
run_name = "pytorch_cnn_pipeline" # name of run in Kubeflow

client = kfp.Client()
experiment = client.create_experiment(name=experiment_name)

# Run a specified pipeline 
my_run = client.run_pipeline(experiment.id, run_name, workflow_yaml)

# please click “Run details” link generated below this cell to view your pipeline. 
# you can click every pipeline step to see logs. 
# you can also see training job in SageMaker console (go `Training > Training jobs` in the left pane)


Once the run is completed successfuly, use the accompanied notebook `invoke_sagemaker_inference_endpoint.ipynb` to invole a SageMaker Endpoint
for inference and submit a sample image there for inference.