# Batching Hyper-parameter Tuning jobs (CPU)

In this notebook, we demonstrate how to batch hyper-parameter tuining jobs using a batching strategy valid for both Bayesian and Random Optimization. We will cover both cold and warm start examples using PyTorch.

Topics Covered in this notebook:

1. SageMaker Hyper-parameter Optimization with PyTorch
2. Batching Large-scale HPO jobs

Required Infrastructure: We will run this notebook using the PyTorch 1.6 Python 3.6 CPU Optimized kernel.

We will use a credit card default dataset from UCI published here: https://archive.ics.uci.edu/ml/datasets/default+of+credit+card+clients for our example.

*Note*: In the code folder, we have also provided files train.py, and train_ray.py which are designed to work on GPU instances. We provide documentation on any other changes you need to make to use GPU instances below. 


## Import Libraries

Here you import the necessary libraries we need in order to run the code in the cells.

In [None]:
import sys
import IPython
install_needed = True # should only be True once
if install_needed:
 print("installing deps and restarting kernel")
 !{sys.executable} -m pip install -U sagemaker
 !{sys.executable} -m pip install -U smdebug
 !{sys.executable} -m pip install -U bokeh
 !{sys.executable} -m pip install -U xlrd
 IPython.Application.instance().kernel.do_shutdown(True)

In [None]:
import csv
import glob
import math
import os
import shutil
from time import gmtime, sleep, strftime, time
from botocore.config import Config
import boto3
import numpy as np
import pandas as pd
from sklearn.utils import resample
from smdebug.profiler.analysis.notebook_utils.timeline_charts import \
 TimelineCharts
from smdebug.profiler.analysis.notebook_utils.training_job import TrainingJob

import sagemaker
from sagemaker.analytics import ExperimentAnalytics
from sagemaker.debugger import (FrameworkProfile, ProfilerConfig, ProfilerRule,
 Rule, rule_configs)
from sagemaker.pytorch import PyTorch
from sagemaker.tuner import (CategoricalParameter, ContinuousParameter,
 HyperparameterTuner, IntegerParameter,
 WarmStartConfig, WarmStartTypes)
from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
from smexperiments.trial_component import TrialComponent

In [None]:
sm = boto3.client('sagemaker', config=Config(retries={'max_attempts': 20}))
sagemaker_session = sagemaker.Session(sagemaker_client=sm)

bucket = sagemaker_session.default_bucket()
prefix = 'distributed_hpo/DEMO-pytorch-hpo'

role = sagemaker.get_execution_role()
region = boto3.session.Session().region_name
print(f'region is {region}')

## The Dataset

For this notebook, we will use a Credit Card Default Dataset published by UCI: https://archive.ics.uci.edu/ml/datasets/default+of+credit+card+clients. Here we provide the dataset as part of the code in this notebook. 

This will be sufficient for testing out the SageMaker features above. 

In [None]:
if not os.path.exists('./data'):
 os.mkdir('./data')
else:
 pass

### Load the dataset

In [None]:
df1 = pd.read_excel('credit_card_default_data.xls', header=1)
df1 = df1.drop(columns = 'ID')
#rename the defaults column
df1.rename(columns={"default payment next month": "Default"}, inplace=True)
df1.to_csv('./data/data.csv', index=False)
df1.head()

In [None]:
df1.dtypes

## Data Pre-processing

In this section we will shuffle and split the data into train and test and explore the dataset

In [None]:
# histgram of the number of defaults versus not
df1.Default.hist()

In [None]:
df1.Default.value_counts()

In [None]:
# Make train folder

if not os.path.exists('./train'):
 os.mkdir('./train')
else:
 pass

# make test folder 

if not os.path.exists('./test'):
 os.mkdir('./test')
else:
 pass

### Train-test split

Split the raw data into train and test sets. Since the number of foreclosures is very small, we will oversample the training dataset to balance the number of defaulted and non-defaulted loans in the training dataset. Note that this can introduce training/serving skew -- to avoid this, we will prepare a separate test set, derived from the test data which retains the original distribution. The model will be validated against this test set after every training epoch.


