{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# An Introduction to SageMaker Random Cut Forests\n", "\n", "***Unsupervised anomaly detection on timeseries data a Random Cut Forest algorithm.***\n", "\n", "---\n", "\n", "1. [Introduction](#Introduction)\n", "1. [Setup](#Setup)\n", "1. [Training](#Training)\n", "1. [Inference](#Inference)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# Introduction\n", "***\n", "\n", "Amazon SageMaker Random Cut Forest (RCF) is an algorithm designed to detect anomalous data points within a dataset. Examples of when anomalies are important to detect include when website activity uncharactersitically spikes, when temperature data diverges from a periodic behavior, or when changes to public transit ridership reflect the occurrence of a special event.\n", "\n", "In this notebook, we will use the SageMaker RCF algorithm to train an RCF model on Satellite Comms Signal to Noise Ratio values. We will then use this model to predict anomalous events by emitting an \"anomaly score\" for each data point. The main goals of this notebook are,\n", "\n", "* to learn how to obtain, transform, and store data for use in Amazon SageMaker;\n", "* to create an AWS SageMaker training job on a data set to produce an RCF model,\n", "* use the RCF model to perform inference with an Amazon SageMaker endpoint.\n", "\n", "The following are ***not*** goals of this notebook:\n", "\n", "* deeply understand the RCF model,\n", "* understand how the Amazon SageMaker RCF algorithm works.\n", "\n", "If you would like to know more please check out the [SageMaker RCF Documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/randomcutforest.html)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Setup\n", "\n", "***\n", "\n", "*This notebook was tested in Amazon SageMaker Studio on a ml.t3.medium instance with Python 3 (Data Science) kernel.*\n", "\n", "Our first step is to setup our AWS credentials so that AWS SageMaker can store and access training data and model artifacts. We also need some data to inspect and to train upon." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Select Amazon S3 Bucket\n", "\n", "We first need to specify the locations where the original data is stored and where we will store our training data and trained model artifacts. ***This is the only cell of this notebook that you will need to edit.*** In particular, we need the following data:\n", "\n", "* `bucket` - An S3 bucket accessible by this account.\n", "* `prefix` - The location in the bucket where this notebook's input and output data will be stored. (The default value is sufficient.)\n", "* `downloaded_data_bucket` - The S3 bucket output bucket from pipeline 1.\n", "* `downloaded_data_prefix` - The location in the bucket where the data is stored." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "isConfigCell": true, "tags": [ "parameters" ] }, "outputs": [], "source": [ "import boto3\n", "import botocore\n", "import sagemaker\n", "import json\n", "from time import gmtime, strftime\n", "import pandas as pd\n", "import io\n", "\n", "\n", "bucket = sagemaker.Session().default_bucket() # Feel free to change to another bucket you have access to\n", "prefix = \"sagemaker/rcf-benchmarks\"\n", "execution_role = sagemaker.get_execution_role()\n", "region = boto3.Session().region_name\n", "print(f\"Using the following role: {execution_role}\")\n", "\n", "# *** Edit the following bucket name and prefix to read the json lines part files *** \n", "downloaded_data_bucket = \"BUCKET_NAME\"\n", "# To read multiple part files, specify the prefix leading to the files, ex. \"year=2022/month=12/day=21/hour=16/\"\n", "downloaded_data_prefix = \"BUCKET_PREFIX\"\n", "\n", "def check_bucket_permission(bucket):\n", " # check if the bucket exists\n", " permission = False\n", " try:\n", " boto3.Session().client(\"s3\").head_bucket(Bucket=bucket)\n", " except botocore.exceptions.ParamValidationError as e:\n", " print(\n", " \"Hey! You either forgot to specify your S3 bucket\"\n", " \" or you gave your bucket an invalid name!\"\n", " )\n", " except botocore.exceptions.ClientError as e:\n", " if e.response[\"Error\"][\"Code\"] == \"403\":\n", " print(f\"Hey! You don't have permission to access the bucket, {bucket}.\")\n", " elif e.response[\"Error\"][\"Code\"] == \"404\":\n", " print(f\"Hey! Your bucket, {bucket}, doesn't exist!\")\n", " else:\n", " raise\n", " else:\n", " permission = True\n", " return permission\n", "\n", "\n", "if check_bucket_permission(bucket):\n", " print(f\"Training input/output will be stored in: s3://{bucket}/{prefix}\")\n", "if check_bucket_permission(downloaded_data_bucket):\n", " print(f\"Downloaded training data will be read from s3://{downloaded_data_bucket}/{downloaded_data_prefix}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "s3 = boto3.client('s3')\n", "df = pd.DataFrame()\n", "\n", "def json_lines_to_json(s: str) -> str:\n", " # replace the first occurrence of '{'\n", " s = s.replace('{', '[{', 1)\n", "\n", " # replace the last occurrence of '}\n", " s = s.rsplit('}', 1)[0] + '}]'\n", "\n", " # now go in and replace all occurrences of '}' immediately followed\n", " # by newline with a '},'\n", " s = s.replace('}\\n', '},\\n')\n", "\n", " return s\n", "\n", "# Read JSON Lines\n", "for file in s3.list_objects(Bucket=downloaded_data_bucket, Prefix=downloaded_data_prefix)['Contents']:\n", " obj = s3.get_object(Bucket=downloaded_data_bucket, Key=file['Key'])['Body'].read().decode('utf-8') # get object\n", " json_obj = json.loads(json_lines_to_json(obj)) # convert and load json lines\n", " df_part = pd.json_normalize(json_obj) # normalize json array to df\n", " df = pd.concat([df, df_part], ignore_index=True) # merge to single df\n", " \n", "satcom_data = df.rename(columns={\"datetime\": \"timestamp\"})\n", "satcom_data.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Obtain and Inspect Example Data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Before training any models it is important to inspect our data, first. Perhaps there are some underlying patterns or structures that we could provide as \"hints\" to the model or maybe there is some noise that we could pre-process away. The raw data looks like this:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Human beings are visual creatures so let's take a look at a plot of the data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%matplotlib inline\n", "\n", "import matplotlib\n", "import matplotlib.pyplot as plt\n", "\n", "matplotlib.rcParams[\"figure.dpi\"] = 100\n", "\n", "satcom_data['fwdsnr'].plot(title=\"SNR Plot\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# drop any rows that have null or NaN fwdsnr to avoid model training failures\n", "satcom_data = satcom_data.dropna(subset=['fwdsnr'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Training\n", "\n", "***\n", "\n", "Next, we configure a SageMaker training job to train the Random Cut Forest (RCF) algorithm on the data." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Hyperparameters\n", "\n", "Particular to a SageMaker RCF training job are the following hyperparameters:\n", "\n", "* **`num_samples_per_tree`** - the number randomly sampled data points sent to each tree. As a general rule, `1/num_samples_per_tree` should approximate the the estimated ratio of anomalies to normal points in the dataset.\n", "* **`num_trees`** - the number of trees to create in the forest. Each tree learns a separate model from different samples of data. The full forest model uses the mean predicted anomaly score from each constituent tree.\n", "* **`feature_dim`** - the dimension of each data point.\n", "\n", "In addition to these RCF model hyperparameters, we provide additional parameters defining things like the EC2 instance type on which training will run, the S3 bucket containing the data, and the AWS access role. Note that,\n", "\n", "* Recommended instance type: `ml.m4`, `ml.c4`, or `ml.c5`\n", "* Current limitations:\n", " * The RCF algorithm does not take advantage of GPU hardware.\n", "\n", "The following training and deployment takes between 5-10 minutes for the sample dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker import RandomCutForest\n", "\n", "session = sagemaker.Session()\n", "\n", "# specify general training job information\n", "rcf = RandomCutForest(\n", " role=execution_role,\n", " instance_count=1,\n", " instance_type=\"ml.m4.xlarge\",\n", " data_location=f\"s3://{bucket}/{prefix}/\",\n", " output_path=f\"s3://{bucket}/{prefix}/output\",\n", " num_samples_per_tree=512,\n", " num_trees=50,\n", " base_job_name = f\"randomforest-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}\"\n", ")\n", "\n", "# automatically upload the training data to S3 and run the training job\n", "rcf.fit(rcf.record_set(satcom_data['fwdsnr'].to_numpy().reshape(-1, 1)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you see the message\n", "\n", "> `===== Job Complete =====`\n", "\n", "at the bottom of the output logs then that means training successfully completed and the output RCF model was stored in the specified output path. You can also view information about and the status of a training job using the AWS SageMaker console. Just click on the \"Jobs\" tab and select training job matching the training job name, below:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(f\"Training job name: {rcf.latest_training_job.job_name}\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# Inference\n", "\n", "***\n", "\n", "A trained Random Cut Forest model does nothing on its own. We now want to use the model we computed to perform inference on data. In this case, it means computing anomaly scores from input time series data points.\n", "\n", "We create an inference endpoint using the SageMaker Python SDK `deploy()` function from the job we defined above. We can deploy for either Real-time inference or serverless inference.\n", "\n", "The following step may take 5-10 minutes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "endpoint_name = f\"randomforest-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}\"\n", "# serverless_inference_config = sagemaker.serverless.serverless_inference_config.ServerlessInferenceConfig(memory_size_in_mb=1024, max_concurrency=5)\n", "# rcf_inference = rcf.deploy(serverless_inference_config=serverless_inference_config, endpoint_name = endpoint_name)\n", "rcf_inference = rcf.deploy(initial_instance_count=1, instance_type=\"ml.m4.xlarge\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Congratulations! You now have a functioning SageMaker RCF inference endpoint. You can confirm the endpoint configuration and status (\"InService\") by navigating to the \"Endpoints\" tab in the AWS SageMaker console and selecting the endpoint matching the endpoint name, below: " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(f\"Endpoint name: {rcf_inference.endpoint}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Data Serialization/Deserialization\n", "\n", "We can pass data in a variety of formats to our inference endpoint. In this example we will demonstrate passing CSV-formatted data. Other available formats are JSON-formatted and RecordIO Protobuf. We make use of the SageMaker Python SDK utilities `csv_serializer` and `json_deserializer` when configuring the inference endpoint." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "from sagemaker.serializers import CSVSerializer\n", "from sagemaker.deserializers import JSONDeserializer\n", "\n", "rcf_inference.serializer = CSVSerializer()\n", "rcf_inference.deserializer = JSONDeserializer()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's pass the training dataset, in CSV format, to the inference endpoint so we can automatically detect the anomalies we saw with our eyes in the plots, above. Note that the serializer and deserializer will automatically take care of the datatype conversion from Numpy NDArrays.\n", "\n", "For starters, let's only pass in the first six datapoints so we can see what the output looks like." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "satcom_data_numpy = satcom_data['fwdsnr'].to_numpy().reshape(-1, 1)\n", "print(satcom_data_numpy[:6])\n", "results = rcf_inference.predict(\n", " satcom_data_numpy[:6], initial_args={\"ContentType\": \"text/csv\", \"Accept\": \"application/json\"}\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Computing Anomaly Scores\n", "\n", "Now, let's compute and plot the anomaly scores from the entire dataset." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "results = rcf_inference.predict(satcom_data_numpy)\n", "scores = [datum[\"score\"] for datum in results[\"scores\"]]\n", "\n", "# add scores to data frame and print first few values\n", "satcom_data[\"score\"] = pd.Series(scores, index=satcom_data.index)\n", "satcom_data.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fig, ax1 = plt.subplots()\n", "ax2 = ax1.twinx()\n", "\n", "#\n", "# *Try this out* - change `start` and `end` to zoom in on the\n", "# anomaly found earlier in this notebook\n", "#\n", "start, end = 0, len(satcom_data)\n", "satcom_data_subset = satcom_data[start:end]\n", "\n", "ax1.plot(satcom_data_subset[\"fwdsnr\"], color=\"C0\", alpha=0.8)\n", "ax2.plot(satcom_data_subset[\"score\"], color=\"C1\")\n", "\n", "ax1.grid(which=\"major\", axis=\"both\")\n", "\n", "ax1.set_ylabel(\"SNR\", color=\"C0\")\n", "ax2.set_ylabel(\"Anomaly Score\", color=\"C1\")\n", "\n", "ax1.tick_params(\"y\", colors=\"C0\")\n", "ax2.tick_params(\"y\", colors=\"C1\")\n", "\n", "ax1.set_ylim(min(satcom_data[\"fwdsnr\"]), 1.4 * max(satcom_data[\"fwdsnr\"]))\n", "ax2.set_ylim(min(scores), 1.4 * max(scores))\n", "fig.set_figwidth(10)\n", "fig.suptitle(\"Anomaly Score overlayed on SNR value\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Note that the anomaly score spikes where our eyeball-norm method suggests there is an anomalous data point as well as in some places where our eyeballs are not as accurate.\n", "\n", "Below we print and plot any data points with scores greater than some number of standard deviations from the mean anomaly score." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "score_mean = satcom_data[\"score\"].mean()\n", "score_std = satcom_data[\"score\"].std()\n", "score_cutoff = score_mean + 2.85 * score_std\n", "\n", "anomalies = satcom_data_subset[satcom_data_subset[\"score\"] > score_cutoff]\n", "anomalies.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ax2.plot(anomalies.index, anomalies.score, \"ko\")\n", "fig" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "With the current hyperparameter choices we see that the standard-deviation threshold, while able to capture anomalies, is rather sensitive to fine-grained peruturbations and anomalous behavior. Adding trees to the SageMaker RCF model could smooth out the results as well as using a larger data set." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Lastly we write the anomalies to S3 in JSON lines format" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Write Anomalies to S3\n", "destination = f\"{prefix}/anomalies/anomalies-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}.json\"\n", "\n", "json_buffer = io.StringIO()\n", "\n", "anomalies.to_json(json_buffer,\n", " orient=\"records\",\n", " lines=True)\n", "\n", "s3 = boto3.resource('s3')\n", "my_bucket = s3.Bucket(bucket)\n", "\n", "my_bucket.put_object(Key=destination, Body=json_buffer.getvalue())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Stop and Delete the Endpoint\n", "\n", "Finally, we should delete the endpoint before we close the notebook.\n", "\n", "To do so execute the cell below. Alternately, you can navigate to the \"Endpoints\" tab in the SageMaker console, select the endpoint with the name stored in the variable `endpoint_name`, and select \"Delete\" from the \"Actions\" dropdown menu. " ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "# sagemaker.Session().delete_endpoint(rcf_inference.endpoint)" ] } ], "metadata": { "celltoolbar": "Tags", "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-2:429704687514:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" }, "notice": "Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the \"License\"). You may not use this file except in compliance with the License. A copy of the License is located at http://aws.amazon.com/apache2.0/ or in the \"license\" file accompanying this file. This file is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.", "pycharm": { "stem_cell": { "cell_type": "raw", "metadata": { "collapsed": false }, "source": [] } } }, "nbformat": 4, "nbformat_minor": 4 }