In [1]:
!pip3 install -U torch torchvision pandas scikit-learn tensorboard ray[all]==2.0.0rc0
  #https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl

Defaulting to user installation because normal site-packages is not writeable
Requirement already up-to-date: torch in /home/emr-notebook/.local/lib/python3.7/site-packages (1.12.1)
Requirement already up-to-date: torchvision in /home/emr-notebook/.local/lib/python3.7/site-packages (0.13.1)
Requirement already up-to-date: pandas in /home/emr-notebook/.local/lib/python3.7/site-packages (1.3.5)
Requirement already up-to-date: datasets in /home/emr-notebook/.local/lib/python3.7/site-packages (2.4.0)
Requirement already up-to-date: accelerate in /home/emr-notebook/.local/lib/python3.7/site-packages (0.12.0)
Requirement already up-to-date: scikit-learn in /home/emr-notebook/.local/lib/python3.7/site-packages (1.0.2)
Requirement already up-to-date: mlflow in /home/emr-notebook/.local/lib/python3.7/site-packages (1.27.0)
Requirement already up-to-date: tensorboard in /home/emr-notebook/.local/lib/python3.7/site-packages (2.9.1)
Requirement already up-to-date: ray[all]==2.0.0rc0 in /home/emr-n

In [2]:
# restart kernel to pick up the pip installs above
import IPython

IPython.Application.instance().kernel.do_shutdown(True) #automatically restarts kernel

{'status': 'ok', 'restart': True}

In [3]:
!ray disable-usage-stats

Usage stats disabled for future clusters. Restart any current running clusters for this to take effect.
[0m

In [1]:
import ray

ray.shutdown()
address='ray://localhost:10001'
ray.init(address=address)

0,1
Python version:,3.7.10
Ray version:,2.0.0rc0
Dashboard:,http://127.0.0.1:8265


In [2]:
import ray
from ray.data.datasource import SimpleTorchDatasource
import torchvision
import torchvision.transforms as transforms
from torchvision.transforms import ToTensor

ray.shutdown()
ray.init(address='ray://localhost:10001',
         runtime_env={"pip": [
                                "torch",
                                "torchvision",
                                "scikit-learn",
                                "pandas",
                                "scikit-learn",
                                "tensorboardx"
                             ]
                     })

def train_dataset_factory():
    return torchvision.datasets.FashionMNIST(root="./data", download=True, train=True, transform=ToTensor())

def test_dataset_factory():
    return torchvision.datasets.FashionMNIST(root="./data", download=True, train=False, transform=ToTensor())

train_dataset: ray.data.Dataset = ray.data.read_datasource(SimpleTorchDatasource(), dataset_factory=train_dataset_factory)
test_dataset: ray.data.Dataset = ray.data.read_datasource(SimpleTorchDatasource(), dataset_factory=test_dataset_factory)



In [3]:
train_dataset

Dataset(num_blocks=1, num_rows=60000, schema=<class 'tuple'>)

In [4]:
from typing import Tuple
import pandas as pd
from ray.data.extensions import TensorArray
import torch


def convert_batch_to_pandas(batch: Tuple[torch.Tensor, int]) -> pd.DataFrame:
    images = TensorArray([image.numpy() for image, _ in batch])
    labels = [label for _, label in batch]

    df = pd.DataFrame({"image": images, "label": labels})

    return df


train_dataset = train_dataset.map_batches(convert_batch_to_pandas)
test_dataset = test_dataset.map_batches(convert_batch_to_pandas)

Read->Map_Batches: 100%|██████████| 1/1 [00:08<00:00,  8.42s/it]
Read->Map_Batches: 100%|██████████| 1/1 [00:01<00:00,  1.11s/it]


In [5]:
train_dataset

Dataset(num_blocks=1, num_rows=60000, schema={image: TensorDtype(shape=(1, 28, 28), dtype=float32), label: int64})

In [6]:
import torch
from torch import nn

# Define model
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28 * 28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10),
            nn.ReLU(),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

In [7]:
from ray import train
from ray.air import session, Checkpoint
import torch.optim as optim


def train_loop_per_worker(config):
    model = train.torch.prepare_model(NeuralNetwork())

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

    train_dataset_shard: torch.utils.data.Dataset = session.get_dataset_shard("train").to_torch(
        feature_columns=["image"],
        label_column="label",
        batch_size=config["batch_size"],
        unsqueeze_feature_tensors=False,
        unsqueeze_label_tensor=False
    )

    for epoch in range(config["epochs"]):
        running_loss = 0.0
        for i, data in enumerate(train_dataset_shard):
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()
            if i % 2000 == 1999:  # print every 2000 mini-batches
                print(f"[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}")
                running_loss = 0.0

        session.report(
            dict(running_loss=running_loss),
            checkpoint=Checkpoint.from_dict(dict(model=model.module.state_dict())),
        )

In [15]:
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig, RunConfig
from ray.tune import SyncConfig