In [None]:
def train_test_split(file):
 df = pd.read_csv('./data/'+file)
 train=df.sample(frac=0.8,random_state=200) #random state is a seed value
 test=df.drop(train.index)
 print(f'Original training before upsampling shape = ......{train.shape}')
 # first upsample the minority class in training dataset
 train_majority = train[train.Default==0]
 train_minority = train[train.Default==1]
 train_minority_upsampled = resample(train_minority, 
 replace=True, # sample with replacement
 n_samples=len(train_majority), # Experiment with this on your own to see if it improves accuracy.
 random_state=123) # reproducible results
 
 # Combine majority class with upsampled minority class
 train_upsampled = pd.concat([train_majority, train_minority_upsampled], axis=0)
 train_upsampled=train_upsampled.sample(frac=1) #shuffle the data
 print(f"Train file shape = .....{train_upsampled.shape}")
 print(f"Test file shape = .....{test.shape}")
 train_upsampled.to_csv(f'./train/{file}', index=False, header=True)
 test.to_csv(f'./test/{file}', index=False, header=True)
 return len(train_upsampled), len(test)

In [None]:
total_train_rows = 0
total_test_rows = 0
for file in os.listdir('./data'): #this will work if you have multiple data files as well. 
 print(file)
 trl, tel = train_test_split(file)
 total_test_rows+=tel
 total_train_rows+=trl
print(f"Total Training Loans ={total_train_rows}")
print(f"Total Testing Loans = {total_test_rows}")

### Shard the dataset into smaller files

In order for PyTorch to train faster, it is recommended to shard your large dataest into much smaller files. This way the PyTorch dataloader can quickly load one csv at a time consisting of N rows and train the model on that batch. 

The code below will read in each line from the primary dataframe line by line and store it in a separate dataframe. 

Then we will repeat this for the test set.

In [None]:
if not os.path.exists('./train_full_split'):
 os.mkdir('./train_full_split')
else:
 pass

In [None]:
# split the training data into smaller files that can be loaded using the data loader
COLS = pd.read_csv('./train/data.csv').columns
csvfile = open('./train/data.csv', 'r').readlines()
filename = 1
for i in range(len(csvfile)):
 if i % 10000 == 0:
 with open('./train_full_split/' + str(filename) + '.csv', 'w+') as f:
 if filename == 1:
 f.writelines(csvfile[i:i+10000])
 else:
 writer = csv.writer(f, delimiter=',')
 writer.writerow(COLS)
 f.writelines(csvfile[i:i+10000])
 filename += 1
 

In [None]:
if not os.path.exists('./test_full_split'):
 os.mkdir('./test_full_split')
else:
 pass

In [None]:
# split the training data into smaller files that can be loaded using the data loader
csvfile = open('./test/data.csv', 'r').readlines()
filename = 1
for i in range(len(csvfile)):
 if i % 10000 == 0:
 with open('./test_full_split/' + str(filename) + '.csv', 'w+') as f:
 if filename == 1:
 f.writelines(csvfile[i:i+10000])
 else:
 writer = csv.writer(f, delimiter=',')
 writer.writerow(COLS)
 f.writelines(csvfile[i:i+10000])
 filename += 1
 

### Upload Training and test datasets to S3

In [None]:
#Upload Training and test data into S3
train_s3 = sagemaker_session.upload_data(path='./train_full_split/', key_prefix=prefix + '/train')
print(train_s3)
test_s3 = sagemaker_session.upload_data(path='./test_full_split/', key_prefix=prefix + '/test')
print(test_s3)

## Training Script

Here we author our training script which we will use for parallel HPO. This train script uses the SageMaker Distributed DataParallel class for PyTorch for distributed training. 

We will also use SageMaker Profiler to obtain metrics associated with the training job such as the CPU/GPU usage. This is particularly useful for large scale deep learning training jobs.

### 1. Create a Training Job with Profiling Enabled

