{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Implementing a Review classification model with Keras, MXNet and SageMaker\n", " The solution demotrates a Distributed Learning system implementing custom generator to train on sprase libsvm data." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Background " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Data set: Amazon Customer Reviews Dataset\n", " \n", " Amazon Customer Reviews (a.k.a. Product Reviews) is one of Amazon’s iconic products. In a period of over two decades since the first review in 1995, millions of Amazon customers have contributed over a hundred million reviews to express opinions and describe their experiences regarding products on the Amazon.com website. Over 130+ million customer reviews are available to researchers as part of this dataset.\n", " \n", "#### Approach\n", " 1. The Review classficiation is NLP machine learning model to predict whether a review posted by the customer is positive or negative. For the sake of simplification, we have converted the ratings provided by the customer into a binary target variable with value equals to 1 when ratings were either 4 or 5. \n", "\n", " 2. In present work, after spilitting the data into train and test we created a feature engineerng pipeline on training data using natural language processing techiniques. The dataset was futher split into 10 chunks to demonstrate distributed learning, where during training not more than 1204 [batch_size] records are loaded onto memory at a time.\n", " \n", " \n", "#### Setup\n", "This notebook was created and tested on an ml.p2.xlarge notebook instance.\n", "\n", "Let's start by specifying:\n", "\n", "1. The S3 bucket and prefix that you want to use for training and model data. This should be within the same region as the Notebook Instance, training, and hosting.\n", "2. The IAM role arn used to give training and hosting access to your data. See the documentation for how to create these. Note, if more than one role is required for notebook instances, training, and/or hosting, please replace the get_execution_role() call with the appropriate full IAM role arn string(s)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker import get_execution_role\n", "from sagemaker.session import Session\n", "import boto3\n", "\n", "bucket = ''\n", "custom_code_upload_location = 's3://{}/sagemaker/DEMO-Distributed-learning-Keras/customcode'.format(bucket)\n", "model_artifacts_location = 's3://{}/sagemaker/DEMO-Distributed-learning-Keras/artifacts'.format(bucket)\n", "\n", "role = get_execution_role()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Data\n", "##### Explore\n", "Let's start by bringing in our dataset from an S3 public bucket. As mentioned above, this contains 1 to 5 star ratings from over 2M Amazon customers on over 160K digital videos. More details on this dataset can be found at its AWS Public Datasets page.\n", "\n", "Note, because this dataset is over a half gigabyte, the load from S3 may take ~10 minutes. Also, since Amazon SageMaker Notebooks start with a 5GB persistent volume by default, and we don't need to keep this data on our instance for long, we'll bring it to the temporary volume (which has up to 20GB of storage).\n", "\n", "\n", "Let's read the data into a Pandas DataFrame so that we can begin to understand it.\n", "\n", "Note, we'll set error_bad_lines=False when reading the file in as there appear to be a very small number of records which would create a problem otherwise." ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": true }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "b'Skipping line 92523: expected 15 fields, saw 22\\n'\n", "b'Skipping line 343254: expected 15 fields, saw 22\\n'\n", "b'Skipping line 524626: expected 15 fields, saw 22\\n'\n", "b'Skipping line 623024: expected 15 fields, saw 22\\n'\n", "b'Skipping line 977412: expected 15 fields, saw 22\\n'\n", "b'Skipping line 1496867: expected 15 fields, saw 22\\n'\n", "b'Skipping line 1711638: expected 15 fields, saw 22\\n'\n", "b'Skipping line 1787213: expected 15 fields, saw 22\\n'\n", "b'Skipping line 2395306: expected 15 fields, saw 22\\n'\n", "b'Skipping line 2527690: expected 15 fields, saw 22\\n'\n" ] } ], "source": [ "import pandas as pd\n", "import csv\n", "fname = 's3://amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Video_Download_v1_00.tsv.gz'\n", "df = pd.read_csv(fname,sep='\\t',delimiter='\\t',error_bad_lines=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "We can see this dataset includes information like:\n", "\n", " 1. marketplace: 2-letter country code (in this case all \"US\").\n", " 2. customer_id: Random identifier that can be used to aggregate reviews written by a single author.\n", " 3. review_id: A unique ID for the review.\n", " 4. product_id: The Amazon Standard Identification Number (ASIN). http://www.amazon.com/dp/ links to the product's detail page.\n", " 5. product_parent: The parent of that ASIN. Multiple ASINs (color or format variations of the same product) can roll up into a single parent parent.\n", " 6. product_title: Title description of the product.\n", " 7. product_category: Broad product category that can be used to group reviews (in this case digital videos).\n", " 8. star_rating: The review's rating (1 to 5 stars).\n", " 9. helpful_votes: Number of helpful votes for the review.\n", " 10. total_votes: Number of total votes the review received.\n", " 11. vine: Was the review written as part of the Vine program?\n", " 12. verified_purchase: Was the review from a verified purchase?\n", " 13. review_headline: The title of the review itself.\n", " 14. review_body: The text of the review.\n", " 15. review_date: The date the review was written.\n", " \n", " \n", "For this example, let's limit ourselves 'verified_purchase','review_headline','review_body','product_title','helpful_votes and star_rating. \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### Pre-processing\n", "1. Replace any NaN values with empty string\n", "2. Create Target column based the star rating" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "def pre_process(df):\n", " df.fillna(\n", " value={'review_body': '', 'review_headline': ''}, inplace=True)\n", " df.fillna(\n", " value={'verified_purchase': 'Unk'}, inplace=True)\n", "\n", " df.fillna(0, inplace=True)\n", " return df\n", "\n", "df = pre_process(df)\n", "\n", "df.review_date = pd.to_datetime(df.review_date)\n", "df['target'] = np.where(df['star_rating']>=4,1,0)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### Split the data into Train, validation and Test set\n", "Note: We have used stratefied sampling as the criteria for splittng the data; stratefied sampling on the target variable insures same distribution of the varible across all the splits." ] }, { "cell_type": "code", "execution_count": 75, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "0.7920667165021078\n", "0.7920667275251041\n", "0.7920667165021078\n" ] } ], "source": [ "from sklearn.model_selection import StratifiedShuffleSplit\n", "sss = StratifiedShuffleSplit(n_splits=2, test_size=0.10, random_state=0)\n", "\n", "sss.get_n_splits(df, df['target'])\n", "for train_index, test_index in sss.split(df, df['target']):\n", " X_train_vallid , X_test = df.iloc[train_index], df.iloc[test_index]\n", " \n", " \n", "sss.get_n_splits(X_train_vallid, X_train_vallid['target']) \n", "for train_index, test_index in sss.split(X_train_vallid, X_train_vallid['target']):\n", " X_train , X_valid = X_train_vallid.iloc[train_index], X_train_vallid.iloc[test_index]\n", " \n", " \n", "print(X_train.target.value_counts()[1]/(X_train.target.value_counts().sum()))\n", "print(X_test.target.value_counts()[1]/(X_test.target.value_counts().sum()))\n", "print(X_valid.target.value_counts()[1]/(X_valid.target.value_counts().sum()))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### Feature Engineering\n", " 1. One-hot encoder for 'verified_purchase'\n", " 2. CountVectorizer followed by absolute scaling for 'review_headline'\n", " 3. CountVectorizer followed by absolute scaling for 'review_body'\n", " 4. CountVectorizer followed by absolute scaling for 'product_title'\n", " 5. SimpleImputer followed by MinMaxScaler for 'helpful_votes'\n", " 6. Concatenate all the previous vector in one Pipeline" ] }, { "cell_type": "code", "execution_count": 68, "metadata": {}, "outputs": [], "source": [ "from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer\n", "import sklearn.pipeline\n", "from sklearn.preprocessing import OneHotEncoder,MaxAbsScaler,MinMaxScaler\n", "from sklearn.impute import SimpleImputer\n", "from sklearn.compose import ColumnTransformer\n", "\n", "cat_transformer = sklearn.pipeline.Pipeline(steps=[('cat_Vectorizer', \n", " OneHotEncoder(handle_unknown='ignore'))])\n", "\n", "headline_vectorizer = CountVectorizer(stop_words='english',\n", " binary=False,\n", " max_df=0.7,\n", " min_df=100,\n", " ngram_range=(3, 3),\n", " max_features=500,\n", " analyzer='char_wb')\n", "\n", "headline_transformer = sklearn.pipeline.Pipeline(steps=[\n", " ('Vectorizer', headline_vectorizer),\n", " ('feature_vec_sclr', MaxAbsScaler())])\n", "\n", "\n", "body_vectorizer = CountVectorizer(stop_words='english',\n", " binary=False,\n", " max_df=0.7,\n", " min_df=100,\n", " ngram_range=(3, 3),\n", " max_features=5000,\n", " analyzer='char_wb')\n", "\n", "body_transformer = sklearn.pipeline.Pipeline(steps=[\n", " ('Vectorizer', body_vectorizer),\n", " ('feature_vec_sclr', MaxAbsScaler())])\n", "\n", "\n", "product_vectorizer = CountVectorizer(stop_words='english',\n", " binary=False,\n", " max_df=0.7,\n", " min_df=100,\n", " ngram_range=(3, 3),\n", " max_features=500,\n", " analyzer='char_wb')\n", "\n", "product_transformer = sklearn.pipeline.Pipeline(steps=[\n", " ('Vectorizer', product_vectorizer),\n", " ('feature_vec_sclr', MaxAbsScaler())])\n", "\n", "steps = [('feature_Imputer', SimpleImputer(strategy='most_frequent'))\n", " , ('feature_scaler', MinMaxScaler())]\n", "numeric_transformer = sklearn.pipeline.Pipeline(steps)\n", " \n", "\n", "union = ColumnTransformer(\n", " transformers=[ ('cat', cat_transformer, [0]),\n", " ('headline', headline_transformer, 1),\n", " ('body', body_transformer, 2),\n", " ('product', product_transformer, 3),\n", " ('num', numeric_transformer, [4])\n", " ],\n", " remainder=\"drop\")\n", "\n", "steps = [('features', union)]\n", "pipeline = sklearn.pipeline.Pipeline(steps)" ] }, { "cell_type": "code", "execution_count": 76, "metadata": {}, "outputs": [], "source": [ "cols = ['verified_purchase','review_headline','review_body','product_title','helpful_votes']\n", "import time\n", "start = time.time()\n", "train_vec = pipeline.fit_transform(X_train[cols])\n", "end = time.time()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It took approximatly 10 mins to fit the pipeline on 3M records" ] }, { "cell_type": "code", "execution_count": 112, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "mkdir: cannot create directory ‘./data’: File exists\r\n" ] } ], "source": [ "! mkdir ./data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Splitting the data into 10 different chunks to demonstrate distributed learning" ] }, { "cell_type": "code", "execution_count": 200, "metadata": {}, "outputs": [], "source": [ "from sklearn.datasets import dump_svmlight_file\n", "train_df = np.array_split(X_train, 10)\n", "i=0\n", "for df_split in train_df: \n", " trans = pipeline.transform(df_split[cols])\n", " dump_svmlight_file(trans,df_split['target'].values,'data/train/file_'+str(i))\n", " i=i+1\n", " \n", "train_df = np.array_split(X_valid, 10)\n", "i=0\n", "for df_split in train_df: \n", " trans = pipeline.transform(df_split[cols]) \n", " dump_svmlight_file(trans,df_split['target'].values,'data/valid/file_'+str(i))\n", " i=i+1 \n", " \n", "train_df = np.array_split(X_test, 10)\n", "i=0\n", "for df_split in train_df: \n", " trans = pipeline.transform(df_split[cols]) \n", " dump_svmlight_file(trans,df_split['target'].values,'data/holdout/file_'+str(i))\n", " i=i+1 " ] }, { "cell_type": "code", "execution_count": 297, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "/bin/sh: your: No such file or directory\r\n" ] } ], "source": [ "### repalce with your bucket name\n", "! aws s3 cp ./data/ s3:///sagemaker/DEMO-Distributed-learning-Keras/data/ --recursive --acl bucket-owner-full-control" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Train with SageMaker\n", "\n", "\n", "Wrap Code\n", "To use SageMaker's pre-built MXNet container, we'll need to wrap our code from above into a Python script. There's a great deal of flexibility in using SageMaker's pre-built containers, and detailed documentation can be found here, but for our example, it consisted of:\n", "\n", " 1.f1 custom function to optmize around\n", " 2.readfile function to read individual files\n", " 3.Custom generator to loop through files and records to return batchs in sequence\n", " 4.parse_args to parser to read all arguements\n", " 5.train function to train the Neural network model\n", " 6.threadsafe_iter class and threadsafe_generator to make the custom generator threadsafe\n", " 7.input_fn function to convert inference payload to numpy vector\n" ] }, { "cell_type": "code", "execution_count": 299, "metadata": { "collapsed": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "# #Standard Libraries\r\n", "import sys\r\n", "import os\r\n", "from os import walk\r\n", "\r\n", "import json\r\n", "import threading\r\n", "\r\n", "import argparse\r\n", "\r\n", "\r\n", "import numpy as np\r\n", "\r\n", "\r\n", "import mxnet as mx\r\n", "\r\n", "##Keras Imports\r\n", "import keras\r\n", "from keras import backend as K\r\n", "from keras.layers import Dense, Dropout, Input, BatchNormalization, Activation\r\n", "\r\n", "from keras.models import Sequential\r\n", "from keras.models import save_mxnet_model\r\n", "from keras.models import load_model\r\n", "from keras.optimizers import RMSprop, Adam, SGD\r\n", "\r\n", "from keras.utils import multi_gpu_model\r\n", "\r\n", "\r\n", "input_shape = (6003,)\r\n", "\r\n", "\r\n", "def f1(y_true, y_pred):\r\n", " def recall(y_true, y_pred):\r\n", " \"\"\"Recall metric.\r\n", "\r\n", " Only computes a batch-wise average of recall.\r\n", "\r\n", " Computes the recall, a metric for multi-label classification of\r\n", " how many relevant items are selected.\r\n", " \"\"\"\r\n", " true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))\r\n", " possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))\r\n", " recall = true_positives / (possible_positives + K.epsilon())\r\n", " return recall\r\n", "\r\n", " def precision(y_true, y_pred):\r\n", " \"\"\"Precision metric.\r\n", "\r\n", " Only computes a batch-wise average of precision.\r\n", "\r\n", " Computes the precision, a metric for multi-label classification of\r\n", " how many selected items are relevant.\r\n", " \"\"\"\r\n", " true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))\r\n", " predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))\r\n", " precision = true_positives / (predicted_positives + K.epsilon())\r\n", " return precision\r\n", "\r\n", " precision = precision(y_true, y_pred)\r\n", " recall = recall(y_true, y_pred)\r\n", " return 2 * ((precision * recall) / (precision + recall + K.epsilon()))\r\n", "\r\n", "\r\n", "def input_fn(request_body, request_content_type):\r\n", " if request_content_type == 'text/csv':\r\n", " print('I am here:')\r\n", " Matrix = []\r\n", " print(type(request_body))\r\n", " for line in request_body.splitlines():\r\n", " data = line.split()\r\n", " target = float(data[0])\r\n", " row = np.zeros(input_shape[0], float)\r\n", " for i, (idx, value) in enumerate([item.split(':') for item in data[1:]]):\r\n", " row[int(idx)] = value\r\n", " Matrix.append(np.array(row))\r\n", " Matrix = np.array(Matrix)\r\n", " print('Matrix Shape', Matrix.shape)\r\n", " print('Matrix Type', type(Matrix))\r\n", " return mx.io.NDArrayIter(Matrix)\r\n", " else:\r\n", " # Handle other content-types here or raise an Exception\r\n", " # if the content type is not supported.\r\n", " pass\r\n", "\r\n", "\r\n", "\r\n", "class threadsafe_iter:\r\n", " \"\"\"Takes an iterator/generator and makes it thread-safe by\r\n", " serializing call to the `next` method of given iterator/generator.\r\n", " \"\"\"\r\n", "\r\n", " def __init__(self, it):\r\n", " self.it = it\r\n", " self.lock = threading.Lock()\r\n", "\r\n", " def __iter__(self):\r\n", " return self\r\n", "\r\n", " def next(self):\r\n", " with self.lock:\r\n", " return self.it.next()\r\n", "\r\n", "\r\n", "def threadsafe_generator(f):\r\n", " \"\"\"A decorator that takes a generator function and makes it thread-safe.\r\n", " \"\"\"\r\n", "\r\n", " def g(*a, **kw):\r\n", " return threadsafe_iter(f(*a, **kw))\r\n", "\r\n", " return g\r\n", "\r\n", "\r\n", "def readfile(filepath):\r\n", " with open(filepath) as fp:\r\n", " lines = fp.readlines()\r\n", " Matrix = []\r\n", " labels = []\r\n", " for line in lines:\r\n", " data = line.split()\r\n", " target = labels.append(float(data[0]))\r\n", " row = np.zeros(input_shape[0], float)\r\n", " for i, (idx, value) in enumerate([item.split(':') for item in data[1:]]):\r\n", " row[int(idx)] = value\r\n", " Matrix.append(np.array(row))\r\n", " Matrix = np.array(Matrix)\r\n", " return labels, Matrix\r\n", "\r\n", "\r\n", "def generator(files, batch_size):\r\n", " # print('start generator')\r\n", " while 1:\r\n", " # print('loop generator')\r\n", " for file in files:\r\n", " try:\r\n", " # data = load_svmlight_file(file)\r\n", " Y, X = readfile(file)\r\n", " recs = X.shape[0]\r\n", " batches = int(np.ceil(recs / batch_size))\r\n", " for i in range(0, batches):\r\n", " x_batch = X[i * batch_size:min(len(X), i * batch_size + batch_size), ]\r\n", " y_batch = Y[i * batch_size:min(len(X), i * batch_size + batch_size)]\r\n", " yield x_batch, y_batch\r\n", " except EOFError:\r\n", " print(\"error\" + file)\r\n", "\r\n", "\r\n", "def parse_args():\r\n", " parser = argparse.ArgumentParser()\r\n", "\r\n", " # retrieve the hyperparameters we set in notebook (with some defaults)\r\n", " parser.add_argument('--epochs', type=int, default=5)\r\n", " parser.add_argument('--learning-rate', type=float, default=0.01)\r\n", "\r\n", " parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])\r\n", " parser.add_argument('--training_channel', type=str, default=os.environ['SM_CHANNEL_TRAIN'])\r\n", " parser.add_argument('--test_channel', type=str, default=os.environ['SM_CHANNEL_TEST'])\r\n", "\r\n", " parser.add_argument('--current-host', type=str, default=os.environ['SM_CURRENT_HOST'])\r\n", " parser.add_argument('--hosts', type=list, default=json.loads(os.environ['SM_HOSTS']))\r\n", "\r\n", " parser.add_argument('--batch_size', type=int, default=1024)\r\n", "\r\n", " return parser.parse_args()\r\n", "\r\n", "\r\n", "def train(current_host, hosts, num_cpus, num_gpus, training_dir, val_dir, model_dir, batch_size, epochs, learning_rate):\r\n", " print('Parameters: ', num_cpus, num_gpus, training_dir, val_dir, model_dir, batch_size, epochs, learning_rate)\r\n", "\r\n", " train_files = [os.path.join(training_dir, file) for file in os.listdir(training_dir)]\r\n", " train_files = [x for x in train_files if x.split('/')[-1] != '.ipynb_checkpoints']\r\n", " train_gen = generator(train_files, batch_size)\r\n", " print('Number of training files: ', len(train_files))\r\n", "\r\n", " test_files = [os.path.join(val_dir, file) for file in os.listdir(val_dir)]\r\n", " test_files = [x for x in test_files if x.split('/')[-1] != '.ipynb_checkpoints']\r\n", " test_gen = generator(test_files, batch_size)\r\n", " print('Number of test files: ', len(test_files))\r\n", "\r\n", " # shape = load_svmlight_file(train_files[0])[0].shape[1:]\r\n", " # shape = (scipy.sparse.load_npz(train_files[0]).shape[1] -1 ,)## Substracting 1 for label columns\r\n", " print('shape:', input_shape)\r\n", "\r\n", " # steps = np.array([int(np.ceil(scipy.sparse.load_npz(x).shape[0]/batch_size)) for x in train_files]).sum()\r\n", " # steps_val = np.array([int(np.ceil(scipy.sparse.load_npz(x).shape[0]/batch_size)) for x in test_files]).sum()\r\n", "\r\n", " steps = np.array([int(np.ceil(readfile(x)[1].shape[0] / batch_size)) for x in train_files]).sum()\r\n", " steps_val = np.array([int(np.ceil(readfile(x)[1].shape[0] / batch_size)) for x in test_files]).sum()\r\n", " print('Steps', steps, steps_val)\r\n", " print('Number of gpus', num_gpus)\r\n", "\r\n", " model = Sequential()\r\n", " model.add(Dense(1024, input_shape=input_shape))\r\n", " model.add(Activation('relu'))\r\n", " model.add(Dropout(0.25))\r\n", "\r\n", " model.add(Dense(128))\r\n", " model.add(Activation('relu'))\r\n", " model.add(Dropout(0.25))\r\n", "\r\n", " model.add(Dense(32))\r\n", " model.add(Activation('relu'))\r\n", " model.add(Dropout(0.25))\r\n", "\r\n", " model.add(Dense(1))\r\n", " model.add(Activation('sigmoid'))\r\n", "\r\n", " print(model.summary())\r\n", "\r\n", " if num_gpus > 1:\r\n", " model = multi_gpu_model(model, gpus=num_gpus)\r\n", "\r\n", " filepath = 'best_model.h5'\r\n", " checkpoint = keras.callbacks.ModelCheckpoint(filepath, monitor='val_loss',\r\n", " verbose=1, save_best_only=True, mode='min')\r\n", " callbacks_list = [checkpoint]\r\n", " opt = Adam(lr=learning_rate, rescale_grad=1. / batch_size)\r\n", "\r\n", " model.compile(optimizer=opt, loss='binary_crossentropy', metrics=[f1], callbacks=callbacks_list)\r\n", " # model.fit_generator(train, epochs=epochs, shuffle=True, class_weight=[0.6,0.4],\r\n", " # validation_data=test, callbacks=[es, mc, lrp], max_queue_size=30, verbose=verbose)\r\n", "\r\n", " history = model.fit_generator(train_gen, steps_per_epoch=steps, epochs=epochs,\r\n", " shuffle=True,\r\n", " validation_data=test_gen,\r\n", " validation_steps=steps_val,\r\n", " callbacks=callbacks_list,\r\n", " verbose=2)\r\n", "\r\n", " print(np.sqrt(history.history['loss']))\r\n", " print(np.sqrt(history.history['val_loss']))\r\n", " print('model trained: ')\r\n", " return model\r\n", "\r\n", "\r\n", "if __name__ == '__main__':\r\n", " args = parse_args()\r\n", " num_cpus = int(os.environ['SM_NUM_CPUS'])\r\n", " num_gpus = int(os.environ['SM_NUM_GPUS'])\r\n", " print('number of cpus', num_cpus)\r\n", " print('number of gpus', num_gpus)\r\n", "\r\n", " os.environ['MXNET_ENABLE_GPU_P2P'] = '0'\r\n", " os.environ['MXNET_CUDNN_AUTOTUNE_DEFAULT'] = '0'\r\n", "\r\n", " model = train(args.current_host, args.hosts, num_cpus, num_gpus, args.training_channel, args.test_channel,\r\n", " args.model_dir,\r\n", " (args.batch_size), args.epochs, args.learning_rate)\r\n", "\r\n", " if args.current_host == args.hosts[0]:\r\n", " model.save(os.path.join(args.model_dir, 'model'))\r\n", " print('model saved')\r\n", " # Get our best model\r\n", " # best_model = load_model(os.path.join(args.model_dir, 'model'))\r\n", " dependencies = {'f1': f1}\r\n", " best_model = load_model('best_model.h5', custom_objects=dependencies)\r\n", " print('model loaded:-')\r\n", " print( os.path.join(args.model_dir, 'model'))\r\n", " #\r\n", " # Make one random prediction to initialize for the save\r\n", " # sample = np.random.randint(args.vocab_size, size=(args.maxlen,))\r\n", " # best_model.predict(np.array([sample]))\r\n", "\r\n", " test_files = [os.path.join(args.test_channel, file) for file in os.listdir(args.test_channel)]\r\n", " # sample = load_svmlight_file(test_files[0])[0].todense()\r\n", " Y, X = readfile(test_files[0])\r\n", " print(test_files[0])\r\n", " print(X.shape)\r\n", " best_model.predict(X)\r\n", "\r\n", " print( os.path.join(args.model_dir, 'model'))\r\n", " data_names, data_shapes = save_mxnet_model(best_model, os.path.join(args.model_dir, 'model'))\r\n", "\r\n", " signature = [{'name': data_names[0], 'shape': [dim for dim in data_desc.shape]} for data_desc in data_shapes]\r\n", " with open(os.path.join(args.model_dir, 'model-shapes.json'), 'w') as f:\r\n", " json.dump(signature, f)\r\n" ] } ], "source": [ "!cat Mailerclass_mxnet_dnn_II.py" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "### Submitting a sagemaker training job\n", "\n", "from sagemaker.mxnet import MXNet\n", "mnist_dnn_estimator = MXNet(entry_point='Mailerclass_mxnet_dnn_II.py',\n", " role=role,\n", " output_path=model_artifacts_location,\n", " code_location=custom_code_upload_location,\n", " train_instance_count=1,\n", " train_instance_type='ml.p3.16xlarge',\n", " framework_version='1.4.1',\n", " py_version='py3',\n", " train_volume_size = 100,\n", " distributions={'parameter_server': {'enabled': True}},\n", " hyperparameters={'learning-rate': 0.01})\n", "\n", "region = boto3.Session().region_name\n", "train_data_location = 's3://'+bucket+'/sagemaker/DEMO-Distributed-learning-Keras/data/train/'\n", "test_data_location = 's3://'+bucket+'/sagemaker/DEMO-Distributed-learning-Keras/data/valid/'\n", "\n", "mnist_dnn_estimator.fit({'train': train_data_location, 'test': test_data_location})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Batch inference\n", "Run the batch inference on the test set" ] }, { "cell_type": "code", "execution_count": 272, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Parameter image will be renamed to image_uri in SageMaker Python SDK v2.\n", "'create_image_uri' will be deprecated in favor of 'ImageURIProvider' class in SageMaker Python SDK v2.\n" ] } ], "source": [ "output_path = 's3://'+bucket+'/sagemaker/DEMO-Distributed-learning-Keras/data/holdout-predictions/'\n", "input_path = 's3://'+bucket+'/sagemaker/DEMO-Distributed-learning-Keras/data/holdout/'\n", "\n", "transformer = mnist_dnn_estimator.transformer(instance_count=2, \n", " instance_type='ml.p3.16xlarge', \n", " assemble_with='Line', \n", " accept='text/csv',\n", " max_payload=3,\n", " output_path=output_path,\n", " env = {'SAGEMAKER_MODEL_SERVER_TIMEOUT' : '3600' })\n", "transformer.transform(input_path, \n", " content_type='text/csv',\n", " #input_filter=\"$[1:]\" ,\n", " #join_source='Input',\n", " split_type='Line')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Evaluation\n", "1. read_predictions: Read results from the batch transform\n", "2. Analyze and plot the PR-AUC and ROC-AUC curve " ] }, { "cell_type": "code", "execution_count": 284, "metadata": {}, "outputs": [], "source": [ "### Reading results and Analyzing\n", "from sklearn import metrics\n", "import matplotlib\n", "import pandas as pd\n", "matplotlib.use('agg', warn=False, force=True)\n", "from matplotlib import pyplot as plt\n", "\n", "%matplotlib inline \n", "\n", "def analyze_results(labels, predictions):\n", " precision, recall, thresholds = metrics.precision_recall_curve(labels, predictions)\n", " auc = metrics.auc(recall, precision)\n", " \n", " fpr, tpr, _ = metrics.roc_curve(labels, predictions)\n", " roc_auc_score = metrics.roc_auc_score(labels, predictions)\n", " \n", " print('Neural-Nets: ROC auc=%.3f' % ( roc_auc_score))\n", " \n", " plt.plot(fpr, tpr, label=\"data 1, auc=\" + str(roc_auc_score))\n", " plt.xlabel('1-Specificity')\n", " plt.ylabel('Sensitivity')\n", " plt.legend(loc=4)\n", " plt.show()\n", " \n", " \n", " lr_precision, lr_recall, _ = metrics.precision_recall_curve(labels, predictions)\n", " lr_auc = metrics.auc(lr_recall, lr_precision)\n", " # summarize scores\n", " print('Neural-Nets: PR auc=%.3f' % ( lr_auc))\n", " # plot the precision-recall curves\n", " no_skill = len(labels[labels==1.0]) / len(labels)\n", " plt.plot([0, 1], [no_skill, no_skill], linestyle='--', label='No Skill')\n", " \n", " plt.plot(lr_recall, lr_precision, marker='.', label='Neural-Nets')\n", " # axis labels\n", " plt.xlabel('Recall')\n", " plt.ylabel('Precision')\n", " # show the legend\n", " plt.legend()\n", " # show the plot\n", " plt.show()\n", " \n", " \n", " return auc\n", "\n", "def read_predictions(prefix):\n", " s3 = boto3.resource('s3')\n", " my_bucket = s3.Bucket(bucket)\n", " files = 0 \n", " preds=[]\n", " for object_summary in my_bucket.objects.filter(Prefix=prefix):\n", " print(object_summary.key)\n", " obj = s3.Object(bucket, object_summary.key)\n", " filename = 's3://'+ bucket+ '/' + object_summary.key\n", " res = pd.read_csv(filename, header=None , sep='\\t')\n", " print(\"length:\",len(res.iloc[:,0].values.tolist()))\n", " preds = preds + res.iloc[:,0].values.tolist()\n", " files = files +1\n", " return preds\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "prefix ='sagemaker/DEMO-Distributed-learning-Keras/data/holdout-predictions/'\n", "y_dnn_pred = read_predictions(prefix)" ] }, { "cell_type": "code", "execution_count": 286, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Neural-Nets: ROC auc=0.975\n" ] }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "Neural-Nets: PR auc=0.993\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "/home/ec2-user/anaconda3/envs/mxnet_p36/lib/python3.6/site-packages/IPython/core/pylabtools.py:132: UserWarning: Creating legend with loc=\"best\" can be slow with large amounts of data.\n", " fig.canvas.print_figure(bytes_io, **kw)\n" ] }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" }, { "data": { "text/plain": [ "0.992858168571398" ] }, "execution_count": 286, "metadata": {}, "output_type": "execute_result" } ], "source": [ "analyze_results(X_test['target'].values,y_dnn_pred)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Results\n", "1. Model achieved an ROC-AUC of 0.97 on the holdout set below are some reviews classified as good and bad by the model\n", "\n", "#### Reviews classified as Good by the model\n", " 1. 'Awesome show, wish they would have continued it!',\n", " 2. 'Great movie.',\n", " 3. 'Absolutely beautiful!!!', \n", " 4. 'I love this show <3',\n", " 5. 'One of my favorite kind of shows. Love I!'\n", " \n", "#### Reviews classified as Bad by the model \n", " 1. If I could give this less than 1 star I would. The worst movie I've seen in ages. The acting was less than high school level. The Mockingjay character was mind numbingly boring and droll. She was terrible. The characters were beyond pathetic. 1 hour and 40 minutes of pure torture and inane plot, script combined with terrible CGI and effects I found myself wanting to consume mass quantities of alcohol. The whole Hunger Game sage is a total waste of time. How anyone can say this is anything but pure boredom is beyond me.\n", " 2. slow and boring, boring, boring, just like the book\n", " 3. \"Awful. A waste of money. A waste of time. Boring. Predictable. I gave up for the sake of my sanity about 20 minutes before it was over. Sandler and Barrymore must have been bored to tears with nothing to do, under contract to do something, and just didn't care........\",\n", " 4.'Horrible movie. Mediocre acting, terrible plot, confusing and boring.',\n", " 5.'The movie is a real waste of time and money, poor actors, poor story, poor locations... really awful movie. I waste my money with this piece of crap.',\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "conda_mxnet_p36", "language": "python", "name": "conda_mxnet_p36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.10" } }, "nbformat": 4, "nbformat_minor": 4 }