# Training with Pipe Mode using PipeModeDataset
Amazon SageMaker allows users to create training jobs using Pipe input mode. With Pipe input mode, your dataset is streamed directly to your training instances instead of being downloaded first. This means that your training jobs start sooner, finish quicker, and need less disk space.

SageMaker TensorFlow provides an implementation of `tf.data.Dataset` that makes it easy to take advantage of Pipe input mode in SageMaker. You can replace your `tf.data.Dataset` with a `sagemaker_tensorflow.PipeModeDataset` to read TFRecords as they are streamed to your training instances.

In your entry_point script, you can use `PipeModeDataset` like a `Dataset`. In this example, we create a `PipeModeDataset` to read TFRecords from the ‘training’ channel:

```python
from sagemaker_tensorflow import PipeModeDataset

features = {
 'data': tf.FixedLenFeature([], tf.string),
 'labels': tf.FixedLenFeature([], tf.int64),
}

def parse(record):
 parsed = tf.parse_single_example(record, features)
 return ({
 'data': tf.decode_raw(parsed['data'], tf.float64)
 }, parsed['labels'])

def train_input_fn(training_dir, hyperparameters):
 ds = PipeModeDataset(channel='training', record_format='TFRecord')
 ds = ds.repeat(20)
 ds = ds.prefetch(10)
 ds = ds.map(parse, num_parallel_calls=10)
 ds = ds.batch(64)
 return ds
```

To run training job with Pipe input mode, pass in input_mode='Pipe' to your TensorFlow Estimator:

```python
from sagemaker.tensorflow import TensorFlow

tf_estimator = TensorFlow(entry_point='tf-train-with-pipemodedataset.py', role='SageMakerRole',
 train_instance_count=1, train_instance_type='ml.c5.2xlarge',
 framework_version='1.12.0', input_mode='Pipe')

tf_estimator.fit('s3://bucket/path/to/training/data')
```

## Create a training script that support pipemode datasets
Create a copy of the script (training_script/cifar10_keras_sm.py) and save it as training_script/cifar10_keras_pipe.py.

In cifar10_keras_pipe.py, import the PIpeModeDataset using:
```python
from sagemaker_tensorflow import PipeModeDataset
```
update 
```python
def _input(epochs, batch_size, channel, channel_name):
```
to create the dataset variable using
```python
dataset = PipeModeDataset(channel=channel_name, record_format='TFRecord')
```

The new _input function should look as following:
```python
def _input(epochs, batch_size, channel, channel_name):
 dataset = PipeModeDataset(channel=channel_name, record_format='TFRecord')

 dataset = dataset.repeat(epochs)
 dataset = dataset.prefetch(10)
 ...
```
For info see the SageMaker-python-sdk [documentation](https://sagemaker.readthedocs.io/en/stable/using_tf.html#training-with-pipe-mode-using-pipemodedataset)

Run the previous job, this time use the new script (cifar10_keras_pipe.py)
Run the job for 20 epochs and configure it with `input_mode='Pipe'`

In [None]:
import os
import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()

role = get_execution_role()

### Load the SageMaker experiment

In [None]:
from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
import time
cifar10_experiment = Experiment.load(
 experiment_name="TensorFlow-cifar10-experiment")

In [None]:
# create a new trial
trial_name = f"cifar10-training-job-pipemode-{int(time.time())}"
trial = Trial.create(
 trial_name=trial_name, 
 experiment_name=cifar10_experiment.experiment_name
)

In [None]:
# Configure the dataset location variable
dataset_location = sagemaker_session.upload_data(path='data', key_prefix='data/DEMO-cifar10')
display(dataset_location)

In [None]:
metric_definitions = [
 {'Name': 'train:loss', 'Regex': 'loss: ([0-9\\.]+) - acc: [0-9\\.]+'},
 {'Name': 'train:accuracy', 'Regex': 'loss: [0-9\\.]+ - acc: ([0-9\\.]+)'},
 {'Name': 'validation:accuracy', 'Regex': 'val_loss: [0-9\\.]+ - val_acc: ([0-9\\.]+)'},
 {'Name': 'validation:loss', 'Regex': 'val_loss: ([0-9\\.]+) - val_acc: [0-9\\.]+'},
]

In [None]:
from sagemaker.tensorflow import TensorFlow
# You should add the metric_definitions arguments to all of your jobs
# Change base_job_name to 'cifar10-pipe' for console visibility
# Remember to configure input_mode='Pipe' 
estimator = ... 

Connect the trial configured above to the job. add the experiment config to the fit function.
```python
experiment_config={
 "ExperimentName": cifar10_experiment.experiment_name, 
 "TrialName": trial.trial_name,
 "TrialComponentDisplayName": "Training"}
```

In [None]:
estimator.fit({'train' : 'train_data_location',
 'validation' : 'validation_data_location',
 'eval' : 'eval_data_location'},
 experiment_config=)

### Analyze the experiments

In [None]:
search_expression = {
 "Filters":[
 {
 "Name": "DisplayName",
 "Operator": "Equals",
 "Value": "Training",
 }
 ],
}

In [None]:
import pandas as pd 
pd.options.display.max_columns = 500

from sagemaker.analytics import ExperimentAnalytics
trial_component_analytics = ExperimentAnalytics(
 sagemaker_session=sagemaker_session, 
 experiment_name=cifar10_experiment.experiment_name,
 search_expression=search_expression
)

table = trial_component_analytics.dataframe(force_refresh=True)
display(table)

**Good job!** 
You can now use pipemode datasets. With big datasets it'll reduce the training time, and the local disk needs.
Before continuing to the next notebook, look at the pipemode job metrics from CloudWatch and TensorBoard.