# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. # Permission is hereby granted, free of charge, to any person obtaining a copy of # this software and associated documentation files (the "Software"), to deal in # the Software without restriction, including without limitation the rights to # use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of # the Software, and to permit persons to whom the Software is furnished to do so. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS # FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR # COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER # IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # Original license # Copyright 2018 The TensorFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """Efficient ImageNet input pipeline using tf.data.Dataset.""" from __future__ import absolute_import from __future__ import division from __future__ import print_function import abc from collections import namedtuple import functools import os import tensorflow as tf import preprocessing from utils import data_utils from utils import hvd_utils # for dali processing def list_filenames_in_dataset(data_dir, mode, count=True): if mode not in ["train", 'validation']: raise ValueError("Unknown mode received: %s" % mode) if not os.path.exists(data_dir): raise FileNotFoundError("The data directory: `%s` can't be found" % data_dir) filename_pattern = os.path.join(data_dir, '%s-*' % mode) file_list = sorted(tf.gfile.Glob(filename_pattern)) num_samples = 0 if count: def count_records(tf_record_filename): count = 0 for _ in tf.python_io.tf_record_iterator(tf_record_filename): count += 1 return count n_files = len(file_list) num_samples = (count_records(file_list[0]) * (n_files - 1) + count_records(file_list[-1])) return file_list, num_samples def parse_dali_idx_dataset(data_idx_dir, mode): if data_idx_dir is not None: filenames, _ = list_filenames_in_dataset(data_dir=data_idx_dir, mode=mode, count=False) return filenames def build_image_serving_input_fn(image_size): """Builds a serving input fn for raw images.""" def _image_serving_input_fn(): """Serving input fn for raw images.""" def _preprocess_image(image_bytes): """Preprocess a single raw image.""" image = preprocessing.preprocess_image( image_bytes=image_bytes, is_training=False, image_size=image_size) return image image_bytes_list = tf.placeholder( shape=[None], dtype=tf.string, ) images = tf.map_fn( _preprocess_image, image_bytes_list, back_prop=False, dtype=tf.float32) return tf.estimator.export.ServingInputReceiver( images, {'image_bytes': image_bytes_list}) return _image_serving_input_fn class ImageNetTFExampleInput(object): """Base class for ImageNet input_fn generator. Args: is_training: `bool` for whether the input is for training use_bfloat16: If True, use bfloat16 precision; else use float32. num_cores: `int` for the number of TPU cores image_size: `int` for image size (both width and height). transpose_input: 'bool' for whether to use the double transpose trick """ __metaclass__ = abc.ABCMeta def __init__(self, is_training, use_bfloat16, num_cores=64, image_size=224, transpose_input=False): self.image_preprocessing_fn = preprocessing.preprocess_image self.is_training = is_training self.use_bfloat16 = use_bfloat16 self.num_cores = num_cores self.transpose_input = transpose_input self.image_size = image_size def set_shapes(self, batch_size, images, labels): """Statically set the batch_size dimension.""" if self.transpose_input: images.set_shape(images.get_shape().merge_with( tf.TensorShape([None, None, None, batch_size]))) labels.set_shape(labels.get_shape().merge_with( tf.TensorShape([batch_size]))) else: images.set_shape(images.get_shape().merge_with( tf.TensorShape([batch_size, None, None, None]))) labels.set_shape(labels.get_shape().merge_with( tf.TensorShape([batch_size]))) return images, labels def dataset_parser(self, value): """Parses an image and its label from a serialized ResNet-50 TFExample. Args: value: serialized string containing an ImageNet TFExample. Returns: Returns a tuple of (image, label) from the TFExample. """ keys_to_features = { 'image/encoded': tf.FixedLenFeature((), tf.string, ''), 'image/class/label': tf.FixedLenFeature([], tf.int64, -1), } parsed = tf.parse_single_example(value, keys_to_features) image_bytes = tf.reshape(parsed['image/encoded'], shape=[]) image = self.image_preprocessing_fn( image_bytes=image_bytes, is_training=self.is_training, image_size=self.image_size, use_bfloat16=self.use_bfloat16) # Subtract one so that labels are in [0, 1000). label = tf.cast( tf.reshape(parsed['image/class/label'], shape=[]), dtype=tf.int32) - 1 return image, label @abc.abstractmethod def make_source_dataset(self, index, num_hosts): """Makes dataset of serialized TFExamples. The returned dataset will contain `tf.string` tensors, but these strings are serialized `TFExample` records that will be parsed by `dataset_parser`. If self.is_training, the dataset should be infinite. Args: index: current host index. num_hosts: total number of hosts. Returns: A `tf.data.Dataset` object. """ return def input_fn(self, params): """Input function which provides a single batch for train or eval. Args: params: `dict` of parameters passed from the `TPUEstimator`. `params['batch_size']` is always provided and should be used as the effective batch size. Returns: A `tf.data.Dataset` object. """ # Retrieves the batch size for the current shard. The # of shards is # computed according to the input pipeline deployment. See # tf.contrib.tpu.RunConfig for details. batch_size = params['batch_size'] # Mnas optimize - added hvd to params to enable sharding for horovod if 'hvd' in params and self.is_training: tf.logging.info('Mnas optimize - hvd data sharding. Curr host: %d. Num hosts: %d', \ params['hvd_curr_host'], params['hvd_num_hosts']) current_host = params['hvd_curr_host'] num_hosts = params['hvd_num_hosts'] elif 'context' in params: current_host = params['context'].current_input_fn_deployment()[1] num_hosts = params['context'].num_hosts else: current_host = 0 num_hosts = 1 dataset = self.make_source_dataset(current_host, num_hosts) # Use the fused map-and-batch operation. # # For XLA, we must used fixed shapes. Because we repeat the source training # dataset indefinitely, we can use `drop_remainder=True` to get fixed-size # batches without dropping any training examples. # # When evaluating, `drop_remainder=True` prevents accidentally evaluating # the same image twice by dropping the final batch if it is less than a full # batch size. As long as this validation is done with consistent batch size, # exactly the same images will be used. dataset = dataset.apply( tf.contrib.data.map_and_batch( self.dataset_parser, batch_size=batch_size, num_parallel_batches=self.num_cores, drop_remainder=True)) # Transpose for performance on TPU if self.transpose_input: dataset = dataset.map( lambda images, labels: (tf.transpose(images, [1, 2, 3, 0]), labels), num_parallel_calls=self.num_cores) # Assign static batch size dimension dataset = dataset.map(functools.partial(self.set_shapes, batch_size), num_parallel_calls=self.num_cores) # Prefetch overlaps in-feed with training dataset = dataset.prefetch(tf.contrib.data.AUTOTUNE) return dataset class ImageNetInput(ImageNetTFExampleInput): """Generates ImageNet input_fn from a series of TFRecord files. The training data is assumed to be in TFRecord format with keys as specified in the dataset_parser below, sharded across 1024 files, named sequentially: train-00000-of-01024 train-00001-of-01024 ... train-01023-of-01024 The validation data is in the same format but sharded in 128 files. The format of the data required is created by the script at: https://github.com/tensorflow/tpu/blob/master/tools/datasets/imagenet_to_gcs.py """ def __init__(self, is_training, use_bfloat16, transpose_input, data_dir, image_size=224, num_parallel_calls=16, cache=False): """Create an input from TFRecord files. Args: is_training: `bool` for whether the input is for training use_bfloat16: If True, use bfloat16 precision; else use float32. transpose_input: 'bool' for whether to use the double transpose trick data_dir: `str` for the directory of the training and validation data; if 'null' (the literal string 'null') or implicitly False then construct a null pipeline, consisting of empty images and blank labels. image_size: `int` for image size (both width and height). num_parallel_calls: concurrency level to use when reading data from disk. cache: if true, fill the dataset by repeating from its cache """ super(ImageNetInput, self).__init__( is_training=is_training, image_size=image_size, use_bfloat16=use_bfloat16, transpose_input=transpose_input) self.data_dir = data_dir if self.data_dir == 'null' or not self.data_dir: self.data_dir = None self.num_parallel_calls = num_parallel_calls self.cache = cache def _get_null_input(self, data): """Returns a null image (all black pixels). Args: data: element of a dataset, ignored in this method, since it produces the same null image regardless of the element. Returns: a tensor representing a null image. """ del data # Unused since output is constant regardless of input return tf.zeros([self.image_size, self.image_size, 3], tf.bfloat16 if self.use_bfloat16 else tf.float32) def dataset_parser(self, value): """See base class.""" if not self.data_dir: return value, tf.constant(0, tf.int32) return super(ImageNetInput, self).dataset_parser(value) def make_source_dataset(self, index, num_hosts): """See base class.""" if not self.data_dir: tf.logging.info('Undefined data_dir implies null input') return tf.data.Dataset.range(1).repeat().map(self._get_null_input) # Shuffle the filenames to ensure better randomization. file_pattern = os.path.join( self.data_dir, 'train-*' if self.is_training else 'validation-*') # For multi-host training, we want each hosts to always process the same # subset of files. Each host only sees a subset of the entire dataset, # allowing us to cache larger datasets in memory. dataset = tf.data.Dataset.list_files(file_pattern, shuffle=False) dataset = dataset.shard(num_hosts, index) if self.is_training and not self.cache: dataset = dataset.repeat() def fetch_dataset(filename): buffer_size = 8 * 1024 * 1024 # 8 MiB per file dataset = tf.data.TFRecordDataset(filename, buffer_size=buffer_size) return dataset # Read the data from disk in parallel dataset = dataset.apply( tf.contrib.data.parallel_interleave( fetch_dataset, cycle_length=self.num_parallel_calls, sloppy=True, prefetch_input_elements=16)) if self.cache: dataset = dataset.cache().apply( tf.contrib.data.shuffle_and_repeat(1024*16)) else: dataset = dataset.shuffle(1024) return dataset # for dali def train_data_fn(self, params): data_dir = '/data' data_index_dir = '/data/index_files' if self.is_training: mode = 'train' else: mode = 'validation' if data_dir is not None: #if hvd.rank() == 0: tf.logging.info("Using DALI input... ") filenames, num_samples = list_filenames_in_dataset(data_dir=data_dir, mode=mode) idx_filenames = parse_dali_idx_dataset(data_idx_dir=data_index_dir,mode=mode) return data_utils.get_dali_input_fn( filenames=filenames, idx_filenames=idx_filenames, batch_size=params['batch_size'], height=self.image_size, width=self.image_size, training=self.is_training, distort_color=False, num_threads=4, deterministic=False ) # Defines a selection of data from a Cloud Bigtable. BigtableSelection = namedtuple('BigtableSelection', ['project', 'instance', 'table', 'prefix', 'column_family', 'column_qualifier']) class ImageNetBigtableInput(ImageNetTFExampleInput): """Generates ImageNet input_fn from a Bigtable for training or evaluation. """ def __init__(self, is_training, use_bfloat16, transpose_input, selection): """Constructs an ImageNet input from a BigtableSelection. Args: is_training: `bool` for whether the input is for training use_bfloat16: If True, use bfloat16 precision; else use float32. transpose_input: 'bool' for whether to use the double transpose trick selection: a BigtableSelection specifying a part of a Bigtable. """ super(ImageNetBigtableInput, self).__init__( is_training=is_training, use_bfloat16=use_bfloat16, transpose_input=transpose_input) self.selection = selection def make_source_dataset(self, index, num_hosts): """See base class.""" data = self.selection client = tf.contrib.cloud.BigtableClient(data.project, data.instance) table = client.table(data.table) ds = table.parallel_scan_prefix(data.prefix, columns=[(data.column_family, data.column_qualifier)]) # The Bigtable datasets will have the shape (row_key, data) ds_data = ds.map(lambda index, data: data) if self.is_training: ds_data = ds_data.repeat() return ds_data