# **re:Mars - Anomaly detection workshop** - From deep space to shop floor

<table>
    <tr>
        <td>
            <a href="https://www.amazon.com/Time-Analysis-AWS-forecasting-anomalies-ebook/dp/B09MMLLWDY">
                <img src="assets/book_cover.jpg" width="600px" />
            </a>
        </td>
        <td style="font-size: 14px">
            <p>
    In this second part of this workshop, we are going to use <b><a href="https://aws.amazon.com/lookout-for-equipment">Amazon Lookout for Equipment</a></b>. This service analyzes the data from the sensors on your equipment (e.g. pressure in a generator, flow rate of a compressor, revolutions per minute of fans), to automatically train a machine learning model based on just your data, for your equipment â€“ with no machine learning (ML) expertise required. Lookout for Equipment uses your unique ML model to analyze incoming sensor data in real-time and accurately identify early warning signs that could lead to machine failures. This means you can detect equipment abnormalities with speed and precision, quickly diagnose issues, take action to reduce expensive downtime, and reduce false alerts.
            </p>
            <p>
If you're interested about knowing more about this service, you can check out the <b><a href="https://www.amazon.com/Time-Analysis-AWS-forecasting-anomalies-ebook/dp/B09MMLLWDY">Time Series Analysis on AWS book</a></b>, written by one of the authors of this workshop. It contains 6 chapters dedicated to Amazon Lookout for Equipment and will give you solid foundation on how to setup an end-to-end anomaly detection pipeline:
            </p>
        </td>
    </tr>
</table>

# **Initialization**
---

### Notebook configuration update

In [None]:
!pip install --quiet --upgrade tqdm lookoutequipment

### Imports

In [None]:
import boto3
import config
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import sagemaker
import shutil
import sys
import zipfile

from botocore.client import ClientError
from datetime import datetime
from dateutil.relativedelta import relativedelta
from matplotlib.gridspec import GridSpec
from tqdm import tqdm

# SDK / toolbox for managing Lookout for Equipment API calls:
import lookoutequipment as lookout

### Parameters

In [None]:
# Loading configuration:
PREFIX_TRAINING  = config.PREFIX_TRAINING
PREFIX_LABEL     = config.PREFIX_LABEL
DATASET_NAME     = config.DATASET_NAME
MODEL_NAME       = config.MODEL_NAME
EQUIPMENT        = config.EQUIPMENT

BUCKET           = sagemaker.Session().default_bucket()
ROLE_ARN         = sagemaker.get_execution_role()
RAW_DATA         = os.path.join('..', 'dataset')
TMP_DATA         = os.path.join('..', 'data', 'interim')
PROCESSED_DATA   = os.path.join('..', 'data', 'processed')
LABEL_DATA       = os.path.join(PROCESSED_DATA, 'label-data')
TRAIN_DATA       = os.path.join(PROCESSED_DATA, 'training-data')
INFERENCE_DATA   = os.path.join(PROCESSED_DATA, 'inference-data')

os.makedirs(TMP_DATA,         exist_ok=True)
os.makedirs(RAW_DATA,         exist_ok=True)
os.makedirs(PROCESSED_DATA,   exist_ok=True)
os.makedirs(LABEL_DATA,       exist_ok=True)
os.makedirs(TRAIN_DATA,       exist_ok=True)
os.makedirs(INFERENCE_DATA,   exist_ok=True)

# AWS Look & Feel definition for Matplotlib
plt.style.use('../utils/aws_matplotlib_template.py')
prop_cycle = plt.rcParams['axes.prop_cycle']
colors = prop_cycle.by_key()['color']

%matplotlib inline
BUCKET

# **Dataset overview**
---
For this workshop, we are going to switch from space data to a multivariate water pump dataset. This dataset contains 50 sensors (flow rates, pressures, temperatures...) and several system failures are known. Let's first load and visualize the content of this dataset...

### **Loading dataset**

In [None]:
ARCHIVE_PATH = os.path.join(RAW_DATA, 'sensors.zip')
DEST_PATH    = os.path.join(TMP_DATA, 'sensors.csv')
DEST_DIR     = os.path.dirname(DEST_PATH)

