{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "from IPython.display import display\n", "import json\n", "import matplotlib.pyplot as plt\n", "from multiprocessing import cpu_count, Pool\n", "import os\n", "import numpy as np\n", "import pickle\n", "from PIL import Image\n", "import random\n", "import sagemaker\n", "from sklearn.model_selection import train_test_split\n", "import ssl\n", "import tarfile\n", "import tensorflow as tf\n", "from tqdm import tqdm\n", "import urllib.request" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "## set s3 bucket\n", "sm_session = sagemaker.Session()\n", "bucket = sm_session.default_bucket()" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "CINIC-10.tar.gz: 688MB [25:46, 445kB/s] \n" ] } ], "source": [ "## download cinic 10 dataset\n", "# https://datashare.is.ed.ac.uk/handle/10283/3192\n", "\n", "# download progress bar\n", "class DownloadProgressBar(tqdm):\n", " def update_to(self, b=1, bsize=1, tsize=None):\n", " if tsize is not None:\n", " self.total = tsize\n", " self.update(b * bsize - self.n)\n", "\n", "def download_url(url, output_path):\n", " ssl._create_default_https_context = ssl._create_unverified_context\n", " with DownloadProgressBar(unit='B', unit_scale=True,\n", " miniters=1, desc=url.split('/')[-1]) as t:\n", " urllib.request.urlretrieve(url, filename=output_path, reporthook=t.update_to)\n", " \n", "\n", "# set up and start download\n", "cinic_filename = 'CINIC-10.tar.gz'\n", "cinic_download_url = 'https://datashare.is.ed.ac.uk/bitstream/handle/10283/3192/' + cinic_filename\n", "local_data_folder = '../data'\n", "cinic_extracted_folder = 'cinic-10'\n", "\n", "os.makedirs(local_data_folder, exist_ok=True)\n", "download_url(cinic_download_url, local_data_folder +'/'+ cinic_filename)" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "## extract files\n", "extracted_path = local_data_folder + '/' + cinic_extracted_folder\n", "tarfile.open(os.path.join(local_data_folder, cinic_filename), 'r:gz').extractall(extracted_path)" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "## train/validation/test split\n", "# load file names\n", "image_files = [os.path.join(r,file) for r,d,f in os.walk(extracted_path) for file in f if file.endswith('.png')]\n", "\n", "# shuffle and split\n", "train_filenames, validation_test_filenames = train_test_split(\n", " image_files, train_size=0.8, test_size=0.2, random_state=24601)\n", "validation_filenames, test_filenames = train_test_split(\n", " validation_test_filenames, train_size=0.5, test_size=0.5, random_state=24601)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "## visualize CINIC-10 data\n", "\n", "# plot the first 5 samples in test\n", "fig, ax = plt.subplots(1,5, figsize=(15,5))\n", "for i in range(5):\n", " \n", " # parse label\n", " label = train_filenames[i].split('/')[-2]\n", "\n", " # load and plot image\n", " image = Image.open(train_filenames[i])\n", " image_np = np.asarray(image)\n", " ax[i].set_axis_off()\n", " ax[i].imshow(image_np)\n", " ax[i].set_title(label)" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{0: 'airplane', 1: 'automobile', 2: 'bird', 3: 'cat', 4: 'deer', 5: 'dog', 6: 'frog', 7: 'horse', 8: 'ship', 9: 'truck'}\n" ] } ], "source": [ "## create label dictionary\n", "all_labels = [filename.split('/')[-2] for filename in image_files]\n", "all_labels = sorted(list(set(all_labels)))\n", "label_dictionary = {k:v for k,v in zip(all_labels, range(0,len(all_labels)))}\n", "inverted_label_dictionary = {v:k for k,v in label_dictionary.items()}\n", "\n", "label_dictionary_json = json.dumps(label_dictionary)\n", "with open(\"../data/label_dictionary.json\", \"w\") as f:\n", " f.write(label_dictionary_json)\n", " \n", "inverted_label_dictionary_json = json.dumps(inverted_label_dictionary)\n", "with open(\"../data/inverted_label_dictionary.json\", \"w\") as f:\n", " f.write(inverted_label_dictionary_json)\n", "\n", "print(inverted_label_dictionary)" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "## helper functions to build the tfrecords protocol buffer\n", "# https://www.tensorflow.org/tutorials/load_data/tfrecord\n", "\n", "def _bytes_feature(value):\n", " \"\"\"Returns a bytes_list from a string / byte.\"\"\"\n", " return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))\n", "\n", "def _int64_feature(value):\n", " \"\"\"Returns an int64_list from a bool / enum / int / uint.\"\"\"\n", " return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))\n", " \n", "def _float_feature(value):\n", " \"\"\"Returns a float_list from a float / double.\"\"\"\n", " return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "## functions to shard and convert to tfrecords\n", "\n", "# function to build the tfrecord file\n", "def create_tfrecords_in_shards(data_filenames, record_index, dataset_name):\n", " \n", " # set up paths\n", " output_path = '../data/sharded_tfrecords/{}'.format(dataset_name)\n", " output_file = '{}/{}_{:03d}.tfrecords'.format(output_path, dataset_name, record_index)\n", " os.makedirs(output_path, exist_ok=True)\n", " \n", " # set up tfrecord writer\n", " number_of_samples_written = 0\n", " with tf.io.TFRecordWriter(output_file) as record_writer:\n", "\n", " # write each entry to tfrecord\n", " num_entries_in_batch = len(data_filenames)\n", " for input_file in data_filenames:\n", " # load label and image\n", " label_str = input_file.split('/')[-2]\n", " label_int = label_dictionary[label_str]\n", " image = Image.open(input_file).convert('RGB')\n", " image_np = np.asarray(image)\n", " \n", " # encode and write to tfrecord\n", " example = tf.train.Example(features=tf.train.Features(\n", " feature={\n", " 'image': _bytes_feature(image_np.tobytes()),\n", " 'label': _int64_feature(label_int)\n", " }))\n", " record_writer.write(example.SerializeToString())\n", " number_of_samples_written += 1\n", "\n", " return number_of_samples_written, output_file\n", "\n", "\n", "# multiprocessing wrapper function\n", "def multi_create_tfrecords_in_shards(data_filenames, dataset_name, num_files_to_create, num_workers):\n", " \n", " # split up arguments to shard\n", " split_data_filenames = np.array_split(data_filenames, num_files_to_create)\n", " tfrecord_args = zip(split_data_filenames, range(0, num_files_to_create), [dataset_name]*num_files_to_create)\n", "\n", " # create the shards\n", " with Pool(processes=num_workers) as pool:\n", " tfrecords_func_output = pool.starmap(create_tfrecords_in_shards, tfrecord_args)\n", " sample_counts = [x[0] for x in tfrecords_func_output]\n", " output_files = [x[1] for x in tfrecords_func_output]\n", " \n", " return sum(sample_counts), output_files" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "## create tfrecords for train, validation and test set\n", "\n", "num_files_to_create = 96 # some multiple of 8 as we will use 8 GPUs\n", "num_workers = cpu_count()-1\n", "\n", "num_train_samples, train_tfrecords = multi_create_tfrecords_in_shards(\n", " data_filenames=train_filenames,\n", " dataset_name='train',\n", " num_files_to_create=num_files_to_create,\n", " num_workers=num_workers,)\n", "\n", "num_validation_samples, validation_tfrecords = multi_create_tfrecords_in_shards(\n", " data_filenames=validation_filenames,\n", " dataset_name='validation',\n", " num_files_to_create=num_files_to_create,\n", " num_workers=num_workers,)\n", "\n", "num_test_samples, test_tfrecords = multi_create_tfrecords_in_shards(\n", " data_filenames=test_filenames,\n", " dataset_name='test',\n", " num_files_to_create=num_files_to_create,\n", " num_workers=num_workers,)" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Wrote 216000 samples in the train set.\n", "Wrote 27000 samples in the validation set.\n", "Wrote 27000 samples in the test set.\n" ] } ], "source": [ "print(\"Wrote {} samples in the train set.\".format(num_train_samples))\n", "print(\"Wrote {} samples in the validation set.\".format(num_validation_samples))\n", "print(\"Wrote {} samples in the test set.\".format(num_test_samples))" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "## parse and confirm data in tfrecords\n", "\n", "# load and test the tf record files\n", "dataset = tf.data.TFRecordDataset(train_tfrecords)\n", "\n", "# Create a dictionary describing the features.\n", "sample_feature_description = {\n", " 'image': tf.io.FixedLenFeature([], tf.string),\n", " 'label': tf.io.FixedLenFeature([], tf.int64),\n", "}\n", "\n", "def _parse_sample_function(example_proto):\n", " return tf.io.parse_single_example(example_proto, sample_feature_description)\n", "\n", "parsed_sample_dataset = dataset.map(_parse_sample_function)\n", "\n", "for i, sample_features in enumerate(parsed_sample_dataset):\n", " image = tf.io.decode_raw(sample_features['image'], tf.uint8)\n", " image = tf.reshape(image, (32,32,3))\n", " \n", " label = sample_features['label']\n", " \n", " if i==1000: break" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "dog\n" ] } ], "source": [ "display(Image.fromarray(image.numpy()).resize((128,128)))\n", "print(inverted_label_dictionary[label.numpy()])" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Counted 216000 samples in train set.\n", "Counted 27000 samples in validation set.\n", "Counted 27000 samples in test set.\n" ] } ], "source": [ "## count number of samples in tfrecords\n", "\n", "train_path = f'{local_data_folder}/sharded_tfrecords/train/'\n", "files = [os.path.join(r,file) for r,d,f in os.walk(train_path) for file in f]\n", "dataset = tf.data.TFRecordDataset(files)\n", "num_train_samples = sum(1 for _ in dataset)\n", "print(\"Counted {} samples in train set.\".format(num_train_samples))\n", "\n", "validation_path = f'{local_data_folder}/sharded_tfrecords/validation/'\n", "files = [os.path.join(r,file) for r,d,f in os.walk(validation_path) for file in f]\n", "dataset = tf.data.TFRecordDataset(files)\n", "num_validation_samples = sum(1 for _ in dataset)\n", "print(\"Counted {} samples in validation set.\".format(num_validation_samples))\n", "\n", "test_path = f'{local_data_folder}/sharded_tfrecords/test/'\n", "files = [os.path.join(r,file) for r,d,f in os.walk(test_path) for file in f]\n", "dataset = tf.data.TFRecordDataset(files)\n", "num_test_samples = sum(1 for _ in dataset)\n", "print(\"Counted {} samples in test set.\".format(num_test_samples))" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "## upload to s3\n", "for dataset in ['train', 'validation', 'test']:\n", " prefix = 'distributed_training_demo/data/' + dataset\n", " !aws s3 cp ../data/sharded_tfrecords/{dataset} s3://{bucket}/{prefix} --recursive --quiet" ] } ], "metadata": { "kernelspec": { "display_name": "conda_tensorflow2_p36", "language": "python", "name": "conda_tensorflow2_p36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.10" } }, "nbformat": 4, "nbformat_minor": 4 }