You will use the standard [SageMaker Estimator API for PyTorch ](https://sagemaker.readthedocs.io/en/stable/frameworks/tensorflow/sagemaker.pytorch.html) to create training jobs. To enable profiling, create a `ProfilerConfig` object and pass it to the `profiler_config` parameter of the `PyTorch` estimator.

### 2. Use SM Distributed DataParallel (DDP) to effiently parallelize the training job across multiple GPUs. 

The training script provides the code you need for distributed data parallel (DDP) training using SMDataParallel. The training script is very similar to a PyTorch training script you might run outside of SageMaker, but modified to run with SMDataParallel. SMDataParallel's PyTorch client provides an alternative to PyTorch's native DDP. For details about how to use SMDataParallel's DDP in your native PyTorch script, see the Getting Started with SMDataParallel tutorials.

For your benefit, we have provided 2 training scripts:

1. train.py : full training script with SM DDP and SageMaker Profiler

2. train_profiler.py : training script with regular PyTorch DDP and SageMaker Profiler

In [None]:
!pygmentize code/train_cpu.py

### Train the model. 

To train the model, we will now use 2 GPUs and also use SageMaker Profiler Capability to generate a report of the GPU utilization, and other performance metrics.

#### Estimator function options
In the following code block, you can update the estimator function to use a different instance type, instance count, and distrubtion strategy. You're also passing in the training script you reviewed in the previous cell.

**Instance types**

SMDataParallel supports model training on SageMaker with the following instance types only:
ml.p3.16xlarge
ml.p3dn.24xlarge [Recommended]
ml.p4d.24xlarge [Recommended]

**Instance count**
To get the best performance and the most out of SMDataParallel, you should use at least 2 instances, but you can also use 1 for testing this example.
Distribution strategy
Note that to use DDP mode, you update the the distribution strategy, and set it to use smdistributed dataparallel.

**Code folder**

In order to run SM DistributedDataParallel, we need to ensure our training container has the latest version of the SageMaker SDK. To do this, simply pass in a requirements.txt file along with your code script in the code folder as provided here. We pass in the code folder as the source directory, pointing to the training script. 

**SageMaker Experiments**

We will also track the training jobs using SageMaker Experiments, which will allow data scientist to compare different trials, analyze and extract any metadata from their training runs and compare jobs.

This is particularly useful with HPO, when data scientists want to compare different Hyperparameter tuning jobs against one another.

In [None]:
loan_class_experiment = Experiment.create(
 experiment_name=f"Classifying-housing-loans-{int(time())}",
 description="Classification of loans as default or not", 
 sagemaker_boto_client=sm)
print(loan_class_experiment)

In [None]:
# Now let's launch a single SageMaker training job. Next we will run a HPO job.
trial_name = f"loan-trial-base-{int(time())}"
loan_trial = Trial.create(
 trial_name=trial_name,
 experiment_name=loan_class_experiment.experiment_name,
 sagemaker_boto_client=sm,
 )

estimator = PyTorch(entry_point="train_cpu.py", 
 role=role,
 framework_version='1.6.0',
 py_version='py36',
 source_dir='./code',
 output_path = f's3://{bucket}/{prefix}/output',
 instance_count=1, 
 sagemaker_session=sagemaker_session,
 instance_type='ml.m5.xlarge', 
 hyperparameters={
 'epochs': 3,
 'backend': 'gloo' #gloo for CPU, nccl for GPU
 },
 # allows Experiments to capture metrics 
 metric_definitions=[
 {'Name':'train:loss', 'Regex':'Train Loss: (.*?);'},
 {'Name':'test:loss', 'Regex':'Test set Average loss: (.*?),'},
 {'Name':'test:accuracy', 'Regex':'Test Accuracy: (.*?)%;'},
 {'Name':'test:F1', 'Regex':'Test set F1-score: (.*?),'}
 ]
 )

# this is a fire and forget event -- this way you can continue to use the notebook below for other data exploration
# and prototyping activities. 

estimator.fit({'training': train_s3,
 'testing':test_s3},
 experiment_config={
 "TrialName": loan_trial.trial_name,
 "TrialComponentDisplayName": "Training",
 },
 wait=False)

### Run the training job on a GPU

To test this estimator out with the latest smdistributed library, make the following changes:

1. replace entry_point = "train.py"

2. add the following code in "" after metric_definitions: "distribution={'smdistributed':{
 'dataparallel':{
 'enabled': True
 }
 }
 },"
 
 
