# **Amazon Lookout for Equipment** - Demonstration on an anonymized compressor dataset
*Part 1: Data preparation*

## Initialization
---
This repository is initially structured as follow:
```
/lookout-equipment-demo/getting_started/
|
├── dataset/ <<< Original dataset <<<
| ├── labels.csv
| ├── tags_description.csv
| ├── timeranges.txt
| └── timeseries.zip
|
├── notebooks/
| ├── 1_data_preparation.ipynb <<< This notebook <<<
| ├── 2_dataset_creation.ipynb
| ├── 3_model_training.ipynb
| ├── 4_model_evaluation.ipynb
| ├── 5_inference_scheduling.ipynb
| └── config.py
|
└── utils/
 ├── aws_matplotlib_light.py
 └── lookout_equipment_utils.py
```

### Notebook configuration update

In [None]:
!pip install --quiet --upgrade tqdm tsia

### Imports
**Note:** Update the content of the **config.py** file **before** running the following cell

In [None]:
import boto3
import config
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import shutil
import sys
import tsia

from botocore.client import ClientError
from tqdm import tqdm

In [None]:
sys.path.append('../utils')
import lookout_equipment_utils as lookout

### Parameters

In [None]:
BUCKET = config.BUCKET
PREFIX_TRAINING = config.PREFIX_TRAINING
PREFIX_LABEL = config.PREFIX_LABEL
RAW_DATA = os.path.join('..', 'dataset')
DATA = os.path.join('..', 'data')
LABEL_DATA = os.path.join(DATA, 'labelled-data')
TRAIN_DATA = os.path.join(DATA, 'training-data', 'expander')

os.makedirs(DATA, exist_ok=True)
os.makedirs(LABEL_DATA, exist_ok=True)
os.makedirs(TRAIN_DATA, exist_ok=True)

Let's first check if the bucket name is defined, if it exists and if we have access to it from this notebook. If this notebook does not have access to the S3 bucket, you will have to update its permission:

In [None]:
if BUCKET == '<>':
 raise Exception('Please update your Amazon S3 bucket name in the config.py file located at the root of this repository and restart the kernel for this notebook.')
 
else:
 # Check access to the configured bucket:
 try:
 s3_resource = boto3.resource('s3')
 s3_resource.meta.client.head_bucket(Bucket=BUCKET)
 print(f'Bucket "{BUCKET}" exists')
 
 # Expose error reason:
 except ClientError as error:
 error_code = int(error.response['Error']['Code'])
 if error_code == 403:
 raise Exception(f'Bucket "{BUCKET}" is private: access is forbidden!')
 
 elif error_code == 404:
 raise Exception(f'Bucket "{BUCKET}" does not exist!')

### AWS Look & Feel definition for Matplotlib

In [None]:
%matplotlib inline

# Load style sheet:
plt.style.use('../utils/aws_matplotlib_light.py')

# Get colors from custom AWS palette:
prop_cycle = plt.rcParams['axes.prop_cycle']
colors = prop_cycle.by_key()['color']

## Loading datasets of interest
---
### Analysis time ranges
The dataset provided with this repository is one year long with some known anomaly periods appearing both at the beginning and at the end of the year. Using the following training / evaluation split, will allow Lookout for Equipment to have labelled periods on both side of the split date:

In [None]:
timeranges_fname = os.path.join(DATA, 'timeranges.txt')
shutil.copyfile(os.path.join(RAW_DATA, 'timeranges.txt'), timeranges_fname)
with open(timeranges_fname, 'r') as f:
 timeranges = f.readlines()
 
training_start = pd.to_datetime(timeranges[0][:-1])
training_end = pd.to_datetime(timeranges[1][:-1])
evaluation_start = pd.to_datetime(timeranges[2][:-1])
evaluation_end = pd.to_datetime(timeranges[3][:-1])

print(f'Training period: from {training_start} to {training_end}')
print(f'Evaluation period: from {evaluation_start} to {evaluation_end}')

### Labels
Historical maintenance record time ranges are recorded in a CSV files with two columns containing *start time* and *end time* of each range:

In [None]:
labels_fname = os.path.join(LABEL_DATA, 'labels.csv')
shutil.copyfile(os.path.join(RAW_DATA, 'labels.csv'), labels_fname)
labels_df = pd.read_csv(os.path.join(LABEL_DATA, 'labels.csv'), header=None)
labels_df[0] = pd.to_datetime(labels_df[0])
labels_df[1] = pd.to_datetime(labels_df[1])
labels_df.columns = ['start', 'end']
labels_df.head()

### Time series
The raw timeseries is a zipped parquet file, let's deflate it:

In [None]:
timeseries_fname = os.path.join(RAW_DATA, 'timeseries.zip')
!unzip -o $timeseries_fname -d $DATA/training-data

The dataframe stored there has 122 tags and 480,886 rows, ranging from *January 1st* to *November 30, 2015*:

In [None]:
all_tags_fname = os.path.join(DATA, 'training-data', 'expander.parquet')
table = pq.read_table(all_tags_fname)
all_tags_df = table.to_pandas()
del table

print(all_tags_df.shape)
all_tags_df.head()

### Tags description

This dataset comes with a tag description file including:

