# An Introduction to SageMaker Random Cut Forests

***Unsupervised anomaly detection on timeseries data a Random Cut Forest algorithm.***

---

1. [Introduction](#Introduction)
1. [Setup](#Setup)
1. [Training](#Training)
1. [Inference](#Inference)

# Introduction
***

Amazon SageMaker Random Cut Forest (RCF) is an algorithm designed to detect anomalous data points within a dataset. Examples of when anomalies are important to detect include when website activity uncharactersitically spikes, when temperature data diverges from a periodic behavior, or when changes to public transit ridership reflect the occurrence of a special event.

In this notebook, we will use the SageMaker RCF algorithm to train an RCF model on Satellite Comms Signal to Noise Ratio values. We will then use this model to predict anomalous events by emitting an "anomaly score" for each data point. The main goals of this notebook are,

* to learn how to obtain, transform, and store data for use in Amazon SageMaker;
* to create an AWS SageMaker training job on a data set to produce an RCF model,
* use the RCF model to perform inference with an Amazon SageMaker endpoint.

The following are ***not*** goals of this notebook:

* deeply understand the RCF model,
* understand how the Amazon SageMaker RCF algorithm works.

If you would like to know more please check out the [SageMaker RCF Documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/randomcutforest.html).

# Setup

***

*This notebook was tested in Amazon SageMaker Studio on a ml.t3.medium instance with Python 3 (Data Science) kernel.*

Our first step is to setup our AWS credentials so that AWS SageMaker can store and access training data and model artifacts. We also need some data to inspect and to train upon.

## Select Amazon S3 Bucket

We first need to specify the locations where the original data is stored and where we will store our training data and trained model artifacts. ***This is the only cell of this notebook that you will need to edit.*** In particular, we need the following data:

* `bucket` - An S3 bucket accessible by this account.
* `prefix` - The location in the bucket where this notebook's input and output data will be stored. (The default value is sufficient.)
* `downloaded_data_bucket` - The S3 bucket output bucket from pipeline 1.
* `downloaded_data_prefix` - The location in the bucket where the data is stored.

In [None]:
import boto3
import botocore
import sagemaker
import json
from time import gmtime, strftime
import pandas as pd
import io


bucket = sagemaker.Session().default_bucket() # Feel free to change to another bucket you have access to
prefix = "sagemaker/rcf-benchmarks"
execution_role = sagemaker.get_execution_role()
region = boto3.Session().region_name
print(f"Using the following role: {execution_role}")

# *** Edit the following bucket name and prefix to read the json lines part files *** 
downloaded_data_bucket = "BUCKET_NAME"
# To read multiple part files, specify the prefix leading to the files, ex. "year=2022/month=12/day=21/hour=16/"
downloaded_data_prefix = "BUCKET_PREFIX"

def check_bucket_permission(bucket):
 # check if the bucket exists
 permission = False
 try:
 boto3.Session().client("s3").head_bucket(Bucket=bucket)
 except botocore.exceptions.ParamValidationError as e:
 print(
 "Hey! You either forgot to specify your S3 bucket"
 " or you gave your bucket an invalid name!"
 )
 except botocore.exceptions.ClientError as e:
 if e.response["Error"]["Code"] == "403":
 print(f"Hey! You don't have permission to access the bucket, {bucket}.")
 elif e.response["Error"]["Code"] == "404":
 print(f"Hey! Your bucket, {bucket}, doesn't exist!")
 else:
 raise
 else:
 permission = True
 return permission


if check_bucket_permission(bucket):
 print(f"Training input/output will be stored in: s3://{bucket}/{prefix}")
if check_bucket_permission(downloaded_data_bucket):
 print(f"Downloaded training data will be read from s3://{downloaded_data_bucket}/{downloaded_data_prefix}")

In [None]:
s3 = boto3.client('s3')
df = pd.DataFrame()

def json_lines_to_json(s: str) -> str:
 # replace the first occurrence of '{'
 s = s.replace('{', '[{', 1)

 # replace the last occurrence of '}
 s = s.rsplit('}', 1)[0] + '}]'

 # now go in and replace all occurrences of '}' immediately followed
 # by newline with a '},'
 s = s.replace('}\n', '},\n')

 return s

# Read JSON Lines
for file in s3.list_objects(Bucket=downloaded_data_bucket, Prefix=downloaded_data_prefix)['Contents']:
 obj = s3.get_object(Bucket=downloaded_data_bucket, Key=file['Key'])['Body'].read().decode('utf-8') # get object
 json_obj = json.loads(json_lines_to_json(obj)) # convert and load json lines
 df_part = pd.json_normalize(json_obj) # normalize json array to df
 df = pd.concat([df, df_part], ignore_index=True) # merge to single df
 
satcom_data = df.rename(columns={"datetime": "timestamp"})
satcom_data.head()

## Obtain and Inspect Example Data

Before training any models it is important to inspect our data, first. Perhaps there are some underlying patterns or structures that we could provide as "hints" to the model or maybe there is some noise that we could pre-process away. The raw data looks like this:

Human beings are visual creatures so let's take a look at a plot of the data.

In [None]:
%matplotlib inline

import matplotlib
import matplotlib.pyplot as plt

matplotlib.rcParams["figure.dpi"] = 100

satcom_data['fwdsnr'].plot(title="SNR Plot")

In [None]:
# drop any rows that have null or NaN fwdsnr to avoid model training failures
satcom_data = satcom_data.dropna(subset=['fwdsnr'])

# Training

***

Next, we configure a SageMaker training job to train the Random Cut Forest (RCF) algorithm on the data.

## Hyperparameters

Particular to a SageMaker RCF training job are the following hyperparameters:

* **`num_samples_per_tree`** - the number randomly sampled data points sent to each tree. As a general rule, `1/num_samples_per_tree` should approximate the the estimated ratio of anomalies to normal points in the dataset.
* **`num_trees`** - the number of trees to create in the forest. Each tree learns a separate model from different samples of data. The full forest model uses the mean predicted anomaly score from each constituent tree.
* **`feature_dim`** - the dimension of each data point.

In addition to these RCF model hyperparameters, we provide additional parameters defining things like the EC2 instance type on which training will run, the S3 bucket containing the data, and the AWS access role. Note that,

* Recommended instance type: `ml.m4`, `ml.c4`, or `ml.c5`
* Current limitations:
 * The RCF algorithm does not take advantage of GPU hardware.

The following training and deployment takes between 5-10 minutes for the sample dataset

In [None]:
from sagemaker import RandomCutForest

session = sagemaker.Session()

# specify general training job information
rcf = RandomCutForest(
 role=execution_role,
 instance_count=1,
 instance_type="ml.m4.xlarge",
 data_location=f"s3://{bucket}/{prefix}/",
 output_path=f"s3://{bucket}/{prefix}/output",
 num_samples_per_tree=512,
 num_trees=50,
 base_job_name = f"randomforest-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}"
)

# automatically upload the training data to S3 and run the training job
rcf.fit(rcf.record_set(satcom_data['fwdsnr'].to_numpy().reshape(-1, 1)))

If you see the message

> `===== Job Complete =====`

at the bottom of the output logs then that means training successfully completed and the output RCF model was stored in the specified output path. You can also view information about and the status of a training job using the AWS SageMaker console. Just click on the "Jobs" tab and select training job matching the training job name, below:

In [None]:
print(f"Training job name: {rcf.latest_training_job.job_name}")

# Inference

***

A trained Random Cut Forest model does nothing on its own. We now want to use the model we computed to perform inference on data. In this case, it means computing anomaly scores from input time series data points.

We create an inference endpoint using the SageMaker Python SDK `deploy()` function from the job we defined above. We can deploy for either Real-time inference or serverless inference.

The following step may take 5-10 minutes.

In [None]:
endpoint_name = f"randomforest-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}"
# serverless_inference_config = sagemaker.serverless.serverless_inference_config.ServerlessInferenceConfig(memory_size_in_mb=1024, max_concurrency=5)
# rcf_inference = rcf.deploy(serverless_inference_config=serverless_inference_config, endpoint_name = endpoint_name)
rcf_inference = rcf.deploy(initial_instance_count=1, instance_type="ml.m4.xlarge")

Congratulations! You now have a functioning SageMaker RCF inference endpoint. You can confirm the endpoint configuration and status ("InService") by navigating to the "Endpoints" tab in the AWS SageMaker console and selecting the endpoint matching the endpoint name, below: 

In [None]:
print(f"Endpoint name: {rcf_inference.endpoint}")

## Data Serialization/Deserialization

We can pass data in a variety of formats to our inference endpoint. In this example we will demonstrate passing CSV-formatted data. Other available formats are JSON-formatted and RecordIO Protobuf. We make use of the SageMaker Python SDK utilities `csv_serializer` and `json_deserializer` when configuring the inference endpoint.

In [9]:
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer

rcf_inference.serializer = CSVSerializer()
rcf_inference.deserializer = JSONDeserializer()

Let's pass the training dataset, in CSV format, to the inference endpoint so we can automatically detect the anomalies we saw with our eyes in the plots, above. Note that the serializer and deserializer will automatically take care of the datatype conversion from Numpy NDArrays.

For starters, let's only pass in the first six datapoints so we can see what the output looks like.

In [None]:
satcom_data_numpy = satcom_data['fwdsnr'].to_numpy().reshape(-1, 1)
print(satcom_data_numpy[:6])
results = rcf_inference.predict(
 satcom_data_numpy[:6], initial_args={"ContentType": "text/csv", "Accept": "application/json"}
)

## Computing Anomaly Scores

Now, let's compute and plot the anomaly scores from the entire dataset.

In [None]:
results = rcf_inference.predict(satcom_data_numpy)
scores = [datum["score"] for datum in results["scores"]]

# add scores to data frame and print first few values
satcom_data["score"] = pd.Series(scores, index=satcom_data.index)
satcom_data.head()

In [None]:
fig, ax1 = plt.subplots()
ax2 = ax1.twinx()

#
# *Try this out* - change `start` and `end` to zoom in on the
# anomaly found earlier in this notebook
#
start, end = 0, len(satcom_data)
satcom_data_subset = satcom_data[start:end]

ax1.plot(satcom_data_subset["fwdsnr"], color="C0", alpha=0.8)
ax2.plot(satcom_data_subset["score"], color="C1")

ax1.grid(which="major", axis="both")

ax1.set_ylabel("SNR", color="C0")
ax2.set_ylabel("Anomaly Score", color="C1")

ax1.tick_params("y", colors="C0")
ax2.tick_params("y", colors="C1")

ax1.set_ylim(min(satcom_data["fwdsnr"]), 1.4 * max(satcom_data["fwdsnr"]))
ax2.set_ylim(min(scores), 1.4 * max(scores))
fig.set_figwidth(10)
fig.suptitle("Anomaly Score overlayed on SNR value")

Note that the anomaly score spikes where our eyeball-norm method suggests there is an anomalous data point as well as in some places where our eyeballs are not as accurate.

Below we print and plot any data points with scores greater than some number of standard deviations from the mean anomaly score.

In [None]:
score_mean = satcom_data["score"].mean()
score_std = satcom_data["score"].std()
score_cutoff = score_mean + 2.85 * score_std

anomalies = satcom_data_subset[satcom_data_subset["score"] > score_cutoff]
anomalies.head()

In [None]:
ax2.plot(anomalies.index, anomalies.score, "ko")
fig

With the current hyperparameter choices we see that the standard-deviation threshold, while able to capture anomalies, is rather sensitive to fine-grained peruturbations and anomalous behavior. Adding trees to the SageMaker RCF model could smooth out the results as well as using a larger data set.

Lastly we write the anomalies to S3 in JSON lines format

In [None]:
# Write Anomalies to S3
destination = f"{prefix}/anomalies/anomalies-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}.json"

json_buffer = io.StringIO()

anomalies.to_json(json_buffer,
 orient="records",
 lines=True)

s3 = boto3.resource('s3')
my_bucket = s3.Bucket(bucket)

my_bucket.put_object(Key=destination, Body=json_buffer.getvalue())

## Stop and Delete the Endpoint

Finally, we should delete the endpoint before we close the notebook.

To do so execute the cell below. Alternately, you can navigate to the "Endpoints" tab in the SageMaker console, select the endpoint with the name stored in the variable `endpoint_name`, and select "Delete" from the "Actions" dropdown menu. 

In [16]:
# sagemaker.Session().delete_endpoint(rcf_inference.endpoint)