# Distributed PyTorch in SageMaker with Magic 

In [2]:
%%pytorch?

[0;31mDocstring:[0m
::

  %pytorch [--estimator_name ESTIMATOR_NAME] [--entry_point ENTRY_POINT]
               [--source_dir SOURCE_DIR] [--role ROLE]
               [--framework_version FRAMEWORK_VERSION]
               [--py_version PY_VERSION] [--instance_type INSTANCE_TYPE]
               [--instance_count INSTANCE_COUNT] [--output_path OUTPUT_PATH]
               [--hyperparameters FOO:1,BAR:0.555,BAZ:ABC | 'FOO : 1, BAR : 0.555, BAZ : ABC']
               [--channel_training CHANNEL_TRAINING]
               [--channel_testing CHANNEL_TESTING]
               [--use_spot_instances [USE_SPOT_INSTANCES]]
               [--max_wait MAX_WAIT]
               [--enable_sagemaker_metrics [ENABLE_SAGEMAKER_METRICS]]
               [--metric_definitions ['Name: loss, Regex: Loss = .*?);' ['Name: loss, Regex: Loss = (.*?;' ...]]]
               [--name_contains NAME_CONTAINS] [--max_result MAX_RESULT]
               {submit,list,status,logs,delete}

Pytorch magic command.

methods:
  {sub

### Setup S3 bucket locations

First, setup some locations in the default SageMaker bucket to store the raw input datasets and the Pytorch job output.

In [1]:
import sagemaker

sess = sagemaker.Session()

output_path='s3://' + sess.default_bucket() + '/pytorch/mnist'

MNIST is a widely used dataset for handwritten digit classification. It consists of 70,000 labeled 28x28 pixel grayscale images of hand-written digits. The dataset is split into 60,000 training images and 10,000 test images.

In [2]:
import os
import json
import logging
import boto3
from botocore.exceptions import ClientError


# Download training and testing data from a public S3 bucket


def download_from_s3(data_dir='/tmp/data', train=True):
    """Download MNIST dataset and convert it to numpy array
    
    Args:
        data_dir (str): directory to save the data
        train (bool): download training set
    
    Returns:
        None
    """
    
    # Get global config
#     with open('code/config.json', 'r') as f:
#         CONFIG=json.load(f)

    CONFIG = {}
    CONFIG['public_bucket'] = "sagemaker-sample-files"
    
    if not os.path.exists(data_dir):
        os.makedirs(data_dir)
    
    if train:
        images_file = "train-images-idx3-ubyte.gz"
        labels_file = "train-labels-idx1-ubyte.gz"
    else:
        images_file = "t10k-images-idx3-ubyte.gz"
        labels_file = "t10k-labels-idx1-ubyte.gz"

    # download objects
    s3 = boto3.client('s3')
    bucket = CONFIG['public_bucket']
    for obj in [images_file, labels_file]:
        key = os.path.join("datasets/image/MNIST", obj)
        dest = os.path.join(data_dir, obj)
        if not os.path.exists(dest):
            s3.download_file(bucket, key, dest)
    return


download_from_s3('/tmp/data', True)
download_from_s3('/tmp/data', False)

In [5]:
prefix = 'mnist'
bucket = sess.default_bucket()
loc = sess.upload_data(path='/tmp/data', bucket=bucket, key_prefix=prefix)

channels = {
    "training": loc,
    "testing": loc
}


In [7]:
print(output_path)
print(channels.get('training'))
print(channels.get('testing'))

s3://sagemaker-eu-west-1-245582572290/pytorch/mnist
s3://sagemaker-eu-west-1-245582572290/mnist
s3://sagemaker-eu-west-1-245582572290/mnist


### Write the PySTorch script

The source for a traning script is in the cell below. The cell uses the `%%pytorch submit` directive to submit python application from cell to PyTorch Estimator. 

In [10]:
%%pytorch submit --enable_sagemaker_metrics  --metric_definitions 'Name: loss, Regex: Loss: (.*)' --use_spot_instances --max_wait 86400 --output_path s3://sagemaker-eu-west-1-245582572290/pytorch/mnist --channel_training s3://sagemaker-eu-west-1-245582572290/mnist --channel_testing s3://sagemaker-eu-west-1-245582572290/mnist --hyperparameters 'batch-size: 128 ,epochs: 20, learning-rate: 1e-3, log-interval: 100, backend: gloo'  
# --instance_type ml.g4dn.xlarge --instance_count 2 
# --instance_type ml.c4.xlarge --instance_count 2 


import argparse
import gzip
import json
import logging
import os
import sys

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.distributed as dist


logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))

# Based on https://github.com/pytorch/examples/blob/master/mnist/main.py
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)


