# Distributed Tensorflow in SageMaker with Magic 

In [1]:
%%tfjob?

[0;31mDocstring:[0m
::

 %tfjob [--estimator_name ESTIMATOR_NAME] [--entry_point ENTRY_POINT]
 [--source_dir SOURCE_DIR] [--role ROLE]
 [--framework_version FRAMEWORK_VERSION] [--py_version PY_VERSION]
 [--instance_type INSTANCE_TYPE] [--instance_count INSTANCE_COUNT]
 [--output_path OUTPUT_PATH]
 [--hyperparameters FOO:1,BAR:0.555,BAZ:ABC | 'FOO : 1, BAR : 0.555, BAZ : ABC']
 [--channel_training CHANNEL_TRAINING]
 [--channel_testing CHANNEL_TESTING]
 [--use_spot_instances [USE_SPOT_INSTANCES]] [--max_wait MAX_WAIT]
 [--enable_sagemaker_metrics [ENABLE_SAGEMAKER_METRICS]]
 [--metric_definitions ['Name: ganloss, Regex: GAN_loss=.*?);' ['Name: ganloss, Regex: GAN_loss=(.*?;' ...]]]
 [--distribution {parameter_server,horovod}]
 [--mpi_processes_per_host MPI_PROCESSES_PER_HOST]
 [--mpi_custom_mpi_options MPI_CUSTOM_MPI_OPTIONS]
 [--name_contains NAME_CONTAINS] [--max_result MAX_RESULT]
 {submit,list,status,logs,delete}

Tensorflow magic command.

methods:
 {submit,list,status,logs,delete

### Setup S3 bucket locations

First, setup some locations in the default SageMaker bucket to store the raw input datasets and the Tensorflow job output.

In [1]:
import sagemaker

sess = sagemaker.Session()
output_path='s3://' + sess.default_bucket() + '/mnist'

Couldn't call 'get_role' to get Role ARN from role name workshop-sagemaker to get Role path.


MNIST is a widely used dataset for handwritten digit classification. It consists of 70,000 labeled 28x28 pixel grayscale images of hand-written digits. The dataset is split into 60,000 training images and 10,000 test images.

In [2]:
import os
import json
import logging
import boto3
from botocore.exceptions import ClientError
# Download training and testing data from a public S3 bucket

def download_from_s3(data_dir='/tmp/data', train=True):
 """Download MNIST dataset and convert it to numpy array
 
 Args:
 data_dir (str): directory to save the data
 train (bool): download training set
 
 Returns:
 None
 """
 
 if not os.path.exists(data_dir):
 os.makedirs(data_dir)
 
 if train:
 images_file = "train-images-idx3-ubyte.gz"
 labels_file = "train-labels-idx1-ubyte.gz"
 else:
 images_file = "t10k-images-idx3-ubyte.gz"
 labels_file = "t10k-labels-idx1-ubyte.gz"
 
# with open('code/config.json', 'r') as f:
# config = json.load(f)
 config = {}
 config['public_bucket'] = "sagemaker-sample-files"

 # download objects
 s3 = boto3.client('s3')
 bucket = config['public_bucket']
 for obj in [images_file, labels_file]:
 key = os.path.join("datasets/image/MNIST", obj)
 dest = os.path.join(data_dir, obj)
 if not os.path.exists(dest):
 s3.download_file(bucket, key, dest)
 return


download_from_s3('/tmp/data', True)
download_from_s3('/tmp/data', False)

In [3]:
# upload to the default bucket

prefix = 'mnist'
bucket = sess.default_bucket()
loc = sess.upload_data(path='/tmp/data', bucket=bucket, key_prefix=prefix)

channels = {
 "training": loc,
 "testing": loc
}



### Write the Tensorflow script

The source for a traning script is in the cell below. The cell uses the `%%tfjob submit` directive to submit python application from cell to Tensorflow Estimator. 

In [9]:
%%tfjob submit --output_path s3://sagemaker-eu-west-1-245582572290/mnist --channel_training s3://sagemaker-eu-west-1-245582572290/mnist --channel_testing s3://sagemaker-eu-west-1-245582572290/mnist 

from __future__ import print_function

import argparse
import logging
import os
import json
import gzip
import numpy as np
import traceback

import tensorflow as tf
from tensorflow.keras.layers import Dense, Flatten, Conv2D
from tensorflow.keras import Model


logging.basicConfig(level=logging.DEBUG)

# Define the model object

class SmallConv(Model):
 def __init__(self):
 super(SmallConv, self).__init__()
 self.conv1 = Conv2D(32, 3, activation='relu')
 self.flatten = Flatten()
 self.d1 = Dense(128, activation='relu')
 self.d2 = Dense(10)
 
 def call(self, x):
 x = self.conv1(x)
 x = self.flatten(x)
 x = self.d1(x)
 return self.d2(x)


# Decode and preprocess data
def convert_to_numpy(data_dir, images_file, labels_file):
 """Byte string to numpy arrays"""
 with gzip.open(os.path.join(data_dir, images_file), 'rb') as f:
 images = np.frombuffer(f.read(), np.uint8, offset=16).reshape(-1, 28, 28)
 
 with gzip.open(os.path.join(data_dir, labels_file), 'rb') as f:
 labels = np.frombuffer(f.read(), np.uint8, offset=8)

 return (images, labels)

def mnist_to_numpy(data_dir, train):
 """Load raw MNIST data into numpy array
 
 Args:
 data_dir (str): directory of MNIST raw data. 
 This argument can be accessed via SM_CHANNEL_TRAINING
 
 train (bool): use training data

 Returns:
 tuple of images and labels as numpy array
 """

 if train:
 images_file = "train-images-idx3-ubyte.gz"
 labels_file = "train-labels-idx1-ubyte.gz"
 else:
 images_file = "t10k-images-idx3-ubyte.gz"
 labels_file = "t10k-labels-idx1-ubyte.gz"

 return convert_to_numpy(data_dir, images_file, labels_file)


def normalize(x, axis):
 eps = np.finfo(float).eps

 mean = np.mean(x, axis=axis, keepdims=True)
 # avoid division by zero
 std = np.std(x, axis=axis, keepdims=True) + eps
 return (x - mean) / std

# Training logic

def train(args):
 # create data loader from the train / test channels
 x_train, y_train = mnist_to_numpy(data_dir=args.train, train=True)
 x_test, y_test = mnist_to_numpy(data_dir=args.test, train=False)

 x_train, x_test = x_train.astype(np.float32), x_test.astype(np.float32)

 # normalize the inputs to mean 0 and std 1
 x_train, x_test = normalize(x_train, (1, 2)), normalize(x_test, (1, 2))

 # expand channel axis
 # tf uses depth minor convention
 x_train, x_test = np.expand_dims(x_train, axis=3), np.expand_dims(x_test, axis=3)
 
 # normalize the data to mean 0 and std 1
 train_loader = tf.data.Dataset.from_tensor_slices(
 (x_train, y_train)).shuffle(len(x_train)).batch(args.batch_size)

 test_loader = tf.data.Dataset.from_tensor_slices(
 (x_test, y_test)).batch(args.batch_size)

 model = SmallConv()
 model.compile()
 loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
 optimizer = tf.keras.optimizers.Adam(
 learning_rate=args.learning_rate, 
 beta_1=args.beta_1,
 beta_2=args.beta_2
 )


 train_loss = tf.keras.metrics.Mean(name='train_loss')
 train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

 test_loss = tf.keras.metrics.Mean(name='test_loss')
 test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')


 @tf.function
 def train_step(images, labels):
 with tf.GradientTape() as tape:
 predictions = model(images, training=True)
 loss = loss_fn(labels, predictions)
 grad = tape.gradient(loss, model.trainable_variables)
 optimizer.apply_gradients(zip(grad, model.trainable_variables))
 
 train_loss(loss)
 train_accuracy(labels, predictions)
 return 
 
 @tf.function
 def test_step(images, labels):
 predictions = model(images, training=False)
 t_loss = loss_fn(labels, predictions)
 test_loss(t_loss)
 test_accuracy(labels, predictions)
 return
 
 print("Training starts ...")
 for epoch in range(args.epochs):
 train_loss.reset_states()
 train_accuracy.reset_states()
 test_loss.reset_states()
 test_accuracy.reset_states()
 
 for batch, (images, labels) in enumerate(train_loader):
 train_step(images, labels)
 
 for images, labels in test_loader:
 test_step(images, labels)
 
 print(
 f'Epoch {epoch + 1}, '
 f'Loss: {train_loss.result()}, '
 f'Accuracy: {train_accuracy.result() * 100}, '
 f'Test Loss: {test_loss.result()}, '
 f'Test Accuracy: {test_accuracy.result() * 100}'
 )

 # Save the model
 # A version number is needed for the serving container
 # to load the model
 version = '00000000'
# ckpt_dir = os.path.join(args.model_dir, version)
 ckpt_dir = os.path.join(args.sm_model_dir, version)
 if not os.path.exists(ckpt_dir):
 os.makedirs(ckpt_dir)
 model.save(ckpt_dir)
 return


def parse_args():
 parser = argparse.ArgumentParser()

 parser.add_argument('--batch-size', type=int, default=32)
 parser.add_argument('--epochs', type=int, default=1)
 parser.add_argument('--learning-rate', type=float, default=1e-3)
 parser.add_argument('--beta_1', type=float, default=0.9)
 parser.add_argument('--beta_2', type=float, default=0.999)
 
 # Environment variables given by the training image
 
# parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
 parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAINING'])
 parser.add_argument('--test', type=str, default=os.environ['SM_CHANNEL_TESTING'])

 parser.add_argument('--current-host', type=str, default=os.environ['SM_CURRENT_HOST'])
 parser.add_argument('--hosts', type=list, default=json.loads(os.environ['SM_HOSTS']))
 
 # Data, model, and output directories.
 # model_dir is always passed in from SageMaker.
 # By default this is a S3 path under the default bucket.
 parser.add_argument('--model_dir', type=str)
 parser.add_argument('--sm-model-dir', type=str, default=os.environ.get('SM_MODEL_DIR'))

 return parser.parse_args()



if __name__ == '__main__':
 args = parse_args()
 train(args)



Couldn't call 'get_role' to get Role ARN from role name workshop-sagemaker to get Role path.


submit:
 {
 "channel_testing": "s3://sagemaker-eu-west-1-245582572290/mnist",
 "channel_training": "s3://sagemaker-eu-west-1-245582572290/mnist",
 "enable_sagemaker_metrics": false,
 "entry_point": "/tmp/tmp-4c621cef-148d-4252-918a-52e58e750496.py",
 "estimator_name": "___TensorFlow_estimator",
 "framework_version": "2.3.0",
 "instance_count": 1,
 "instance_type": "ml.c4.xlarge",
 "max_result": 10,
 "mpi_custom_mpi_options": "--NCCL_DEBUG INFO",
 "mpi_processes_per_host": 4,
 "name_contains": "tensorflow",
 "output_path": "s3://sagemaker-eu-west-1-245582572290/mnist",
 "py_version": "py37",
 "role": "arn:aws:iam::245582572290:role/workshop-sagemaker",
 "use_spot_instances": false
}
{
 "___TensorFlow_latest_job_name": "tensorflow-training-2020-12-17-17-28-10-885",
 "estimator_variable": "___TensorFlow_estimator"
}


In [12]:
___TensorFlow_latest_job_name

'tensorflow-training-2020-12-17-17-28-10-885'

## Stop latest traning Job

In [8]:
%tfjob delete

{
 "AlgorithmSpecification": {
 "EnableSageMakerMetricsTimeSeries": false,
 "TrainingImage": "763104351884.dkr.ecr.eu-west-1.amazonaws.com/tensorflow-training:2.3.0-cpu-py37",
 "TrainingInputMode": "File"
 },
 "CreationTime": "2020-12-17 17:27:44.884000+00:00",
 "DebugHookConfig": {
 "CollectionConfigurations": [],
 "S3OutputPath": "s3://sagemaker-eu-west-1-245582572290/mnist"
 },
 "EnableInterContainerTrafficEncryption": false,
 "EnableManagedSpotTraining": false,
 "EnableNetworkIsolation": false,
 "HyperParameters": {
 "model_dir": "\"s3://sagemaker-eu-west-1-245582572290/mnist/tensorflow-training-2020-12-17-17-27-44-577/model\"",
 "sagemaker_container_log_level": "20",
 "sagemaker_job_name": "\"tensorflow-training-2020-12-17-17-27-44-577\"",
 "sagemaker_program": "\"tmp-223ca4e1-019c-489c-9b25-d553c2cc311d.py\"",
 "sagemaker_region": "\"eu-west-1\"",
 "sagemaker_submit_directory": "\"s3://sagemaker-eu-west-1-245582572290/tensorflow-training-2020-12-17-17-27-44-577/source/sourcedir.t

## Describe latest traning Job

In [7]:
%tfjob status

{
 "AlgorithmSpecification": {
 "EnableSageMakerMetricsTimeSeries": false,
 "TrainingImage": "763104351884.dkr.ecr.eu-west-1.amazonaws.com/tensorflow-training:2.3.0-cpu-py37",
 "TrainingInputMode": "File"
 },
 "CreationTime": "2020-12-17 17:27:44.884000+00:00",
 "DebugHookConfig": {
 "CollectionConfigurations": [],
 "S3OutputPath": "s3://sagemaker-eu-west-1-245582572290/mnist"
 },
 "EnableInterContainerTrafficEncryption": false,
 "EnableManagedSpotTraining": false,
 "EnableNetworkIsolation": false,
 "HyperParameters": {
 "model_dir": "\"s3://sagemaker-eu-west-1-245582572290/mnist/tensorflow-training-2020-12-17-17-27-44-577/model\"",
 "sagemaker_container_log_level": "20",
 "sagemaker_job_name": "\"tensorflow-training-2020-12-17-17-27-44-577\"",
 "sagemaker_program": "\"tmp-223ca4e1-019c-489c-9b25-d553c2cc311d.py\"",
 "sagemaker_region": "\"eu-west-1\"",
 "sagemaker_submit_directory": "\"s3://sagemaker-eu-west-1-245582572290/tensorflow-training-2020-12-17-17-27-44-577/source/sourcedir.t

## Show logs for latest traning Job

In [13]:
%tfjob logs


2020-12-17 17:28:33 Starting - Launching requested ML instances.null


## List traning jobs

In [6]:
%tfjob list

{
 "NextToken": "cIws2QhTXUIa8bi8X9aU7gCAR0Xdc3x9L/Ofg4vsVMTtcNqRqLcpBqE42+cDc29TFQi5WMntyYF8Dtfi7hilXAF3S3jOJ0DmOuxvXC7MuU1Q6+20eQKMbbovB90pwL5DPnINepnlLEmFhvO87tIVNZR4vTy3ef5rgF6dqbA0VVq0m92q6y2SOofBaZP49sRdVnJtTcTQaS5EqeYVIuH5KxlQ5w0j5RQq6GZMUD+Yb2yDOCsvqfy1owdkaN5KZ4UOJZ02BEXJUQQfH9slv9djHeoqy1UQeAT4Uj9JfW7GSGGhGbqBQpALEjcsu7VkI9wUADMYCWYOlP+lUtW8E2nSCujGvcsgo9AYnWN4Mg9fAZA1v19kYU5BB3xHSRprEU/tz/yJxJdxE2jqVM3qT/9GmPydPS9kETLj6F4XNZHCzJgu8qXvXFjinhEQdrLD1c3EhItDsieIUexyFvCnBZ2zc5gUDNcBOUTPIKpF17tfSgav50UtzbMbcYSaK2kGOSl3i6+ahsHMUSm91mecIbqrxUjSdqToSsJwf3n0hkGr/bDXAA+L+TosLKfp4NkhLxwo0n6xKnMMAO+WIA==",
 "ResponseMetadata": {
 "HTTPHeaders": {
 "content-length": "917",
 "content-type": "application/x-amz-json-1.1",
 "date": "Thu, 17 Dec 2020 17:27:48 GMT",
 "x-amzn-requestid": "229a98f7-db12-44e8-bfc8-4cf522d8542a"
 },
 "HTTPStatusCode": 200,
 "RequestId": "229a98f7-db12-44e8-bfc8-4cf522d8542a",
 "RetryAttempts": 0
 },
 "TrainingJobSummaries": [
 {
 "CreationTime": "2020-12-17 17:27:4