3. In hyperparameters, replace 'backend': 'nccl'

4. Note that the SM distributed library requires the instance types to be one of the following: ml.p3.16xlarge, ml.p3dn.24xlarge, and ml.p4d.24xlarge. 


This will instantiate the sm distributed library on the containers. To learn more about Sm distributed, see this link: 
https://aws.amazon.com/sagemaker/distributed-training/



### Algorithm metrics

We want to extract the model metrics on the test set. To do this, we will use the SageMaker Experiments API and extract the metric for the trail above. 

In [None]:
from sagemaker.analytics import ExperimentAnalytics
trial_component_analytics = ExperimentAnalytics(
 sagemaker_session=sagemaker_session, 
 experiment_name=loan_class_experiment.experiment_name,
 metric_names=['test:F1']
)
analytic_table = trial_component_analytics.dataframe()
analytic_table

In [None]:
# get details for most recent job:
analytic_table.iloc[0]

# Scaling to Tens of thousands of HPO jobs

Having seen how we can launch 1 job, next we will look at some strategies for scaling this to tens of thousands of jobs with Amazon SageMaker for both random and bayesian HPO methods that are come out-of-the-box with SageMaker. 

Currently, for Bayesian HPO, SageMaker has a limit of 500 *concurrent* jobs across all Bayesian HPO jobs. Below we will provide code for launching the maximum possible Bayesian trials while respecting this limit.

Let's look at both strategies in detail. 

## Random Strategy

For random strategy, each trial in an HPO job is completely independent of one another. In this case, if we want to launch a total of N_tot jobs, we can choose to launch N HPO jobs concurrently with each HPO job having M parallel jobs.

To see how you can launch multiple HPO jobs in parallel, refer to the code below.

## Batched Hyper-parameter Optimization for Bayesian Optimization (Cold Start)

For Bayesian HPO, we want to take advantage of the parallelism by training multiple hyper-parameter trials in parallel so the Bayesian algorithm can use the outputs of these parallel jobs to determine the next set of parameters.

Again we are limited by the number of concurrent jobs/account = 500, and the default limits are 10 jobs in parallel per job. In that case we can adapt the formula above as follows:

![Visual Representation of Batching jobs](img/batching.png)


In [None]:
def bayesian_batching_cold_start(total_requested_trials, max_parallel_across_jobs=20, max_parallel_per_job=10, max_candidates_per_job = 500):
 '''Given a total number of requested trials, generates the strategy for Bayesian HPO
 The strategy is a list (batch_strat) where every element is the number of jobs to execute in parallel. The sum of all elements in the list is
 the total number of HPO jobs needed to reach total_requested_trials. For example if batch_strat = [2, 2, 2, 1], means you will run a total of 7
 HPO jobs starting with 2 --> 2 ---> 2 ---> 1. 
 total_requested_trials = number of trails user wants to run.
 max_parallel_across_jobs = max number of training jobs across all trials Sagemaker runs in parallel. Limited by instance availability
 max_parallel_per_job = max number of parallel jobs to run per HPO job
 max_candidates_per_job = total number of training jobs per HPO job'''
 batch_strat = [] 
 tot_jobs_left = total_requested_trials
 max_parallel_hpo_jobs = max_parallel_across_jobs//max_parallel_per_job
 if total_requested_trials < max_parallel_hpo_jobs*max_candidates_per_job:
 batch_strat.append(total_requested_trials//max_candidates_per_job)
 else:
 while tot_jobs_left > max_parallel_hpo_jobs*max_candidates_per_job:
 batch_strat.append(max_parallel_hpo_jobs)
 tot_jobs_left -=max_parallel_hpo_jobs*max_candidates_per_job

 batch_strat.append(math.ceil((tot_jobs_left)/max_candidates_per_job))
 return math.ceil(total_requested_trials/max_candidates_per_job), max_parallel_hpo_jobs, batch_strat
 
bayesian_batching_cold_start(10000)

In [None]:
# let's redefine a simpler estimator with just 1 instance. 
hyperparameter_ranges = {'lr': ContinuousParameter(0.001, 0.1),
 'momentum': CategoricalParameter(list(np.arange(0, 10)/10))}

inputs ={'training': train_s3,
 'testing':test_s3}

objective_metric_name = 'test AUC'
objective_type = 'Maximize'
metric_definitions = [{'Name': 'test AUC',
 'Regex': 'Test set AUC: ([0-9\\.]+)'}]


In [None]:
# Create the estimator for HPO
estimator = PyTorch(entry_point="train_cpu.py",
 role=role,
 framework_version='1.6.0',
 py_version='py36',
 source_dir='./code',
 output_path = f's3://{bucket}/{prefix}/output',
 instance_count=1, 
 sagemaker_session=sagemaker_session,
 instance_type='ml.m5.xlarge', 
 hyperparameters={
 'epochs': 10, # run more epochs for HPO.
 'backend': 'gloo' #gloo for cpu, nccl for gpu
 }
 )

### Run the HPO job on a GPU

As before, to run this HPO job using a GPU and test out the SM distributed library on large scale datasets, make the following code changes.


1. replace entry_point = "train.py"

2. add the following code in "" after metric_definitions: "distribution={'smdistributed':{
 'dataparallel':{
 'enabled': True
 }
 }
 },"
 
 
