# MNIST Tensorflow training options

The **SageMaker Python SDK** helps you deploy your models for training and hosting in optimized, productions ready containers in SageMaker. The SageMaker Python SDK is easy to use, modular, extensible and compatible with TensorFlow and MXNet. This tutorial focuses on how to create a convolutional neural network model to train the [MNIST dataset](http://yann.lecun.com/exdb/mnist/) using **TensorFlow training**.



### Set up the environment

In [None]:
import os
import sagemaker
from sagemaker import get_execution_role
import project_path # path to helper methods
from lib import tf_scripts
from lib import workshop
import boto3

sagemaker_session = sagemaker.Session()
cfn = boto3.client('cloudformation')

role = get_execution_role()

session = boto3.session.Session()
region = session.region_name

### [Create S3 Bucket](https://docs.aws.amazon.com/AmazonS3/latest/gsg/CreatingABucket.html)

We will create an S3 bucket that will be used throughout the workshop for storing our data.

[s3.create_bucket](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.create_bucket) boto3 documentation

In [None]:
bucket = workshop.create_bucket(region, session, 'sage-')
print(bucket)

### Download the [MNIST dataset](https://en.wikipedia.org/wiki/MNIST_database#Dataset)

In this step, we are going to convert the MNIST dataset into [tfrecord](https://www.tensorflow.org/tutorials/load_data/tf-records) binary files for training, testing, and validation.

In [None]:
from tensorflow.contrib.learn.python.learn.datasets import mnist
import tensorflow as tf

data_sets = mnist.read_data_sets('data', dtype=tf.uint8, reshape=False, validation_size=5000)

tf_scripts.convert_to(data_sets.train, 'train', 'data')
tf_scripts.convert_to(data_sets.validation, 'validation', 'data')
tf_scripts.convert_to(data_sets.test, 'test', 'data')

### Upload the data
We use the ```sagemaker.Session.upload_data``` function to upload our datasets to an S3 location. The return value inputs identifies the location -- we will use this later when we start the training job.

In [None]:
inputs = sagemaker_session.upload_data(bucket=bucket, path='data', key_prefix='data/DEMO-mnist')

# Construct a script for distributed training 
Here is the full code for the network model:

In [None]:
!pygmentize 'mnist.py'

## Create a training job using the sagemaker.TensorFlow estimator 

The script here is and adaptation of the [TensorFlow MNIST example](https://github.com/tensorflow/models/tree/master/official/mnist). It provides a ```model_fn(features, labels, mode)```, which is used for training, evaluation and inference. 

## A regular ```model_fn```

A regular **```model_fn```** follows the pattern:
1. [defines a neural network](https://github.com/tensorflow/models/blob/master/official/mnist/mnist.py#L96)
- [applies the ```features``` in the neural network](https://github.com/tensorflow/models/blob/master/official/mnist/mnist.py#L178)
- [if the ```mode``` is ```PREDICT```, returns the output from the neural network](https://github.com/tensorflow/models/blob/master/official/mnist/mnist.py#L186)
- [calculates the loss function comparing the output with the ```labels```](https://github.com/tensorflow/models/blob/master/official/mnist/mnist.py#L188)
- [creates an optimizer and minimizes the loss function to improve the neural network](https://github.com/tensorflow/models/blob/master/official/mnist/mnist.py#L193)
- [returns the output, optimizer and loss function](https://github.com/tensorflow/models/blob/master/official/mnist/mnist.py#L205)

## Writing a ```model_fn``` for distributed training
When distributed training happens, the same neural network will be sent to the multiple training instances. Each instance will predict a batch of the dataset, calculate loss and minimize the optimizer. One entire loop of this process is called **training step**.

### Syncronizing training steps
A [global step](https://www.tensorflow.org/api_docs/python/tf/train/global_step) is a global variable shared between the instances. It's necessary for distributed training, so the optimizer will keep track of the number of **training steps** between runs: 

```python
train_op = optimizer.minimize(loss, tf.train.get_or_create_global_step())
```

That is the only required change for distributed training!

In [None]:
from sagemaker.session import Session

# Location to save your custom code in tar.gz format.
custom_code_upload_location = 's3://{}/customcode/tensorflow_pipemode'.format(bucket)

# Location where results of model training are saved.
model_artifacts_location = 's3://{}/sagemaker/artifacts'.format(bucket)

In [None]:
%%time
from sagemaker.tensorflow import TensorFlow

mnist_estimator = TensorFlow(entry_point='mnist.py',
 role=role,
 framework_version='1.11.0',
 training_steps=1000, 
 evaluation_steps=100,
 output_path=model_artifacts_location,
 train_instance_count=2,
 train_instance_type='ml.c4.xlarge',
 tags=[{"Key":"Project", "Value":"Tensorflow_Demo"}])

mnist_estimator.fit(inputs)

The **```fit```** method will create a training job in two **ml.c4.xlarge** instances. The logs above will show the instances doing training, evaluation, and incrementing the number of **training steps**. 

In the end of the training, the training job will generate a saved model for TF serving.

# Deploy the trained model to prepare for predictions

The deploy() method creates an endpoint which serves prediction requests in real-time.

In [None]:
mnist_predictor = mnist_estimator.deploy(initial_instance_count=1,
 instance_type='ml.m4.xlarge')

# Perform inference

Now that we've trained a model, we're going to use it to perform inference with a SageMaker endpoint as well as a batch transform job. The request handling behavior of the Endpoint deployed during the transform job is determined by the `mnist.py` script we looked at earlier.

## Invoking the endpoint

We will be using matplotlib to show and sample image from the MNIST dataset. From there we will make a prediction against the inference endpiint we just created above running 5 samples through the endpoint.

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = (2,10)

def show_digit(arr):
 two_d = (np.reshape(arr, (28, 28)) * 255).astype(np.uint8)
 plt.imshow(two_d, interpolation='nearest')
 return plt

In [None]:
import numpy as np
from tensorflow.examples.tutorials.mnist import input_data

mnist = input_data.read_data_sets("/tmp/data/", one_hot=True)

for i in range(5):
 data = mnist.test.images[i].tolist()
 tensor_proto = tf.make_tensor_proto(values=np.asarray(data), shape=[1, len(data)], dtype=tf.float32)
 predict_response = mnist_predictor.predict(tensor_proto)
 
 print("========================================")
 label = np.argmax(mnist.test.labels[i])
 print("label is {}".format(label))
 show_digit(mnist.test.images[i]).show()
 prediction = predict_response['outputs']['classes']['int64_val'][0]
 print("prediction is {}".format(prediction))

# Deploy the trained model using Neo

Now the model is ready to be compiled by Neo to be optimized for our hardware of choice. We are using the ``TensorFlowEstimator.compile_model`` method to do this. For this example, our target hardware is ``'ml_c5'``. You can changed these to other supported target hardware if you prefer. [Amazon SageMaker Neo Blog](https://aws.amazon.com/blogs/aws/amazon-sagemaker-neo-train-your-machine-learning-models-once-run-them-anywhere/) for more information.

## Compiling the model
The ``input_shape`` is the definition for the model's input tensor and ``output_path`` is where the compiled model will be stored in S3. **Important. If the following command result in a permission error, scroll up and locate the value of execution role returned by `get_execution_role()`. The role must have access to the S3 bucket specified in ``output_path``.**

In [None]:
!pygmentize mnist.py

The script here is and adaptation of the [TensorFlow MNIST example](https://github.com/tensorflow/models/tree/master/official/mnist). It provides a ```model_fn(features, labels, mode)```, which is used for training, evaluation and inference. See [TensorFlow MNIST distributed training notebook](https://github.com/awslabs/amazon-sagemaker-examples/blob/master/sagemaker-python-sdk/tensorflow_distributed_mnist/tensorflow_distributed_mnist.ipynb) for more details about the training script.

At the end of the training script, there are two additional functions, to be used with Neo Deep Learning Runtime:
* `neo_preprocess(payload, content_type)`: Function that takes in the payload and Content-Type of each incoming request and returns a NumPy array
* `neo_postprocess(result)`: Function that takes the prediction results produced by Deep Learining Runtime and returns the response body

In [None]:
output_path = '/'.join(mnist_estimator.output_path.split('/')[:-1])
optimized_estimator = mnist_estimator.compile_model(target_instance_family='ml_c5', 
 input_shape={'data':[1, 784]}, # Batch size 1, 3 channels, 224x224 Images.
 output_path=output_path,
 framework='tensorflow', framework_version='1.11.0')

## Deploying the compiled model

With the optimizer model now created we will deploy a new inference endpoint in SageMaker. We will also set the `content-type` and `serializer` to use when making a request to the endpoint.

In [None]:
optimized_predictor = optimized_estimator.deploy(initial_instance_count = 1,
 instance_type = 'ml.c5.xlarge')

In [None]:
# The neo_preprocess() function expects an image in the request body
# But the MNIST example data is saved as NumPy array.
# So we convert it to PNG before invoking the endpoint
def png_serializer(data):
 im = PIL.Image.fromarray(data.reshape((28,28))*255).convert('L')
 f = io.BytesIO()
 im.save(f, format='png')
 f.seek(0)
 return f.read()

optimized_predictor.content_type = 'application/x-image'
optimized_predictor.serializer = png_serializer

## Invoking the endpoint

In [None]:
from tensorflow.examples.tutorials.mnist import input_data
from IPython import display
import PIL.Image
import io

mnist = input_data.read_data_sets("/tmp/data/", one_hot=True)

for i in range(5):
 data = mnist.test.images[i]
 print(data.shape)
 # Display image
 im = PIL.Image.fromarray(data.reshape((28,28))*255).convert('L')
 display.display(im)
 # Invoke endpoint with image
 predict_response = optimized_predictor.predict(data)
 
 print("========================================")
 label = np.argmax(mnist.test.labels[i])
 print("label is {}".format(label))
 prediction = predict_response
 print("prediction is {}".format(prediction))

## AWS Serverless Application Model (SAM)

The AWS Serverless Application Model is an extension of AWS CloudFormation to provide a simplified way of defining Amazon API Gateway APIs, AWS Lambda functions, and Amazon DynamoDB tables needed by your serverless application. In the next few steps we will create a serverless application to invoke the SageMaker Neo endpoint through an API Gateway.

### Deploy using SAM 
The `service.py` file in the zip file contains the code required to invoke the SageMaker endpoint.

```python
from __future__ import print_function

import boto3
from PIL import Image
from io import BytesIO
import base64
import os 
import json
import os.path
import sys
import urllib
import ast
import numpy as np 

def handler(event, context):
 print(event)
 print(context)
 body = json.loads(event['body'])
 endpoint = os.environ['ENDPOINT_NAME'] 

 print("%s" % (endpoint))
 image_str = base64.b64decode(body['data'])
 image = Image.open(BytesIO(image_str))

 with BytesIO() as output:
 with image as img:
 img.save(output, 'PNG')
 data = output.getvalue()

 runtime = boto3.Session().client(service_name='sagemaker-runtime')
 response = runtime.invoke_endpoint(EndpointName=endpoint, ContentType='application/x-image', Body=data)
 
 result = response['Body'].read()
 predictions = ast.literal_eval(result)
 prediction = np.argmax(predictions)
 response = {
 "statusCode": "200",
 "headers": {
 'Content-Type': 'application/json'
 },
 "body": json.dumps({
 'prediction' : prediction
 })
 }

 return response

```

### Preparing the [AWS Lamba Layer](https://docs.aws.amazon.com/lambda/latest/dg/configuration-layers.html)

We will be making a custom [AWS Lambda Layer](https://medium.com/@adhorn/getting-started-with-aws-lambda-layers-for-python-6e10b1f9a5d) to make a common library for the dependencies we require on [NumPy](http://www.numpy.org/) and [Pillow](https://python-pillow.org/). To do so we must first create the directory `layers/pil_layer/python` that will be used to `pip` install the necessary libraries from a `requirements.txt` file locally.

In [None]:
!mkdir -p layers/pil_layer/python

In [None]:
%%writefile layers/pil_layer/requirements.txt

numpy
pillow

In [None]:
!pip install -r layers/pil_layer/requirements.txt -t layers/pil_layer/python 

### Zip the Lambda Layer

**NOTE: One trick to packaging a Lambda Layer for Python is that the contents of the layer must sit within a folder with one of the following names. You should not place the contents of the layer into the root of the .zip file.**

Acceptable folder names for Python are: python, python/lib/python3.7/site-packages

More info on including library dependencies in a layer [here](https://docs.aws.amazon.com/lambda/latest/dg/configuration-layers.html#configuration-layers-path)

In [None]:
%%bash
cd layers/pil_layer/

zip -r ../pil-python27-layer.zip python/*

### [Publish the Lambda Layer](https://docs.aws.amazon.com/lambda/latest/dg/configuration-layers.html#configuration-layers-manage)

To create a layer you can use the `publish-layer-version` command with a name, description, archive file, and a list of compatible runtimes with the layer. The list of runtimes is optional, but it makes the layer easier to discover.

In [None]:
!aws lambda publish-layer-version --layer-name pil_numpy_layer_2_7 --zip-file fileb://layers/pil-python27-layer.zip --compatible-runtimes python2.7 --description 'Numpy, Pillow dependencies layer' --license-info "MIT"


### [Create AWS Lambda Function](https://docs.aws.amazon.com/lambda/latest/dg/python-programming-model.html)

We now want to create the AWS Lambda function that will be used to invoke the SageMaker endpoint. All files required for the application will be loaded into `lambda_func` directory in the notebook. We will pass the endpoint name in an environment variable `ENDPOINT_NAME` that will be used with boto3 to invoke the SageMaker endpoint.

In [None]:
!mkdir lambda_func

In [None]:
%%writefile lambda_func/service.py

from __future__ import print_function

import boto3
from PIL import Image
from io import BytesIO
import base64
import os 
import json
import os.path
import sys
import urllib
import ast
import numpy as np 

def handler(event, context):
 print(event)
 print(context)
 body = json.loads(event['body'])
 endpoint = os.environ['ENDPOINT_NAME'] 

 print("%s" % (endpoint))
 image_str = base64.b64decode(body['data'])
 image = Image.open(BytesIO(image_str))

 with BytesIO() as output:
 with image as img:
 img.save(output, 'PNG')
 data = output.getvalue()

 runtime = boto3.Session().client(service_name='sagemaker-runtime')
 response = runtime.invoke_endpoint(EndpointName=endpoint, ContentType='application/x-image', Body=data)
 
 result = response['Body'].read()
 predictions = ast.literal_eval(result)
 prediction = np.argmax(predictions)
 response = {
 "statusCode": "200",
 "headers": {
 'Content-Type': 'application/json'
 },
 "body": json.dumps({
 'prediction' : prediction
 })
 }

 return response


### Get the Neo optimized endpoint from the estimator

In [None]:
print(optimized_predictor.endpoint)

### Create the SAM yaml file to deploy our service

The yaml file below is the CloudFormation template needed to deploy our service. We will point to the CodeUri we uploaded above and created an environment variable `ENDPOINT_NAME` to pass into the Lambda function. Find the `{{optimized_predictor.endpoint}}` below and replace with the cell result above.

In [None]:
%%writefile tensorflow-lambda.yaml

AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Description: Tensorflow MNIST Inference REST API
Resources:
 InferenceApi:
 Type: AWS::Serverless::Function
 Properties:
 CodeUri: lambda_func/
 Handler: service.handler
 Runtime: python2.7
 MemorySize: 1024
 Timeout: 30
 Layers:
 - Ref: PILLayer
 Events:
 Endpoint:
 Type: Api
 Properties:
 Path: /predict
 Method: post
 Policies:
 # Give SageMaker Full Access to your Lambda Function
 - AmazonSageMakerFullAccess
 Environment:
 Variables:
 ENDPOINT_NAME: {{optimized_predictor.endpoint}} #Replace with result from cell above
 PILLayer:
 Type: 'AWS::Serverless::LayerVersion'
 Properties:
 LayerName: pil-python27
 Description: Pillow and Tensorflow for Python 2.7
 ContentUri: ./layers/pil-python27-layer.zip
 CompatibleRuntimes:
 - python2.7
 RetentionPolicy: Retain

Outputs:
 InferenceApi:
 Description: "API Gateway endpoint URL for Prod stage for Hello World function"
 Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/predict/"

 InferenceFunction:
 Description: "Hello World Lambda Function ARN"
 Value: !GetAtt InferenceApi.Arn

### [Validate template](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-validate-template.html)

The `aws cloudformation validate-template` command is designed to check only the syntax of your template. It does not ensure that the property values that you have specified for a resource are valid for that resource. Nor does it determine the number of resources that will exist when the stack is created.

In [None]:
!aws cloudformation validate-template --template-body file://tensorflow-lambda.yaml

### [Package deployment](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-cli-package.html)

For some resource properties that require an Amazon S3 location (a bucket name and filename), you can specify local references instead. For example, you might specify the S3 location of your AWS Lambda function's source code or an Amazon API Gateway REST API's OpenAPI (formerly Swagger) file. Instead of manually uploading the files to an S3 bucket and then adding the location to your template, you can specify local references, called local artifacts, in your template and then use the package command to quickly upload them. A local artifact is a path to a file or folder that the package command uploads to Amazon S3. For example, an artifact can be a local path to your AWS Lambda function's source code or an Amazon API Gateway REST API's OpenAPI file.

With the yaml file created we can now `package` out CloudFormation template to prepare it for deployment and finally call the `deploy` function on the CloudFormation service to build our API service for out Inference endpoint.

In [None]:
prefix = os.path.join('sagemaker', 'lambda')

In [None]:
!aws cloudformation package \
 --template-file tensorflow-lambda.yaml \
 --output-template-file tensorflow-lambda-out.yaml \
 --s3-bucket $bucket \
 --s3-prefix $prefix

### [Deploy Application](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-cli-deploy.html)

AWS CloudFormation requires you to use a change set to create a template that includes transforms. Instead of independently creating and then executing a change set, use the `aws cloudformation deploy` command. When you run this command, it creates a change set, executes the change set, and then terminates. This command reduces the numbers of required steps when you create or update a stack that includes transforms.

In [None]:
stack_name = "Tensorflow-Lambda-MNIST"

!aws cloudformation deploy \
--template-file tensorflow-lambda-out.yaml \
--stack-name $stack_name \
--capabilities CAPABILITY_IAM \
--region $region

### Get API Gateway Endpoint

In [None]:
stacks = cfn.describe_stacks(StackName=stack_name)
stack = stacks["Stacks"][0]

for output in stack["Outputs"]:
 if output["OutputKey"] == 'InferenceApi':
 api_endpoint = output["OutputValue"]
 print('%s=%s (%s)' % (output["OutputKey"], output["OutputValue"], output["Description"]))
 
print(api_endpoint)

### Evaluate
We can now use this predictor to classify hand-written digits. Drawing into the image box loads the pixel data into a `data` variable in this notebook, which we can then pass to the `predictor`.

In [None]:
from IPython.display import HTML
HTML(open("input.html").read())

### Save the output of the html reader to PNG

In [None]:
%matplotlib inline
import numpy as np
import imageio
from PIL import Image

image = np.array([data], dtype=np.float32)
two_d = (np.reshape(image, (28, 28)) * 255).astype(np.uint8)
imageio.imwrite("output.png", two_d)

### Using the requests library to make a request to the new API Gateway endpoint

In this section we will take the PNG we saved locally and base64 encode it and send it to the API gateway endpoint. The gateway endpoint is deployed to the Prod stage and has a POST medthod at the /predict path. The Lambda function calls the Neo optimized hosted model and will use the `neo_preprocess` method of the model to convert the image into the appropriate input format.

In [None]:
import requests
import json
import base64
import numpy as np
from io import BytesIO
import ast

print(api_endpoint)

# prepare headers for http request
content_type = 'application/json'
headers = {'content-type': content_type}

file_name = "output.png"
encoded_string = ""
with open(file_name, "rb") as image_file:
 encoded_string = base64.b64encode(image_file.read())

body = {
 "data": encoded_string
}
print(body)
response = requests.post(api_endpoint, data=json.dumps(body), headers=headers)
print(response)
prediction = response.json()
print("prediction is {}".format(prediction['prediction']))

## Set up hyperparameter tuning job
*Note, with the default setting below, the hyperparameter tuning job can take about 30 minutes to complete.*

Now we will set up the hyperparameter tuning job using SageMaker Python SDK, following below steps:
* Create an estimator to set up the TensorFlow training job
* Define the ranges of hyperparameters we plan to tune, in this example, we are tuning "learning_rate"
* Define the objective metric for the tuning job to optimize
* Create a hyperparameter tuner with above setting, as well as tuning resource configurations 

Similar to training a single TensorFlow job in SageMaker, we define our TensorFlow estimator passing in the TensorFlow script, IAM role, and (per job) hardware configuration.

In [None]:
import boto3
from time import gmtime, strftime
from sagemaker.tensorflow import TensorFlow
from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner

estimator = TensorFlow(entry_point='mnist.py',
 role=role,
 framework_version='1.11.0',
 training_steps=1000, 
 evaluation_steps=100,
 train_instance_count=1,
 train_instance_type='ml.m4.xlarge',
 base_job_name='DEMO-hpo-tensorflow')

Once we've defined our estimator we can specify the hyperparameters we'd like to tune and their possible values. We have three different types of hyperparameters.
- Categorical parameters need to take one value from a discrete set. We define this by passing the list of possible values to `CategoricalParameter(list)`
- Continuous parameters can take any real number value between the minimum and maximum value, defined by `ContinuousParameter(min, max)`
- Integer parameters can take any integer value between the minimum and maximum value, defined by `IntegerParameter(min, max)`

*Note, if possible, it's almost always best to specify a value as the least restrictive type. For example, tuning learning rate as a continuous value between 0.01 and 0.2 is likely to yield a better result than tuning as a categorical parameter with values 0.01, 0.1, 0.15, or 0.2.*

In [None]:
hyperparameter_ranges = {'learning_rate': ContinuousParameter(0.01, 0.2)}

Next we'll specify the objective metric that we'd like to tune and its definition, which includes the regular expression (Regex) needed to extract that metric from the CloudWatch logs of the training job. In this particular case, our script emits loss value and we will use it as the objective metric, we also set the objective_type to be 'minimize', so that hyperparameter tuning seeks to minize the objective metric when searching for the best hyperparameter setting. By default, objective_type is set to 'maximize'.

In [None]:
objective_metric_name = 'loss'
objective_type = 'Minimize'
metric_definitions = [{'Name': 'loss',
 'Regex': 'loss = ([0-9\\.]+)'}]

Now, we'll create a `HyperparameterTuner` object, to which we pass:
- The TensorFlow estimator we created above
- Our hyperparameter ranges
- Objective metric name and definition
- Tuning resource configurations such as Number of training jobs to run in total and how many training jobs can be run in parallel.

In [None]:
tuner = HyperparameterTuner(estimator,
 objective_metric_name,
 hyperparameter_ranges,
 metric_definitions,
 max_jobs=9,
 max_parallel_jobs=3,
 objective_type=objective_type)

## Launch hyperparameter tuning job
And finally, we can start our hyperprameter tuning job by calling `.fit()` and passing in the S3 path to our train and test dataset.

After the hyperprameter tuning job is created, you should be able to describe the tuning job to see its progress in the next step, and you can go to SageMaker console->Jobs to check out the progress of the progress of the hyperparameter tuning job.

In [None]:
tuner.fit(inputs)

Let's just run a quick check of the hyperparameter tuning jobs status to make sure it started successfully.

In [None]:
boto3.client('sagemaker').describe_hyper_parameter_tuning_job(
 HyperParameterTuningJobName=tuner.latest_tuning_job.job_name)['HyperParameterTuningJobStatus']

## Analyze tuning job results - after tuning job is completed
Please refer to "HPO_Analyze_TuningJob_Results.ipynb" to see example code to analyze the tuning job results.

## Deploy the best model
Now that we have got the best model, we can deploy it to an endpoint. Please refer to other SageMaker sample notebooks or SageMaker documentation to see how to deploy a model.

## Extra Credit: Training with SageMaker Pipe Mode and TensorFlow using the SageMaker Python SDK

SageMaker Pipe Mode is an input mechanism for SageMaker training containers based on Linux named pipes. SageMaker makes the data available to the training container using named pipes, which allows data to be downloaded from S3 to the container while training is running. For larger datasets, this dramatically improves the time to start training, as the data does not need to be first downloaded to the container. To learn more about pipe mode, please consult the AWS documentation at: https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-training-algo.html#your-algorithms-training-algo-running-container-trainingdata.

In this tutorial, we show you how to train a tf.estimator using data read with SageMaker Pipe Mode. We'll use the SageMaker `PipeModeDataset` class - a special TensorFlow `Dataset` built specifically to read from SageMaker Pipe Mode data. This `Dataset` is available in our TensorFlow containers for TensorFlow versions 1.7.0 and up. It's also open-sourced at https://github.com/aws/sagemaker-tensorflow-extensions and can be built into custom TensorFlow images for use in SageMaker. 

Although you can also build the PipeModeDataset into your own containers, in this tutorial we'll show how you can use the PipeModeDataset by launching training from the SageMaker Python SDK. The SageMaker Python SDK helps you deploy your models for training and hosting in optimized, production ready containers in SageMaker. The SageMaker Python SDK is easy to use, modular, extensible and compatible with TensorFlow and MXNet. 

Different collections of S3 files can be made available to the training container while it's running. These are referred to as "channels" in SageMaker. In this example, we use two channels - one for training data and one for evaluation data. Each channel is mapped to S3 files from different directories. The SageMaker PipeModeDataset knows how to read from the named pipes for each channel given just the channel name. When we launch SageMaker training we tell SageMaker what channels we have and where in S3 to read the data for each channel.


## Complete training source code 

In this section of the tutorial we train a TensorFlow LinearClassifier using pipe mode data. The TensorFlow training script is contained in following file:

In [None]:
!pygmentize 'pipemode.py'

The above script implements all the functions required for a sagemaker tensorflow training script (See: [Preparing TensorFlow Training Script](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/tensorflow/README.rst#preparing-the-tensorflow-training-script)). 

### Using a PipeModeDataset in an input_fn
To train an estimator using a Pipe Mode channel, we must construct an input_fn that reads from the channel. To do this, we use the SageMaker PipeModeDataset. This is a TensorFlow Dataset specifically created to read from a SageMaker Pipe Mode channel. A PipeModeDataset is a fully-featured TensorFlow Dataset and can be used in exactly the same ways as a regular TensorFlow Dataset can be used.

The training and evaluation data used in this tutorial is synthetic. It contains a series of records stored in a TensorFlow Example protobuf object. Each record contains a numeric class label and an array of 1024 floating point numbers. Each array is sampled from a multi-dimensional Gaussian distribution with a class-specific mean. This means it is possible to learn a model using a TensorFlow Linear classifier which can classify examples well. Each record is separated using RecordIO encoding (though the PipeModeDataset class also supports the TFRecord format as well). 

The training and evaluation data were produced using the benchmarking source code in the sagemaker-tensorflow-extensions benchmarking sub-package. If you want to investigate this further, please visit the GitHub repository for sagemaker-tensorflow-extensions at https://github.com/aws/sagemaker-tensorflow-extensions. 

The following example code shows how to use a PipeModeDataset in an input_fn.

```python
from sagemaker_tensorflow import PipeModeDataset

def input_fn():
 # Simple example data - a labeled vector.
 features = {
 'data': tf.FixedLenFeature([], tf.string),
 'labels': tf.FixedLenFeature([], tf.int64),
 }
 
 # A function to parse record bytes to a labeled vector record
 def parse(record):
 parsed = tf.parse_single_example(record, features)
 return ({
 'data': tf.decode_raw(parsed['data'], tf.float64)
 }, parsed['labels'])

 # Construct a PipeModeDataset reading from a 'training' channel, using
 # the TF Record encoding.
 ds = PipeModeDataset(channel='training', record_format='TFRecord')

 # The PipeModeDataset is a TensorFlow Dataset and provides standard Dataset methods
 ds = ds.repeat(20)
 ds = ds.prefetch(10)
 ds = ds.map(parse, num_parallel_calls=10)
 ds = ds.batch(64)
 
 return ds
```

## Running training using the Python SDK

We can use the SDK to run our local training script on SageMaker infrastructure.

1. Pass the path to the pipemode.py file, which contains the functions for defining your estimator, to the sagemaker.TensorFlow init method.
2. Pass the S3 location that we uploaded our data to previously to the fit() method.

In [None]:
from sagemaker.tensorflow import TensorFlow

tensorflow = TensorFlow(entry_point='pipemode.py',
 role=role,
 framework_version='1.11.0',
 input_mode='Pipe',
 output_path=model_artifacts_location,
 code_location=custom_code_upload_location,
 train_instance_count=1,
 training_steps=1000,
 evaluation_steps=100,
 train_instance_type='ml.c4.xlarge')

After we've created the SageMaker Python SDK TensorFlow object, we can call fit to launch TensorFlow training:

In [None]:
%%time
import boto3

# use the region-specific sample data bucket
region = boto3.Session().region_name

# training data that is already partitioned
train_data = 's3://sagemaker-sample-data-{}/tensorflow/pipe-mode/train'.format(region)
eval_data = 's3://sagemaker-sample-data-{}/tensorflow/pipe-mode/eval'.format(region)

tensorflow.fit({'train':train_data, 'eval':eval_data})

After ``fit`` returns, you've successfully trained a TensorFlow LinearClassifier using SageMaker pipe mode! The TensorFlow model data will be stored in '``s3:///artifacts``' - where '````' is the name of the bucket you supplied earlier.

## Final cleanup

### Delete the CloudFormation template

In [None]:
!aws cloudformation delete-stack --stack-name $stack_name

### Deleting SageMaker Neo endpoint

In [None]:
sagemaker.Session().delete_endpoint(optimized_predictor.endpoint)

### Deleting SageMaker the endpoint

In [None]:
sagemaker.Session().delete_endpoint(mnist_predictor.endpoint)

### Delete S3 Bucket

In [None]:
workshop.delete_bucket_completely(bucket)