# Decode binary data from SM_CHANNEL_TRAINING
# Decode and preprocess data
# Create map dataset


def normalize(x, axis):
    eps = np.finfo(float).eps
    mean = np.mean(x, axis=axis, keepdims=True)
    # avoid division by zero
    std = np.std(x, axis=axis, keepdims=True) + eps
    return (x - mean) / std

def convert_to_tensor(data_dir, images_file, labels_file):
    """Byte string to torch tensor 
    """
    with gzip.open(os.path.join(data_dir, images_file), 'rb') as f:
        images = np.frombuffer(f.read(), 
                np.uint8, offset=16).reshape(-1, 28, 28).astype(np.float32)

    with gzip.open(os.path.join(data_dir, labels_file), 'rb') as f:
        labels = np.frombuffer(f.read(), np.uint8, offset=8).astype(
                np.int64)
        
    # normalize the images
    images = normalize(images, axis=(1,2))

    # add channel dimension (depth-major)
    images = np.expand_dims(images, axis=1)

    # to torch tensor
    images = torch.tensor(images, dtype=torch.float32)
    labels = torch.tensor(labels, dtype=torch.int64)
    return images, labels 

        
class MNIST(Dataset):
    def __init__(self, data_dir, train=True):

        if train:
            images_file="train-images-idx3-ubyte.gz"
            labels_file="train-labels-idx1-ubyte.gz"
        else:
            images_file="t10k-images-idx3-ubyte.gz"
            labels_file="t10k-labels-idx1-ubyte.gz"
        
        self.images, self.labels = convert_to_tensor(
                data_dir, images_file, labels_file)
        

    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        return self.images[idx], self.labels[idx]


def train(args):
    # Initialize the distributed environment.
    if(len(args.hosts)>1):
        world_size = len(args.hosts)
        os.environ['WORLD_SIZE'] = str(world_size)
        host_rank = args.hosts.index(args.current_host)
        dist.init_process_group(backend=args.backend, rank=host_rank)
    
    # GPU,  CPU
    use_cuda = args.num_gpus > 0
    device = torch.device("cuda" if use_cuda > 0 else "cpu")

    torch.manual_seed(args.seed)
    if use_cuda:
        torch.cuda.manual_seed(args.seed)

    train_loader = DataLoader(MNIST(args.train, train=True), 
            batch_size=args.batch_size, shuffle=True)
    test_loader = DataLoader(MNIST(args.test, train=False),
            batch_size=args.test_batch_size, shuffle=False)

    net = Net().to(device)
    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.Adam(net.parameters(), 
            betas=(args.beta_1, args.beta_2),
            weight_decay=args.weight_decay)

    logger.info("Start training ...")    
    for epoch in range(1, args.epochs+1):
        net.train()
        for batch_idx, (imgs, labels) in enumerate(train_loader, 1):
            imgs, labels = imgs.to(device), labels.to(device)
            output = net(imgs)
            loss = loss_fn(output, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            if batch_idx % args.log_interval == 0:
                print('Train Epoch: {} [{}/{} ({:.0f}%)] Loss: {:.6f}'.format(
                    epoch, batch_idx * len(imgs), len(train_loader.sampler),
                    100. * batch_idx / len(train_loader), loss.item()))
        
        # test the model
        test(net, test_loader, device)

    # save model checkpoint
    save_model(net, args.model_dir)
    return

def test(model, test_loader, device):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for imgs, labels in test_loader:
            imgs, labels = imgs.to(device), labels.to(device)
            output = model(imgs)
            test_loss+=F.cross_entropy(output, labels, reduction='sum').item()
            
            pred = output.max(1, keepdim=True)[1]
            correct+=pred.eq(labels.view_as(pred)).sum().item()
        
    test_loss /= len(test_loader.dataset)
    logger.info('Test set: Average loss: {:.4f}, Accuracy: {}/{}, {})\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100.0 * correct / len(test_loader.dataset)
        ))
    return

