{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Blazing Text multi label Classification example\n", "\n", "## Introduction\n", "\n", "Multi label text classification is an important aspect of Natural Language Processing, supporting many varied use cases, like search, recommendations, ranking and sentiment analysis. The goal here is to automatically classify new text input into one or more pre-defined categories, for the consuming application to decide how to handle best. Toxicity has been a problem amongst the game community for some time now, and this notebook sets out to solve this problem using SageMakers BlazingText algorithm. \n", "\n", "BlazingText implements the fastText text classification model, optimizing it to leverage GPU acceleration, meaning you can train a model on more than a billion words in a couple of minutes using a multi-core CPU or a GPU, whilst achieving performance on par with the state-of-the-art deep learning text classification algorithms. \n", "\n", "BlazingText on Amazon SageMaker further extends this model with features like Early Stopping and Model Tuning, so that you don’t have to worry about setting the right hyperparameters out of the box.\n", "\n", "The following figure, from the [launch announcement blog post](https://aws.amazon.com/blogs/machine-learning/enhanced-text-classification-and-word-vectors-using-amazon-sagemaker-blazingtext/), depicts the simple yet powerful architecture behind the text classification model:\n", "\n", "![Architecture](https://d2908q01vomqb2.cloudfront.net/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59/2018/07/11/SageMaker-blazingtext-1.gif)\n", "\n", "Word embedding is an approach for representing words using a dense vector representation, giving us an improvement over the traditional pure Bag of Words model encoding schemes and their large sparse vectors by using a continuous vector space. The position of a word within the vector space is learned from text and is based on the words that surround the word when it is used, set via the `word_ngrams` HyperParameter. \n", "The position of a word in this learned vector space is referred to as its embedding, and the number of dimensions it uses are controlled by the `vector_dim` Hyper Parameter. A lookup is done on this embedding layer to get the vector representations of the words in the sentence. The word representations are then averaged into a text representation, which is finally forwarded into a linear classifier. It is virtually impossible to have all the words that we could come across during inference in our training dataset, so generating semantic representations for these words is much more useful than other strategies, like ignoring these words altogether or using random vectors for them. BlazingText can generate meaningful vectors for out-of-vocabulary words by representing their vectors as the sum of the character n-gram vectors, making it, to a degree, tolerant of slang, misspellings, and the introduction of new words to convey familiar meanings.\n", "\n", "#### Run this notebook with the Python 3 DataScience kernel on an ml.t3.medium or above instance\n", "\n", "The notebook is divided into the following steps\n", "\n", "- [Pre-requisite install and setup](#1.\\)-Pre-requisite-install-and-setup)\n", "- [Data download and exploration](#2.\\)-Data-download-and-exploration)\n", "- [Pre-Processing](#3.\\)-Pre-Processing)\n", "- [Training](#4.\\)-Training)\n", "- [Deploy](#5.\\)-Deploy)\n", "- [Test](#6.\\)-Test)\n", "- [Cleanup (optional)](#7.\\)-Cleanup)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1.) Pre-requisite install and setup\n", "\n", "Here we install essential libraries and tools and create some utility functions and helpers\n", "\n", "We start by making sure our main dependencies are installed." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%bash\n", "\n", "deps=(\"kaggle nltk\" \"pandas\" \"sagemaker\")\n", "for dep in ${deps[@]}; do\n", " echo \"testing for ${dep}...\"\n", " pip show ${dep} > /dev/null\n", " if [ $? -ne 0 ]; then\n", " echo \"installing ${dep}\" && pip install ${dep}\n", " else\n", " echo \"found : ${dep}\"\n", " fi\n", "done\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Import the general purpose libraries and helpers we need" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "import boto3\n", "from sagemaker import get_execution_role\n", "from sagemaker.serializers import JSONSerializer\n", "from sagemaker.sklearn import SKLearnModel\n", "from sagemaker.serverless import ServerlessInferenceConfig\n", "from pprint import pprint\n", "import os, sys, random, time, json\n", "import pandas as pd\n", "import nltk, re" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2.) Data download and exploration" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this segment, we download the dataset locally using the kaggle cli utility and explore it, so we know what we need to do to prepare it for the BlazingText algorithm. \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We need to check for the existence of a kaggle API key, so we can download the dataset.\n", "\n", "If this cell fails, please\n", "\n", "- sign up for a free kaggle account at www.kaggle.com\n", "- create and download an API key, as per https://www.kaggle.com/docs/api#authentication\n", "- upload it to this directory as 'kaggle.json'\n", "- accept the terms of the competition at [the Kaggle website](https://www.kaggle.com/c/jigsaw-toxic-comment-classification-challenge/rules)\n", "- run this cell again" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cwd = os.getcwd()\n", "homeDir = os.path.expanduser(\"~\")\n", "kaggleDir=\"{}/.kaggle\".format(homeDir)\n", "kaggleFile=\"kaggle.json\"\n", "kaggleFQP=\"{}/{}\".format(kaggleDir, kaggleFile)\n", "\n", "if not os.path.isdir(kaggleDir):\n", " print(\"creating kaggle directory : {}\".format(kaggleDir))\n", " os.mkdir( kaggleDir, 0o755 )\n", "\n", "print(\"checking for kaggle api key : {}\".format(kaggleFQP))\n", "if not os.path.isfile(kaggleFQP):\n", " if not os.path.isfile(kaggleFile):\n", " print(\"kaggle api credentials are missing\")\n", " print(\"you need to create a free API key at kaggle.com and upload it to this directory as '{}'\".format(kaggleFile))\n", " print(\"please see https://www.kaggle.com/docs/api#authentication for details\")\n", " sys.exit(1)\n", " else:\n", " print(\"found local '{}' file, moving it to : '{}'\".format(kaggleFile, kaggleFQP))\n", " os.rename(kaggleFile, kaggleFQP)\n", " os.chmod(kaggleFQP, 0o600)\n", "else:\n", " print(\"found kaggle api key\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we can download the dataset locally using the configured kaggle cli, passing in the competition name as an identifier.\n", "\n", "If the cell below fails, make sure you accepted the the terms of the competition at [the Kaggle website](https://www.kaggle.com/c/jigsaw-toxic-comment-classification-challenge/rules), which enables your API key for this dataset." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%bash\n", "\n", "kaggle competitions download -c jigsaw-toxic-comment-classification-challenge" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "With the dataset downloaded, the next step is to unpack it" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%bash\n", "\n", "ls -l\n", "if [ -f jigsaw-toxic-comment-classification-challenge.zip ]; then\n", "\n", " if [ -d ./data ]; then\n", " rm -rf ./data\n", " fi\n", " echo \"unzipping the kaggle dataset\"\n", " unzip ./jigsaw-toxic-comment-classification-challenge.zip -d ./data\n", " rm -f jigsaw-toxic-comment-classification-challenge.zip\n", " pushd ./data\n", " for f in *.zip; do\n", " echo \"unpacking data set : ${f}\"\n", " unzip ${f} && rm -f ${f}\n", " done\n", " popd && ls -l ./data\n", "fi" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The datasets are not so big, and so we will just load them directly into Pandas DataFrames for further investigation. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"loading: training data\")\n", "trainData = pd.read_csv('data/train.csv')\n", "print(\"loading: test labels\")\n", "labelData = pd.read_csv('data/test_labels.csv')\n", "print(\"loading: full data\")\n", "testData = pd.read_csv('data/test.csv')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A brief look tells us that we've actually got 5,066,773 rows of training data with 8 columns, and 2,460,933 rows of test and label data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"(rows, columns) = {}\".format(trainData.shape))\n", "print(\"(rows, columns) = {}\".format(labelData.shape))\n", "print(\"(rows, columns) = {}\".format(testData.shape))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Exploring the data sets, we can see in 'train.csv' it has been One Hot Encoded already, with each category of toxicity having its own column, and the test and test_labels datasets are logically joined on the id column. \n", "\n", "There is no column to explicitly represent *'Not toxic'*, and so we will have to handle this in the pre-processor.\n", "\n", "The text is raw text, with mis-spellings, slang , plurals, etc, and will need some work doing to it." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "trainData.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Looking at the test data we see an id and comment text, unprocessed as before." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "testData.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And so we know the label data must join to the test data set via the id column, hence the matching row count.\n", "We can also see that we need to zero out negative values in the data set." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "labelData.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 3.) Pre-Processing\n", "\n", "In NLP projects, pre-processing can take up a lot of cpu time and memory, and so we are going to offload it onto bigger resources, using the SageMaker processing feature.\n", "\n", "As an overview, we create a script that wraps up what we want to do, and create a Processing job, defining the resources, the data location and the processing script we want run.\n", "\n", "Let's start by defining the data locations, where our inout data will be copied to, and our processed data can be found at." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "inputPrefix = \"/opt/ml/processing/input\"\n", "outputPrefix = \"/opt/ml/processing/output\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When you create a SageMaker Processing job, you have to supply a script to take the supplied inputs, transform them as required, and place them into the defined output locations. \n", "\n", "Let's start building the scripts that will do this work for us. The first step is to create a folder to keep everything." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!mkdir -p scripts" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We're going to break out our actual line processing function into a seperate file, to re-use when we deploy our endpoint.\n", "\n", "We import all of our NLP pre-processing utilities, the Natural Language Toolkit (nltk), regular expression libraries etc.\n", "We also supress Warnings in Beautiful Soup around url's, as the input is so dirty it can contain anything.\n", "\n", "Using the `%%writefile` cell magic, we start by importing some basic libraries, and creating a couple helper functions to also install NLTK, BeautifulSoup etc onto our scikit-learn container, and the compilers they need to build. \n", "You can append to a file uising the `-a` flag." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile scripts/process.py\n", "\n", "import nltk\n", "import re\n", "import warnings\n", "from nltk.corpus import stopwords\n", "from nltk.stem import WordNetLemmatizer\n", "from nltk.tokenize import word_tokenize\n", "from nltk.corpus import wordnet\n", "from bs4 import BeautifulSoup\n", "import contractions\n", "import inflect\n", "\n", "warnings.filterwarnings(\"ignore\", category = UserWarning, module = 'bs4')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "From nltk we download a tokenizer ruleset called 'punkt', a stopwords collection, our lemmatization rules, and store them locally. \n", "We then create a stop words collection and a Lemmatizer instance from the downloaded artifacts. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile -a scripts/process.py\n", "\n", "print(\"downloading nltk resources\")\n", "downloader = nltk.downloader.Downloader()\n", "downloader._update_index()\n", "downloader.download('punkt')\n", "downloader.download('stopwords')\n", "downloader.download('wordnet')\n", "downloader.download('omw-1.4')\n", "\n", "stopWords = stopwords.words('english')\n", "lemma = WordNetLemmatizer()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is the function that actually does all of our pre-processing on the raw text, using the libraries we prepared earlier.\n", "For each piece of training data, we\n", "- Lower case it\n", "- Strip white space\n", "- Remove markup\n", "- Tokenize it\n", "- For each token we\n", " - Convert numbers to text\n", " - Throw away\n", " - anything shorter than 2 characters\n", " - anything in the stop words collection\n", " - Lemmatize it to remove plurals, prefixes, suffixes etc\n", "- Combine the tokens back into a string\n", "- return the result" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile -a scripts/process.py\n", "\n", "def processLine(line):\n", " \n", " if isinstance(line, str) == False:\n", " return \"\"\n", "\n", " filteredSentence = []\n", " # lower case everything, and remove basic whitespace\n", " line = line.lower()\n", " line = line.strip()\n", " line = re.sub('\\s+', ' ', line)\n", " # strip html\n", " beauty = BeautifulSoup(line,'html.parser')\n", " line = beauty.get_text()\n", " # strip url's\n", " line = re.sub(r'https\\S', '', line)\n", " # remove brackets\n", " line = re.sub('\\(.*?\\)', '', line)\n", " # expand contractions\n", " line = contractions.fix(line)\n", "\n", " for word in word_tokenize(line):\n", " if (word.isnumeric()):\n", " try:\n", " filteredSentence.append(inflect.engine().number_to_words(word))\n", " except Exception as e:\n", " # print(\"Couldn't convert number : '{}'\".format(word))\n", " pass\n", "\n", " elif (len(word) > 2) and (word not in stopWords):\n", " fixed = lemma.lemmatize(word, pos = wordnet.VERB)\n", " filteredSentence.append(fixed)\n", " result = \" \".join(filteredSentence)\n", " return result" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And that is our core processor written out. \n", "\n", "Now we can start on the wrapper script we will pass to our Processing job.\n", "Initially, we create a utility function to get our helper libraries installed onto our Processing environment." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile scripts/preprocessing.py\n", "import subprocess\n", "import sys\n", "import os\n", "import argparse\n", "import pandas as pd\n", "from sklearn.utils import shuffle\n", "from sklearn.model_selection import train_test_split\n", "\n", "def pipInstall(package):\n", " subprocess.check_call([sys.executable, \"-m\", \"pip\", \"install\", package])\n", "\n", "print(\"installing : NLTK\")\n", "pipInstall (\"nltk\")\n", "print(\"installing : BeautifulSoup\")\n", "pipInstall(\"bs4\")\n", "print(\"installing : Inflect\")\n", "pipInstall(\"inflect\")\n", "print(\"installing : Contractions\")\n", "pipInstall(\"contractions\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "At this point, we want to inline our line processing function, since we can only pass a single file to our Processing environment." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!cat scripts/process.py >> scripts/preprocessing.py" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is a simple utility function to convert a training row into the format that BlazingText expects, \n", "prepending the text with the category labels, and then adding the processed text.\n", "\n", "We also create a dictionary to hold our Classification Labels lookup." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile -a scripts/preprocessing.py\n", "\n", "indexToLabel = {}\n", "\n", "def transformInstance(row):\n", " resultRow = []\n", " for i in range(2, 8):\n", " if ( row.iloc[i] > 0 ):\n", " label = \"__label__\" + indexToLabel[str(i - 1)]\n", " resultRow.append(label)\n", " \n", " # add a 'none' label if we didn't get any specific category assigned\n", " if len(resultRow) == 0:\n", " resultRow.append(\"__label__none\")\n", "\n", " text = processLine(row[1])\n", " resultRow.append(text)\n", " return ' '.join(map(str, resultRow))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "At this point, we need a mapper between a one hot encoded column and a category, based on column index, \n", "so we create a dictionary to represent this mapping." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile -a scripts/preprocessing.py\n", "\n", "def createLabels(labels):\n", " print(\"generating labels\")\n", " for i in range(1, 7):\n", " label = labels.iloc[:, i].name\n", " indexToLabel[str(i)] = label.strip()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Another simple utility function to set the None category on the label data set. A `negative one` value in the label data means that row wasn't used for measuring, and so all we care about is if all our columns are zero value then we are not toxic." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile -a scripts/preprocessing.py\n", "\n", "def calcNone(row):\n", " for v in row.loc[\"toxic\":\"identity_hate\"]:\n", " if (v < 0): return -1\n", " if (v > 0): return 0\n", " return 1" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we get to the entry point for the Pre-Processing script.\n", "This takes the following steps...\n", "\n", "- checks for any args that can override the default data input and output locations\n", "- checks that the files have landed from S3\n", "- creates our Classification Label stems\n", "- pushes each line of training data thru our filter\n", "- shuffles the result\n", "- splits into 90% training and 10% validation data sets\n", "- writes these datasets to csv, ready for uploading to S3 by SageMaker\n", "\n", "For convenience, we also combine the `test.csv` and `test_labels.csv` datasets into a new one, matching on the `id` column, and drop any rows containing a `negative one` value, as these weren't used for scoring the competition. \n", "We start by setting a common index, and concatenating them together." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile -a scripts/preprocessing.py\n", "\n", "if __name__ == \"__main__\":\n", "\n", " # handle any args\n", " parser = argparse.ArgumentParser()\n", " parser.add_argument(\"--inputPrefix\", type = str, default = \"/opt/ml/processing/input\")\n", " parser.add_argument(\"--outputPrefix\", type = str, default = \"/opt/ml/processing/output\")\n", " args, _ = parser.parse_known_args()\n", "\n", " print(\"Received arguments {}\".format(args))\n", "\n", " # capture our paths\n", " inputPrefix = args.inputPrefix\n", " outputPrefix = args.outputPrefix\n", "\n", " print(\"checking files are local, and getting their sizes...\")\n", " if not os.path.isdir(inputPrefix):\n", " print(\"input prefix directory doesn't exist : {}\".format(inputPrefix))\n", "\n", " fileList = filter( lambda x: os.path.isfile(os.path.join(inputPrefix, x)), os.listdir(inputPrefix) )\n", " fileSizes = [ \n", " (fileName, os.stat(os.path.join(inputPrefix, fileName)).st_size) \n", " for fileName in fileList \n", " ]\n", " for fileName, size in fileSizes:\n", " print(inputPrefix, ' --> ', fileName, ' --> ', size, ' bytes') \n", "\n", " # Now we load all of the datasets into pandas data frames, to work with\n", " trainPath = os.path.join(inputPrefix, \"train.csv\")\n", " print(\"loading: training data from : {}\".format(trainPath))\n", " trainData = pd.read_csv(trainPath)\n", " testPath = os.path.join(inputPrefix, \"test.csv\")\n", " print(\"loading: test data from : {}\".format(testPath))\n", " testData = pd.read_csv(testPath)\n", " labelsPath = os.path.join(inputPrefix, \"test_labels.csv\")\n", " print(\"loading: test labels data from : {}\".format(labelsPath))\n", " labelData = pd.read_csv(labelsPath)\n", "\n", " # popuate the labels lookup\n", " createLabels(labelData)\n", "\n", " # kick off the pre-processing\n", " # with the training data, \n", " # writing the result into a new column.\n", " print(\"pre-processing {} rows of training data\".format(trainData.shape[0]))\n", " trainData[\"processed\"] = trainData.apply(lambda row: transformInstance(row), axis = 1)\n", "\n", " # Next we want to pre-process the test data as well, also writing it into a new column\n", " print(\"pre-processing {} rows of test data\".format(testData.shape[0]))\n", " testData[\"processed\"] = testData.apply(lambda row: processLine(row[1]), axis = 1)\n", " \n", " print(\"pre-processing {} rows of test_labels data\".format(labelData.shape[0]))\n", " # calculate the none field\n", " labelData[\"none\"] = labelData.apply(lambda row: calcNone(row), axis = 1)\n", " # join the 2 datasets into one linked one\n", " if testData.index.name != \"id\":\n", " testData.set_index(\"id\", inplace = True)\n", " if labelData.index.name != \"id\":\n", " labelData.set_index(\"id\", inplace = True)\n", " mergedDf = pd.concat([testData, labelData], axis = 1)\n", " \n", " # and drop anything that wasn't used in the competition marking\n", " mergedDf = mergedDf.loc[mergedDf[\"toxic\"] != -1]\n", " print(mergedDf.head(10))\n", " print(\"done pre-processing\")\n", "\n", " # Shuffle the data in case it is grouped\n", " shuffle(trainData)\n", "\n", " # Split into train and validation data sets\n", " train, validation = train_test_split(trainData, test_size = 0.1)\n", " print(\"train:validation split = {}:{} rows\".format(train.shape[0], validation.shape[0]))\n", "\n", " try:\n", " print(\"creating output directory : {}\".format(outputPrefix))\n", " if not os.path.isdir(outputPrefix):\n", " os.makedirs(outputPrefix)\n", " print(\"successfully created directories\")\n", " except Exception as e:\n", " print(e)\n", " print(\"could not make directories\")\n", " pass\n", "\n", " train.to_csv(os.path.join(outputPrefix, \"train.csv\"), columns = [\"processed\"], header = False, index = False)\n", " validation.to_csv(os.path.join(outputPrefix, \"validation.csv\"), columns=[\"processed\"], header = False, index = False)\n", " trainData.to_csv(os.path.join(outputPrefix, \"full.csv\"), index = False)\n", " mergedDf.to_csv(os.path.join(outputPrefix, \"test.csv\"), index = True)\n", "\n", " print(\"csv files successfully written to : {}\".format(outputPrefix))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we have the pre-processing script, we can create the job definition, and run it on a suitably sized machine. \n", "This brings our processing time down to around 36 minutes from over 100 minutes if run locally on an ml.t3.medium, allowing us to keep our Jupyter notebook kernel as small and economic as possible. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "from sagemaker.sklearn.processing import SKLearnProcessor\n", "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "role = get_execution_role()\n", "\n", "sklearnProcessor = SKLearnProcessor(\n", " framework_version = \"0.23-1\", role = role, instance_type = \"ml.c5.4xlarge\", instance_count = 1\n", ")\n", "\n", "sklearnProcessor.run(\n", " code = \"scripts/preprocessing.py\",\n", " inputs = [\n", " ProcessingInput(source = \"data\", destination = inputPrefix, input_name = \"input\")\n", " ], \n", " outputs = [\n", " ProcessingOutput(source = outputPrefix, output_name = \"output\")\n", " ], \n", " arguments = [\"--inputPrefix\", inputPrefix, \"--outputPrefix\", outputPrefix]\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To load the pre-processed data for validation, and for our subsequent training job, we need the S3 path the results were stored to. We can get from the Processing Job Description, as follows." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "jobDescription = sklearnProcessor.jobs[-1].describe()\n", "outputConfig = jobDescription[\"ProcessingOutputConfig\"]\n", "print(json.dumps(outputConfig, indent = 2))\n", "output = outputConfig[\"Outputs\"][0]\n", "s3Path = output[\"S3Output\"][\"S3Uri\"]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "At this point we can load a sample from our pre-processed datasets into pandas data frames directly from S3, to check the pre-processing is as expected." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "s3TrainData = os.path.join(s3Path, 'train.csv')\n", "s3ValidationData = os.path.join(s3Path, 'validation.csv')\n", "print(\"loading: full data\")\n", "s3FullData = os.path.join(s3Path, 'full.csv')\n", "fullData = pd.read_csv(s3FullData)\n", "print(\"loading: test data\")\n", "s3TestData = os.path.join(s3Path, 'test.csv')\n", "testData = pd.read_csv(s3TestData)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we can compare the before and after states from pre-processing the text, and confirm the results [look good](https://docs.aws.amazon.com/sagemaker/latest/dg/blazingtext.html) for BlazingText to train on." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "test, processed = fullData.iloc[2][\"comment_text\"], fullData.iloc[2][\"processed\"]\n", "print(\"raw : '{}'\\npre-processed : '{}'\".format(test, processed))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 4.) Training" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Time to start setting up SageMaker for training on our prepared data sets. \n", "\n", "To do this, we define a prefix for that bucket, to isolate this workload\n", "and then we grab \n", "\n", "- a reference to our session\n", "- our default bucket name\n", "- the role that SageMaker is running under" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "prefix = 'toxicity/blazingtext'\n", "sess = sagemaker.Session()\n", "bucket = sess.default_bucket()\n", "s3OutputLocation = 's3://{}/{}/output'.format(bucket, prefix)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can tell SageMaker where to find its training and validation data via the `data_channels` construct. \n", "\n", "This requires \n", "- the path to the data in S3\n", "- the distribution mode of the data. `FullyReplicated` means that each training node will get a full copy of the data. The alternate setting is `ShardedByS3Key`, for when you are training with a cluster and can optimize by partitioning the data. We can only use one trainig node with the Classification mode of Blazing Text, and so we select `FullyReplicated`.\n", "- the content (mime) type of the data\n", "- The S3 data type, being one of `S3Prefix`, `ManifestFile`, `AugmentedManifestFile`. This denotes what the first parameter is pointing to, in our case, an S3 Bucket and key prefix.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "trainInput = sagemaker.inputs.TrainingInput(\n", " s3TrainData, \n", " distribution = 'FullyReplicated', \n", " content_type = 'text/plain', \n", " s3_data_type = 'S3Prefix')\n", "\n", "validationInput = sagemaker.inputs.TrainingInput(\n", " s3ValidationData, \n", " distribution = 'FullyReplicated', \n", " content_type = 'text/plain', \n", " s3_data_type = 'S3Prefix')\n", "\n", "dataChannels = { 'train': trainInput, 'validation': validationInput }" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The last piece of information we need to configure our training job is the Fully Qualified `URI` for the latest public BlazingText container from the ECR service, in our current region." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "regionName = sess.boto_region_name\n", "container = sagemaker.image_uris.retrieve(\"blazingtext\", regionName, \"latest\", py_version = \"py3\")\n", "print('Using SageMaker BlazingText container: {} ({})'.format(container, regionName))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "With this information, we can define the SageMaker training job itself, by setting our [algorithm specific hyperparameters](https://docs.aws.amazon.com/sagemaker/latest/dg/blazingtext_hyperparameters.html) and the generic SageMaker training parameters via the standard [Estimator](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html) construct.\n", "\n", "The Algorithm Specic Hyperparameters for Blazing Text in Classification mode can be sumarized as\n", "\n", "\n", "| Parameter Name | Parameter Type | Recommended Ranges or Values | Required | Default | Description | |\n", "|----------------|-----------------------------|---------------------------------|----------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------|---|\n", "| buckets | `IntegerParameterRange` | [1000000-10000000] | N | 2000000 | The number of hash buckets to use for word n-grams | |\n", "| early_stopping | `Boolean` | [`True`, `False`] | N | `False` | Whether to stop training if validation accuracy doesn't improve after a patience number of epochs | |\n", "| epochs | `IntegerParameterRange` | [5-15] | N | 5 | The maximum number of complete passes through the training data | |\n", "| learning_rate | `ContinuousParameterRange` | MinValue: 0.005, MaxValue: 0.01 | N | 0.05 | The step size used for parameter updates | |\n", "| min_count | `IntegerParameterRange` | [0-100] | N | 5 | Words that appear less than min_count times are discarded | |\n", "| min_epochs | `IntegerParameterRange` | [0-100] | N | 5 | The minimum number of epochs to train before early stopping logic is invoked | |\n", "| mode | `CategoricalParameterRange` | [`supervised`] | Y | N/A | The training mode | |\n", "| patience | `IntegerParameterRange` | [0-100] | N | 4 | The number of epochs to wait before applying early stopping when no progress is made on the validation set. Used only when early_stopping is True | |\n", "| vector_dim | `IntegerParameterRange` | [32-300] | N | 100 | The dimension of the embedding layer | |\n", "| word_ngrams | `IntegerParameterRange` | [1-3] | N | 2 | The number of word n-gram features to use | |" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can see from the size of our hyper parameters construct that we are accepting most of the defaults, we set the training mode to supervized and then only override the values for \n", "\n", "- number of epochs the training should run for\n", "- minimum word count\n", "- the number of vector dimensions\n", "\n", "For the general Estimator properties, we are using one instance of an ml.c5.4xlarge, with a default 30GB EBS Volume attached, and sending the data via `File` mode, where the process starts off by pulling all the data from S3 to the local attached EBS volume and ingests it as a file from there." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hp = {\n", " \"mode\": \"supervised\",\n", " \"epochs\": 50,\n", " \"min_count\": 2,\n", " \"vector_dim\": 10\n", "}\n", "\n", "bt_estimator = sagemaker.estimator.Estimator(container,\n", " role, \n", " instance_count = 1, \n", " instance_type = 'ml.c5.4xlarge',\n", " input_mode = 'File',\n", " output_path = s3OutputLocation,\n", " hyperparameters = hp)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can now invoke the training, passing in our training and validation data set path, from S3.\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The supervized BlazingText algorithm measures success by looking to maximize accuracy, defined as the classification accuracy on the user-specified validation dataset\n", "\n", "You should achieve something in the area of 94% accuracy using the hyperparameter settings from above, after around just 5 minutes of elapsed training time." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bt_estimator.fit(inputs = dataChannels, logs = True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "With training completed, we can retrieve the location in s3 where our model artifact has been saved." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "training_job_name = bt_estimator.latest_training_job.name\n", "training_job_info = sess.sagemaker_client.describe_training_job(TrainingJobName = training_job_name)\n", "s3_model_artifact = training_job_info['ModelArtifacts']['S3ModelArtifacts']" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 5.) Deploy\n", "\n", "We can now go about deploying the model into production, using initially one instance of an ml.t2.large virtual machine. For real traffic volumes you would likely use a bigger machine.\n", "\n", "Because we are optimizing for cost, and we want the consumer of our endpoint to be freed from knowing and re-implementing our pre-processing rules, we will take our BlazingText trained model, and host it in a generic SciKit Learn container, using fastText and our pre-processing code from before. \n", "The provided algorithm containers do not support injecting your own code, but the framework containers do, and provide a number of lifecycle hooks for us to implement what we need." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, we create a requirements.txt file that holds our dependencies, that the SageMaker platform will install for us at launch.\n", "\n", "For more on this, please see [Using Scikit-learn with the SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/using_sklearn.html)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile scripts/requirements.txt\n", "fastText\n", "numpy\n", "sklearn\n", "pandas\n", "nltk\n", "bs4\n", "inflect\n", "contractions\n", "sagemaker" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The last component is to build our pre-processing code up, as before.\n", "\n", "We start with some standard imports, and pull in our processLine function that we created before." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile scripts/serve.py\n", "import json\n", "import os\n", "import fasttext as ft\n", "import numpy as np\n", "from process import processLine\n", "from sagemaker.serializers import CSVSerializer\n", "import multiprocessing\n", "import os\n", "\n", "cpu_count = multiprocessing.cpu_count()\n", "model_server_timeout = os.environ.get('MODEL_SERVER_TIMEOUT', 120)\n", "model_server_workers = int(os.environ.get('MODEL_SERVER_WORKERS', cpu_count))\n", "\n", "CONTENT_TYPE_JSON = 'application/json'\n", "CONTENT_TYPE_CSV = \"text/csv\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "At this point we just need to implement the SageMaker lifecycle hooks to achieve 4 objectives. \n", "The first is to load our model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile -a scripts/serve.py\n", "\n", "def model_fn(model_dir):\n", " model_path = os.path.join(model_dir, \"model.bin\")\n", " if not os.path.isfile(model_path):\n", " print(\"model_fn : no model at : {}\".format(model_path))\n", " for everything in os.listdir(model_dir):\n", " print(\"model_fn : found file : {}\".format(everything))\n", " return None\n", " return ft.load_model(model_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The second is to transform our input from JSON" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile -a scripts/serve.py\n", "\n", "def input_fn(serialized_input_data, content_type = CONTENT_TYPE_JSON):\n", " #print(\"input_fn : {}\".format(serialized_input_data))\n", " #print(\"input_fn : Using '{}' Parser\".format(content_type))\n", " if (CONTENT_TYPE_JSON == content_type):\n", " input_data = json.loads(serialized_input_data)\n", " return input_data\n", " elif (CONTENT_TYPE_CSV == content_type):\n", " input_data = {\n", " \"configuration\": { \"k\": 7 }, \n", " \"instances\" : serialized_input_data.splitlines()\n", " }\n", " return input_data\n", " else:\n", " raise Exception('Unsupported content type')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Third, we need to implement the prediction call. Here we are looping thru all of the submitted lines for inference, and passing them thru our pre-processor function, before calling into the model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile -a scripts/serve.py\n", "\n", "def predict_fn(input_data, model):\n", " k = 2\n", " processed = []\n", " #print(\"predict_fn : \".format(input_data))\n", " if not input_data[\"configuration\"] is None:\n", " config = input_data[\"configuration\"]\n", " if not config[\"k\"] is None:\n", " k = int(config[\"k\"])\n", " if not input_data[\"instances\"] is None:\n", " instances = input_data[\"instances\"]\n", " for line in instances:\n", " processedLine = processLine(line)\n", " processed.append(processedLine)\n", "\n", " prediction = model.predict(processed, k)\n", " return np.array(prediction)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Last, we transform the results back into the correct output format" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile -a scripts/serve.py\n", "\n", "def output_fn(prediction_output, accept = CONTENT_TYPE_JSON):\n", " #print(\"output_fn : Using '{}' Parser\".format(accept))\n", " if CONTENT_TYPE_JSON == accept:\n", " return json.dumps(prediction_output.tolist()), accept\n", " elif CONTENT_TYPE_CSV == accept:\n", " return CSVSerializer().serialize(prediction_output.tolist()), accept\n", " raise Exception('Unsupported content type')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This gives us everything we need to create a Model object to deploy. \n", "We copy the whole `scripts` folder across, and set the entry point to be our `serve.py` file." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sklearn_model = SKLearnModel(\n", " model_data = s3_model_artifact,\n", " source_dir = \"scripts\", \n", " entry_point = \"serve.py\",\n", " framework_version = \"0.23-1\",\n", " role = role\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll use the new [Serverless Inference feature](https://docs.aws.amazon.com/sagemaker/latest/dg/serverless-endpoints.html), so we can leave our endpoint deployed and only be charged when we actually make predictions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "serverless_config = ServerlessInferenceConfig(\n", " memory_size_in_mb = 2048,\n", " max_concurrency = 5,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We attach a JSONSerializer for marshalling JSON inputs to the inference endpoint in its native format, and deploy the endpoint. \n", "\n", "(Because we are optimizing for cost and using the scikit learn container as a base, we need to install all of our dependencies at deployment time, which can add some time overhead to our deployment. In a production environment, you would either want to create a finalised container for using with serverless inference, or use the auto-scaling server backed option.)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "classifier_endpoint = sklearn_model.deploy(\n", " serverless_inference_config = serverless_config, \n", " serializer = JSONSerializer()\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here we get back the SageMaker Endpoint name, which we use for testing, and you would also use if you were to invoke the endpoint from a Lambda function or similar, using the AWS SDK" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "endpoint_name = classifier_endpoint.endpoint_name\n", "endpoint_name" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 6.) Test\n", "\n", "At this stage we have a live inference endpoint available to us, so we can run some test queries against it. \n", "\n", "Let's specify the index so we can pull specific examples out by their id." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if testData.index.name != \"id\":\n", " testData.set_index(\"id\", inplace = True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First up, we'll test a potentially contentious sentence, that has some potential trigger words it, but no malice.\n", "\n", "We want to test with raw examples, and make sure our pre-processing happens on the inference side, so we can just create a Plain Old Javascript Object (POJO) and send it off to our inference endpoint by calling `predict`, and finally print the results back." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "simpleTest = testData.loc[\"001d39c71fce6f78\", \"comment_text\":\"none\"]\n", "print(simpleTest)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We pack this example inside a POJO as the `instances` property. \n", "\n", "With this payload we invoke the endpoint, and print the results out." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can see from the results, with the `None` category gettng the highest weighting, that our model wasn't fooled by this simple test." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "runtime_client = boto3.client(\"runtime.sagemaker\")\n", "payload = { \"instances\" : [simpleTest.comment_text], \"configuration\" : { \"k\": 1 } }\n", "response = runtime_client.invoke_endpoint(\n", " EndpointName = endpoint_name, ContentType = \"application/json\", Body = json.dumps(payload)\n", ")\n", "response_payload = json.loads(response['Body'].read().decode(\"utf-8\"))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print (\"response_payload: {}\".format(pprint(response_payload)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, let's try making many predictions against our endpoint using a mix of samples pulled from our test dataset.\n", "We create a helper function here to find us a random example of a toxic or non-toxic sentence, determined by the functions only parameter." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "testRowCount = testData.shape[0]\n", "def findTestCandidate(toxic):\n", " randId = random.randint(0, testRowCount - 1)\n", " result = testData.iloc[randId]\n", " if type(result[\"comment_text\"]) is str:\n", " if result.none != toxic:\n", " return result\n", " return findTestCandidate(toxic)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we can test our endpoint, pulling back 4 categories by using the `configuration.k` parameter. \n", "This option tells the Blazing Text Classification algorithm how many matched categories with weighting we want to get back in each result.\n", "\n", "You can run this cell as often as you like, and experiment with changing the `True` to `False` to select non-toxic examples." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "result = findTestCandidate(True)\n", "payload = {\n", " \"instances\" : [result.processed],\n", " \"configuration\": { \"k\": 4 }\n", " }\n", "# uncomment this next line if you are ok seeing the sentence\n", "# print(json.dumps(payload, indent = 2))\n", "response = runtime_client.invoke_endpoint(\n", " EndpointName = endpoint_name, ContentType = \"application/json\", Body = json.dumps(payload)\n", ")\n", "response_payload = json.loads(response['Body'].read().decode(\"utf-8\"))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print (\"response_payload: {}\".format(response_payload))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 7.) Cleanup\n", "\n", "Finally, we can optionally delete the serverless inference endpoint, but this isn't essential" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#classifier_endpoint.delete_endpoint()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And that's it :)" ] } ], "metadata": { "instance_type": "", "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" } }, "nbformat": 4, "nbformat_minor": 4 }