3. In hyperparameters, replace 'backend': 'nccl'

4. Note that the SM distributed library requires the instance types to be one of the following: ml.p3.16xlarge, ml.p3dn.24xlarge, and ml.p4d.24xlarge. 

This will instantiate the sm distributed library on the containers. To learn more about Sm distributed, see this link: 
https://aws.amazon.com/sagemaker/distributed-training/

### Polling jobs
You can potentially reduce your overall wall time by polling continuously for completed HPO jobs. This way, if the number of launched HPO jobs is less than a certain threshold, you can start a new one. Use the code below to implement the polling strategy.

In [None]:
# helper function to launch a desired number of "n_parallel" HPO jobs simultaneously
def _parallel_hpo_no_polling(job_name_prefix, n_parallel, inputs, max_candidates_per_job, max_parallel_per_job):
 """kicks off N_parallel Bayesian HPO jobs in parallel
 job_name_prefix: user specified prefix for job names
 n_parallel: Number of HPO jobs to start in parallel
 inputs: training and test data s3 paths
 max_candidates_per_job: number of training jobs to run in each HPO job in total
 max_parallel_per_job: number of training jobs to run in parallel in each job
 
 """
 # kick off n_parallel jobs simultaneously and returns all the job names 
 tuning_job_names = []
 for i in range(n_parallel):
 timestamp_suffix = strftime("%d-%H-%M-%S", gmtime())
 try:
 tuner = HyperparameterTuner(estimator,
 objective_metric_name,
 hyperparameter_ranges,
 metric_definitions,
 max_jobs=max_candidates_per_job,
 max_parallel_jobs=max_parallel_per_job,
 base_tuning_job_name=f'{job_name_prefix}-{timestamp_suffix}',
 objective_type=objective_type)
 # fit the tuner to the inputs and include it as part of experiments
 tuner.fit(inputs, 
 wait=False) # set wait=False, so you can launch other jobs in parallel.
 tuning_job_names.append(tuner.latest_tuning_job.name)
 sleep(1) #this is required otherwise you will get an error for using the same tuning job name
 print(tuning_job_names)
 except Exception as e:
 sleep(5)
 return tuning_job_names

#orchestration and polling logic
def poll_and_run(job_name_prefix, inputs, max_total_candidates, max_parallel_across_jobs, max_candidates_per_job, max_parallel_per_job):
 """Polls for number of running HPO jobs. If less than max_parallel , starts a new one. 
 job_name_prefix: the name prefix to give all your training jobs
 max_total_candidates: how many total trails to run across all HPO jobs
 max_candidates_per_job: how many total trails to run for 1 HPO job 
 max_parallel_per_job: how many trials to run in parallel for a given HPO job (fixed to 10 without limit increases). 
 max_parallel_across_jobs: how many concurrent trials to run in parallel across all HPO jobs
 """
 #get how many jobs to run in total and concurrently
 max_num, max_parallel, _ = bayesian_batching_cold_start(max_total_candidates, 
 max_parallel_across_jobs=max_parallel_across_jobs,
 max_parallel_per_job=max_parallel_per_job,
 max_candidates_per_job = max_candidates_per_job
 )
 
 # continuously polls for running jobs -- if they are less than the required number, then launches a new one. 
 try:
 all_jobs = sm.list_hyper_parameter_tuning_jobs(SortBy='CreationTime', SortOrder='Descending', 
 NameContains=job_name_prefix)['HyperParameterTuningJobSummaries']
 all_jobs = [i['HyperParameterTuningJobName'] for i in all_jobs]

 if len(all_jobs)==0:
 raise ValueError
 
 else:
 print("Continuing where you left off...")
 response_list = [sm.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=i)['HyperParameterTuningJobStatus']
 for i in all_jobs]
 num_left = max_num - response_list.count("Completed")
 except Exception as e:
 print(f"Starting a set of HPO jobs with the prefix {job_name_prefix} ...")
 num_left = max_num
 #kick off the first set of jobs
 all_jobs += _parallel_hpo_no_polling(job_name_prefix, min(max_parallel, num_left), inputs, max_candidates_per_job, max_parallel_per_job)
 
 
 while num_left >0:
 response_list = [sm.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=i)['HyperParameterTuningJobStatus']
 for i in all_jobs]
 running_jobs = response_list.count("InProgress") # look for the jobs that are running. 
 print(f"number of completed jobs = {response_list.count('Completed')}")
 sleep(10)
 if running_jobs < max_parallel and len(all_jobs) < max_num:
 all_jobs += _parallel_hpo_no_polling(job_name_prefix, min(max_parallel-running_jobs, num_left), inputs, max_candidates_per_job, max_parallel_per_job)
 num_left = max_num - response_list.count("Completed")
 
 return all_jobs

In [None]:
# Test out this loop
alljobs = poll_and_run('newtrials', inputs, max_total_candidates=260, max_parallel_across_jobs = 20, max_candidates_per_job=4, max_parallel_per_job=2)

In [None]:
# Aggregate the results from all the HPO jobs based on the custom metric specified
def get_best_job(all_jobs_list):
 """Get the best job from the list of all the jobs completed.
 Objective is to maximize a particular value such as AUC or F1 score"""
 df = pd.DataFrame()
 for job in all_jobs_list:
 tuner = sagemaker.HyperparameterTuningJobAnalytics(job)
 full_df = tuner.dataframe()
 tuning_job_result = sm.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=job)
 is_maximize = (tuning_job_result['HyperParameterTuningJobConfig']['HyperParameterTuningJobObjective']['Type'] == 'Maximize')
 if len(full_df) > 0:
 df = pd.concat([df, full_df[full_df['FinalObjectiveValue'] < float('inf')]])
 if len(df) > 0:
 df = df.sort_values('FinalObjectiveValue', ascending=is_maximize)
 print("Number of training jobs with valid objective: %d" % len(df))
 print({"lowest":min(df['FinalObjectiveValue']),"highest": max(df['FinalObjectiveValue'])})
 pd.set_option('display.max_colwidth', -1) # Don't truncate TrainingJobName
 return df
 else:
 print("No training jobs have reported valid results yet.")

In [None]:
get_best_job(alljobs)

The above example shows how the batching strategy works for cold-start cases. For warm start, we need a different strategy as we want to use the outputs of the previous job as the inputs into the next job. Look at the code below to run a warm start batch

## Serial Hyper-parameter Optimization (warm start)

For warm start -- we want to use the outputs of our previous run, as the input of the next one. For this reason, we need to run the jobs serially, as shown in the code below. 

In [None]:
def large_scale_hpo_warmstart(inputs, max_total_jobs, max_jobs_per_hpo_job, max_parallel_per_job):
 base_hpo_job_name = 'FTW'
 timestamp_suffix = strftime("%d-%H-%M-%S", gmtime())
 tuning_job_name = lambda i : f"{base_hpo_job_name}-{timestamp_suffix}-{i}"
 current_jobs_completed = 0
 job_names_list = []
 while current_jobs_completed < max_total_jobs:
 jobs_to_launch = min(max_total_jobs - current_jobs_completed, max_jobs_per_hpo_job)

 hpo_job_config = dict(
 estimator=estimator,
 objective_metric_name=objective_metric_name,
 metric_definitions=metric_definitions,
 hyperparameter_ranges=hyperparameter_ranges,
 max_jobs=jobs_to_launch,
 strategy="Bayesian",
 objective_type=objective_type,
 max_parallel_jobs=max_parallel_per_job,
 )

 if current_jobs_completed > 0:
 parent_tuning_job_name = tuning_job_name(current_jobs_completed)
 warm_start_config = WarmStartConfig(
 WarmStartTypes.IDENTICAL_DATA_AND_ALGORITHM,
 parents={parent_tuning_job_name}
 )
 hpo_job_config.update(dict(
 base_tuning_job_name=parent_tuning_job_name,
 warm_start_config=warm_start_config
 ))

 tuner = HyperparameterTuner(**hpo_job_config)
 tuner.fit(
 inputs,
 job_name=tuning_job_name(current_jobs_completed + jobs_to_launch),
 logs=True,
 )
 tuner.wait()
 job_names_list.append(tuner.latest_tuning_job.name)
 current_jobs_completed += jobs_to_launch
 return job_names_list

In [None]:
job_list = large_scale_hpo_warmstart(inputs, 2, 1, 1)

In [None]:
get_best_job(job_list)

## Conclusions

In this notebook we have covered a number of advanced topics suitable for ML researchers who run large scale deep learning training and HPO jobs.


1. We have covered how to run large numbers of HPO jobs using a continuous polling technique. 

2. We have provided code in the code folder to allow you to use the SM Distributed library as well as GPUs for faster training

3. Finally we covered how you can optimize for a custom metric, by publishing your custom metric to stdout and passing in the regex in the metric_definition during HPO.

## Optional Topic (using RayTune with PyTorch)

Ray is an open source library for HPO developed from this paper: https://arxiv.org/pdf/1807.05118.pdf our of UC Berkeley. Ray integrates with many of the popular open and closed source HPO search algorithms and schedulers. 

To see how Ray works with Amazon SageMaker, try out the estimator below which runs the train_ray_cpu.py script.


### Run the Ray job on a GPU instead

To test Ray with a single node GPU cluster, make the following code changes.

1. replace entry_point = "train_ray.py"

2. In hyperparameters, replace 'backend': 'nccl'

Note: Since Ray uses its own distribution mechanism we do not discuss smdistrubted on top of Ray here. 

In [None]:
from sagemaker.pytorch import PyTorch

estimator = PyTorch(entry_point="train_ray_cpu.py", #put requirements.txt file to install ray
 role=role,
 source_dir='./code',
 framework_version='1.6.0',
 py_version='py3',
 output_path = f's3://{bucket}/{prefix}/output',
 instance_count=1,
 instance_type='ml.m5.xlarge',
 sagemaker_session=sagemaker_session,
 hyperparameters={
 'epochs': 7,
 'backend': 'gloo' # gloo for CPU and nccl for GPU
 },
 disable_profiler=True)

inputs ={'training': train_s3,
 'testing':test_s3}

estimator.fit(inputs, wait=True)

## Conclusions

In this notebook, you have seen how to run large scale HPO jobs using SageMaker HPO tuning feature on a credit card default dataset. 

Wherever possible, with minimal code changes you can use this notebook to also use GPUs, as we have provided all the required code in the code folder associated with this repo. 

Happy Hyperparameter tuning!