* `Tag`: the tag name as it is recorded by the customer in his historian system (for instance the [Honeywell process history database](https://www.honeywellprocess.com/en-US/explore/products/advanced-applications/uniformance/Pages/uniformance-phd.aspx))
* `UOM`: the unit of measure for the recorded signal
* `Subsystem`: an ID linked to the part of the asset this sensor is attached to

In [None]:
tags_description_fname = os.path.join(RAW_DATA, 'tags_description.csv')
tags_description_df = pd.read_csv(tags_description_fname)
tags_description_df.head()

Let's extract a list of features from this table: we group them by unit of measure for more convenience:

In [None]:
features = list(tags_description_df.sort_values(by='UOM')['Tag'])

## Dataset overview
---

Build a list of dataframes, one per feature (this will be useful for visualizations purpose). An early event in the year skews the data: we remove that part for visualization purpose only (hence the `start` and `end` range definition below), but will keep the period starting January 1st as a training period later on.

In [None]:
start = pd.to_datetime('2015-04-05 00:00:00')
end = evaluation_end

df_list = []
feature_groups = dict()
for f in features:
 # Get the unit of measure for the current feature:
 uom = str(list(tags_description_df.loc[tags_description_df['Tag'] == f, 'UOM'])[0])
 
 # We have already some features in this group, add it:
 if uom in feature_groups.keys():
 feature_groups.update({uom: feature_groups[uom] + [f]})
 
 # Otherwise, create this group:
 else:
 feature_groups.update({uom: [f]})
 
 # Add the dataframe to the list:
 current_df = all_tags_df.loc[start:end, [f]]
 current_df = current_df.replace(np.nan, 0.0)
 df_list.append(current_df)

In [None]:
tag = 'signal-028'
tag_df = all_tags_df.loc[start:end, [tag]]
tag_df.columns = ['Value']

fig, axes = lookout.plot_timeseries(
 tag_df, 
 tag, 
 fig_width=20, 
 tag_split=evaluation_start, 
 labels_df=labels_df
)

Run the following two cells only on **instances with high memory** (at least **ml.m5.xlarge**): on smaller instances, the image generated by **matplotlib** are too large to be displayed:

In [None]:
fig = tsia.plot.plot_multivariate_timeseries(
 timeseries_list=df_list,
 tags_list=features,
 split_date=evaluation_start,
 tags_description_df=tags_description_df,
 tags_grouping_key='UOM',
 num_cols=4,
 col_size=5
)

In [None]:
%%time
# Discretize each signal in 3 bins:
array = tsia.markov.discretize_multivariate(df_list)

# Grouping the signals based on their unit of measure (UOM):
num_timesteps = array.shape[1]
separator = np.zeros(shape=(1, num_timesteps))
separator = np.where(separator==0, np.nan, separator)
grouped_array = []
signal_list = []
current_row = 0
for uom in feature_groups.keys():
 num_features = len(feature_groups[uom])
 signal_list = signal_list + features[current_row:current_row + num_features + 1]
 signal_list.append(uom)
 grouped_array.append(array[current_row:current_row + num_features + 1])
 grouped_array.append(separator)
 current_row += num_features
grouped_array = np.concatenate(grouped_array)

# Plot the strip chart:
tsia.plot.plot_timeseries_strip_chart(
 grouped_array, 
 signal_list=signal_list,
 fig_width=20,
 dates=df_list[0].index.to_pydatetime(),
 day_interval=2
)

## Building and uploading the dataset
---
We will structure our S3 bucket like this:
```
s3://sagemaker-lookout-equipment-demo/
|
├── training-data/
| |
| ├── subsystem-01
| | └── subsystem-01.csv
| |
| ├── subsystem-02
| | └── subsystem-02.csv
| |
| ├── ...
| |
| └── subsystem-24
| └── subsystem-24.csv
|
└── labelled-data/
 └── labels.csv
```

In [None]:
# Process each subsystem one by one:
components = list(tags_description_df['Subsystem'].unique())
progress_bar = tqdm(components)
for component in progress_bar:
 progress_bar.set_description(f'Component {component}')
 progress_bar.refresh()
 
 # Check if CSV file already exist and do not overwrite it:
 component_tags_fname = os.path.join(TRAIN_DATA, f'{component}', f'{component}.csv')
 if not os.path.exists(component_tags_fname):
 # Build the dataframe with all the signal timeseries for the current subsystem:
 component_tags_list = list(tags_description_df[tags_description_df['Subsystem'] == component]['Tag'])
 component_tags_df = all_tags_df[component_tags_list]
 component_tags_df = component_tags_df.reset_index()
 component_tags_df['Timestamp'] = component_tags_df['Timestamp'].dt.strftime('%Y-%m-%dT%H:%M:%S.%f')
 
 # Save to disk:
 os.makedirs(os.path.join(TRAIN_DATA, f'{component}'), exist_ok=True)
 component_tags_df.to_csv(component_tags_fname, index=None)

In [None]:
# Uploading training dataset to S3:
training_src_dir = TRAIN_DATA
training_s3_dest_path = f's3://{BUCKET}/{PREFIX_TRAINING}'
!aws s3 cp --recursive $training_src_dir $training_s3_dest_path

In [None]:
# Uploading label dataset to S3:
label_src_fname = os.path.join(LABEL_DATA, 'labels.csv')
label_s3_dest_path = f's3://{BUCKET}/{PREFIX_LABEL}labels.csv'
!aws s3 cp $label_src_fname $label_s3_dest_path

## Conclusion
---
At this stage, we have built:
* A single Parquet dataset that contains all the historical data for all tags provided by the customer: this is **58,668,092** at a **1 minute** sampling rate for **122 tags**.
* **24 individual CSV files** (1 for each subsystem, each subsystem can contain several timeseries) filed in their respective subsystem directories

Looking at the plot for **signal-028** above, we are going to try and predict the event that happens on **November 2015**: to achieve this, we will use a training set spanning from **January 2015** to **August 2015** and we will test on **September 2015** to **November 2015**.