{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# DeepFM Tensorflow Horovod on SageMaker Sample" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### In this sample, we will demo how to run a deepfm sample code in tensorflow horovod on sagemaker\n", "\n", "Notice:\n", "\n", "1. Dataset format is TFRecord\n", "\n", "2. This model training we will use **GPU** instances\n", "\n", "3. Using [SageMaker Python SDK 2.x](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html)\n", "4. TensorFlow version is 1.14 or 1.15.2" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2.25.1\n" ] } ], "source": [ "import sagemaker\n", "print(sagemaker.__version__)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## File mode" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "from sagemaker.tensorflow.estimator import TensorFlow\n", "from datetime import datetime\n", "import os\n", "\n", "dt_now = datetime.now().strftime(\"%Y-%m-%d-%H-%M-%S\")\n", "\n", "bucket = ''# YOUR BUCKET NAME\n", "checkpoint_s3_uri = 's3://{}/deepfm-checkpoint/{}'.format(bucket, dt_now) #Change to your own path if you want to save ckpt during training\n", "checkpoint_dir = '/opt/ml/deepfm/checkpoints'\n", "model_dir = '/opt/ml/model'\n", "output_path= 's3://{}/deepfm-2021'.format(bucket)\n", "\n", "training_channel_name = 'training'\n", "evaluation_channel_name = 'evaluation'\n", "\n", "train_instance_type = 'ml.p3.8xlarge'\n", "hvd_processes_per_host = 4\n", "train_instance_count= 1\n", "\n", "train_use_spot_instances = True\n", "enable_s3_shard = True\n", "enable_data_multi_path = True\n", "\n", "#enable pipe mode\n", "pipe_mode = 0\n", "\n", "train_max_run=36000*2\n", "train_max_wait = 72000 if train_use_spot_instances else None\n", "\n", "distributions = {'mpi': {\n", " 'enabled': True,\n", " 'processes_per_host': hvd_processes_per_host,\n", " 'custom_mpi_options': '-verbose --NCCL_DEBUG=INFO -x OMPI_MCA_btl_vader_single_copy_mechanism=none'\n", " }\n", " }\n", "\n", "deep_layer = '128,64,32'\n", "\n", "batch_size = 1024\n", "feature_size = 117581\n", "\n", "base_job_name='tf-scriptmode-deepfm'\n", "\n", "hyperparameters = {'servable_model_dir': '/opt/ml/model', 'checkpoint_dir':checkpoint_dir,\n", " 'training_data_dir': '/opt/ml/input/data/training/', 'val_data_dir': '/opt/ml/input/data/evaluation/', 'log_steps': 10, 'num_epochs': 10, \n", " 'field_size': 39, 'feature_size': feature_size, 'deep_layers': deep_layer,\n", " 'perform_shuffle': 0, 'batch_size': batch_size, 'pipe_mode': pipe_mode, 'enable_s3_shard': enable_s3_shard,\n", " 'training_channel_name': training_channel_name, 'evaluation_channel_name': evaluation_channel_name,\n", " 'worker_per_host': hvd_processes_per_host, 'enable_data_multi_path': enable_data_multi_path\n", " }\n", "\n", "estimator = TensorFlow(\n", " #source_dir='./',\n", " entry_point='DeepFM-hvd-tfrecord-vectorized-map.py',\n", " model_dir=False,\n", " #checkpoint_s3_uri = checkpoint_s3_uri,\n", " #checkpoint_local_path = checkpoint_local_path,\n", " output_path= output_path,\n", " instance_type=train_instance_type,\n", " instance_count=train_instance_count,\n", " #volume_size = 500,\n", " hyperparameters=hyperparameters,\n", " role=sagemaker.get_execution_role(),\n", " base_job_name=base_job_name,\n", " framework_version='1.15.2',\n", " py_version='py3',\n", " script_mode=True,\n", " #input_mode='Pipe',\n", " distribution=distributions,\n", " use_spot_instances=train_use_spot_instances,\n", " max_wait=train_max_wait,\n", " max_run=train_max_run,\n", " debugger_hook_config =False,\n", " disable_profiler=True\n", " )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.inputs import TrainingInput\n", "\n", "train_s3_uri = 's3://sagemaker-us-west-2-169088282855/tf-SM-deepctr-deepfm-sample/data-tfrecord/training/'\n", "validate_s3_uri = 's3://sagemaker-us-west-2-169088282855/tf-SM-deepctr-deepfm-sample/data-tfrecord/val/'\n", "\n", "if enable_s3_shard:\n", " train_input = TrainingInput(train_s3_uri, distribution='ShardedByS3Key')\n", " val_input = TrainingInput(validate_s3_uri)\n", "else :\n", " train_input = TrainingInput(train_s3_uri)\n", " val_input = TrainingInput(validate_s3_uri)\n", "\n", "inputs = {training_channel_name : train_input, evaluation_channel_name : val_input}\n", "\n", "estimator.fit(inputs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Pipe mode" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "from sagemaker.tensorflow.estimator import TensorFlow\n", "from datetime import datetime\n", "import os\n", "\n", "dt_now = datetime.now().strftime(\"%Y-%m-%d-%H-%M-%S\")\n", "\n", "bucket = 'sagemaker-us-west-2-169088282855'\n", "checkpoint_s3_uri = 's3://{}/deepfm-checkpoint/{}'.format(bucket, dt_now) #Change to your own path if you want to save ckpt during training\n", "checkpoint_dir = '/opt/ml/deepfm/checkpoints'\n", "model_dir = '/opt/ml/model'\n", "output_path= 's3://{}/deepfm-2021'.format(bucket)\n", "\n", "training_channel_name = 'training'\n", "evaluation_channel_name = 'evaluation'\n", "\n", "train_instance_type = 'ml.p3.8xlarge'\n", "hvd_processes_per_host = 4\n", "train_instance_count= 1\n", "\n", "train_use_spot_instances = True\n", "enable_s3_shard = True\n", "enable_data_multi_path = False\n", "\n", "#enable pipe mode\n", "pipe_mode = 1\n", "\n", "train_max_run=36000*2\n", "train_max_wait = 72000 if train_use_spot_instances else None\n", "\n", "distributions = {'mpi': {\n", " 'enabled': True,\n", " 'processes_per_host': hvd_processes_per_host,\n", " 'custom_mpi_options': '-verbose --NCCL_DEBUG=INFO -x OMPI_MCA_btl_vader_single_copy_mechanism=none'\n", " }\n", " }\n", "\n", "deep_layer = '128,64,32'\n", "\n", "batch_size = 1024\n", "feature_size = 117581\n", "\n", "base_job_name='tf-scriptmode-deepfm'\n", "\n", "hyperparameters = {'servable_model_dir': '/opt/ml/model', 'checkpoint_dir':checkpoint_dir,\n", " 'training_data_dir': '/opt/ml/input/data/training/', 'val_data_dir': '/opt/ml/input/data/evaluation/', 'log_steps': 10, 'num_epochs': 10, \n", " 'field_size': 39, 'feature_size': feature_size, 'deep_layers': deep_layer,\n", " 'perform_shuffle': 0, 'batch_size': batch_size, 'pipe_mode': pipe_mode, 'enable_s3_shard': enable_s3_shard,\n", " 'training_channel_name': training_channel_name, 'evaluation_channel_name': evaluation_channel_name,\n", " 'worker_per_host': hvd_processes_per_host, 'enable_data_multi_path': enable_data_multi_path\n", " }\n", "\n", "estimator = TensorFlow(\n", " #source_dir='./',\n", " entry_point='DeepFM-hvd-tfrecord-vectorized-map.py',\n", " model_dir=False,\n", " #checkpoint_s3_uri = checkpoint_s3_uri,\n", " #checkpoint_local_path = checkpoint_local_path,\n", " output_path= output_path,\n", " instance_type=train_instance_type,\n", " instance_count=train_instance_count,\n", " #volume_size = 500,\n", " hyperparameters=hyperparameters,\n", " role=sagemaker.get_execution_role(),\n", " base_job_name=base_job_name,\n", " framework_version='1.14',\n", " py_version='py3',\n", " script_mode=True,\n", " input_mode='Pipe',\n", " distribution=distributions,\n", " use_spot_instances=train_use_spot_instances,\n", " max_wait=train_max_wait,\n", " max_run=train_max_run,\n", " debugger_hook_config =False,\n", " disable_profiler=True\n", " )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.inputs import TrainingInput\n", "\n", "train_s3_uri = '' # Path to training data\n", "validate_s3_uri = '' # Path to validation data\n", "\n", "if enable_data_multi_path: # assume we have four channels\n", "\n", " train_s3_uri_1 = ''\n", " train_s3_uri_2 = ''\n", " train_s3_uri_3 = ''\n", " train_s3_uri_4 = ''\n", " \n", " if enable_s3_shard:\n", " train_input_1 = TrainingInput(train_s3_uri_1, distribution='ShardedByS3Key')\n", " train_input_2 = TrainingInput(train_s3_uri_2, distribution='ShardedByS3Key')\n", " train_input_3 = TrainingInput(train_s3_uri_3, distribution='ShardedByS3Key')\n", " train_input_4 = TrainingInput(train_s3_uri_4, distribution='ShardedByS3Key')\n", " else :\n", " train_input_1 = TrainingInput(train_s3_uri_1)\n", " train_input_2 = TrainingInput(train_s3_uri_2)\n", " train_input_3 = TrainingInput(train_s3_uri_3)\n", " train_input_4 = TrainingInput(train_s3_uri_4)\n", " \n", " val_input = TrainingInput(validate_s3_uri)\n", " \n", " inputs = {'{}'.format(training_channel_name) : train_input_1,\n", " '{}-1'.format(training_channel_name) : train_input_2,\n", " '{}-2'.format(training_channel_name) : train_input_3,\n", " '{}-3'.format(training_channel_name) : train_input_4, \n", " evaluation_channel_name : val_input}\n", "\n", "else : # use one train_s3_uri for example, you could change to your real path\n", " \n", " if enable_s3_shard:\n", " train_input = TrainingInput(train_s3_uri, distribution='ShardedByS3Key')\n", " else :\n", " train_input = TrainingInput(train_s3_uri)\n", " \n", " val_input = TrainingInput(validate_s3_uri)\n", " \n", " inputs = {'{}'.format(training_channel_name) : train_input,\n", " '{}-1'.format(training_channel_name) : train_input,\n", " '{}-2'.format(training_channel_name) : train_input,\n", " '{}-3'.format(training_channel_name) : train_input, \n", " evaluation_channel_name : val_input}\n", "\n", "estimator.fit(inputs)" ] } ], "metadata": { "kernelspec": { "display_name": "conda_tensorflow_p36", "language": "python", "name": "conda_tensorflow_p36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.13" } }, "nbformat": 4, "nbformat_minor": 4 }