{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Sentiment Analysis with TensorFlow\n", "\n", "Sentiment analysis is a very common text analytics task that involves determining whether a text sample is positive or negative about its subject. There are several different algorithms for performing this task, including statistical algorithms and deep learning algorithms. With respect to deep learning, a Convolutional Neural Net (CNN) is sometimes used for this purpose. In this notebook we'll use a CNN built with TensorFlow to perform sentiment analysis in Amazon SageMaker on the IMDB dataset, which consists of movie reviews labeled as having positive or negative sentiment. Three aspects of Amazon SageMaker will be demonstrated:\n", "\n", "- How to use Script Mode with Distributed Training using Parameter Server, along with a training script similar to one you would use outside SageMaker. \n", "- Batch Transform for offline, asynchronous predictions on large batches of data. \n", "\n", "# Prepare Dataset\n", "\n", "We'll begin by loading the reviews dataset, and padding the reviews so all reviews have the same length. Each review is represented as an array of numbers, where each number represents an indexed word. Training data for both Local Mode and Hosted Training must be saved as files, so we'll also save the transformed data to files." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import os\n", "\n", "from tensorflow.keras.preprocessing import sequence\n", "from tensorflow.python.keras.datasets import imdb\n", "\n", "max_features = 20000\n", "maxlen = 400\n", "\n", "import sagemaker\n", "from sagemaker.tensorflow import TensorFlow" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=max_features)\n", "print(len(x_train), 'train sequences')\n", "print(len(x_test), 'test sequences')\n", "\n", "x_train = sequence.pad_sequences(x_train, maxlen=maxlen)\n", "x_test = sequence.pad_sequences(x_test, maxlen=maxlen)\n", "print('x_train shape:', x_train.shape)\n", "print('x_test shape:', x_test.shape)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data_dir = os.path.join(os.getcwd(), 'data')\n", "os.makedirs(data_dir, exist_ok=True)\n", "\n", "train_dir = os.path.join(os.getcwd(), 'data/train')\n", "os.makedirs(train_dir, exist_ok=True)\n", "\n", "test_dir = os.path.join(os.getcwd(), 'data/test')\n", "os.makedirs(test_dir, exist_ok=True)\n", "\n", "csv_test_dir = os.path.join(os.getcwd(), 'data/csv-test')\n", "os.makedirs(csv_test_dir, exist_ok=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "np.save(os.path.join(train_dir, 'x_train.npy'), x_train)\n", "np.save(os.path.join(train_dir, 'y_train.npy'), y_train)\n", "np.save(os.path.join(test_dir, 'x_test.npy'), x_test)\n", "np.save(os.path.join(test_dir, 'y_test.npy'), y_test)\n", "np.savetxt(os.path.join(csv_test_dir, 'csv-test.csv'), np.array(x_test[:100], dtype=np.int32), fmt='%d', delimiter=\",\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Distributed Training using Parameter Server\n", "\n", "A common pattern in distributed training is to use dedicated processes to collect gradients computed by “worker” processes, then aggregate them and distribute the updated gradients back to the workers. These processes are known as parameter servers. In general, they can be run either on their own machines or co-located on the same machines as the workers. In a parameter server cluster, each parameter server communicates with all workers (“all-to-all”). The Amazon SageMaker prebuilt TensorFlow container comes with a built-in option to use parameter servers for distributed training. The container runs a parameter server thread in each training instance, so there is a 1:1 ratio of parameter servers to workers. With this built-in option, gradient updates are made asynchronously (though some other versions of parameters servers use synchronous updates).\n", "\n", "Script Mode requires a training script, which in this case is the sentiment.py file in the /distributed training subdirectory of the related distributed training example GitHub repository. Once a training script is ready, the next step is to set up an Amazon SageMaker TensorFlow Estimator object with the details of the training job. It is very similar to an Estimator for training on a single machine, except we specify a distributions parameter to enable starting a parameter server on each training instance. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Hosted Training\n", "\n", "We move on to use SageMaker's hosted training using Parameter Server, which uses compute resources separate from your notebook instance. Hosted training spins up one or more instances (cluster) for training, and then tears the cluster down when training is complete. In general, hosted training is preferred for doing actual training, especially for large-scale, distributed training. Before starting hosted training, the data must be present in storage that can be accessed by SageMaker. The storage options are: Amazon S3 (object storage service), Amazon EFS (elastic NFS file system service), and Amazon FSx for Lustre (high-performance file system service). For this example, we'll upload the data to S3. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3_prefix = 'tf-keras-sentiment'\n", "\n", "traindata_s3_prefix = '{}/data/train'.format(s3_prefix)\n", "testdata_s3_prefix = '{}/data/test'.format(s3_prefix)\n", "\n", "train_s3 = sagemaker.Session().upload_data(path='./data/train/', key_prefix=traindata_s3_prefix)\n", "test_s3 = sagemaker.Session().upload_data(path='./data/test/', key_prefix=testdata_s3_prefix)\n", "\n", "inputs = {'train':train_s3, 'test': test_s3}\n", "print(inputs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "With the training data now in S3, we're ready to set up an Estimator object for hosted training. It is similar to the Local Mode Estimator, except the `train_instance_type` has been set to a ML instance type instead of a local type for Local Mode. Additionally, we've set the number of epochs to a number greater than one for actual training, as opposed to just testing the code." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_dir = '/opt/ml/model'\n", "train_instance_type='ml.c5.xlarge'\n", "instance_count = 2\n", "\n", "distributions = {'parameter_server': {'enabled': True}}\n", "\n", "hyperparameters = {'epochs': 5, 'batch_size': 128}\n", "\n", "estimator = TensorFlow(\n", " source_dir='tf-sentiment-script-mode',\n", " entry_point='sentiment.py',\n", " model_dir=model_dir,\n", " train_instance_type=train_instance_type,\n", " train_instance_count=instance_count,\n", " hyperparameters=hyperparameters,\n", " role=sagemaker.get_execution_role(),\n", " base_job_name='tf-keras-sentiment',\n", " framework_version='1.13',\n", " py_version='py3',\n", " distributions = distributions,\n", " script_mode=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "With the change in training instance type and increase in epochs, we simply call `fit` to start the actual hosted training. At the end of hosted training, you'll see from the logs below the cell that accuracy on the training set has greatly increased, and accuracy on the validation set is around 90%. The model may be overfitting now (less able to generalize to data it has not yet seen), even though we are employing dropout as a regularization technique. In a production situation, further investigation would be necessary." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "estimator.fit(inputs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Batch Prediction\n", "\n", "\n", "If our use case requires individual predictions in near real-time, SageMaker hosted endpoints can be created. Hosted endpoints also can be used for pseudo-batch prediction, but the process is more involved than simply using SageMaker's Batch Transform feature, which is designed for large-scale, asynchronous batch inference.\n", "\n", "To use Batch Transform, we must first upload to S3 some input test data in CSV format to be transformed." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "csvtestdata_s3_prefix = '{}/data/csv-test'.format(s3_prefix)\n", "csvtest_s3 = sagemaker.Session().upload_data(path='./data/csv-test/', key_prefix=csvtestdata_s3_prefix)\n", "print(csvtest_s3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A Transformer object must be set up to describe the Batch Transform job, including the amount and type of inference hardware to be used. Then the actual transform job itself is started with a call to the `transform` method of the Transformer." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "transformer = estimator.transformer(instance_count=1, instance_type='ml.m5.xlarge')\n", "transformer.transform(csvtest_s3, content_type='text/csv')\n", "print('Waiting for transform job: ' + transformer.latest_transform_job.job_name)\n", "transformer.wait()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can now download the batch predictions from S3 to the local filesystem on the notebook instance; the predictions are contained in a file with a .out extension, and are embedded in JSON. Next we'll load the JSON and examine the predictions, which are confidence scores from 0.0 to 1.0 where numbers close to 1.0 indicate positive sentiment, while numbers close to 0.0 indicate negative sentiment." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import json\n", "\n", "batch_output = transformer.output_path\n", "!mkdir -p batch_data/output\n", "!aws s3 cp --recursive $batch_output/ batch_data/output/\n", "\n", "with open('batch_data/output/csv-test.csv.out', 'r') as f:\n", " jstr = json.load(f)\n", " results = [float('%.3f'%(item)) for sublist in jstr['predictions'] for item in sublist]\n", " print(results)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's look at the text of some actual reviews to see the predictions in action. First, we have to convert the integers representing the words back to the words themselves by using a reversed dictionary. Next we can decode the reviews, taking into account that the first 3 indices were reserved for \"padding\", \"start of sequence\", and \"unknown\", and removing a string of unknown tokens from the start of the review." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import re\n", "\n", "regex = re.compile(r'^[\\?\\s]+')\n", "\n", "word_index = imdb.get_word_index()\n", "reverse_word_index = dict([(value, key) for (key, value) in word_index.items()])\n", "first_decoded_review = ' '.join([reverse_word_index.get(i - 3, '?') for i in x_test[0]])\n", "regex.sub('', first_decoded_review)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Overall, this review looks fairly negative. Let's compare the actual label with the prediction:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def get_sentiment(score):\n", " return 'positive' if score > 0.5 else 'negative' \n", "\n", "print('Labeled sentiment for this review is {}, predicted sentiment is {}'.format(get_sentiment(y_test[0]), \n", " get_sentiment(results[0])))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Training deep learning models is a stochastic process, so your results may vary -- there is no guarantee that the predicted result will match the actual label. However, it is likely that the sentiment prediction agrees with the label for this review. Let's now examine another review:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "second_decoded_review = ' '.join([reverse_word_index.get(i - 3, '?') for i in x_test[5]])\n", "regex.sub('', second_decoded_review)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('Labeled sentiment for this review is {}, predicted sentiment is {}'.format(get_sentiment(y_test[5]), \n", " get_sentiment(results[5])))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Again, it is likely (but not guaranteed) that the prediction agreed with the label for the test data. Note that there is no need to clean up any Batch Transform resources: after the transform job is complete, the cluster used to make inferences is torn down.\n", "\n", "Now that we've reviewed some sample predictions as a sanity check, we're finished. Of course, in a typical production situation, the data science project lifecycle is iterative, with repeated cycles of refining the model using a tool such as Amazon SageMaker's Automatic Model Tuning feature, and gathering more data. " ] } ], "metadata": { "kernelspec": { "display_name": "conda_tensorflow_p36", "language": "python", "name": "conda_tensorflow_p36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" } }, "nbformat": 4, "nbformat_minor": 2 }