{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Training with Pipe Mode using PipeModeDataset\n", "Amazon SageMaker allows users to create training jobs using Pipe input mode. With Pipe input mode, your dataset is streamed directly to your training instances instead of being downloaded first. This means that your training jobs start sooner, finish quicker, and need less disk space.\n", "\n", "SageMaker TensorFlow provides an implementation of `tf.data.Dataset` that makes it easy to take advantage of Pipe input mode in SageMaker. You can replace your `tf.data.Dataset` with a `sagemaker_tensorflow.PipeModeDataset` to read TFRecords as they are streamed to your training instances.\n", "\n", "In your entry_point script, you can use `PipeModeDataset` like a `Dataset`. In this example, we create a `PipeModeDataset` to read TFRecords from the ‘training’ channel:\n", "\n", "```python\n", "from sagemaker_tensorflow import PipeModeDataset\n", "\n", "features = {\n", " 'data': tf.FixedLenFeature([], tf.string),\n", " 'labels': tf.FixedLenFeature([], tf.int64),\n", "}\n", "\n", "def parse(record):\n", " parsed = tf.parse_single_example(record, features)\n", " return ({\n", " 'data': tf.decode_raw(parsed['data'], tf.float64)\n", " }, parsed['labels'])\n", "\n", "def train_input_fn(training_dir, hyperparameters):\n", " ds = PipeModeDataset(channel='training', record_format='TFRecord')\n", " ds = ds.repeat(20)\n", " ds = ds.prefetch(10)\n", " ds = ds.map(parse, num_parallel_calls=10)\n", " ds = ds.batch(64)\n", " return ds\n", "```\n", "\n", "To run training job with Pipe input mode, pass in input_mode='Pipe' to your TensorFlow Estimator:\n", "\n", "```python\n", "from sagemaker.tensorflow import TensorFlow\n", "\n", "tf_estimator = TensorFlow(entry_point='tf-train-with-pipemodedataset.py', role='SageMakerRole',\n", " train_instance_count=1, train_instance_type='ml.c5.2xlarge',\n", " framework_version='1.12.0', input_mode='Pipe')\n", "\n", "tf_estimator.fit('s3://bucket/path/to/training/data')\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a training script that support pipemode datasets\n", "Create a copy of the script (training_script/cifar10_keras_sm.py) and save it as training_script/cifar10_keras_pipe.py.\n", "\n", "In cifar10_keras_pipe.py, import the PIpeModeDataset using:\n", "```python\n", "from sagemaker_tensorflow import PipeModeDataset\n", "```\n", "update \n", "```python\n", "def _input(epochs, batch_size, channel, channel_name):\n", "```\n", "to create the dataset variable using\n", "```python\n", "dataset = PipeModeDataset(channel=channel_name, record_format='TFRecord')\n", "```\n", "\n", "The new _input function should look as following:\n", "```python\n", "def _input(epochs, batch_size, channel, channel_name):\n", " dataset = PipeModeDataset(channel=channel_name, record_format='TFRecord')\n", "\n", " dataset = dataset.repeat(epochs)\n", " dataset = dataset.prefetch(10)\n", " ...\n", "```\n", "For info see the SageMaker-python-sdk [documentation](https://sagemaker.readthedocs.io/en/stable/using_tf.html#training-with-pipe-mode-using-pipemodedataset)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Run the previous job, this time use the new script (cifar10_keras_pipe.py)\n", "Run the job for 20 epochs and configure it with `input_mode='Pipe'`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import sagemaker\n", "from sagemaker import get_execution_role\n", "\n", "sagemaker_session = sagemaker.Session()\n", "\n", "role = get_execution_role()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load the SageMaker experiment" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from smexperiments.experiment import Experiment\n", "from smexperiments.trial import Trial\n", "import time\n", "cifar10_experiment = Experiment.load(\n", " experiment_name=\"TensorFlow-cifar10-experiment\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# create a new trial\n", "trial_name = f\"cifar10-training-job-pipemode-{int(time.time())}\"\n", "trial = Trial.create(\n", " trial_name=trial_name, \n", " experiment_name=cifar10_experiment.experiment_name\n", ")" ] }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [ "# Configure the dataset location variable\n", "dataset_location = sagemaker_session.upload_data(path='data', key_prefix='data/DEMO-cifar10')\n", "display(dataset_location)" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [ "metric_definitions = [\n", " {'Name': 'train:loss', 'Regex': 'loss: ([0-9\\\\.]+) - acc: [0-9\\\\.]+'},\n", " {'Name': 'train:accuracy', 'Regex': 'loss: [0-9\\\\.]+ - acc: ([0-9\\\\.]+)'},\n", " {'Name': 'validation:accuracy', 'Regex': 'val_loss: [0-9\\\\.]+ - val_acc: ([0-9\\\\.]+)'},\n", " {'Name': 'validation:loss', 'Regex': 'val_loss: ([0-9\\\\.]+) - val_acc: [0-9\\\\.]+'},\n", "]" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.tensorflow import TensorFlow\n", "# You should add the metric_definitions arguments to all of your jobs\n", "# Change base_job_name to 'cifar10-pipe' for console visibility\n", "# Remember to configure input_mode='Pipe' \n", "estimator = ... " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Connect the trial configured above to the job. add the experiment config to the fit function.\n", "```python\n", "experiment_config={\n", " \"ExperimentName\": cifar10_experiment.experiment_name, \n", " \"TrialName\": trial.trial_name,\n", " \"TrialComponentDisplayName\": \"Training\"}\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "estimator.fit({'train' : 'train_data_location',\n", " 'validation' : 'validation_data_location',\n", " 'eval' : 'eval_data_location'},\n", " experiment_config=)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Analyze the experiments" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "search_expression = {\n", " \"Filters\":[\n", " {\n", " \"Name\": \"DisplayName\",\n", " \"Operator\": \"Equals\",\n", " \"Value\": \"Training\",\n", " }\n", " ],\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd \n", "pd.options.display.max_columns = 500\n", "\n", "from sagemaker.analytics import ExperimentAnalytics\n", "trial_component_analytics = ExperimentAnalytics(\n", " sagemaker_session=sagemaker_session, \n", " experiment_name=cifar10_experiment.experiment_name,\n", " search_expression=search_expression\n", ")\n", "\n", "table = trial_component_analytics.dataframe(force_refresh=True)\n", "display(table)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Good job!** \n", "You can now use pipemode datasets. With big datasets it'll reduce the training time, and the local disk needs.\n", "Before continuing to the next notebook, look at the pipemode job metrics from CloudWatch and TensorBoard." ] } ], "metadata": { "kernelspec": { "display_name": "conda_tensorflow_p36", "language": "python", "name": "conda_tensorflow_p36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" }, "pycharm": { "stem_cell": { "cell_type": "raw", "source": [], "metadata": { "collapsed": false } } } }, "nbformat": 4, "nbformat_minor": 4 }