s3_checkpoint_prefix="s3://dsoaws/ray_output"

trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    train_loop_config={
                        "batch_size": 64,
                        "epochs": 1
                      },
    datasets={"train": train_dataset},
    scaling_config=ScalingConfig(num_workers=2),
    run_config = RunConfig(
        sync_config=SyncConfig(
            # This will store checkpoints on S3.
            upload_dir=s3_checkpoint_prefix
        )
    )
)
    
result = trainer.fit()

[2m[36m(TunerInternal pid=11224)[0m == Status ==
[2m[36m(TunerInternal pid=11224)[0m Current time: 2022-08-06 02:05:05 (running for 00:00:02.50)
[2m[36m(TunerInternal pid=11224)[0m Memory usage on this node: 12.5/62.1 GiB
[2m[36m(TunerInternal pid=11224)[0m Using FIFO scheduling algorithm.
[2m[36m(TunerInternal pid=11224)[0m Resources requested: 3.0/24 CPUs, 0/0 GPUs, 0.0/114.74 GiB heap, 0.0/51.5 GiB objects
[2m[36m(TunerInternal pid=11224)[0m Result logdir: /home/hadoop/ray_results/TorchTrainer_2022-08-06_02-05-03
[2m[36m(TunerInternal pid=11224)[0m Number of trials: 1/1 (1 RUNNING)
[2m[36m(TunerInternal pid=11224)[0m +--------------------------+----------+---------------------+
[2m[36m(TunerInternal pid=11224)[0m | Trial name               | status   | loc                 |
[2m[36m(TunerInternal pid=11224)[0m |--------------------------+----------+---------------------|
[2m[36m(TunerInternal pid=11224)[0m | TorchTrainer_2f23a_00000 | RUNNING  | 172.3

[2m[36m(RayTrainWorker pid=25353, ip=172.31.16.116)[0m 2022-08-06 02:05:07,794	INFO config.py:72 -- Setting up process group for: env:// [rank=0, world_size=2]
[2m[36m(RayTrainWorker pid=25354, ip=172.31.16.116)[0m A value is trying to be set on a copy of a slice from a DataFrame.
[2m[36m(RayTrainWorker pid=25354, ip=172.31.16.116)[0m Try using .loc[row_indexer,col_indexer] = value instead
[2m[36m(RayTrainWorker pid=25354, ip=172.31.16.116)[0m 
[2m[36m(RayTrainWorker pid=25354, ip=172.31.16.116)[0m See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
[2m[36m(RayTrainWorker pid=25354, ip=172.31.16.116)[0m   self._setitem_single_column(ilocs[0], value, pi)
[2m[36m(RayTrainWorker pid=25354, ip=172.31.16.116)[0m   return torch.as_tensor(vals, dtype=dtype)
[2m[36m(RayTrainWorker pid=25353, ip=172.31.16.116)[0m 2022-08-06 02:05:10,434	INFO train_loop_utils.py:300 -- Moving model to d

[2m[36m(TunerInternal pid=11224)[0m == Status ==
[2m[36m(TunerInternal pid=11224)[0m Current time: 2022-08-06 02:05:10 (running for 00:00:07.50)
[2m[36m(TunerInternal pid=11224)[0m Memory usage on this node: 12.5/62.1 GiB
[2m[36m(TunerInternal pid=11224)[0m Using FIFO scheduling algorithm.
[2m[36m(TunerInternal pid=11224)[0m Resources requested: 3.0/24 CPUs, 0/0 GPUs, 0.0/114.74 GiB heap, 0.0/51.5 GiB objects
[2m[36m(TunerInternal pid=11224)[0m Result logdir: /home/hadoop/ray_results/TorchTrainer_2022-08-06_02-05-03
[2m[36m(TunerInternal pid=11224)[0m Number of trials: 1/1 (1 RUNNING)
[2m[36m(TunerInternal pid=11224)[0m +--------------------------+----------+---------------------+
[2m[36m(TunerInternal pid=11224)[0m | Trial name               | status   | loc                 |
[2m[36m(TunerInternal pid=11224)[0m |--------------------------+----------+---------------------|
[2m[36m(TunerInternal pid=11224)[0m | TorchTrainer_2f23a_00000 | RUNNING  | 172.3

In [16]:
print("Your model checkpoint files are here:")
print(f"{s3_checkpoint_prefix}/{str(result.log_dir).split('/')[-2]}/{str(result.log_dir).split('/')[-1]}/checkpoint_000000")

Your model checkpoint files are here:
s3://dsoaws/ray_output/TorchTrainer_2022-08-06_02-05-03/TorchTrainer_2f23a_00000_0_2022-08-06_02-05-03/checkpoint_000000


[2m[36m(TunerInternal pid=11224)[0m 2022-08-06 02:05:16,173	INFO tune.py:759 -- Total run time: 13.02 seconds (12.86 seconds for the tuning loop).