print("Extracting data archive")
zip_ref = zipfile.ZipFile(ARCHIVE_PATH, 'r')
zip_ref.extractall(DEST_DIR + '/')
zip_ref.close()

print("Loading known labels")
_ = shutil.copy(src=os.path.join(RAW_DATA, 'status.csv'), dst=os.path.join(TMP_DATA, 'status.csv'))

In [None]:
pump_df = pd.read_csv(DEST_PATH)
pump_df['Timestamp'] = pd.to_datetime(pump_df['Timestamp'], format='%Y-%m-%d %H:%M:%S')
pump_df = pump_df.set_index('Timestamp')

status_df = pd.read_csv(os.path.join(TMP_DATA, 'status.csv'))
status_df['Timestamp'] = pd.to_datetime(status_df['Timestamp'])
status_df = status_df.set_index('Timestamp')

pump_df['machine_status'] = status_df['machine_status']
del status_df

pump_df.head()

### **Dataset visualization**

In [None]:
plt.rcParams['hatch.linewidth'] = 0.5
plt.rcParams['lines.linewidth'] = 0.5

fig = plt.figure(figsize=(24,4))
ax1 = fig.add_subplot(1,1,1)
ax1.set_yticks([])
plot1 = ax1.plot(pump_df['sensor_00'], label='sensor_00', alpha=0.7)
ax1 = ax1.twinx()
ax1.grid(False)
ax1.set_yticks([])
plot2 = ax1.plot(pump_df['sensor_34'], label='sensor_34', color=colors[1], alpha=0.7)

ax2 = ax1.twinx()
plot3 = ax2.fill_between(
    x=pump_df.index, y1=0.0, y2=pump_df['machine_status'], 
    color=colors[5], linewidth=0.0, edgecolor='#000000', alpha=0.5, hatch="//////", 
    label='Broken pump'
)
ax2.grid(False)
ax2.set_yticks([])

labels = [plot1[0].get_label(), plot2[0].get_label(), plot3.get_label()]
plt.legend(handles=[plot1[0], plot2[0], plot3], labels=labels, loc='lower center', ncol=3, bbox_to_anchor=(0.5, -.3))
plt.title('Industrial pump sensor data')
plt.show()

On the preceding plot, you can see a couple signals (`sensor_00` in dark gray and `sensor_34` in light blue). Feel free to update the previous cell and rerun it to visualize other signals. The areas highlighted in red are time periods where system failures are known to happen.

Use the following cells to get a better view from the sensors available in this dataset:

In [None]:
for index, f in enumerate(list(pump_df.columns)[:3]):
    fig = plt.figure(figsize=(24,3))
    ax1 = fig.add_subplot(1,1,1)
    ax1.plot(pump_df[f], color=colors[index % len(colors)])
    ax1.set_title(f)
    
plt.show()

### **Preparing time series data**
We are going to generate a single CSV file for each sensor and put it into its own folder. Although Amazon Lookout for Equipment can detect your data structure by itself, this approach can be useful when you don't want to align the timestamps for every sensors (which you would have to do, should you want to build a single CSV file with all your sensors). In this case, the data is already nicely formatted, but this is a worthy trick that can save you a lot of data preparation hassle:

In [None]:
features = list(pump_df.columns)[:-1]

for tag in tqdm(features):
    os.makedirs(os.path.join(TRAIN_DATA, EQUIPMENT, tag), exist_ok=True)
    fname = os.path.join(TRAIN_DATA, EQUIPMENT, tag, 'tag_data.csv')
    tag_df = pump_df[[tag]]
    tag_df.to_csv(fname)
    
# Let's now load our training data and labels to Amazon S3, so that 
# Lookout for Equipment can access them to train and evaluate a model:
train_s3_path = f's3://{BUCKET}/{PREFIX_TRAINING}{EQUIPMENT}/'
!aws s3 cp --recursive $TRAIN_DATA/water-pump $train_s3_path

# **Data ingestion**
---

Let's double check the values of all the parameters that will be used to ingest some data into an existing Lookout for Equipment dataset:

In [None]:
ROLE_ARN, BUCKET, PREFIX_TRAINING + EQUIPMENT + '/', DATASET_NAME

