{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import numpy as np\n", "import keras\n", "import tensorflow as tf\n", "import pandas as pd\n", "import glob\n", "import json\n", "import random\n", "import time\n", "from PIL import Image\n", "\n", "BATCH_SIZE = 128\n", "TRAINING_SPLIT = 0.8\n", "EPOCHS = 20\n", "\n", "prefix = '/opt/ml/'\n", "input_path = prefix + 'input/data'\n", "output_path = os.path.join(prefix, 'output')\n", "model_path = os.path.join(prefix, 'model')\n", "param_path = os.path.join(prefix, 'input/config/hyperparameters.json')\n", "\n", "model_loc = os.path.join(model_path, 'car-model.pkl')\n", "\n", "# This algorithm has a single channel of input data called 'training'. Since we run in\n", "# File mode, the input files are copied to the directory specified here.\n", "channel_name='training'\n", "training_path = os.path.join(input_path, channel_name)\n", "\n", "INPUT_TENSOR_NAME = \"inputs\"\n", "SIGNATURE_NAME = \"serving_default\"\n", "LEARNING_RATE = 0.001\n", "\n", "def train():\n", "\n", " print('Starting the training.')\n", " try:\n", " # Read in any hyperparameters that the user passed with the training job\n", " with open(param_path, 'r') as tc:\n", " trainingParams = json.load(tc)\n", "\n", " input_files = [ os.path.join(training_path, file) for file in os.listdir(training_path) ]\n", " if len(input_files) == 0:\n", " raise ValueError(('There are no files in {}.\\n' +\n", " 'This usually indicates that the channel ({}) was incorrectly specified,\\n' +\n", " 'the data specification in S3 was incorrectly specified or the role specified\\n' +\n", " 'does not have permission to access the data.').format(training_path, channel_name))\n", "\n", " tubgroup = TubGroup(input_path)\n", "\n", "\n", " total_records = len(tubgroup.df)\n", " total_train = int(total_records * TRAINING_SPLIT)\n", " total_val = total_records - total_train\n", " steps_per_epoch = total_train // BATCH_SIZE\n", " X_keys = ['cam/image_array']\n", " y_keys = ['user/angle', 'user/throttle']\n", " train_gen, val_gen = tubgroup.get_train_val_gen(X_keys, y_keys, record_transform=rt,\n", " batch_size=BATCH_SIZE,\n", " train_frac=TRAINING_SPLIT)\n", " save_best = keras.callbacks.ModelCheckpoint(model_loc, \n", " monitor='val_loss', \n", " verbose=1, \n", " save_best_only=True, \n", " mode='min')\n", "\n", " #stop training if the validation error stops improving.\n", " early_stop = keras.callbacks.EarlyStopping(monitor='val_loss', \n", " min_delta=0.0005, \n", " patience=5, \n", " verbose=1, \n", " mode='auto')\n", " callbacks_list = [save_best]\n", " callbacks_list.append(early_stop)\n", " model = default_categorical()\n", " hist = model.fit_generator(\n", " train_gen, \n", " steps_per_epoch=11, \n", " epochs=EPOCHS, \n", " verbose=1, \n", " validation_data=val_gen,\n", " callbacks=callbacks_list, \n", " validation_steps=11*(1.0 - TRAINING_SPLIT))\n", " print('Training complete.')\n", " \n", " except Exception as e:\n", " # Write out an error file. This will be returned as the failureReason in the\n", " # DescribeTrainingJob result.\n", " trc = traceback.format_exc()\n", " with open(os.path.join(output_path, 'failure'), 'w') as s:\n", " s.write('Exception during training: ' + str(e) + '\\n' + trc)\n", " # Printing this causes the exception to be in the training job logs, as well.\n", " print('Exception during training: ' + str(e) + '\\n' + trc, file=sys.stderr)\n", " # A non-zero exit code causes the training job to be marked as Failed.\n", " sys.exit(255)\n", " \n", "if __name__ == '__main__':\n", " train()\n", "\n", " # A zero exit code causes the job to be marked a Succeeded.\n", " sys.exit(0)\n", "\n", "\n", "class Tub(object):\n", " \"\"\"\n", " A datastore to store sensor data in a key, value format.\n", "\n", " Accepts str, int, float, image_array, image, and array data types.\n", "\n", " For example:\n", "\n", " #Create a tub to store speed values.\n", " >>> path = '~/mydonkey/test_tub'\n", " >>> inputs = ['user/speed', 'cam/image']\n", " >>> types = ['float', 'image']\n", " >>> t=Tub(path=path, inputs=inputs, types=types)\n", "\n", " \"\"\"\n", "\n", " def __init__(self, path, inputs=None, types=None):\n", "\n", " self.path = os.path.expanduser(path)\n", " print('path_in_tub:', self.path)\n", " self.meta_path = os.path.join(self.path, 'meta.json')\n", " self.df = None\n", "\n", " exists = os.path.exists(self.path)\n", "\n", " if exists:\n", " #load log and meta\n", " print(\"Tub exists: {}\".format(self.path))\n", " with open(self.meta_path, 'r') as f:\n", " self.meta = json.load(f)\n", " self.current_ix = self.get_last_ix() + 1\n", "\n", " elif not exists and inputs:\n", " print('Tub does NOT exist. Creating new tub...')\n", " #create log and save meta\n", " os.makedirs(self.path)\n", " self.meta = {'inputs': inputs, 'types': types}\n", " with open(self.meta_path, 'w') as f:\n", " json.dump(self.meta, f)\n", " self.current_ix = 0\n", " print('New tub created at: {}'.format(self.path))\n", " else:\n", " msg = \"The tub path you provided doesn't exist and you didnt pass any meta info (inputs & types)\" + \\\n", " \"to create a new tub. Please check your tub path or provide meta info to create a new tub.\"\n", "\n", " raise AttributeError(msg)\n", "\n", " self.start_time = time.time()\n", "\n", "\n", " def get_last_ix(self):\n", " index = self.get_index()\n", " return max(index)\n", "\n", " def update_df(self):\n", " df = pd.DataFrame([self.get_json_record(i) for i in self.get_index(shuffled=False)])\n", " self.df = df\n", "\n", " def get_df(self):\n", " if self.df is None:\n", " self.update_df()\n", " return self.df\n", "\n", "\n", " def get_index(self, shuffled=True):\n", " files = next(os.walk(self.path))[2]\n", " record_files = [f for f in files if f[:6]=='record']\n", " \n", " def get_file_ix(file_name):\n", " try:\n", " name = file_name.split('.')[0]\n", " num = int(name.split('_')[1])\n", " except:\n", " num = 0\n", " return num\n", "\n", " nums = [get_file_ix(f) for f in record_files]\n", " \n", " if shuffled:\n", " random.shuffle(nums)\n", " else:\n", " nums = sorted(nums)\n", " \n", " return nums \n", "\n", "\n", " @property\n", " def inputs(self):\n", " return list(self.meta['inputs'])\n", "\n", " @property\n", " def types(self):\n", " return list(self.meta['types'])\n", "\n", " def get_input_type(self, key):\n", " input_types = dict(zip(self.inputs, self.types))\n", " return input_types.get(key)\n", "\n", " def write_json_record(self, json_data):\n", " path = self.get_json_record_path(self.current_ix)\n", " try:\n", " with open(path, 'w') as fp:\n", " json.dump(json_data, fp)\n", " #print('wrote record:', json_data)\n", " except TypeError:\n", " print('troubles with record:', json_data)\n", " except FileNotFoundError:\n", " raise\n", " except:\n", " print(\"Unexpected error:\", sys.exc_info()[0])\n", " raise\n", "\n", " def get_num_records(self):\n", " import glob\n", " files = glob.glob(os.path.join(self.path, 'record_*.json'))\n", " return len(files)\n", "\n", "\n", " def make_record_paths_absolute(self, record_dict):\n", " #make paths absolute\n", " d = {}\n", " for k, v in record_dict.items():\n", " if type(v) == str: #filename\n", " if '.' in v:\n", " v = os.path.join(self.path, v)\n", " d[k] = v\n", "\n", " return d\n", "\n", "\n", "\n", "\n", " def check(self, fix=False):\n", " '''\n", " Iterate over all records and make sure we can load them.\n", " Optionally remove records that cause a problem.\n", " '''\n", " print('Checking tub:%s.' % self.path)\n", " print('Found: %d records.' % self.get_num_records())\n", " problems = False\n", " for ix in self.get_index(shuffled=False):\n", " try:\n", " self.get_record(ix)\n", " except:\n", " problems = True\n", " if fix == False:\n", " print('problems with record:', self.path, ix)\n", " else:\n", " print('problems with record, removing:', self.path, ix)\n", " self.remove_record(ix)\n", " if not problems:\n", " print(\"No problems found.\")\n", "\n", " def remove_record(self, ix):\n", " '''\n", " remove data associate with a record\n", " '''\n", " record = self.get_json_record_path(ix)\n", " os.unlink(record)\n", "\n", " def put_record(self, data):\n", " \"\"\"\n", " Save values like images that can't be saved in the csv log and\n", " return a record with references to the saved values that can\n", " be saved in a csv.\n", " \"\"\"\n", " json_data = {}\n", " self.current_ix += 1\n", " \n", " for key, val in data.items():\n", " typ = self.get_input_type(key)\n", "\n", " if typ in ['str', 'float', 'int', 'boolean']:\n", " json_data[key] = val\n", "\n", " elif typ is 'image':\n", " path = self.make_file_path(key)\n", " val.save(path)\n", " json_data[key]=path\n", "\n", " elif typ == 'image_array':\n", " img = Image.fromarray(np.uint8(val))\n", " name = self.make_file_name(key, ext='.jpg')\n", " img.save(os.path.join(self.path, name))\n", " json_data[key]=name\n", "\n", " else:\n", " msg = 'Tub does not know what to do with this type {}'.format(typ)\n", " raise TypeError(msg)\n", "\n", " self.write_json_record(json_data)\n", " return self.current_ix\n", "\n", "\n", " def get_json_record_path(self, ix):\n", " return os.path.join(self.path, 'record_'+str(ix)+'.json')\n", "\n", " def get_json_record(self, ix):\n", " path = self.get_json_record_path(ix)\n", " try:\n", " with open(path, 'r') as fp:\n", " json_data = json.load(fp)\n", " except UnicodeDecodeError:\n", " raise Exception('bad record: %d. You may want to run `python manage.py check --fix`' % ix)\n", " except FileNotFoundError:\n", " raise\n", " except:\n", " print(\"Unexpected error:\", sys.exc_info()[0])\n", " raise\n", "\n", " record_dict = self.make_record_paths_absolute(json_data)\n", " return record_dict\n", "\n", "\n", " def get_record(self, ix):\n", "\n", " json_data = self.get_json_record(ix)\n", " data = self.read_record(json_data)\n", " return data\n", "\n", "\n", "\n", " def read_record(self, record_dict):\n", " data={}\n", " for key, val in record_dict.items():\n", " typ = self.get_input_type(key)\n", "\n", " #load objects that were saved as separate files\n", " if typ == 'image_array':\n", " img = Image.open((val))\n", " val = np.array(img)\n", "\n", " data[key] = val\n", "\n", "\n", " return data\n", "\n", "\n", " def make_file_name(self, key, ext='.png'):\n", " name = '_'.join([str(self.current_ix), key, ext])\n", " name = name = name.replace('/', '-')\n", " return name\n", "\n", " def delete(self):\n", " \"\"\" Delete the folder and files for this tub. \"\"\"\n", " import shutil\n", " shutil.rmtree(self.path)\n", "\n", " def shutdown(self):\n", " pass\n", "\n", "\n", " def get_record_gen(self, record_transform=None, shuffle=True, df=None):\n", "\n", " if df is None:\n", " df = self.get_df()\n", "\n", "\n", " while True:\n", " for row in self.df.iterrows():\n", " if shuffle:\n", " record_dict = df.sample(n=1).to_dict(orient='record')[0]\n", "\n", " if record_transform:\n", " record_dict = record_transform(record_dict)\n", "\n", " record_dict = self.read_record(record_dict)\n", "\n", " yield record_dict\n", "\n", "\n", " def get_batch_gen(self, keys, record_transform=None, batch_size=128, shuffle=True, df=None):\n", "\n", " record_gen = self.get_record_gen(record_transform, shuffle=shuffle, df=df)\n", "\n", " if keys == None:\n", " keys = list(self.df.columns)\n", "\n", " while True:\n", " record_list = []\n", " for _ in range(batch_size):\n", " record_list.append(next(record_gen))\n", "\n", " batch_arrays = {}\n", " for i, k in enumerate(keys):\n", " arr = np.array([r[k] for r in record_list])\n", " # if len(arr.shape) == 1:\n", " # arr = arr.reshape(arr.shape + (1,))\n", " batch_arrays[k] = arr\n", "\n", " yield batch_arrays\n", "\n", "\n", " def get_train_gen(self, X_keys, Y_keys, batch_size=128, record_transform=None, df=None):\n", "\n", " batch_gen = self.get_batch_gen(X_keys + Y_keys,\n", " batch_size=batch_size, record_transform=record_transform, df=df)\n", "\n", " while True:\n", " batch = next(batch_gen)\n", " X = [batch[k] for k in X_keys]\n", " Y = [batch[k] for k in Y_keys]\n", " yield X, Y\n", "\n", "\n", " def get_train_val_gen(self, X_keys, Y_keys, batch_size=128, record_transform=None, train_frac=.8):\n", " train_df = train=self.df.sample(frac=train_frac,random_state=200)\n", " val_df = self.df.drop(train_df.index)\n", "\n", " train_gen = self.get_train_gen(X_keys=X_keys, Y_keys=Y_keys, batch_size=batch_size,\n", " record_transform=record_transform, df=train_df)\n", "\n", " val_gen = self.get_train_gen(X_keys=X_keys, Y_keys=Y_keys, batch_size=batch_size,\n", " record_transform=record_transform, df=val_df)\n", "\n", " return train_gen, val_gen\n", "\n", "\n", "\n", "\n", "\n", "\n", "class TubHandler():\n", " def __init__(self, path):\n", " self.path = os.path.expanduser(path)\n", "\n", " def get_tub_list(self,path):\n", " folders = next(os.walk(path))[1]\n", " return folders\n", "\n", " def next_tub_number(self, path):\n", " def get_tub_num(tub_name):\n", " try:\n", " num = int(tub_name.split('_')[1])\n", " except:\n", " num = 0\n", " return num\n", "\n", " folders = self.get_tub_list(path)\n", " numbers = [get_tub_num(x) for x in folders]\n", " #numbers = [i for i in numbers if i is not None]\n", " next_number = max(numbers+[0]) + 1\n", " return next_number\n", "\n", " def create_tub_path(self):\n", " tub_num = self.next_tub_number(self.path)\n", " date = datetime.datetime.now().strftime('%y-%m-%d')\n", " name = '_'.join(['tub',str(tub_num),date])\n", " tub_path = os.path.join(self.path, name)\n", " return tub_path\n", "\n", " def new_tub_writer(self, inputs, types):\n", " tub_path = self.create_tub_path()\n", " tw = TubWriter(path=tub_path, inputs=inputs, types=types)\n", " return tw\n", "\n", "\n", "\n", "class TubImageStacker(Tub):\n", " '''\n", " A Tub for training a NN with images that are the last three records stacked \n", " togther as 3 channels of a single image. The idea is to give a simple feedforward\n", " NN some chance of building a model based on motion.\n", " If you drive with the ImageFIFO part, then you don't need this.\n", " Just make sure your inference pass uses the ImageFIFO that the NN will now expect.\n", " '''\n", " \n", " def rgb2gray(self, rgb):\n", " '''\n", " take a numpy rgb image return a new single channel image converted to greyscale\n", " '''\n", " return np.dot(rgb[...,:3], [0.299, 0.587, 0.114])\n", "\n", " def stack3Images(self, img_a, img_b, img_c):\n", " '''\n", " convert 3 rgb images into grayscale and put them into the 3 channels of\n", " a single output image\n", " '''\n", " width, height, _ = img_a.shape\n", "\n", " gray_a = self.rgb2gray(img_a)\n", " gray_b = self.rgb2gray(img_b)\n", " gray_c = self.rgb2gray(img_c)\n", " \n", " img_arr = np.zeros([width, height, 3], dtype=np.dtype('B'))\n", "\n", " img_arr[...,0] = np.reshape(gray_a, (width, height))\n", " img_arr[...,1] = np.reshape(gray_b, (width, height))\n", " img_arr[...,2] = np.reshape(gray_c, (width, height))\n", "\n", " return img_arr\n", "\n", " def get_record(self, ix):\n", " '''\n", " get the current record and two previous.\n", " stack the 3 images into a single image.\n", " '''\n", " data = super(TubImageStacker, self).get_record(ix)\n", "\n", " if ix > 1:\n", " data_ch1 = super(TubImageStacker, self).get_record(ix - 1)\n", " data_ch0 = super(TubImageStacker, self).get_record(ix - 2)\n", "\n", " json_data = self.get_json_record(ix)\n", " for key, val in json_data.items():\n", " typ = self.get_input_type(key)\n", "\n", " #load objects that were saved as separate files\n", " if typ == 'image':\n", " val = self.stack3Images(data_ch0[key], data_ch1[key], data[key])\n", " data[key] = val\n", " elif typ == 'image_array':\n", " img = self.stack3Images(data_ch0[key], data_ch1[key], data[key])\n", " val = np.array(img)\n", "\n", " return data\n", "\n", "\n", "\n", "class TubTimeStacker(TubImageStacker):\n", " '''\n", " A Tub for training N with records stacked through time. \n", " The idea here is to force the network to learn to look ahead in time.\n", " Init with an array of time offsets from the current time.\n", " '''\n", "\n", " def __init__(self, frame_list, *args, **kwargs):\n", " '''\n", " frame_list of [0, 10] would stack the current and 10 frames from now records togther in a single record\n", " with just the current image returned.\n", " [5, 90, 200] would return 3 frames of records, ofset 5, 90, and 200 frames in the future.\n", "\n", " '''\n", " super(TubTimeStacker, self).__init__(*args, **kwargs)\n", " self.frame_list = frame_list\n", " \n", " def get_record(self, ix):\n", " '''\n", " stack the N records into a single record.\n", " Each key value has the record index with a suffix of _N where N is\n", " the frame offset into the data.\n", " '''\n", " data = {}\n", " for i, iOffset in enumerate(self.frame_list):\n", " iRec = ix + iOffset\n", " \n", " try:\n", " json_data = self.get_json_record(iRec)\n", " except FileNotFoundError:\n", " pass\n", " except:\n", " pass\n", "\n", " for key, val in json_data.items():\n", " typ = self.get_input_type(key)\n", "\n", " #load only the first image saved as separate files\n", " if typ == 'image' and i == 0:\n", " val = Image.open(os.path.join(self.path, val))\n", " data[key] = val \n", " elif typ == 'image_array' and i == 0:\n", " d = super(TubTimeStacker, self).get_record(ix)\n", " data[key] = d[key]\n", " else:\n", " '''\n", " we append a _offset to the key\n", " so user/angle out now be user/angle_0\n", " '''\n", " new_key = key + \"_\" + str(iOffset)\n", " data[new_key] = val\n", " return data\n", "\n", "\n", "class TubGroup(Tub):\n", " def __init__(self, tub_paths_arg):\n", " tub_paths = expand_path_arg(tub_paths_arg)\n", " \n", " print('TubGroup:tubpaths:', tub_paths)\n", " tubs = [Tub(path) for path in tub_paths]\n", " self.input_types = {}\n", "\n", " record_count = 0\n", " for t in tubs:\n", " t.update_df()\n", " record_count += len(t.df)\n", " self.input_types.update(dict(zip(t.inputs, t.types)))\n", "\n", " print('joining the tubs {} records together. This could take {} minutes.'.format(record_count,\n", " int(record_count / 300000)))\n", "\n", " self.meta = {'inputs': list(self.input_types.keys()),\n", " 'types': list(self.input_types.values())}\n", "\n", "\n", " self.df = pd.concat([t.df for t in tubs], axis=0, join='inner')\n", " \n", " \n", "\n", "def expand_path_arg(path_str):\n", " path_list = path_str.split(\",\")\n", " expanded_paths = []\n", " for path in path_list:\n", " paths = expand_path_mask(path)\n", " expanded_paths += paths\n", " return expanded_paths\n", "\n", "\n", "def expand_path_mask(path):\n", " matches = []\n", " path = os.path.expanduser(path)\n", " for file in glob.glob(path):\n", " if os.path.isdir(file):\n", " matches.append(os.path.join(os.path.abspath(file)))\n", " return matches\n", "\n", "def linear_bin(a):\n", " a = a + 1\n", " b = round(a / (2/14))\n", " arr = np.zeros(15)\n", " arr[int(b)] = 1\n", " return arr\n", "\n", "def rt(record):\n", " record['user/angle'] = linear_bin(record['user/angle'])\n", " return record\n", " \n", " \n", "def default_categorical():\n", " from keras.layers import Input, Dense, merge\n", " from keras.models import Model\n", " from keras.layers import Convolution2D, MaxPooling2D, Reshape, BatchNormalization\n", " from keras.layers import Activation, Dropout, Flatten, Dense\n", " \n", " img_in = Input(shape=(160, 120, 3), name='img_in') # First layer, input layer, Shape comes from camera.py resolution, RGB\n", " x = img_in\n", " x = Convolution2D(24, (5,5), strides=(2,2), activation='relu')(x) # 24 features, 5 pixel x 5 pixel kernel (convolution, feauture) window, 2wx2h stride, relu activation\n", " x = Convolution2D(32, (5,5), strides=(2,2), activation='relu')(x) # 32 features, 5px5p kernel window, 2wx2h stride, relu activatiion\n", " x = Convolution2D(64, (5,5), strides=(2,2), activation='relu')(x) # 64 features, 5px5p kernal window, 2wx2h stride, relu\n", " x = Convolution2D(64, (3,3), strides=(2,2), activation='relu')(x) # 64 features, 3px3p kernal window, 2wx2h stride, relu\n", " x = Convolution2D(64, (3,3), strides=(1,1), activation='relu')(x) # 64 features, 3px3p kernal window, 1wx1h stride, relu\n", "\n", " # Possibly add MaxPooling (will make it less sensitive to position in image). Camera angle fixed, so may not to be needed\n", "\n", " x = Flatten(name='flattened')(x) # Flatten to 1D (Fully connected)\n", " x = Dense(100, activation='relu')(x) # Classify the data into 100 features, make all negatives 0\n", " x = Dropout(.1)(x) # Randomly drop out (turn off) 10% of the neurons (Prevent overfitting)\n", " x = Dense(50, activation='relu')(x) # Classify the data into 50 features, make all negatives 0\n", " x = Dropout(.1)(x) # Randomly drop out 10% of the neurons (Prevent overfitting)\n", " #categorical output of the angle\n", " angle_out = Dense(15, activation='softmax', name='angle_out')(x) # Connect every input with every output and output 15 hidden units. Use Softmax to give percentage. 15 categories and find best one based off percentage 0.0-1.0\n", " \n", " #continous output of throttle\n", " throttle_out = Dense(1, activation='relu', name='throttle_out')(x) # Reduce to 1 number, Positive number only\n", " \n", " model = Model(inputs=[img_in], outputs=[angle_out, throttle_out])\n", " model.compile(optimizer='adam',\n", " loss={'angle_out': 'categorical_crossentropy', \n", " 'throttle_out': 'mean_absolute_error'},\n", " loss_weights={'angle_out': 0.9, 'throttle_out': .001})\n", "\n", " return model" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3.0 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.4.3" } }, "nbformat": 4, "nbformat_minor": 0 }