# DeepFM Tensorflow Parameter Server on SageMaker Sample

### In this sample, we will demo how to run a deepfm sample code in tensorflow parameter server on sagemaker

Notice:

1. Dataset format is TFRecord

2. This model training we will use **CPU** instances based on our experience, DeepFM script TF PS on CPU will more effective and saving cost. 

3. Using [SageMaker Python SDK 2.x](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html)
4. TensorFlow version is 1.14 or 1.15.2

In [1]:
import sagemaker
print(sagemaker.__version__)

2.25.1


## File mode

In [None]:
import sagemaker
from sagemaker.tensorflow.estimator import TensorFlow
from datetime import datetime
import os

bucket = '' # YOUR_BUCKET_NAME
checkpoint_s3_uri = 's3://{}/deepfm-checkpoint'.format(bucket) #Change to your own path if you want to save ckpt during training
checkpoint_local_path = '/opt/ml/checkpoints'
model_dir = 's3://{}/deepfm-ps-ckpt/{}'.format(bucket, datetime.now().strftime("%Y-%m-%d-%H-%M-%S"))
output_path= 's3://{}/deepfm-2021'.format(bucket)

training_channel_name = 'training'
evaluation_channel_name = 'evaluation'

train_instance_type = 'ml.c5.18xlarge'
train_instance_count= 2

train_use_spot_instances = True
enable_s3_shard = True

train_max_run=36000*2
train_max_wait = 72000 if train_use_spot_instances else None

distributions={'parameter_server': {'enabled': True}}

deep_layer = '128,64,32'

batch_size = 1024
feature_size = 117581

base_job_name='tf-scriptmode-deepfm'

hyperparameters = {'servable_model_dir': '/opt/ml/model', 'training_data_dir': '/opt/ml/input/data/training/',
                   'val_data_dir': '/opt/ml/input/data/evaluation/', 'log_steps': 10, 'num_epochs': 10, 
                   'field_size': 39, 'feature_size': feature_size, 'deep_layers': deep_layer,
                   'perform_shuffle': 0, 'batch_size': batch_size, 'pipe_mode': 0, 'enable_s3_shard': enable_s3_shard,
                   'training_channel_name': training_channel_name, 'evaluation_channel_name': evaluation_channel_name
                  }

estimator = TensorFlow(
                       #source_dir='./',
                       entry_point='DeepFM-dist-ps-for-multipleCPU-multiInstance.py',
                       model_dir=model_dir,
                       #checkpoint_s3_uri = checkpoint_s3_uri,
                       #checkpoint_local_path = checkpoint_local_path,
                       output_path= output_path,
                       instance_type=train_instance_type,
                       instance_count=train_instance_count,
                       #volume_size = 500,
                       hyperparameters=hyperparameters,
                       role=sagemaker.get_execution_role(),
                       base_job_name=base_job_name,
                       framework_version='1.15.2',
                       py_version='py3',
                       script_mode=True,
                       #input_mode='Pipe',
                       distribution=distributions,
                       use_spot_instances=train_use_spot_instances,
                       max_wait=train_max_wait,
                       max_run=train_max_run,
                       debugger_hook_config =False,
                       disable_profiler=True
                       )

In [None]:
# File mode
from sagemaker.inputs import TrainingInput

train_s3_uri = ''# Path to training data
validate_s3_uri = ''# Path to validation data

if enable_s3_shard:
    train_input = TrainingInput(train_s3_uri, distribution='ShardedByS3Key')
    val_input = TrainingInput(validate_s3_uri)
else :
    train_input = TrainingInput(train_s3_uri)
    val_input = TrainingInput(validate_s3_uri)

inputs = {training_channel_name : train_input, evaluation_channel_name : val_input}

estimator.fit(inputs)

## Pipe mode

In [None]:
import sagemaker
from sagemaker.tensorflow.estimator import TensorFlow
from datetime import datetime
import os

bucket = 'sagemaker-us-west-2-169088282855'
checkpoint_s3_uri = 's3://{}/deepfm-checkpoint'.format(bucket) #Change to your own path if you want to save ckpt during training
checkpoint_local_path = '/opt/ml/checkpoints'
model_dir = 's3://{}/deepfm-ps-ckpt/{}'.format(bucket, datetime.now().strftime("%Y-%m-%d-%H-%M-%S"))
output_path= 's3://{}/deepfm-2021'.format(bucket)

training_channel_name = 'training'
evaluation_channel_name = 'evaluation'

train_instance_type = 'ml.c5.18xlarge'
train_instance_count= 2

train_use_spot_instances = True
enable_s3_shard = True

train_max_run=36000*2
train_max_wait = 72000 if train_use_spot_instances else None

distributions={'parameter_server': {'enabled': True}}

deep_layer = '128,64,32'

batch_size = 1024
feature_size = 117581

base_job_name='tf-scriptmode-deepfm'

hyperparameters = {'servable_model_dir': '/opt/ml/model', 'training_data_dir': '/opt/ml/input/data/training/',
                   'val_data_dir': '/opt/ml/input/data/evaluation/', 'log_steps': 10, 'num_epochs': 10, 
                   'field_size': 39, 'feature_size': feature_size, 'deep_layers': deep_layer,
                   'perform_shuffle': 0, 'batch_size': batch_size, 'pipe_mode': 1, 'enable_s3_shard': enable_s3_shard,
                   'training_channel_name': training_channel_name, 'evaluation_channel_name': evaluation_channel_name
                  }

estimator = TensorFlow(
                       #source_dir='./',
                       entry_point='DeepFM-dist-ps-for-multipleCPU-multiInstance.py',
                       model_dir=model_dir,
                       #checkpoint_s3_uri = checkpoint_s3_uri,
                       #checkpoint_local_path = checkpoint_local_path,
                       output_path= output_path,
                       instance_type=train_instance_type,
                       instance_count=train_instance_count,
                       #volume_size = 500,
                       hyperparameters=hyperparameters,
                       role=sagemaker.get_execution_role(),
                       base_job_name=base_job_name,
                       framework_version='1.14',
                       py_version='py3',
                       script_mode=True,
                       input_mode='Pipe',
                       distribution=distributions,
                       use_spot_instances=train_use_spot_instances,
                       max_wait=train_max_wait,
                       max_run=train_max_run,
                       debugger_hook_config =False,
                       disable_profiler=True
                       )

In [None]:
from sagemaker.inputs import TrainingInput

train_s3_uri = ''# Path to training data
validate_s3_uri = '' # Path to validation data

if enable_s3_shard:
    train_input = TrainingInput(train_s3_uri, distribution='ShardedByS3Key')
    val_input = TrainingInput(validate_s3_uri)
else :
    train_input = TrainingInput(train_s3_uri)
    val_input = TrainingInput(validate_s3_uri)

inputs = {training_channel_name : train_input, evaluation_channel_name : val_input}

estimator.fit(inputs)