In [None]:
lookout_dataset = lookout.LookoutEquipmentDataset(
    dataset_name=DATASET_NAME,
    component_root_dir=f'{TRAIN_DATA}/{EQUIPMENT}',
    access_role_arn=ROLE_ARN
)
lookout_dataset.create()
response = lookout_dataset.ingest_data(BUCKET, PREFIX_TRAINING + EQUIPMENT + '/')

We use the following cell to monitor the ingestion process. This process should take between **10 and 15 minutes** given the amount of data we have to ingest (~350 MB). During ingestion, Lookout for Equipment will prepare the data so that it can be processed by multiple algorithms (deep learning, statistical and traditional machine learning ones). It will also grade them to issue a data quality report:

In [None]:
lookout_dataset.poll_data_ingestion(sleep_time=60)

We created a **Lookout for Equipment dataset** and ingested the S3 data previously uploaded into this dataset.

Don't forget to checkout the [**Lookout for Equipment console**](https://console.aws.amazon.com/lookoutequipment/home), where you will be able to visualize the data quality report for your ingested data.

**Let's now train a model based on these data.**

# **Model training**
---
When training our model, we are going to define a train/evaluation split. Run the following cell to use the first 6 months for training purpose and the remaining data (~1.5 year) for evaluation (backtesting):

In [None]:
# Configuring time ranges:
training_start   = pd.to_datetime('2018-04-01 00:00:00')
training_end     = pd.to_datetime('2018-10-31 23:59:00')
evaluation_start = pd.to_datetime('2018-11-01 00:00:00')
evaluation_end   = pd.to_datetime('2020-04-30 23:59:00')

print(f'  Training period | from {training_start} to {training_end}')
print(f'Evaluation period | from {evaluation_start} to {evaluation_end}')

print(training_end - training_start)
print(evaluation_end - evaluation_start)

In [None]:
lookout_model = lookout.LookoutEquipmentModel(model_name=MODEL_NAME, dataset_name=DATASET_NAME)
lookout_model.set_time_periods(evaluation_start, evaluation_end, training_start, training_end)
lookout_model.set_target_sampling_rate(sampling_rate='PT5M')
lookout_model.train()