def save_model(model, model_dir):
    logger.info('Saving the model')
    path = os.path.join(model_dir, 'model.pth')
    torch.save(model.cpu().state_dict(), path)
    return

def parse_args():
    parser = argparse.ArgumentParser()

    # Data and model checkpoints directories
    parser.add_argument('--batch-size', type=int, default=64, metavar='N',
                        help='input batch size for training (default: 64)')
    parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
                        help='input batch size for testing (default: 1000)')
    parser.add_argument('--epochs', type=int, default=1, metavar='N',
                        help='number of epochs to train (default: 1)')
    parser.add_argument('--learning-rate', type=float, default=0.001, metavar='LR',
                        help='learning rate (default: 0.01)')
    parser.add_argument('--beta_1', type=float, default=0.9, metavar='BETA1',
                        help='beta1 (default: 0.9)')
    parser.add_argument('--beta_2', type=float, default=0.999, metavar='BETA2',
                        help='beta2 (default: 0.999)')
    parser.add_argument('--weight-decay', type=float, default=1e-4, metavar='WD',
                        help='L2 weight decay (default: 1e-4)')
    parser.add_argument('--seed', type=int, default=1, metavar='S',
                        help='random seed (default: 1)')
    parser.add_argument('--log-interval', type=int, default=100, metavar='N',
                        help='how many batches to wait before logging training status')
    parser.add_argument('--backend', type=str, default=None,
                        help='backend for distributed training (tcp, gloo on cpu and gloo, nccl on gpu)')

    # Container environment
    parser.add_argument('--hosts', type=list, default=json.loads(os.environ['SM_HOSTS']))
    parser.add_argument('--current-host', type=str, default=os.environ['SM_CURRENT_HOST'])
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAINING'])
    parser.add_argument('--test', type=str, default=os.environ['SM_CHANNEL_TESTING'])
    parser.add_argument('--num-gpus', type=int, default=os.environ['SM_NUM_GPUS'])
    
    return parser.parse_args()


if __name__ == "__main__":
    args = parse_args()
        
    train(args)

Couldn't call 'get_role' to get Role ARN from role name workshop-sagemaker to get Role path.


