{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# **Amazon Lookout for Equipment** - Demonstration on an anonymized compressor dataset\n",
"*Part 1: Data preparation*"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Initialization\n",
"---\n",
"This repository is initially structured as follow:\n",
"```\n",
"/lookout-equipment-demo/getting_started/\n",
"|\n",
"├── dataset/ <<< Original dataset <<<\n",
"| ├── labels.csv\n",
"| ├── tags_description.csv\n",
"| ├── timeranges.txt\n",
"| └── timeseries.zip\n",
"|\n",
"├── notebooks/\n",
"| ├── 1_data_preparation.ipynb <<< This notebook <<<\n",
"| ├── 2_dataset_creation.ipynb\n",
"| ├── 3_model_training.ipynb\n",
"| ├── 4_model_evaluation.ipynb\n",
"| ├── 5_inference_scheduling.ipynb\n",
"| └── config.py\n",
"|\n",
"└── utils/\n",
" ├── aws_matplotlib_light.py\n",
" └── lookout_equipment_utils.py\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Notebook configuration update"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip install --quiet --upgrade tqdm tsia"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Imports\n",
"**Note:** Update the content of the **config.py** file **before** running the following cell"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import boto3\n",
"import config\n",
"import matplotlib.pyplot as plt\n",
"import numpy as np\n",
"import os\n",
"import pandas as pd\n",
"import pyarrow as pa\n",
"import pyarrow.parquet as pq\n",
"import shutil\n",
"import sys\n",
"import tsia\n",
"\n",
"from botocore.client import ClientError\n",
"from tqdm import tqdm"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sys.path.append('../utils')\n",
"import lookout_equipment_utils as lookout"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Parameters"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"BUCKET = config.BUCKET\n",
"PREFIX_TRAINING = config.PREFIX_TRAINING\n",
"PREFIX_LABEL = config.PREFIX_LABEL\n",
"RAW_DATA = os.path.join('..', 'dataset')\n",
"DATA = os.path.join('..', 'data')\n",
"LABEL_DATA = os.path.join(DATA, 'labelled-data')\n",
"TRAIN_DATA = os.path.join(DATA, 'training-data', 'expander')\n",
"\n",
"os.makedirs(DATA, exist_ok=True)\n",
"os.makedirs(LABEL_DATA, exist_ok=True)\n",
"os.makedirs(TRAIN_DATA, exist_ok=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's first check if the bucket name is defined, if it exists and if we have access to it from this notebook. If this notebook does not have access to the S3 bucket, you will have to update its permission:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"if BUCKET == '<>':\n",
" raise Exception('Please update your Amazon S3 bucket name in the config.py file located at the root of this repository and restart the kernel for this notebook.')\n",
" \n",
"else:\n",
" # Check access to the configured bucket:\n",
" try:\n",
" s3_resource = boto3.resource('s3')\n",
" s3_resource.meta.client.head_bucket(Bucket=BUCKET)\n",
" print(f'Bucket \"{BUCKET}\" exists')\n",
" \n",
" # Expose error reason:\n",
" except ClientError as error:\n",
" error_code = int(error.response['Error']['Code'])\n",
" if error_code == 403:\n",
" raise Exception(f'Bucket \"{BUCKET}\" is private: access is forbidden!')\n",
" \n",
" elif error_code == 404:\n",
" raise Exception(f'Bucket \"{BUCKET}\" does not exist!')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### AWS Look & Feel definition for Matplotlib"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%matplotlib inline\n",
"\n",
"# Load style sheet:\n",
"plt.style.use('../utils/aws_matplotlib_light.py')\n",
"\n",
"# Get colors from custom AWS palette:\n",
"prop_cycle = plt.rcParams['axes.prop_cycle']\n",
"colors = prop_cycle.by_key()['color']"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Loading datasets of interest\n",
"---\n",
"### Analysis time ranges\n",
"The dataset provided with this repository is one year long with some known anomaly periods appearing both at the beginning and at the end of the year. Using the following training / evaluation split, will allow Lookout for Equipment to have labelled periods on both side of the split date:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"timeranges_fname = os.path.join(DATA, 'timeranges.txt')\n",
"shutil.copyfile(os.path.join(RAW_DATA, 'timeranges.txt'), timeranges_fname)\n",
"with open(timeranges_fname, 'r') as f:\n",
" timeranges = f.readlines()\n",
" \n",
"training_start = pd.to_datetime(timeranges[0][:-1])\n",
"training_end = pd.to_datetime(timeranges[1][:-1])\n",
"evaluation_start = pd.to_datetime(timeranges[2][:-1])\n",
"evaluation_end = pd.to_datetime(timeranges[3][:-1])\n",
"\n",
"print(f'Training period: from {training_start} to {training_end}')\n",
"print(f'Evaluation period: from {evaluation_start} to {evaluation_end}')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Labels\n",
"Historical maintenance record time ranges are recorded in a CSV files with two columns containing *start time* and *end time* of each range:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"labels_fname = os.path.join(LABEL_DATA, 'labels.csv')\n",
"shutil.copyfile(os.path.join(RAW_DATA, 'labels.csv'), labels_fname)\n",
"labels_df = pd.read_csv(os.path.join(LABEL_DATA, 'labels.csv'), header=None)\n",
"labels_df[0] = pd.to_datetime(labels_df[0])\n",
"labels_df[1] = pd.to_datetime(labels_df[1])\n",
"labels_df.columns = ['start', 'end']\n",
"labels_df.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Time series\n",
"The raw timeseries is a zipped parquet file, let's deflate it:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"timeseries_fname = os.path.join(RAW_DATA, 'timeseries.zip')\n",
"!unzip -o $timeseries_fname -d $DATA/training-data"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The dataframe stored there has 122 tags and 480,886 rows, ranging from *January 1st* to *November 30, 2015*:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"all_tags_fname = os.path.join(DATA, 'training-data', 'expander.parquet')\n",
"table = pq.read_table(all_tags_fname)\n",
"all_tags_df = table.to_pandas()\n",
"del table\n",
"\n",
"print(all_tags_df.shape)\n",
"all_tags_df.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Tags description"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This dataset comes with a tag description file including:\n",
"\n",
"* `Tag`: the tag name as it is recorded by the customer in his historian system (for instance the [Honeywell process history database](https://www.honeywellprocess.com/en-US/explore/products/advanced-applications/uniformance/Pages/uniformance-phd.aspx))\n",
"* `UOM`: the unit of measure for the recorded signal\n",
"* `Subsystem`: an ID linked to the part of the asset this sensor is attached to"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tags_description_fname = os.path.join(RAW_DATA, 'tags_description.csv')\n",
"tags_description_df = pd.read_csv(tags_description_fname)\n",
"tags_description_df.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's extract a list of features from this table: we group them by unit of measure for more convenience:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"features = list(tags_description_df.sort_values(by='UOM')['Tag'])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Dataset overview\n",
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Build a list of dataframes, one per feature (this will be useful for visualizations purpose). An early event in the year skews the data: we remove that part for visualization purpose only (hence the `start` and `end` range definition below), but will keep the period starting January 1st as a training period later on."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"start = pd.to_datetime('2015-04-05 00:00:00')\n",
"end = evaluation_end\n",
"\n",
"df_list = []\n",
"feature_groups = dict()\n",
"for f in features:\n",
" # Get the unit of measure for the current feature:\n",
" uom = str(list(tags_description_df.loc[tags_description_df['Tag'] == f, 'UOM'])[0])\n",
" \n",
" # We have already some features in this group, add it:\n",
" if uom in feature_groups.keys():\n",
" feature_groups.update({uom: feature_groups[uom] + [f]})\n",
" \n",
" # Otherwise, create this group:\n",
" else:\n",
" feature_groups.update({uom: [f]})\n",
" \n",
" # Add the dataframe to the list:\n",
" current_df = all_tags_df.loc[start:end, [f]]\n",
" current_df = current_df.replace(np.nan, 0.0)\n",
" df_list.append(current_df)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tag = 'signal-028'\n",
"tag_df = all_tags_df.loc[start:end, [tag]]\n",
"tag_df.columns = ['Value']\n",
"\n",
"fig, axes = lookout.plot_timeseries(\n",
" tag_df, \n",
" tag, \n",
" fig_width=20, \n",
" tag_split=evaluation_start, \n",
" labels_df=labels_df\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Run the following two cells only on **instances with high memory** (at least **ml.m5.xlarge**): on smaller instances, the image generated by **matplotlib** are too large to be displayed:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fig = tsia.plot.plot_multivariate_timeseries(\n",
" timeseries_list=df_list,\n",
" tags_list=features,\n",
" split_date=evaluation_start,\n",
" tags_description_df=tags_description_df,\n",
" tags_grouping_key='UOM',\n",
" num_cols=4,\n",
" col_size=5\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"# Discretize each signal in 3 bins:\n",
"array = tsia.markov.discretize_multivariate(df_list)\n",
"\n",
"# Grouping the signals based on their unit of measure (UOM):\n",
"num_timesteps = array.shape[1]\n",
"separator = np.zeros(shape=(1, num_timesteps))\n",
"separator = np.where(separator==0, np.nan, separator)\n",
"grouped_array = []\n",
"signal_list = []\n",
"current_row = 0\n",
"for uom in feature_groups.keys():\n",
" num_features = len(feature_groups[uom])\n",
" signal_list = signal_list + features[current_row:current_row + num_features + 1]\n",
" signal_list.append(uom)\n",
" grouped_array.append(array[current_row:current_row + num_features + 1])\n",
" grouped_array.append(separator)\n",
" current_row += num_features\n",
"grouped_array = np.concatenate(grouped_array)\n",
"\n",
"# Plot the strip chart:\n",
"tsia.plot.plot_timeseries_strip_chart(\n",
" grouped_array, \n",
" signal_list=signal_list,\n",
" fig_width=20,\n",
" dates=df_list[0].index.to_pydatetime(),\n",
" day_interval=2\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Building and uploading the dataset\n",
"---\n",
"We will structure our S3 bucket like this:\n",
"```\n",
"s3://sagemaker-lookout-equipment-demo/\n",
"|\n",
"├── training-data/\n",
"| |\n",
"| ├── subsystem-01\n",
"| | └── subsystem-01.csv\n",
"| |\n",
"| ├── subsystem-02\n",
"| | └── subsystem-02.csv\n",
"| |\n",
"| ├── ...\n",
"| |\n",
"| └── subsystem-24\n",
"| └── subsystem-24.csv\n",
"|\n",
"└── labelled-data/\n",
" └── labels.csv\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Process each subsystem one by one:\n",
"components = list(tags_description_df['Subsystem'].unique())\n",
"progress_bar = tqdm(components)\n",
"for component in progress_bar:\n",
" progress_bar.set_description(f'Component {component}')\n",
" progress_bar.refresh()\n",
" \n",
" # Check if CSV file already exist and do not overwrite it:\n",
" component_tags_fname = os.path.join(TRAIN_DATA, f'{component}', f'{component}.csv')\n",
" if not os.path.exists(component_tags_fname):\n",
" # Build the dataframe with all the signal timeseries for the current subsystem:\n",
" component_tags_list = list(tags_description_df[tags_description_df['Subsystem'] == component]['Tag'])\n",
" component_tags_df = all_tags_df[component_tags_list]\n",
" component_tags_df = component_tags_df.reset_index()\n",
" component_tags_df['Timestamp'] = component_tags_df['Timestamp'].dt.strftime('%Y-%m-%dT%H:%M:%S.%f')\n",
" \n",
" # Save to disk:\n",
" os.makedirs(os.path.join(TRAIN_DATA, f'{component}'), exist_ok=True)\n",
" component_tags_df.to_csv(component_tags_fname, index=None)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Uploading training dataset to S3:\n",
"training_src_dir = TRAIN_DATA\n",
"training_s3_dest_path = f's3://{BUCKET}/{PREFIX_TRAINING}'\n",
"!aws s3 cp --recursive $training_src_dir $training_s3_dest_path"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Uploading label dataset to S3:\n",
"label_src_fname = os.path.join(LABEL_DATA, 'labels.csv')\n",
"label_s3_dest_path = f's3://{BUCKET}/{PREFIX_LABEL}labels.csv'\n",
"!aws s3 cp $label_src_fname $label_s3_dest_path"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Conclusion\n",
"---\n",
"At this stage, we have built:\n",
"* A single Parquet dataset that contains all the historical data for all tags provided by the customer: this is **58,668,092** at a **1 minute** sampling rate for **122 tags**.\n",
"* **24 individual CSV files** (1 for each subsystem, each subsystem can contain several timeseries) filed in their respective subsystem directories\n",
"\n",
"Looking at the plot for **signal-028** above, we are going to try and predict the event that happens on **November 2015**: to achieve this, we will use a training set spanning from **January 2015** to **August 2015** and we will test on **September 2015** to **November 2015**."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "conda_python3",
"language": "python",
"name": "conda_python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.13"
}
},
"nbformat": 4,
"nbformat_minor": 4
}