import json import multiprocessing import os import tempfile import tensorflow as tf from sagemaker_tensorflow import PipeModeDataset print("Starting estimator script") ds = PipeModeDataset("elizabeth") class BenchmarkConfig(object): def __init__(self): self.hp = json.load(open('/opt/ml/input/config/hyperparameters.json')) @property def batch_size(self): return int(self.hp.get('batch_size', 5)) @property def prefetch_size(self): return int(self.hp.get('prefetch_size', 1000)) @property def channel(self): return self.hp.get('channel', 'elizabeth') @property def dimension(self): return int(self.hp['dimension']) @property def epochs(self): return int(self.hp.get('epochs', 3)) @property def parallel_transform_calls(self): return int(self.hp.get('parallel_transform_calls', max(1, multiprocessing.cpu_count() - 2))) def __repr__(self): """Return all properties""" return str(vars(self)) config = BenchmarkConfig() def input_fn(): features = { 'data': tf.io.FixedLenFeature([], tf.string), 'labels': tf.io.FixedLenFeature([], tf.int64), } def parse(record): parsed = tf.io.parse_single_example(serialized=record, features=features) return ({ 'data': tf.io.decode_raw(parsed['data'], tf.float64) }, parsed['labels']) ds = PipeModeDataset(config.channel) if config.epochs > 1: ds = ds.repeat(config.epochs) if config.prefetch_size > 0: ds = ds.prefetch(config.prefetch_size) ds = ds.map(parse, num_parallel_calls=config.parallel_transform_calls) ds = ds.batch(config.batch_size) return ds # Perform Estimator training column = tf.feature_column.numeric_column('data', shape=(config.dimension, )) model_dir = tempfile.mkdtemp() estimator = tf.estimator.LinearClassifier(feature_columns=[column]) print("About to call train") estimator.train(input_fn=input_fn) # Confirm that we have read the correct number of pipes assert os.path.exists('/opt/ml/input/data/{}_{}'.format(config.channel, config.epochs + 1)) print("About to call evaluate") result = estimator.evaluate(input_fn=input_fn) for key, value in sorted(result.items()): print('%s: %s' % (key, value)) # Test that we can create a new PipeModeDataset after training has run print("Validate that new PipeModeDataset on existing channel can be created") tf.compat.v1.disable_eager_execution() ds = PipeModeDataset(config.channel) with tf.compat.v1.Session() as sess: it = tf.compat.v1.data.make_one_shot_iterator(ds) next = it.get_next() sess.run(next) print("Validate create, read, discard, recreate") # Test that we can create a PipeModeDataset, discard it, and read from a new one ds = PipeModeDataset(config.channel) with tf.compat.v1.Session() as sess: it = tf.compat.v1.data.make_one_shot_iterator(ds) next = it.get_next() with tf.compat.v1.Session() as sess: it = tf.compat.v1.data.make_one_shot_iterator(ds) next = it.get_next() sess.run(next) print("Validate recreate") ds = PipeModeDataset(config.channel)