submit:
 {
    "channel_testing": "s3://sagemaker-eu-west-1-245582572290/mnist",
    "channel_training": "s3://sagemaker-eu-west-1-245582572290/mnist",
    "enable_sagemaker_metrics": true,
    "entry_point": "/tmp/tmp-497a920d-e287-4f8e-ba53-251c4ffb80ad.py",
    "estimator_name": "___PyTorch_estimator",
    "framework_version": "1.5.0",
    "hyperparameters": {
        "backend": "gloo",
        "batch-size": "128",
        "epochs": "20",
        "learning-rate": "1e-3",
        "log-interval": "100"
    },
    "instance_count": 1,
    "instance_type": "ml.c4.xlarge",
    "max_result": 10,
    "max_wait": 86400,
    "metric_definitions": [
        {
            "Name": "loss",
            "Regex": "Loss: (.*)"
        }
    ],
    "name_contains": "pytorch",
    "output_path": "s3://sagemaker-eu-west-1-245582572290/pytorch/mnist",
    "py_version": "py3",
    "role": "arn:aws:iam::245582572290:role/workshop-sagemaker",
    "use_spot_instances": true
}
{
    "___PyTorch_latest_job_na

## Stop latest traning Job

In [9]:
%pytorch delete

{
    "AlgorithmSpecification": {
        "EnableSageMakerMetricsTimeSeries": true,
        "MetricDefinitions": [
            {
                "Name": "loss",
                "Regex": "Loss: (.*)"
            }
        ],
        "TrainingImage": "763104351884.dkr.ecr.eu-west-1.amazonaws.com/pytorch-training:1.5.0-cpu-py3",
        "TrainingInputMode": "File"
    },
    "CreationTime": "2020-12-17 17:26:09.297000+00:00",
    "DebugHookConfig": {
        "CollectionConfigurations": [],
        "S3OutputPath": "s3://sagemaker-eu-west-1-245582572290/pytorch/mnist"
    },
    "EnableInterContainerTrafficEncryption": false,
    "EnableManagedSpotTraining": true,
    "EnableNetworkIsolation": false,
    "HyperParameters": {
        "backend": "\"gloo\"",
        "batch-size": "\"128\"",
        "epochs": "\"20\"",
        "learning-rate": "\"1e-3\"",
        "log-interval": "\"100\"",
        "sagemaker_container_log_level": "20",
        "sagemaker_job_name": "\"pytorch-training-2020-12-1

## Describe latest traning Job

In [25]:
%pytorch status

{
    "AlgorithmSpecification": {
        "EnableSageMakerMetricsTimeSeries": true,
        "MetricDefinitions": [
            {
                "Name": "loss",
                "Regex": "Loss: (.*)"
            }
        ],
        "TrainingImage": "763104351884.dkr.ecr.eu-west-1.amazonaws.com/pytorch-training:1.5.0-cpu-py3",
        "TrainingInputMode": "File"
    },
    "BillableTimeInSeconds": 107,
    "CreationTime": "2020-12-17 17:26:21.532000+00:00",
    "DebugHookConfig": {
        "CollectionConfigurations": [],
        "S3OutputPath": "s3://sagemaker-eu-west-1-245582572290/pytorch/mnist"
    },
    "EnableInterContainerTrafficEncryption": false,
    "EnableManagedSpotTraining": true,
    "EnableNetworkIsolation": false,
    "FinalMetricDataList": [
        {
            "MetricName": "loss",
            "Timestamp": "1970-01-19 14:43:46.439000+00:00",
            "Value": 0.10897199809551239
        }
    ],
    "HyperParameters": {
        "backend": "\"gloo\"",
        "batc

## Show logs for latest traning Job

In [20]:
%pytorch logs

2020-12-17 17:30:27 Starting - Preparing the instances for training
2020-12-17 17:30:27 Downloading - Downloading input data
2020-12-17 17:30:27 Training - Training image download completed. Training in progress.[34mbash: cannot set terminal process group (-1): Inappropriate ioctl for device[0m
[34mbash: no job control in this shell[0m
[34m2020-12-17 17:29:46,602 sagemaker-containers INFO     Imported framework sagemaker_pytorch_container.training[0m
[34m2020-12-17 17:29:46,605 sagemaker-containers INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2020-12-17 17:29:46,618 sagemaker_pytorch_container.training INFO     Block until all host DNS lookups succeed.[0m
[34m2020-12-17 17:29:49,653 sagemaker_pytorch_container.training INFO     Invoking user training script.[0m
[34m2020-12-17 17:29:49,959 sagemaker-containers INFO     Module default_user_module_name does not provide a setup.py. [0m
[34mGenerating setup.py[0m
[34m2020-12-17 17:29:49,959 sagemaker-conta

## List traning jobs

In [13]:
%pytorch list 

{
    "NextToken": "cIws2QhTXUIa8bi8X9aU7gCAR0Xdc3x9L/Ofg4vsVMTtcNqRqLcpBqE42+cDc29TFQi5WMntyYF8Dtfi7hilXAF3S3jOJ0DmOuxvXC7MuU1Q6+20eQKMbbovB90pwL5DPnINepnlLEmFhvO87tIVNZR4vTy3ef5rgF6dqbA0VVq0m92q6y2SOofBaZP49sRdVnJtTcTQaS5EqeYVIuH5KxlQ5w0j5RQq6GZMUD+Yb2yDOCsvqfy1owdkaN5KZ4FGRNtmDYtMraMQRqc2qtXJqyeGVeTI2ay1LmtZUOEhRDz3vRe0Pt1v0C4FDO31aVce2uKbzUS+TI0u9eGnRVRpTlPz1+OKucA7cCkpc6121h+TvwCWzUFRylDK5bA9jdGxQ5WqvdHLF3P4RVNx8ltlG36ht76Wv1twVDcPQ1wZIMTfWInXZp1IY5V31hwN0wFMHIJj86VYFo1G/D59pwDhXnd9Iyj0APFLa9mhymUH3TrC9JytAPYKP8yt92QVOBs+CVElLY9l+EVZF26L+spMwJMg9NPWqPQ8T+uwHRkAsKxOLnlc6qKLB6EMy7Q8ZrrNUHxUJwbNLQ==",
    "ResponseMetadata": {
        "HTTPHeaders": {
            "content-length": "3674",
            "content-type": "application/x-amz-json-1.1",
            "date": "Thu, 17 Dec 2020 17:26:39 GMT",
            "x-amzn-requestid": "69a07fc8-5268-4436-a7ee-4ae1a16a3ba8"
        },
        "HTTPStatusCode": 200,
        "RequestId": "69a07fc8-5268-4436-a7ee-4ae1a16a3ba8",
        "RetryA

## Use estimator variable

In [26]:
___PyTorch_estimator.training_job_analytics.name

'pytorch-training-2020-12-17-17-26-21-133'

In [27]:
___PyTorch_estimator.training_job_analytics.dataframe()

Unnamed: 0,timestamp,metric_name,value
0,0.0,loss,0.214329
1,60.0,loss,0.141576
2,120.0,loss,0.123643
3,180.0,loss,0.117714
4,240.0,loss,0.108972


## Notebook metainformation

In [28]:
%less /opt/ml/metadata/resource-metadata.json

{"AppType":"KernelGateway","DomainId":"d-yu5msju0ejog","UserProfileName":"lblokhin-custom","ResourceArn":"arn:aws:sagemaker:eu-west-1:245582572290:app/d-yu5msju0ejog/lblokhin-custom/KernelGateway/lblokhin-ml-t3-medium-879b63d5fe00d0d11cf7ff2a5992","ResourceName":"lblokhin-ml-t3-medium-879b63d5fe00d0d11cf7ff2a5992","AppImageVersion":""}

In [45]:
%env

{'LANGUAGE': 'en_US.UTF-8',
 'REGION_NAME': 'eu-west-1',
 'HOSTNAME': 'lblokhin-ml-t3-medium-411fee9dd41a1713fdc0a5c3fa84',
 'HOME': '/home/jovyan',
 'CONDA_VERSION': '4.9.0',
 'AWS_CONTAINER_CREDENTIALS_RELATIVE_URI': '/_sagemaker-instance-credentials/6ad65d45a08640ebd438a0158820c7dac41b815a19816ea80cc6f0756a6c2e09',
 'NB_USER': 'jovyan',
 'AWS_DEFAULT_REGION': 'eu-west-1',
 'PATH': '/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/tmp/miniconda3/condabin:/tmp/anaconda3/condabin:/tmp/miniconda2/condabin:/tmp/anaconda2/condabin',
 'NB_GID': '100',
 'LANG': 'en_US.UTF-8',
 'AWS_ACCOUNT_ID': '245582572290',
 'DEBIAN_FRONTEND': 'noninteractive',
 'SHELL': '/bin/bash',
 'AWS_REGION': 'eu-west-1',
 'AWS_INTERNAL_IMAGE_OWNER': 'Custom',
 'CONDA_DIR': '/opt/.sagemakerinternal/conda',
 'LC_ALL': 'en_US.UTF-8',
 'PWD': '/home/jovyan/work',
 'SAGEMAKER_LOG_FILE': '/var/log/studio/kernel_gateway.log',
 'NB_UID': '1000',
 'JUPYTER_PATH': '/opt/conda/share/jupyter/',
 'K