{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "This example has been adapted to use the [Women's Clothing Review Dataset](https://github.com/AFAgarap/ecommerce-reviews-analysis/blob/7062594/Womens%20Clothing%20E-Commerce%20Reviews.csv) from https://github.com/ray-project/ray/blob/b9a4f64/python/ray/train/examples/transformers/transformers_example.py (which was adapted to use Ray Train from https://github.com/huggingface/transformers/blob/75259b4/examples/pytorch/text-classification/run_glue_no_trainer.py)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%pip install torch transformers pandas datasets accelerate scikit-learn mlflow tensorboard s3fs ray[all]==2.0.0rc0" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# restart kernel to pick up the pip installs above\n", "import IPython\n", "\n", "IPython.Application.instance().kernel.do_shutdown(True) #automatically restarts kernel" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "import ray\n", "\n", "import argparse\n", "import logging\n", "import math\n", "import os\n", "os.environ[\"TOKENIZERS_PARALLELISM\"] = \"true\"\n", "\n", "from ray import train\n", "\n", "from typing import Dict, Any\n", "import random\n", "import torch\n", "\n", "import datasets\n", "import ray\n", "import transformers\n", "from accelerate import Accelerator\n", "from datasets import load_dataset, load_metric\n", "from torch.utils.data.dataloader import DataLoader\n", "import tempfile\n", "\n", "from ray.air import session, Checkpoint\n", "\n", "from tqdm.auto import tqdm\n", "\n", "from transformers import (\n", " AdamW,\n", " AutoConfig,\n", " AutoModelForSequenceClassification,\n", " AutoTokenizer,\n", " DataCollatorWithPadding,\n", " PretrainedConfig,\n", " SchedulerType,\n", " default_data_collator,\n", " get_scheduler,\n", " set_seed,\n", ")\n", "from transformers.utils.versions import require_version \n", "\n", "def train_func(): \n", " model_name_or_path = \"roberta-base\"\n", " use_slow_tokenizer = False\n", " per_device_train_batch_size = 64\n", " learning_rate = 5e-5\n", " weight_decay = 0.0\n", " num_train_epochs = 1\n", " max_train_steps = 1\n", " gradient_accumulation_steps = 1\n", " lr_scheduler_type = \"linear\"\n", " num_warmup_steps = 0\n", " output_dir = None\n", " seed = None\n", " num_workers = 20\n", " use_gpu = False \n", " max_length = 64\n", "\n", " accelerator = Accelerator()\n", " \n", " import s3fs\n", "\n", " s3_file = s3fs.S3FileSystem()\n", " s3_path = \"s3://dsoaws/data/\"\n", " data_path = tempfile.mkdtemp()\n", " s3_file.get(s3_path, data_path, recursive=True)\n", "\n", " data_files = {}\n", " data_files[\"train\"] = f\"{data_path}/test/part-algo-1-womens_clothing_ecommerce_reviews.csv\"\n", " extension = \"csv\"\n", "\n", " raw_datasets = load_dataset(extension, data_files=data_files)\n", "\n", " label_list = raw_datasets[\"train\"].unique(\"sentiment\")\n", "\n", " # Sort for determinism\n", " label_list.sort() \n", " \n", " num_labels = len(label_list)\n", "\n", " config = AutoConfig.from_pretrained(\n", " model_name_or_path, num_labels=num_labels, \n", " )\n", " tokenizer = AutoTokenizer.from_pretrained(\n", " model_name_or_path, use_fast=not use_slow_tokenizer\n", " )\n", " model = AutoModelForSequenceClassification.from_pretrained(\n", " model_name_or_path,\n", " config=config,\n", " )\n", "\n", " sentence1_key, sentence2_key = \"review_body\", None\n", "\n", " label_to_id = None\n", " label_to_id = {v: i for i, v in enumerate(label_list)}\n", "\n", " if label_to_id is not None:\n", " model.config.label2id = label_to_id\n", " model.config.id2label = {id: label for label, id in config.label2id.items()}\n", "\n", " def preprocess_function(examples):\n", " texts = (\n", " (examples[sentence1_key],)\n", " if sentence2_key is None\n", " else (examples[sentence1_key], examples[sentence2_key])\n", " )\n", " result = tokenizer(\n", " *texts, padding=\"max_length\", max_length=max_length, truncation=True\n", " )\n", "\n", " if \"sentiment\" in examples:\n", " if label_to_id is not None:\n", " result[\"labels\"] = [\n", " label_to_id[l] for l in examples[\"sentiment\"]\n", " ]\n", " else:\n", " result[\"labels\"] = examples[\"sentiment\"]\n", "\n", " return result\n", "\n", " processed_datasets = raw_datasets.map(\n", " preprocess_function,\n", " batched=True,\n", " remove_columns=raw_datasets[\"train\"].column_names,\n", " desc=\"Running tokenizer on dataset\",\n", " )\n", "\n", " train_dataset = processed_datasets[\"train\"]\n", "\n", " train_dataloader = DataLoader(\n", " train_dataset,\n", " shuffle=True,\n", " collate_fn=default_data_collator,\n", " batch_size=per_device_train_batch_size,\n", " )\n", "\n", " no_decay = [\"bias\", \"LayerNorm.weight\"]\n", " optimizer_grouped_parameters = [\n", " {\n", " \"params\": [\n", " p\n", " for n, p in model.named_parameters()\n", " if not any(nd in n for nd in no_decay)\n", " ],\n", " \"weight_decay\": weight_decay,\n", " },\n", " {\n", " \"params\": [\n", " p\n", " for n, p in model.named_parameters()\n", " if any(nd in n for nd in no_decay)\n", " ],\n", " \"weight_decay\": 0.0,\n", " },\n", " ]\n", " \n", " optimizer = AdamW(optimizer_grouped_parameters, lr=learning_rate)\n", "\n", " model, optimizer, train_dataloader = accelerator.prepare(\n", " model, optimizer, train_dataloader\n", " )\n", "\n", " num_update_steps_per_epoch = math.ceil(\n", " len(train_dataloader) / gradient_accumulation_steps\n", " )\n", " if max_train_steps is None:\n", " max_train_steps = num_train_epochs * num_update_steps_per_epoch\n", " else:\n", " num_train_epochs = math.ceil(\n", " max_train_steps / num_update_steps_per_epoch\n", " )\n", "\n", " lr_scheduler = get_scheduler(\n", " name=lr_scheduler_type,\n", " optimizer=optimizer,\n", " num_warmup_steps=num_warmup_steps,\n", " num_training_steps=max_train_steps,\n", " )\n", "\n", " metric = load_metric(\"accuracy\")\n", "\n", " total_batch_size = (\n", " per_device_train_batch_size\n", " * accelerator.num_processes\n", " * gradient_accumulation_steps\n", " )\n", "\n", " print(\"***** Training *****\")\n", " print(f\" Num examples = {len(train_dataset)}\")\n", " print(f\" Num epochs = {num_train_epochs}\")\n", " print(\n", " f\" Instantaneous batch size per device =\"\n", " f\" {per_device_train_batch_size}\"\n", " )\n", " print(\n", " f\" Total train batch size (w. parallel, distributed & accumulation) \"\n", " f\"= {total_batch_size}\"\n", " )\n", " print(f\" Gradient Accumulation steps = {gradient_accumulation_steps}\")\n", " print(f\" Total optimization steps = {max_train_steps}\")\n", "\n", " progress_bar = tqdm(\n", " range(max_train_steps), disable=not accelerator.is_local_main_process\n", " )\n", " completed_steps = 0\n", "\n", " running_train_loss = 0.0\n", " \n", " model = train.torch.prepare_model(model)\n", " \n", " for epoch in range(num_train_epochs):\n", " model.train()\n", " for step, batch in enumerate(train_dataloader):\n", " outputs = model(**batch)\n", " loss = outputs.loss\n", " \n", " running_train_loss += loss\n", "\n", " loss = loss / gradient_accumulation_steps\n", " accelerator.backward(loss)\n", " if (\n", " step % gradient_accumulation_steps == 0\n", " or step == len(train_dataloader) - 1\n", " ):\n", " optimizer.step()\n", " lr_scheduler.step()\n", " optimizer.zero_grad()\n", " progress_bar.update(1)\n", " completed_steps += 1\n", "\n", " if completed_steps >= max_train_steps:\n", " break\n", " \n", " session.report(\n", " {\n", " \"running_train_loss\": running_train_loss,\n", " },\n", " checkpoint=Checkpoint.from_dict(dict(model=model.module.state_dict()))\n", " )\n", " \n", " if output_dir is not None:\n", " accelerator.wait_for_everyone()\n", " unwrapped_model = accelerator.unwrap_model(model)\n", " unwrapped_model.save_pretrained(output_dir, save_function=accelerator.save)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "import ray\n", "from ray.train.torch import TorchTrainer\n", "from ray.train.huggingface import HuggingFaceTrainer\n", "\n", "from ray.air.config import ScalingConfig, RunConfig\n", "from ray.tune import SyncConfig\n", "\n", "ray.shutdown()\n", "ray.init(address=\"ray://localhost:10001\",\n", " runtime_env={\"pip\": [\n", " \"torch\", \n", " \"scikit-learn\",\n", " \"transformers\",\n", " \"pandas\",\n", " \"datasets\",\n", " \"accelerate\",\n", " \"scikit-learn\",\n", " \"mlflow\", \n", " \"tensorboard\",\n", " \"s3fs\",\n", " ]\n", " }\n", " )\n", "\n", "s3_checkpoint_prefix=\"s3://dsoaws/ray_output\"\n", "\n", "\n", "trainer = TorchTrainer(\n", " train_loop_per_worker=train_func,\n", " train_loop_config={\n", " \"batch_size\": 64,\n", " \"epochs\": 10\n", " },\n", " # Increase num_workers to scale out the cluster\n", " scaling_config=ScalingConfig(num_workers=2),\n", " run_config = RunConfig(\n", " sync_config=SyncConfig(\n", " # This will store checkpoints in S3.\n", " upload_dir=s3_checkpoint_prefix\n", " )\n", " )\n", ")\n", "\n", "results = trainer.fit()\n", "print(results.metrics)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3_uri = f\"s3://dsoaws/ray_output/{results.log_dir.as_posix().split('/')[-2]}/{results.log_dir.as_posix().split('/')[-1]}/checkpoint_000000/\"\n", "print(s3_uri)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.7" } }, "nbformat": 4, "nbformat_minor": 4 }