{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Batching Hyper-parameter Tuning jobs (CPU)\n", "\n", "In this notebook, we demonstrate how to batch hyper-parameter tuining jobs using a batching strategy valid for both Bayesian and Random Optimization. We will cover both cold and warm start examples using PyTorch.\n", "\n", "Topics Covered in this notebook:\n", "\n", "1. SageMaker Hyper-parameter Optimization with PyTorch\n", "2. Batching Large-scale HPO jobs\n", "\n", "Required Infrastructure: We will run this notebook using the PyTorch 1.6 Python 3.6 CPU Optimized kernel.\n", "\n", "We will use a credit card default dataset from UCI published here: https://archive.ics.uci.edu/ml/datasets/default+of+credit+card+clients for our example.\n", "\n", "*Note*: In the code folder, we have also provided files train.py, and train_ray.py which are designed to work on GPU instances. We provide documentation on any other changes you need to make to use GPU instances below. \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Import Libraries\n", "\n", "Here you import the necessary libraries we need in order to run the code in the cells." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sys\n", "import IPython\n", "install_needed = True # should only be True once\n", "if install_needed:\n", " print(\"installing deps and restarting kernel\")\n", " !{sys.executable} -m pip install -U sagemaker\n", " !{sys.executable} -m pip install -U smdebug\n", " !{sys.executable} -m pip install -U bokeh\n", " !{sys.executable} -m pip install -U xlrd\n", " IPython.Application.instance().kernel.do_shutdown(True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import csv\n", "import glob\n", "import math\n", "import os\n", "import shutil\n", "from time import gmtime, sleep, strftime, time\n", "from botocore.config import Config\n", "import boto3\n", "import numpy as np\n", "import pandas as pd\n", "from sklearn.utils import resample\n", "from smdebug.profiler.analysis.notebook_utils.timeline_charts import \\\n", " TimelineCharts\n", "from smdebug.profiler.analysis.notebook_utils.training_job import TrainingJob\n", "\n", "import sagemaker\n", "from sagemaker.analytics import ExperimentAnalytics\n", "from sagemaker.debugger import (FrameworkProfile, ProfilerConfig, ProfilerRule,\n", " Rule, rule_configs)\n", "from sagemaker.pytorch import PyTorch\n", "from sagemaker.tuner import (CategoricalParameter, ContinuousParameter,\n", " HyperparameterTuner, IntegerParameter,\n", " WarmStartConfig, WarmStartTypes)\n", "from smexperiments.experiment import Experiment\n", "from smexperiments.trial import Trial\n", "from smexperiments.trial_component import TrialComponent" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sm = boto3.client('sagemaker', config=Config(retries={'max_attempts': 20}))\n", "sagemaker_session = sagemaker.Session(sagemaker_client=sm)\n", "\n", "bucket = sagemaker_session.default_bucket()\n", "prefix = 'distributed_hpo/DEMO-pytorch-hpo'\n", "\n", "role = sagemaker.get_execution_role()\n", "region = boto3.session.Session().region_name\n", "print(f'region is {region}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## The Dataset\n", "\n", "For this notebook, we will use a Credit Card Default Dataset published by UCI: https://archive.ics.uci.edu/ml/datasets/default+of+credit+card+clients. Here we provide the dataset as part of the code in this notebook. \n", "\n", "This will be sufficient for testing out the SageMaker features above. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if not os.path.exists('./data'):\n", " os.mkdir('./data')\n", "else:\n", " pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load the dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df1 = pd.read_excel('credit_card_default_data.xls', header=1)\n", "df1 = df1.drop(columns = 'ID')\n", "#rename the defaults column\n", "df1.rename(columns={\"default payment next month\": \"Default\"}, inplace=True)\n", "df1.to_csv('./data/data.csv', index=False)\n", "df1.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df1.dtypes" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Data Pre-processing\n", "\n", "In this section we will shuffle and split the data into train and test and explore the dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# histgram of the number of defaults versus not\n", "df1.Default.hist()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df1.Default.value_counts()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Make train folder\n", "\n", "if not os.path.exists('./train'):\n", " os.mkdir('./train')\n", "else:\n", " pass\n", "\n", "# make test folder \n", "\n", "if not os.path.exists('./test'):\n", " os.mkdir('./test')\n", "else:\n", " pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Train-test split\n", "\n", "Split the raw data into train and test sets. Since the number of foreclosures is very small, we will oversample the training dataset to balance the number of defaulted and non-defaulted loans in the training dataset. Note that this can introduce training/serving skew -- to avoid this, we will prepare a separate test set, derived from the test data which retains the original distribution. The model will be validated against this test set after every training epoch.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def train_test_split(file):\n", " df = pd.read_csv('./data/'+file)\n", " train=df.sample(frac=0.8,random_state=200) #random state is a seed value\n", " test=df.drop(train.index)\n", " print(f'Original training before upsampling shape = ......{train.shape}')\n", " # first upsample the minority class in training dataset\n", " train_majority = train[train.Default==0]\n", " train_minority = train[train.Default==1]\n", " train_minority_upsampled = resample(train_minority, \n", " replace=True, # sample with replacement\n", " n_samples=len(train_majority), # Experiment with this on your own to see if it improves accuracy.\n", " random_state=123) # reproducible results\n", " \n", " # Combine majority class with upsampled minority class\n", " train_upsampled = pd.concat([train_majority, train_minority_upsampled], axis=0)\n", " train_upsampled=train_upsampled.sample(frac=1) #shuffle the data\n", " print(f\"Train file shape = .....{train_upsampled.shape}\")\n", " print(f\"Test file shape = .....{test.shape}\")\n", " train_upsampled.to_csv(f'./train/{file}', index=False, header=True)\n", " test.to_csv(f'./test/{file}', index=False, header=True)\n", " return len(train_upsampled), len(test)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "total_train_rows = 0\n", "total_test_rows = 0\n", "for file in os.listdir('./data'): #this will work if you have multiple data files as well. \n", " print(file)\n", " trl, tel = train_test_split(file)\n", " total_test_rows+=tel\n", " total_train_rows+=trl\n", "print(f\"Total Training Loans ={total_train_rows}\")\n", "print(f\"Total Testing Loans = {total_test_rows}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Shard the dataset into smaller files\n", "\n", "In order for PyTorch to train faster, it is recommended to shard your large dataest into much smaller files. This way the PyTorch dataloader can quickly load one csv at a time consisting of N rows and train the model on that batch. \n", "\n", "The code below will read in each line from the primary dataframe line by line and store it in a separate dataframe. \n", "\n", "Then we will repeat this for the test set." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if not os.path.exists('./train_full_split'):\n", " os.mkdir('./train_full_split')\n", "else:\n", " pass" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# split the training data into smaller files that can be loaded using the data loader\n", "COLS = pd.read_csv('./train/data.csv').columns\n", "csvfile = open('./train/data.csv', 'r').readlines()\n", "filename = 1\n", "for i in range(len(csvfile)):\n", " if i % 10000 == 0:\n", " with open('./train_full_split/' + str(filename) + '.csv', 'w+') as f:\n", " if filename == 1:\n", " f.writelines(csvfile[i:i+10000])\n", " else:\n", " writer = csv.writer(f, delimiter=',')\n", " writer.writerow(COLS)\n", " f.writelines(csvfile[i:i+10000])\n", " filename += 1\n", " " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if not os.path.exists('./test_full_split'):\n", " os.mkdir('./test_full_split')\n", "else:\n", " pass" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# split the training data into smaller files that can be loaded using the data loader\n", "csvfile = open('./test/data.csv', 'r').readlines()\n", "filename = 1\n", "for i in range(len(csvfile)):\n", " if i % 10000 == 0:\n", " with open('./test_full_split/' + str(filename) + '.csv', 'w+') as f:\n", " if filename == 1:\n", " f.writelines(csvfile[i:i+10000])\n", " else:\n", " writer = csv.writer(f, delimiter=',')\n", " writer.writerow(COLS)\n", " f.writelines(csvfile[i:i+10000])\n", " filename += 1\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Upload Training and test datasets to S3" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Upload Training and test data into S3\n", "train_s3 = sagemaker_session.upload_data(path='./train_full_split/', key_prefix=prefix + '/train')\n", "print(train_s3)\n", "test_s3 = sagemaker_session.upload_data(path='./test_full_split/', key_prefix=prefix + '/test')\n", "print(test_s3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Training Script\n", "\n", "Here we author our training script which we will use for parallel HPO. This train script uses the SageMaker Distributed DataParallel class for PyTorch for distributed training. \n", "\n", "We will also use SageMaker Profiler to obtain metrics associated with the training job such as the CPU/GPU usage. This is particularly useful for large scale deep learning training jobs.\n", "\n", "### 1. Create a Training Job with Profiling Enabled\n", "\n", "You will use the standard [SageMaker Estimator API for PyTorch ](https://sagemaker.readthedocs.io/en/stable/frameworks/tensorflow/sagemaker.pytorch.html) to create training jobs. To enable profiling, create a `ProfilerConfig` object and pass it to the `profiler_config` parameter of the `PyTorch` estimator.\n", "\n", "### 2. Use SM Distributed DataParallel (DDP) to effiently parallelize the training job across multiple GPUs. \n", "\n", "The training script provides the code you need for distributed data parallel (DDP) training using SMDataParallel. The training script is very similar to a PyTorch training script you might run outside of SageMaker, but modified to run with SMDataParallel. SMDataParallel's PyTorch client provides an alternative to PyTorch's native DDP. For details about how to use SMDataParallel's DDP in your native PyTorch script, see the Getting Started with SMDataParallel tutorials.\n", "\n", "For your benefit, we have provided 2 training scripts:\n", "\n", "1. train.py : full training script with SM DDP and SageMaker Profiler\n", "\n", "2. train_profiler.py : training script with regular PyTorch DDP and SageMaker Profiler" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pygmentize code/train_cpu.py" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Train the model. \n", "\n", "To train the model, we will now use 2 GPUs and also use SageMaker Profiler Capability to generate a report of the GPU utilization, and other performance metrics.\n", "\n", "#### Estimator function options\n", "In the following code block, you can update the estimator function to use a different instance type, instance count, and distrubtion strategy. You're also passing in the training script you reviewed in the previous cell.\n", "\n", "**Instance types**\n", "\n", "SMDataParallel supports model training on SageMaker with the following instance types only:\n", "ml.p3.16xlarge\n", "ml.p3dn.24xlarge [Recommended]\n", "ml.p4d.24xlarge [Recommended]\n", "\n", "**Instance count**\n", "To get the best performance and the most out of SMDataParallel, you should use at least 2 instances, but you can also use 1 for testing this example.\n", "Distribution strategy\n", "Note that to use DDP mode, you update the the distribution strategy, and set it to use smdistributed dataparallel.\n", "\n", "**Code folder**\n", "\n", "In order to run SM DistributedDataParallel, we need to ensure our training container has the latest version of the SageMaker SDK. To do this, simply pass in a requirements.txt file along with your code script in the code folder as provided here. We pass in the code folder as the source directory, pointing to the training script. \n", "\n", "**SageMaker Experiments**\n", "\n", "We will also track the training jobs using SageMaker Experiments, which will allow data scientist to compare different trials, analyze and extract any metadata from their training runs and compare jobs.\n", "\n", "This is particularly useful with HPO, when data scientists want to compare different Hyperparameter tuning jobs against one another." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "loan_class_experiment = Experiment.create(\n", " experiment_name=f\"Classifying-housing-loans-{int(time())}\",\n", " description=\"Classification of loans as default or not\", \n", " sagemaker_boto_client=sm)\n", "print(loan_class_experiment)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Now let's launch a single SageMaker training job. Next we will run a HPO job.\n", "trial_name = f\"loan-trial-base-{int(time())}\"\n", "loan_trial = Trial.create(\n", " trial_name=trial_name,\n", " experiment_name=loan_class_experiment.experiment_name,\n", " sagemaker_boto_client=sm,\n", " )\n", "\n", "estimator = PyTorch(entry_point=\"train_cpu.py\", \n", " role=role,\n", " framework_version='1.6.0',\n", " py_version='py36',\n", " source_dir='./code',\n", " output_path = f's3://{bucket}/{prefix}/output',\n", " instance_count=1, \n", " sagemaker_session=sagemaker_session,\n", " instance_type='ml.m5.xlarge', \n", " hyperparameters={\n", " 'epochs': 3,\n", " 'backend': 'gloo' #gloo for CPU, nccl for GPU\n", " },\n", " # allows Experiments to capture metrics \n", " metric_definitions=[\n", " {'Name':'train:loss', 'Regex':'Train Loss: (.*?);'},\n", " {'Name':'test:loss', 'Regex':'Test set Average loss: (.*?),'},\n", " {'Name':'test:accuracy', 'Regex':'Test Accuracy: (.*?)%;'},\n", " {'Name':'test:F1', 'Regex':'Test set F1-score: (.*?),'}\n", " ]\n", " )\n", "\n", "# this is a fire and forget event -- this way you can continue to use the notebook below for other data exploration\n", "# and prototyping activities. \n", "\n", "estimator.fit({'training': train_s3,\n", " 'testing':test_s3},\n", " experiment_config={\n", " \"TrialName\": loan_trial.trial_name,\n", " \"TrialComponentDisplayName\": \"Training\",\n", " },\n", " wait=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run the training job on a GPU\n", "\n", "To test this estimator out with the latest smdistributed library, make the following changes:\n", "\n", "1. replace entry_point = \"train.py\"\n", "\n", "2. add the following code in \"\" after metric_definitions: \"distribution={'smdistributed':{\n", " 'dataparallel':{\n", " 'enabled': True\n", " }\n", " }\n", " },\"\n", " \n", " \n", "3. In hyperparameters, replace 'backend': 'nccl'\n", "\n", "4. Note that the SM distributed library requires the instance types to be one of the following: ml.p3.16xlarge, ml.p3dn.24xlarge, and ml.p4d.24xlarge. \n", "\n", "\n", "This will instantiate the sm distributed library on the containers. To learn more about Sm distributed, see this link: \n", "https://aws.amazon.com/sagemaker/distributed-training/\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Algorithm metrics\n", "\n", "We want to extract the model metrics on the test set. To do this, we will use the SageMaker Experiments API and extract the metric for the trail above. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.analytics import ExperimentAnalytics\n", "trial_component_analytics = ExperimentAnalytics(\n", " sagemaker_session=sagemaker_session, \n", " experiment_name=loan_class_experiment.experiment_name,\n", " metric_names=['test:F1']\n", ")\n", "analytic_table = trial_component_analytics.dataframe()\n", "analytic_table" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# get details for most recent job:\n", "analytic_table.iloc[0]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Scaling to Tens of thousands of HPO jobs\n", "\n", "Having seen how we can launch 1 job, next we will look at some strategies for scaling this to tens of thousands of jobs with Amazon SageMaker for both random and bayesian HPO methods that are come out-of-the-box with SageMaker. \n", "\n", "Currently, for Bayesian HPO, SageMaker has a limit of 500 *concurrent* jobs across all Bayesian HPO jobs. Below we will provide code for launching the maximum possible Bayesian trials while respecting this limit.\n", "\n", "Let's look at both strategies in detail. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Random Strategy\n", "\n", "For random strategy, each trial in an HPO job is completely independent of one another. In this case, if we want to launch a total of N_tot jobs, we can choose to launch N HPO jobs concurrently with each HPO job having M parallel jobs.\n", "\n", "To see how you can launch multiple HPO jobs in parallel, refer to the code below." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Batched Hyper-parameter Optimization for Bayesian Optimization (Cold Start)\n", "\n", "For Bayesian HPO, we want to take advantage of the parallelism by training multiple hyper-parameter trials in parallel so the Bayesian algorithm can use the outputs of these parallel jobs to determine the next set of parameters.\n", "\n", "Again we are limited by the number of concurrent jobs/account = 500, and the default limits are 10 jobs in parallel per job. In that case we can adapt the formula above as follows:\n", "\n", "![Visual Representation of Batching jobs](img/batching.png)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def bayesian_batching_cold_start(total_requested_trials, max_parallel_across_jobs=20, max_parallel_per_job=10, max_candidates_per_job = 500):\n", " '''Given a total number of requested trials, generates the strategy for Bayesian HPO\n", " The strategy is a list (batch_strat) where every element is the number of jobs to execute in parallel. The sum of all elements in the list is\n", " the total number of HPO jobs needed to reach total_requested_trials. For example if batch_strat = [2, 2, 2, 1], means you will run a total of 7\n", " HPO jobs starting with 2 --> 2 ---> 2 ---> 1. \n", " total_requested_trials = number of trails user wants to run.\n", " max_parallel_across_jobs = max number of training jobs across all trials Sagemaker runs in parallel. Limited by instance availability\n", " max_parallel_per_job = max number of parallel jobs to run per HPO job\n", " max_candidates_per_job = total number of training jobs per HPO job'''\n", " batch_strat = [] \n", " tot_jobs_left = total_requested_trials\n", " max_parallel_hpo_jobs = max_parallel_across_jobs//max_parallel_per_job\n", " if total_requested_trials < max_parallel_hpo_jobs*max_candidates_per_job:\n", " batch_strat.append(total_requested_trials//max_candidates_per_job)\n", " else:\n", " while tot_jobs_left > max_parallel_hpo_jobs*max_candidates_per_job:\n", " batch_strat.append(max_parallel_hpo_jobs)\n", " tot_jobs_left -=max_parallel_hpo_jobs*max_candidates_per_job\n", "\n", " batch_strat.append(math.ceil((tot_jobs_left)/max_candidates_per_job))\n", " return math.ceil(total_requested_trials/max_candidates_per_job), max_parallel_hpo_jobs, batch_strat\n", " \n", "bayesian_batching_cold_start(10000)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# let's redefine a simpler estimator with just 1 instance. \n", "hyperparameter_ranges = {'lr': ContinuousParameter(0.001, 0.1),\n", " 'momentum': CategoricalParameter(list(np.arange(0, 10)/10))}\n", "\n", "inputs ={'training': train_s3,\n", " 'testing':test_s3}\n", "\n", "objective_metric_name = 'test AUC'\n", "objective_type = 'Maximize'\n", "metric_definitions = [{'Name': 'test AUC',\n", " 'Regex': 'Test set AUC: ([0-9\\\\.]+)'}]\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create the estimator for HPO\n", "estimator = PyTorch(entry_point=\"train_cpu.py\",\n", " role=role,\n", " framework_version='1.6.0',\n", " py_version='py36',\n", " source_dir='./code',\n", " output_path = f's3://{bucket}/{prefix}/output',\n", " instance_count=1, \n", " sagemaker_session=sagemaker_session,\n", " instance_type='ml.m5.xlarge', \n", " hyperparameters={\n", " 'epochs': 10, # run more epochs for HPO.\n", " 'backend': 'gloo' #gloo for cpu, nccl for gpu\n", " }\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run the HPO job on a GPU\n", "\n", "As before, to run this HPO job using a GPU and test out the SM distributed library on large scale datasets, make the following code changes.\n", "\n", "\n", "1. replace entry_point = \"train.py\"\n", "\n", "2. add the following code in \"\" after metric_definitions: \"distribution={'smdistributed':{\n", " 'dataparallel':{\n", " 'enabled': True\n", " }\n", " }\n", " },\"\n", " \n", " \n", "3. In hyperparameters, replace 'backend': 'nccl'\n", "\n", "4. Note that the SM distributed library requires the instance types to be one of the following: ml.p3.16xlarge, ml.p3dn.24xlarge, and ml.p4d.24xlarge. \n", "\n", "This will instantiate the sm distributed library on the containers. To learn more about Sm distributed, see this link: \n", "https://aws.amazon.com/sagemaker/distributed-training/" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Polling jobs\n", "You can potentially reduce your overall wall time by polling continuously for completed HPO jobs. This way, if the number of launched HPO jobs is less than a certain threshold, you can start a new one. Use the code below to implement the polling strategy." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# helper function to launch a desired number of \"n_parallel\" HPO jobs simultaneously\n", "def _parallel_hpo_no_polling(job_name_prefix, n_parallel, inputs, max_candidates_per_job, max_parallel_per_job):\n", " \"\"\"kicks off N_parallel Bayesian HPO jobs in parallel\n", " job_name_prefix: user specified prefix for job names\n", " n_parallel: Number of HPO jobs to start in parallel\n", " inputs: training and test data s3 paths\n", " max_candidates_per_job: number of training jobs to run in each HPO job in total\n", " max_parallel_per_job: number of training jobs to run in parallel in each job\n", " \n", " \"\"\"\n", " # kick off n_parallel jobs simultaneously and returns all the job names \n", " tuning_job_names = []\n", " for i in range(n_parallel):\n", " timestamp_suffix = strftime(\"%d-%H-%M-%S\", gmtime())\n", " try:\n", " tuner = HyperparameterTuner(estimator,\n", " objective_metric_name,\n", " hyperparameter_ranges,\n", " metric_definitions,\n", " max_jobs=max_candidates_per_job,\n", " max_parallel_jobs=max_parallel_per_job,\n", " base_tuning_job_name=f'{job_name_prefix}-{timestamp_suffix}',\n", " objective_type=objective_type)\n", " # fit the tuner to the inputs and include it as part of experiments\n", " tuner.fit(inputs, \n", " wait=False) # set wait=False, so you can launch other jobs in parallel.\n", " tuning_job_names.append(tuner.latest_tuning_job.name)\n", " sleep(1) #this is required otherwise you will get an error for using the same tuning job name\n", " print(tuning_job_names)\n", " except Exception as e:\n", " sleep(5)\n", " return tuning_job_names\n", "\n", "#orchestration and polling logic\n", "def poll_and_run(job_name_prefix, inputs, max_total_candidates, max_parallel_across_jobs, max_candidates_per_job, max_parallel_per_job):\n", " \"\"\"Polls for number of running HPO jobs. If less than max_parallel , starts a new one. \n", " job_name_prefix: the name prefix to give all your training jobs\n", " max_total_candidates: how many total trails to run across all HPO jobs\n", " max_candidates_per_job: how many total trails to run for 1 HPO job \n", " max_parallel_per_job: how many trials to run in parallel for a given HPO job (fixed to 10 without limit increases). \n", " max_parallel_across_jobs: how many concurrent trials to run in parallel across all HPO jobs\n", " \"\"\"\n", " #get how many jobs to run in total and concurrently\n", " max_num, max_parallel, _ = bayesian_batching_cold_start(max_total_candidates, \n", " max_parallel_across_jobs=max_parallel_across_jobs,\n", " max_parallel_per_job=max_parallel_per_job,\n", " max_candidates_per_job = max_candidates_per_job\n", " )\n", " \n", " # continuously polls for running jobs -- if they are less than the required number, then launches a new one. \n", " try:\n", " all_jobs = sm.list_hyper_parameter_tuning_jobs(SortBy='CreationTime', SortOrder='Descending', \n", " NameContains=job_name_prefix)['HyperParameterTuningJobSummaries']\n", " all_jobs = [i['HyperParameterTuningJobName'] for i in all_jobs]\n", "\n", " if len(all_jobs)==0:\n", " raise ValueError\n", " \n", " else:\n", " print(\"Continuing where you left off...\")\n", " response_list = [sm.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=i)['HyperParameterTuningJobStatus']\n", " for i in all_jobs]\n", " num_left = max_num - response_list.count(\"Completed\")\n", " except Exception as e:\n", " print(f\"Starting a set of HPO jobs with the prefix {job_name_prefix} ...\")\n", " num_left = max_num\n", " #kick off the first set of jobs\n", " all_jobs += _parallel_hpo_no_polling(job_name_prefix, min(max_parallel, num_left), inputs, max_candidates_per_job, max_parallel_per_job)\n", " \n", " \n", " while num_left >0:\n", " response_list = [sm.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=i)['HyperParameterTuningJobStatus']\n", " for i in all_jobs]\n", " running_jobs = response_list.count(\"InProgress\") # look for the jobs that are running. \n", " print(f\"number of completed jobs = {response_list.count('Completed')}\")\n", " sleep(10)\n", " if running_jobs < max_parallel and len(all_jobs) < max_num:\n", " all_jobs += _parallel_hpo_no_polling(job_name_prefix, min(max_parallel-running_jobs, num_left), inputs, max_candidates_per_job, max_parallel_per_job)\n", " num_left = max_num - response_list.count(\"Completed\")\n", " \n", " return all_jobs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Test out this loop\n", "alljobs = poll_and_run('newtrials', inputs, max_total_candidates=260, max_parallel_across_jobs = 20, max_candidates_per_job=4, max_parallel_per_job=2)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Aggregate the results from all the HPO jobs based on the custom metric specified\n", "def get_best_job(all_jobs_list):\n", " \"\"\"Get the best job from the list of all the jobs completed.\n", " Objective is to maximize a particular value such as AUC or F1 score\"\"\"\n", " df = pd.DataFrame()\n", " for job in all_jobs_list:\n", " tuner = sagemaker.HyperparameterTuningJobAnalytics(job)\n", " full_df = tuner.dataframe()\n", " tuning_job_result = sm.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=job)\n", " is_maximize = (tuning_job_result['HyperParameterTuningJobConfig']['HyperParameterTuningJobObjective']['Type'] == 'Maximize')\n", " if len(full_df) > 0:\n", " df = pd.concat([df, full_df[full_df['FinalObjectiveValue'] < float('inf')]])\n", " if len(df) > 0:\n", " df = df.sort_values('FinalObjectiveValue', ascending=is_maximize)\n", " print(\"Number of training jobs with valid objective: %d\" % len(df))\n", " print({\"lowest\":min(df['FinalObjectiveValue']),\"highest\": max(df['FinalObjectiveValue'])})\n", " pd.set_option('display.max_colwidth', -1) # Don't truncate TrainingJobName\n", " return df\n", " else:\n", " print(\"No training jobs have reported valid results yet.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "get_best_job(alljobs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The above example shows how the batching strategy works for cold-start cases. For warm start, we need a different strategy as we want to use the outputs of the previous job as the inputs into the next job. Look at the code below to run a warm start batch" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Serial Hyper-parameter Optimization (warm start)\n", "\n", "For warm start -- we want to use the outputs of our previous run, as the input of the next one. For this reason, we need to run the jobs serially, as shown in the code below. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def large_scale_hpo_warmstart(inputs, max_total_jobs, max_jobs_per_hpo_job, max_parallel_per_job):\n", " base_hpo_job_name = 'FTW'\n", " timestamp_suffix = strftime(\"%d-%H-%M-%S\", gmtime())\n", " tuning_job_name = lambda i : f\"{base_hpo_job_name}-{timestamp_suffix}-{i}\"\n", " current_jobs_completed = 0\n", " job_names_list = []\n", " while current_jobs_completed < max_total_jobs:\n", " jobs_to_launch = min(max_total_jobs - current_jobs_completed, max_jobs_per_hpo_job)\n", "\n", " hpo_job_config = dict(\n", " estimator=estimator,\n", " objective_metric_name=objective_metric_name,\n", " metric_definitions=metric_definitions,\n", " hyperparameter_ranges=hyperparameter_ranges,\n", " max_jobs=jobs_to_launch,\n", " strategy=\"Bayesian\",\n", " objective_type=objective_type,\n", " max_parallel_jobs=max_parallel_per_job,\n", " )\n", "\n", " if current_jobs_completed > 0:\n", " parent_tuning_job_name = tuning_job_name(current_jobs_completed)\n", " warm_start_config = WarmStartConfig(\n", " WarmStartTypes.IDENTICAL_DATA_AND_ALGORITHM,\n", " parents={parent_tuning_job_name}\n", " )\n", " hpo_job_config.update(dict(\n", " base_tuning_job_name=parent_tuning_job_name,\n", " warm_start_config=warm_start_config\n", " ))\n", "\n", " tuner = HyperparameterTuner(**hpo_job_config)\n", " tuner.fit(\n", " inputs,\n", " job_name=tuning_job_name(current_jobs_completed + jobs_to_launch),\n", " logs=True,\n", " )\n", " tuner.wait()\n", " job_names_list.append(tuner.latest_tuning_job.name)\n", " current_jobs_completed += jobs_to_launch\n", " return job_names_list" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "job_list = large_scale_hpo_warmstart(inputs, 2, 1, 1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "get_best_job(job_list)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Conclusions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this notebook we have covered a number of advanced topics suitable for ML researchers who run large scale deep learning training and HPO jobs.\n", "\n", "\n", "1. We have covered how to run large numbers of HPO jobs using a continuous polling technique. \n", "\n", "2. We have provided code in the code folder to allow you to use the SM Distributed library as well as GPUs for faster training\n", "\n", "3. Finally we covered how you can optimize for a custom metric, by publishing your custom metric to stdout and passing in the regex in the metric_definition during HPO." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Optional Topic (using RayTune with PyTorch)\n", "\n", "Ray is an open source library for HPO developed from this paper: https://arxiv.org/pdf/1807.05118.pdf our of UC Berkeley. Ray integrates with many of the popular open and closed source HPO search algorithms and schedulers. \n", "\n", "To see how Ray works with Amazon SageMaker, try out the estimator below which runs the train_ray_cpu.py script.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run the Ray job on a GPU instead\n", "\n", "To test Ray with a single node GPU cluster, make the following code changes.\n", "\n", "1. replace entry_point = \"train_ray.py\"\n", "\n", "2. In hyperparameters, replace 'backend': 'nccl'\n", "\n", "Note: Since Ray uses its own distribution mechanism we do not discuss smdistrubted on top of Ray here. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.pytorch import PyTorch\n", "\n", "estimator = PyTorch(entry_point=\"train_ray_cpu.py\", #put requirements.txt file to install ray\n", " role=role,\n", " source_dir='./code',\n", " framework_version='1.6.0',\n", " py_version='py3',\n", " output_path = f's3://{bucket}/{prefix}/output',\n", " instance_count=1,\n", " instance_type='ml.m5.xlarge',\n", " sagemaker_session=sagemaker_session,\n", " hyperparameters={\n", " 'epochs': 7,\n", " 'backend': 'gloo' # gloo for CPU and nccl for GPU\n", " },\n", " disable_profiler=True)\n", "\n", "inputs ={'training': train_s3,\n", " 'testing':test_s3}\n", "\n", "estimator.fit(inputs, wait=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Conclusions\n", "\n", "In this notebook, you have seen how to run large scale HPO jobs using SageMaker HPO tuning feature on a credit card default dataset. \n", "\n", "Wherever possible, with minimal code changes you can use this notebook to also use GPUs, as we have provided all the required code in the code folder associated with this repo. \n", "\n", "Happy Hyperparameter tuning!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.g4dn.xlarge", "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.3" } }, "nbformat": 4, "nbformat_minor": 4 }