The following method encapsulate a call to the [**DescribeModel**](https://docs.aws.amazon.com/lookout-for-equipment/latest/ug/API_DescribeModel.html) API and collect the model progress by looking at the `Status` field retrieved from this call. This training should take around 25-30 minutes to run:

In [None]:
lookout_model.poll_model_training(sleep_time=60)

In this section, we use the dataset created earlier and trained an Amazon Lookout for Equipment model.

From here, we will **extract the evaluation data** for this model and use it to perform further analysis on the model results.

# **Model evaluation**
---

After the model is trained, we can extract the backtesting results and visualize the anomalies detected by Lookout for Equipment over the evaluation period. Although evaluating your model is optional (you don't need to do this to deploy and use the model), this section will give you some pointers on how to post-process and visualize the data provided by Amazon Lookout for Equipment:

In [None]:
LookoutDiagnostics = lookout.LookoutEquipmentAnalysis(model_name=MODEL_NAME, tags_df=pump_df)
LookoutDiagnostics.set_time_periods(evaluation_start, evaluation_end, training_start, training_end)
predicted_ranges = LookoutDiagnostics.get_predictions()

In [None]:
tags_list = list(pump_df.columns)[:-1]
custom_colors = {'labels': colors[9], 'predictions': colors[5]}
    
TSViz = lookout.plot.TimeSeriesVisualization(
    timeseries_df=pump_df,
    data_format='tabular'
)
TSViz.add_signal([tags_list[0]])
TSViz.add_predictions([predicted_ranges])
TSViz.add_train_test_split(evaluation_start)
fig, axis = TSViz.plot(fig_width=24, colors=custom_colors)

# **Helper functions**
---
The following functions are used to build the post-processing visualizations:

In [None]:
def plot_detected_events(group_id, start, end, signal):
    custom_colors = {
        'labels': colors[9],
        'predictions': colors[5]
    }
    
    TSViz = lookout.plot.TimeSeriesVisualization(
        timeseries_df=pump_df.loc[start:end, :],
        data_format='tabular'
    )
    TSViz.add_signal([signal])
    TSViz.add_predictions([predicted_ranges])
    TSViz.legend_format = {
        'loc': 'upper right',
        'framealpha': 0.4,
        'ncol': 3
    }
    fig, axis = TSViz.plot(fig_width=24, colors=custom_colors)

    for ax in axis:
        ax.set_xlim((start, end))

def plot_signal_importance_evolution(group_id, plot_start, plot_end, signal):
    fig = plt.figure(figsize=(24,10))
    gs = GridSpec(nrows=4, ncols=1, height_ratios=[0.5, 0.3, 0.1, 0.6])
    df = expanded_results_v3.loc[plot_start:plot_end, :].copy()

    ax0 = fig.add_subplot(gs[0])
    ax0.plot(pump_df.loc[plot_start:plot_end, signal], color=colors[9], linewidth=1.0, label=signal)
    ax0.legend(loc='upper right', fontsize=12)
    ax0.set_xlim((plot_start, evaluation_end))

    ax1 = fig.add_subplot(gs[1])
    ax1.plot(predictions_df.rolling(60*24).sum(), label='Number of daily\nevent detected')
    ax1.legend(loc='upper left', fontsize=12)
    ax1.set_xlim((plot_start, evaluation_end))

    ax3 = fig.add_subplot(gs[2])
    plot_ranges(predictions_df, 'Detected events', colors[5], ax3)
    ax3.set_xlim((plot_start, evaluation_end))

    bar_width = 1.0
    num_top_signals = 5
    ax4 = fig.add_subplot(gs[3])
    bottom_values = np.zeros((len(df.index),))
    current_tags_list = list(df.sum().sort_values(ascending=False).head(num_top_signals).index)
    for tag in current_tags_list:
        plt.bar(x=df.index, height=df[tag], bottom=bottom_values, alpha=0.8, width=bar_width, label=tag.split('\\')[0])
        bottom_values += df[tag].values

    all_other_tags = [t for t in df.columns if t not in current_tags_list]
    all_other_tags_contrib = df[all_other_tags].sum(axis='columns')
    plt.bar(x=df.index, height=all_other_tags_contrib, bottom=bottom_values, alpha=0.8, width=bar_width, label=f'All the others\n({len(all_other_tags)} signals)', color='#CCCCCC')

    ax4.legend(loc='lower center', ncol=4, bbox_to_anchor=(0.5, -0.40))
    ax4.set_xlabel('Signal importance evolution', fontsize=12)
    ax4.set_xlim((plot_start, evaluation_end))

    plt.show()
    
    return current_tags_list

def plot_top_signals_time_series(group_id, plot_start, plot_end, top_tags_list):
    fig = plt.figure(figsize=(24,6 * len(top_tags_list)))
    gs = GridSpec(nrows=2 * len(top_tags_list), ncols=1, height_ratios=[1.0, 0.2] *len(top_tags_list))

    for index, signal in enumerate(top_tags_list):
        ax = fig.add_subplot(gs[index*2])
        ax.plot(pump_df.loc[plot_start:plot_end, signal], color=colors[index * 2], linewidth=1.0)
        ax.set_title(f'Signal: {signal}')

        ax = fig.add_subplot(gs[index*2 + 1])
        plot_ranges(predictions_df, '', colors[5], ax)
        ax.set_xlim((plot_start, plot_end))

    plt.show()

def plot_histograms(group_id):
    fig = TSViz.plot_histograms(freq='60min', fig_width=24, start=training_start, end=evaluation_end, top_n=4)

def plot_ranges(range_df, range_title, color, ax):
    ax.plot(range_df['Label'], color=color)
    ax.fill_between(range_df.index, 
                    y1=range_df['Label'], 
                    y2=0, 
                    alpha=0.1, 
                    color=color)
    ax.axes.get_xaxis().set_ticks([])
    ax.axes.get_yaxis().set_ticks([])
    ax.set_xlabel(range_title, fontsize=12)

# **Extracting additional insights**
---

In [None]:
predicted_ranges

In [None]:
predicted_ranges['duration'] = pd.to_datetime(predicted_ranges['end']) - pd.to_datetime(predicted_ranges['start'])
predicted_ranges['duration'] = predicted_ranges['duration'].dt.total_seconds() / 3600
predictions_df = TSViz._convert_ranges(predicted_ranges, default_freq='1min')

expanded_results = []
for index, row in predicted_ranges.iterrows():
    new_row = dict()
    new_row.update({'start': row['start']})
    new_row.update({'end': row['end']})
    new_row.update({'prediction': 1.0})
    
    diagnostics = pd.DataFrame(row['diagnostics'])
    diagnostics = dict(zip(diagnostics['name'], diagnostics['value']))
    new_row = {**new_row, **diagnostics}
        
    expanded_results.append(new_row)
    
expanded_results = pd.DataFrame(expanded_results)

df_list = []
for index, row in expanded_results.iterrows():
    new_index = pd.date_range(start=row['start'], end=row['end'], freq='5T')
    new_df = pd.DataFrame(index=new_index)
    
    for tag in tags_list:
        new_df[tag] = row[f'{tag}\\{tag}']
        
    df_list.append(new_df)
    
expanded_results_v2 = pd.concat(df_list, axis='index')
expanded_results_v2 = expanded_results_v2.reindex(predictions_df.index)

freq = '1D'
expanded_results_v3 = expanded_results_v2.resample(freq).mean()
expanded_results_v3 = expanded_results_v3.replace(to_replace=np.nan, value=0.0)

In [None]:
plot_start = evaluation_start
plot_end = evaluation_end

In [None]:
plot_detected_events('event_group', plot_start, plot_end, tags_list[0])

In [None]:
top_tags_list = plot_signal_importance_evolution('event_group', plot_start, plot_end, tags_list[0])

In [None]:
plot_top_signals_time_series('event_group', plot_start, plot_end, top_tags_list)

In [None]:
plot_histograms('event_group')

# **Conclusion and Call to Action**
---

In this second part, you learned how to use a managed service, Amazon Lookout for Equipment, to train an anomaly detection model on a water pump multivariate time series dataset.

You also learn how to further post-process and interpret the raw results from such anomaly detection models.

As a next step, we recommend that you add the known failure periods and retrain a new model to see the impact of the forewarning time. Note that training time will be closer to 1 hour in this case.
* If you want to know how to point to a label file when training a model, check out the [**SDK Documentation**](https://amazon-lookout-for-equipment-sdk.readthedocs.io/en/latest/generated/src.lookoutequipment.model.LookoutEquipmentModel.html#src.lookoutequipment.model.LookoutEquipmentModel)
* Use the following cell to prepare the label data accordingly: the following cell compresses the machine status column (one row per timestamp) into a shorter table where each row can be associated to a time range.

In [None]:
range_df = pump_df[['machine_status']].copy()
range_df['BROKEN'] = False
range_df.loc[range_df['machine_status'] == 1.0, 'BROKEN'] = True

range_df['Next Status'] = range_df['BROKEN'].shift(-1)
range_df['Start Range'] = (range_df['BROKEN'] == False) & (range_df['Next Status'] == True)
range_df['End Range'] = (range_df['BROKEN'] == True) & (range_df['Next Status'] == False)
range_df.iloc[0,3] = range_df.iloc[0,1]
range_df = range_df[(range_df['Start Range'] == True) | (range_df['End Range'] == True)]

labels_df = pd.DataFrame(columns=['start', 'end'])
for index, row in range_df.iterrows():
    if row['Start Range']:
        start = index

    if row['End Range']:
        end = index
        labels_df = labels_df.append({
            'start': start + relativedelta(hours=-12),
            'end': end + relativedelta(hours=+12)
        }, ignore_index=True)
        
labels_df['start'] = pd.to_datetime(labels_df['start'])
labels_df['end'] = pd.to_datetime(labels_df['end'])
labels_df['start'] = labels_df['start'].dt.strftime('%Y-%m-%dT%H:%M:%S.%f')
labels_df['end'] = labels_df['end'].dt.strftime('%Y-%m-%dT%H:%M:%S.%f')

labels_fname = os.path.join(LABEL_DATA, 'labels.csv')
labels_df.to_csv(labels_fname, header=None, index=None)
        
labels_df