{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "%glue_ray\n", "%session_id_prefix pytorch-ray\n", "%additional_python_modules ray[ml],torch" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "%min_workers 1\n", "%number_of_workers 2\n", "%object_memory_worker 10" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import torch\n", "import torch.nn as nn\n", "\n", "import ray\n", "from ray import train\n", "from ray.air import session, Checkpoint\n", "from ray.train.torch import TorchTrainer\n", "from ray.air.config import ScalingConfig\n", "from ray.air.config import RunConfig\n", "\n", "input_size = 1\n", "layer_size = 15\n", "output_size = 1\n", "num_epochs = 3\n", "\n", "\n", "class NeuralNetwork(nn.Module):\n", " def __init__(self):\n", " super(NeuralNetwork, self).__init__()\n", " self.layer1 = nn.Linear(input_size, layer_size)\n", " self.relu = nn.ReLU()\n", " self.layer2 = nn.Linear(layer_size, output_size)\n", "\n", " def forward(self, input):\n", " return self.layer2(self.relu(self.layer1(input)))\n", "\n", "\n", "def train_loop_per_worker():\n", " dataset_shard = session.get_dataset_shard(\"train\")\n", " model = NeuralNetwork()\n", " loss_fn = nn.MSELoss()\n", " optimizer = torch.optim.SGD(model.parameters(), lr=0.1)\n", "\n", " model = train.torch.prepare_model(model)\n", "\n", " for epoch in range(num_epochs):\n", " for batches in dataset_shard.iter_torch_batches(\n", " batch_size=32, dtypes=torch.float\n", " ):\n", " inputs, labels = torch.unsqueeze(batches[\"x\"], 1), batches[\"y\"]\n", " output = model(inputs)\n", " loss = loss_fn(output, labels)\n", " optimizer.zero_grad()\n", " loss.backward()\n", " optimizer.step()\n", " print(f\"epoch: {epoch}, loss: {loss.item()}\")\n", "\n", " session.report(\n", " {},\n", " checkpoint=Checkpoint.from_dict(\n", " dict(epoch=epoch, model=model.state_dict())\n", " ),\n", " )\n", "\n", "\n", "train_dataset = ray.data.from_items([{\"x\": x, \"y\": 2 * x + 1} for x in range(200)])\n", "scaling_config = ScalingConfig(num_workers=2)\n", "# If using GPUs, use the below scaling config instead.\n", "# scaling_config = ScalingConfig(num_workers=3, use_gpu=True)\n", "trainer = TorchTrainer(\n", " train_loop_per_worker=train_loop_per_worker,\n", " scaling_config=scaling_config,\n", " run_config=RunConfig(local_dir=\"/tmp/ray_results\"),\n", " datasets={\"train\": train_dataset},\n", ")\n", "result = trainer.fit()\n", "print(result.metrics)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%stop_session" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Glue Python [PySpark and Ray] (SparkAnalytics 1.0)", "language": "python", "name": "conda-env-sm_glue_is-glue_pyspark__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/sagemaker-sparkanalytics-v1" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "Python_Glue_Session", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }