# DeepFM Tensorflow Horovod on SageMaker Sample

### In this sample, we will demo how to run a deepfm sample code in tensorflow horovod on sagemaker

Notice:

1. Dataset format is TFRecord

2. This model training we will use **GPU** instances

3. Using [SageMaker Python SDK 2.x](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html)
4. TensorFlow version is 1.14 or 1.15.2

In [1]:
import sagemaker
print(sagemaker.__version__)

2.25.1


## File mode

In [None]:
import sagemaker
from sagemaker.tensorflow.estimator import TensorFlow
from datetime import datetime
import os

dt_now = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")

bucket = ''# YOUR BUCKET NAME
checkpoint_s3_uri = 's3://{}/deepfm-checkpoint/{}'.format(bucket, dt_now) #Change to your own path if you want to save ckpt during training
checkpoint_dir = '/opt/ml/deepfm/checkpoints'
model_dir = '/opt/ml/model'
output_path= 's3://{}/deepfm-2021'.format(bucket)

training_channel_name = 'training'
evaluation_channel_name = 'evaluation'

train_instance_type = 'ml.p3.8xlarge'
hvd_processes_per_host = 4
train_instance_count= 1

train_use_spot_instances = True
enable_s3_shard = True
enable_data_multi_path = True

#enable pipe mode
pipe_mode = 0

train_max_run=36000*2
train_max_wait = 72000 if train_use_spot_instances else None

distributions = {'mpi': {
 'enabled': True,
 'processes_per_host': hvd_processes_per_host,
 'custom_mpi_options': '-verbose --NCCL_DEBUG=INFO -x OMPI_MCA_btl_vader_single_copy_mechanism=none'
 }
 }

deep_layer = '128,64,32'

batch_size = 1024
feature_size = 117581

base_job_name='tf-scriptmode-deepfm'

hyperparameters = {'servable_model_dir': '/opt/ml/model', 'checkpoint_dir':checkpoint_dir,
 'training_data_dir': '/opt/ml/input/data/training/', 'val_data_dir': '/opt/ml/input/data/evaluation/', 'log_steps': 10, 'num_epochs': 10, 
 'field_size': 39, 'feature_size': feature_size, 'deep_layers': deep_layer,
 'perform_shuffle': 0, 'batch_size': batch_size, 'pipe_mode': pipe_mode, 'enable_s3_shard': enable_s3_shard,
 'training_channel_name': training_channel_name, 'evaluation_channel_name': evaluation_channel_name,
 'worker_per_host': hvd_processes_per_host, 'enable_data_multi_path': enable_data_multi_path
 }

estimator = TensorFlow(
 #source_dir='./',
 entry_point='DeepFM-hvd-tfrecord-vectorized-map.py',
 model_dir=False,
 #checkpoint_s3_uri = checkpoint_s3_uri,
 #checkpoint_local_path = checkpoint_local_path,
 output_path= output_path,
 instance_type=train_instance_type,
 instance_count=train_instance_count,
 #volume_size = 500,
 hyperparameters=hyperparameters,
 role=sagemaker.get_execution_role(),
 base_job_name=base_job_name,
 framework_version='1.15.2',
 py_version='py3',
 script_mode=True,
 #input_mode='Pipe',
 distribution=distributions,
 use_spot_instances=train_use_spot_instances,
 max_wait=train_max_wait,
 max_run=train_max_run,
 debugger_hook_config =False,
 disable_profiler=True
 )

In [None]:
from sagemaker.inputs import TrainingInput

train_s3_uri = 's3://sagemaker-us-west-2-169088282855/tf-SM-deepctr-deepfm-sample/data-tfrecord/training/'
validate_s3_uri = 's3://sagemaker-us-west-2-169088282855/tf-SM-deepctr-deepfm-sample/data-tfrecord/val/'

if enable_s3_shard:
 train_input = TrainingInput(train_s3_uri, distribution='ShardedByS3Key')
 val_input = TrainingInput(validate_s3_uri)
else :
 train_input = TrainingInput(train_s3_uri)
 val_input = TrainingInput(validate_s3_uri)

inputs = {training_channel_name : train_input, evaluation_channel_name : val_input}

estimator.fit(inputs)

## Pipe mode

In [None]:
import sagemaker
from sagemaker.tensorflow.estimator import TensorFlow
from datetime import datetime
import os

dt_now = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")

bucket = 'sagemaker-us-west-2-169088282855'
checkpoint_s3_uri = 's3://{}/deepfm-checkpoint/{}'.format(bucket, dt_now) #Change to your own path if you want to save ckpt during training
checkpoint_dir = '/opt/ml/deepfm/checkpoints'
model_dir = '/opt/ml/model'
output_path= 's3://{}/deepfm-2021'.format(bucket)

training_channel_name = 'training'
evaluation_channel_name = 'evaluation'

train_instance_type = 'ml.p3.8xlarge'
hvd_processes_per_host = 4
train_instance_count= 1

train_use_spot_instances = True
enable_s3_shard = True
enable_data_multi_path = False

#enable pipe mode
pipe_mode = 1

train_max_run=36000*2
train_max_wait = 72000 if train_use_spot_instances else None

distributions = {'mpi': {
 'enabled': True,
 'processes_per_host': hvd_processes_per_host,
 'custom_mpi_options': '-verbose --NCCL_DEBUG=INFO -x OMPI_MCA_btl_vader_single_copy_mechanism=none'
 }
 }

deep_layer = '128,64,32'

batch_size = 1024
feature_size = 117581

base_job_name='tf-scriptmode-deepfm'

hyperparameters = {'servable_model_dir': '/opt/ml/model', 'checkpoint_dir':checkpoint_dir,
 'training_data_dir': '/opt/ml/input/data/training/', 'val_data_dir': '/opt/ml/input/data/evaluation/', 'log_steps': 10, 'num_epochs': 10, 
 'field_size': 39, 'feature_size': feature_size, 'deep_layers': deep_layer,
 'perform_shuffle': 0, 'batch_size': batch_size, 'pipe_mode': pipe_mode, 'enable_s3_shard': enable_s3_shard,
 'training_channel_name': training_channel_name, 'evaluation_channel_name': evaluation_channel_name,
 'worker_per_host': hvd_processes_per_host, 'enable_data_multi_path': enable_data_multi_path
 }

estimator = TensorFlow(
 #source_dir='./',
 entry_point='DeepFM-hvd-tfrecord-vectorized-map.py',
 model_dir=False,
 #checkpoint_s3_uri = checkpoint_s3_uri,
 #checkpoint_local_path = checkpoint_local_path,
 output_path= output_path,
 instance_type=train_instance_type,
 instance_count=train_instance_count,
 #volume_size = 500,
 hyperparameters=hyperparameters,
 role=sagemaker.get_execution_role(),
 base_job_name=base_job_name,
 framework_version='1.14',
 py_version='py3',
 script_mode=True,
 input_mode='Pipe',
 distribution=distributions,
 use_spot_instances=train_use_spot_instances,
 max_wait=train_max_wait,
 max_run=train_max_run,
 debugger_hook_config =False,
 disable_profiler=True
 )

In [None]:
from sagemaker.inputs import TrainingInput

train_s3_uri = '' # Path to training data
validate_s3_uri = '' # Path to validation data

if enable_data_multi_path: # assume we have four channels

 train_s3_uri_1 = ''
 train_s3_uri_2 = ''
 train_s3_uri_3 = ''
 train_s3_uri_4 = ''
 
 if enable_s3_shard:
 train_input_1 = TrainingInput(train_s3_uri_1, distribution='ShardedByS3Key')
 train_input_2 = TrainingInput(train_s3_uri_2, distribution='ShardedByS3Key')
 train_input_3 = TrainingInput(train_s3_uri_3, distribution='ShardedByS3Key')
 train_input_4 = TrainingInput(train_s3_uri_4, distribution='ShardedByS3Key')
 else :
 train_input_1 = TrainingInput(train_s3_uri_1)
 train_input_2 = TrainingInput(train_s3_uri_2)
 train_input_3 = TrainingInput(train_s3_uri_3)
 train_input_4 = TrainingInput(train_s3_uri_4)
 
 val_input = TrainingInput(validate_s3_uri)
 
 inputs = {'{}'.format(training_channel_name) : train_input_1,
 '{}-1'.format(training_channel_name) : train_input_2,
 '{}-2'.format(training_channel_name) : train_input_3,
 '{}-3'.format(training_channel_name) : train_input_4, 
 evaluation_channel_name : val_input}

else : # use one train_s3_uri for example, you could change to your real path
 
 if enable_s3_shard:
 train_input = TrainingInput(train_s3_uri, distribution='ShardedByS3Key')
 else :
 train_input = TrainingInput(train_s3_uri)
 
 val_input = TrainingInput(validate_s3_uri)
 
 inputs = {'{}'.format(training_channel_name) : train_input,
 '{}-1'.format(training_channel_name) : train_input,
 '{}-2'.format(training_channel_name) : train_input,
 '{}-3'.format(training_channel_name) : train_input, 
 evaluation_channel_name : val_input}

estimator.fit(inputs)