{ "cells": [ { "cell_type": "markdown", "id": "a261ae82", "metadata": {}, "source": [ "Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.\n", "\n", "SPDX-License-Identifier: Apache-2.0" ] }, { "cell_type": "markdown", "id": "5eac047c", "metadata": {}, "source": [ "# Train NCAD with IOT data\n", "\n", "**Note**: This model requires GPUs to efficiently train. " ] }, { "cell_type": "markdown", "id": "f99d3bab", "metadata": {}, "source": [ "## Table of Contents\n", "\n", "1. Load model parameters\n", "2. Load train and test CSVs\n", " * Load them into `TimeSeries` objects defined by the `ncad` library\n", "3. Standardize data based on interquartile range\n", "4. Inject anomalies\n", " * NCAD is a semi-supervised algorithm, and adds samples with anomalies to the training set\n", " \n", "5. Split test dataset into val and test\n", "6. Construct NCADDataModule\n", "7. Set up callbacks \n", "8. Train model\n", "9. Perform inference" ] }, { "cell_type": "code", "execution_count": null, "id": "a11af65e", "metadata": {}, "outputs": [], "source": [ "import sys \n", "sys.path.append('../../src/anomaly_detection_spatial_temporal_data/')" ] }, { "cell_type": "code", "execution_count": null, "id": "deaa8d10", "metadata": {}, "outputs": [], "source": [ "import os\n", "import time\n", "import json\n", "from typing import List, Union, Optional, Tuple, Dict\n", "\n", "from pathlib import Path, PosixPath\n", "\n", "import pandas as pd\n", "import numpy as np\n", "import itertools\n", "\n", "import re\n", "\n", "import torch\n", "from torch import nn\n", "\n", "import pytorch_lightning as pl\n", "from pytorch_lightning import Trainer\n", "from pytorch_lightning.callbacks import ModelCheckpoint\n", "from pytorch_lightning.loggers import TensorBoardLogger\n", "\n", "import ncad\n", "from ncad.ts import TimeSeries, TimeSeriesDataset\n", "from ncad.ts import transforms as tr\n", "from ncad.model import NCAD, NCADDataModule\n", "\n", "import tqdm\n", "import yaml" ] }, { "cell_type": "markdown", "id": "23bd95c4", "metadata": {}, "source": [ "# Load model params" ] }, { "cell_type": "code", "execution_count": null, "id": "373de9f6", "metadata": {}, "outputs": [], "source": [ "model_config_file = \"../../conf/base/parameters/ncad.yml\"" ] }, { "cell_type": "code", "execution_count": null, "id": "fc4bfd4e", "metadata": {}, "outputs": [], "source": [ "with open(model_config_file, \"r\") as stream:\n", " try:\n", " model_config = yaml.safe_load(stream)\n", " print(model_config)\n", " except yaml.YAMLError as exc:\n", " print(exc)" ] }, { "cell_type": "markdown", "id": "d4768b08", "metadata": {}, "source": [ "# Load data\n", "\n", "Load processed CSVs into `TimeSeries` object" ] }, { "cell_type": "code", "execution_count": null, "id": "bc28772c", "metadata": {}, "outputs": [], "source": [ "def load_dataset(train_clean: pd.DataFrame, train_anom: pd.DataFrame, test: pd.DataFrame, sensor_cols: List[str], label_col: str) -> Tuple:\n", " if type(sensor_cols) == str:\n", " sensor_cols = sensor_cols.split(\"\\n\")\n", " train_dataset = TimeSeriesDataset()\n", " test_dataset = TimeSeriesDataset()\n", " \n", " \n", " for sensor in sensor_cols:\n", " train_dataset.append(\n", " TimeSeries(\n", " values=train_clean[sensor].values,\n", " labels=None,\n", " item_id=f\"{sensor}_train_clean\"\n", " )\n", " )\n", " \n", " train_dataset.append(\n", " TimeSeries(\n", " values=train_anom[sensor].values,\n", " labels=train_anom[label_col].values,\n", " item_id=f\"{sensor}_train_anom\"\n", " )\n", " )\n", " \n", " test_dataset.append(\n", " TimeSeries(\n", " values=test[sensor].values,\n", " labels=test[label_col].values,\n", " item_id=f\"{sensor}_test\"\n", " )\n", " )\n", " \n", " return train_dataset, test_dataset" ] }, { "cell_type": "code", "execution_count": null, "id": "71ecad9c", "metadata": {}, "outputs": [], "source": [ "train_clean = pd.read_csv(\"../../data/03_primary/iot/iot_ncad_train.csv\")\n", "train_anom = pd.read_csv(\"../../data/03_primary/iot/iot_ncad_train_anom.csv\")\n", "test = pd.read_csv(\"../../data/03_primary/iot/iot_ncad_test.csv\")\n", "\n", "with open(f\"../../data/03_primary/iot/iot_sensor_list_batadal.txt\", \"r\") as f:\n", " sensors = f.read().split(\"\\n\")" ] }, { "cell_type": "code", "execution_count": null, "id": "4333c4b2", "metadata": {}, "outputs": [], "source": [ "train_dataset, test_dataset = load_dataset(train_clean, train_anom, test, sensors, \"label\")" ] }, { "cell_type": "markdown", "id": "90b9e436", "metadata": {}, "source": [ "# Standardize data\n", "\n", "substract median, divide by interquartile range" ] }, { "cell_type": "code", "execution_count": null, "id": "c32887db", "metadata": {}, "outputs": [], "source": [ "def standardize_dataset(train_set: TimeSeriesDataset, test_set: TimeSeriesDataset) -> Tuple:\n", " # Standardize TimeSeries values (substract median, divide by interquartile range)\n", " scaler = tr.TimeSeriesScaler(type=\"robust\")\n", " train_set = TimeSeriesDataset(ncad.utils.take_n_cycle(scaler(train_set), len(train_set)))\n", " test_set = TimeSeriesDataset(ncad.utils.take_n_cycle(scaler(test_set), len(test_set)))\n", " ts_channels = train_set[0].shape[1]\n", " return train_set, test_set, ts_channels" ] }, { "cell_type": "code", "execution_count": null, "id": "1cd2b28f", "metadata": {}, "outputs": [], "source": [ "train_standard, test_standard, ts_channels = standardize_dataset(train_dataset, test_dataset)" ] }, { "cell_type": "markdown", "id": "4b306b56", "metadata": {}, "source": [ "# Inject anomalies\n", "\n", "NCAD is a semi-supervised algorithm, and includes functionalities to inject anomalies into the training data set" ] }, { "cell_type": "code", "execution_count": null, "id": "37d82358", "metadata": {}, "outputs": [], "source": [ "def batadal_inject_anomalies(\n", " dataset: TimeSeriesDataset,\n", " injection_method: str,\n", " ratio_injected_spikes: float,\n", ") -> TimeSeriesDataset:\n", "\n", " # dataset is transformed using a TimeSeriesTransform depending on the type of injection\n", " if injection_method == \"None\":\n", " return dataset\n", " elif injection_method == \"local_outliers\":\n", " if ratio_injected_spikes is None:\n", " # Inject synthetic anomalies: LocalOutlier\n", " ts_transform = tr.LocalOutlier(area_radius=500, num_spikes=360)\n", " else:\n", " ts_transform = tr.LocalOutlier(\n", " area_radius=700,\n", " num_spikes=ratio_injected_spikes,\n", " spike_multiplier_range=(1.0, 3.0),\n", " # direction_options = ['increase'],\n", " )\n", "\n", " # Generate many examples of injected time series\n", " multiplier = 20\n", " ts_transform_iterator = ts_transform(itertools.cycle(dataset))\n", " dataset_transformed = ncad.utils.take_n_cycle(\n", " ts_transform_iterator, multiplier * len(dataset)\n", " )\n", " dataset_transformed = TimeSeriesDataset(dataset_transformed)\n", " else:\n", " raise ValueError(f\"injection_method = {injection_method} not supported!\")\n", "\n", " return dataset_transformed" ] }, { "cell_type": "code", "execution_count": null, "id": "48d0db40", "metadata": {}, "outputs": [], "source": [ "%%time\n", "train_injected = batadal_inject_anomalies(\n", " train_standard, \n", " model_config[\"injection_method\"], \n", " model_config[\"ratio_injected_spikes\"]\n", ")" ] }, { "cell_type": "markdown", "id": "77af86dd", "metadata": {}, "source": [ "# Split Test into val/test" ] }, { "cell_type": "code", "execution_count": null, "id": "004806c8", "metadata": {}, "outputs": [], "source": [ "def split_test_into_val(test_set: TimeSeriesDataset, window_length: int, suspect_window_length: int) -> Tuple:\n", " # Split test dataset in two, half for validation and half for test\n", " _, validation_set, test_set = ncad.ts.split_train_val_test(\n", " data=test_set,\n", " val_portion=0.5, # No labels in training data that could serve for validation\n", " test_portion=0.5, # This dataset provides test set, so we don't need to take from training data\n", " split_method=\"past_future_with_warmup\",\n", " split_warmup_length=window_length - suspect_window_length,\n", " verbose=False,\n", " )\n", " \n", " return validation_set, test_set" ] }, { "cell_type": "code", "execution_count": null, "id": "06d18a60", "metadata": {}, "outputs": [], "source": [ "validation_split, test_split = split_test_into_val(\n", " test_standard, \n", " model_config[\"window_length\"],\n", " model_config[\"suspect_window_length\"]\n", ")" ] }, { "cell_type": "markdown", "id": "f1c3cce0", "metadata": {}, "source": [ "# Construct data module" ] }, { "cell_type": "code", "execution_count": null, "id": "2f5003cd", "metadata": {}, "outputs": [], "source": [ "def construct_data_module(\n", " train_set_transformed: TimeSeriesDataset, validation_set: TimeSeriesDataset, test_set: TimeSeriesDataset,\n", " window_length: int, suspect_window_length: int, \n", " num_series_in_train_batch: int,\n", " num_crops_per_series: int,\n", " stride_roll_pred_val_test: int,\n", " num_workers_loader: int\n", " \n", ") -> NCADDataModule:\n", " # Define DataModule for training with lighting (window cropping + window injection)#\n", " data_module = NCADDataModule(\n", " train_ts_dataset=train_set_transformed,\n", " validation_ts_dataset=validation_set,\n", " test_ts_dataset=test_set,\n", " window_length=window_length,\n", " suspect_window_length=suspect_window_length,\n", " num_series_in_train_batch=num_series_in_train_batch,\n", " num_crops_per_series=num_crops_per_series,\n", " label_reduction_method=\"any\",\n", " stride_val_and_test=stride_roll_pred_val_test,\n", " num_workers=num_workers_loader,\n", " )\n", " \n", " return data_module" ] }, { "cell_type": "code", "execution_count": null, "id": "fa5fb789", "metadata": {}, "outputs": [], "source": [ "ncad_data_module = construct_data_module(\n", " train_injected, validation_split, test_split,\n", " model_config[\"window_length\"], model_config[\"suspect_window_length\"],\n", " model_config[\"num_series_in_train_batch\"],\n", " model_config[\"num_crops_per_series\"],\n", " model_config[\"stride_roll_pred_val_test\"],\n", " model_config[\"num_workers_loader\"]\n", ")" ] }, { "cell_type": "markdown", "id": "77566fa9", "metadata": {}, "source": [ "# Set up call backs" ] }, { "cell_type": "code", "execution_count": null, "id": "4400a32b", "metadata": {}, "outputs": [], "source": [ "def set_up_callbacks(\n", " ## General\n", " model_dir: Union[str, PosixPath],\n", " log_dir: Union[str, PosixPath],\n", ") -> ModelCheckpoint:\n", " # Experiment name #\n", " time_now = time.strftime(\"%Y-%m-%d-%H%M%S\", time.localtime())\n", " exp_name = f\"batadal-{time_now}\"\n", "\n", " ### Training the model ###\n", "\n", " logger = TensorBoardLogger(save_dir=log_dir, name=exp_name)\n", "\n", " # Checkpoint callback, monitoring 'val_f1'\n", " checkpoint_cb = ModelCheckpoint(\n", " monitor=\"val_f1\",\n", " dirpath=model_dir,\n", " filename=\"ncad-model-\" + exp_name + \"-{epoch:02d}-{val_f1:.4f}\",\n", " save_top_k=1,\n", " mode=\"max\",\n", " )\n", " \n", " return checkpoint_cb, logger, exp_name" ] }, { "cell_type": "code", "execution_count": null, "id": "d891e0dc", "metadata": {}, "outputs": [], "source": [ "checkpointer, tb_logger, exp_name = set_up_callbacks(\n", " model_config[\"model_dir\"],\n", " model_config[\"log_dir\"]\n", ")" ] }, { "cell_type": "markdown", "id": "1107c00e", "metadata": {}, "source": [ "# Train model" ] }, { "cell_type": "code", "execution_count": null, "id": "2687fbc7", "metadata": {}, "outputs": [], "source": [ "def set_and_train_model(\n", " data_module: NCADDataModule, \n", " model_dir: Union[str, PosixPath],\n", " logger: TensorBoardLogger,\n", " epochs: int,\n", " limit_val_batches: float,\n", " num_sanity_val_steps: int,\n", " check_val_every_n_epoch: int,\n", " checkpoint_cb: ModelCheckpoint,\n", " ts_channels: int,\n", " window_length: int,\n", " suspect_window_length: int, \n", " tcn_kernel_size: int,\n", " tcn_layers: int,\n", " tcn_out_channels: int,\n", " tcn_maxpool_out_channels: int,\n", " embedding_rep_dim: int,\n", " normalize_embedding: bool,\n", " # hpars for classifier\n", " distance: str,\n", " classifier_threshold: float,\n", " threshold_grid_length_test: float,\n", " # hpars for optimizer\n", " learning_rate: float,\n", " # hpars for anomalizers\n", " coe_rate: float,\n", " mixup_rate: float,\n", " # hpars for validation and test\n", " stride_roll_pred_val_test: int,\n", " val_labels_adj: bool,\n", " test_labels_adj: bool,\n", " max_windows_unfold_batch: Optional[int] = 5000,\n", " \n", " evaluation_result_path: Optional[Union[str, PosixPath]] = None,\n", ") -> Tuple:\n", " if distance == \"cosine\":\n", " # For the contrastive approach, the cosine distance is used\n", " distance = ncad.model.distances.CosineDistance()\n", " elif distance == \"L2\":\n", " # For the contrastive approach, the L2 distance is used\n", " distance = ncad.model.distances.LpDistance(p=2)\n", " elif distance == \"non-contrastive\":\n", " # For the non-contrastive approach, the classifier is\n", " # a neural-net based on the embedding of the whole window\n", " distance = ncad.model.distances.BinaryOnX1(rep_dim=embedding_rep_dim, layers=1)\n", "\n", " # Instantiate model #\n", " model = NCAD(\n", " ts_channels=ts_channels,\n", " window_length=window_length,\n", " suspect_window_length=suspect_window_length,\n", " # hpars for encoder\n", " tcn_kernel_size=tcn_kernel_size,\n", " tcn_layers=tcn_layers,\n", " tcn_out_channels=tcn_out_channels,\n", " tcn_maxpool_out_channels=tcn_maxpool_out_channels,\n", " embedding_rep_dim=embedding_rep_dim,\n", " normalize_embedding=normalize_embedding,\n", " # hpars for classifier\n", " distance=distance,\n", " classification_loss=nn.BCELoss(),\n", " classifier_threshold=classifier_threshold,\n", " threshold_grid_length_test=threshold_grid_length_test,\n", " # hpars for anomalizers\n", " coe_rate=coe_rate,\n", " mixup_rate=mixup_rate,\n", " # hpars for validation and test\n", " stride_rolling_val_test=stride_roll_pred_val_test,\n", " val_labels_adj=val_labels_adj,\n", " test_labels_adj=test_labels_adj,\n", " max_windows_unfold_batch=max_windows_unfold_batch,\n", " # hpars for optimizer\n", " learning_rate=learning_rate,\n", " )\n", " \n", " trainer = Trainer(\n", " gpus=1 if torch.cuda.is_available() else 0,\n", " default_root_dir=model_dir,\n", " logger=logger,\n", " min_epochs=epochs,\n", " max_epochs=epochs,\n", " limit_val_batches=limit_val_batches,\n", " num_sanity_val_steps=1,\n", " check_val_every_n_epoch=check_val_every_n_epoch,\n", " callbacks=[checkpoint_cb],\n", " # callbacks=[checkpoint_cb, earlystop_cb, lr_logger],\n", " auto_lr_find=False,\n", " )\n", " \n", " trainer.fit(\n", " model=model,\n", " datamodule=data_module,\n", " )\n", " \n", " # Metrics on validation and test data #\n", " evaluation_result = trainer.test()\n", " evaluation_result = evaluation_result[0]\n", " classifier_threshold = evaluation_result[\"classifier_threshold\"]\n", " \n", " # Save evaluation results\n", " if evaluation_result_path is not None:\n", " path = evaluation_result_path\n", " path = PosixPath(path).expanduser() if str(path).startswith(\"~\") else Path(path)\n", " with open(path, \"w\") as f:\n", " json.dump(evaluation_result, f, cls=ncad.utils.NpEncoder)\n", "\n", " for key, value in evaluation_result.items():\n", " # if key.startswith('test_'):\n", " print(f\"{key}={value}\")\n", " \n", " return model_dir, classifier_threshold\n" ] }, { "cell_type": "code", "execution_count": null, "id": "eebda6ad", "metadata": {}, "outputs": [], "source": [ "_, classifier_threshold = set_and_train_model(\n", " ncad_data_module, \n", " model_config[\"model_dir\"],\n", " tb_logger,\n", " model_config[\"epochs\"],\n", " model_config[\"limit_val_batches\"],\n", " model_config[\"num_sanity_val_steps\"],\n", " model_config[\"check_val_every_n_epoch\"],\n", " checkpointer,\n", " ts_channels,\n", " model_config[\"window_length\"],\n", " model_config[\"suspect_window_length\"], \n", " model_config[\"tcn_kernel_size\"],\n", " model_config[\"tcn_layers\"],\n", " model_config[\"tcn_out_channels\"],\n", " model_config[\"tcn_maxpool_out_channels\"],\n", " model_config[\"embedding_rep_dim\"],\n", " model_config[\"normalize_embedding\"],\n", " # hpars for classifier\n", " model_config[\"distance\"],\n", " model_config[\"classifier_threshold\"],\n", " model_config[\"threshold_grid_length_test\"],\n", " # hpars for optimizer\n", " model_config[\"learning_rate\"],\n", " # hpars for anomalizers\n", " model_config[\"coe_rate\"],\n", " model_config[\"mixup_rate\"],\n", " # hpars for validation and test\n", " model_config[\"stride_roll_pred_val_test\"],\n", " model_config[\"val_labels_adj\"],\n", " model_config[\"test_labels_adj\"],\n", " model_config[\"max_windows_unfold_batch\"],\n", " \n", " model_config[\"evaluation_result_path\"],\n", ")" ] }, { "cell_type": "markdown", "id": "933a8429", "metadata": {}, "source": [ "# Perform inference" ] }, { "cell_type": "code", "execution_count": null, "id": "3f1befda", "metadata": {}, "outputs": [], "source": [ "def load_and_evaluate(\n", " model_dir: Union[str, PosixPath],\n", " exp_name: str,\n", " data_set: TimeSeriesDataset,\n", " stride: int,\n", " classifier_threshold: float\n", "):\n", " model_dir = PosixPath(model_dir).expanduser() if str(model_dir).startswith(\"~\") else Path(model_dir)\n", " \n", " # Load top performing checkpoint\n", " # ckpt_path = [x for x in model_dir.glob('*.ckpt')][-1]\n", " ckpt_file = [\n", " file\n", " for file in os.listdir(model_dir)\n", " if (file.endswith(\".ckpt\") and file.startswith(\"ncad-model-\" + exp_name))\n", " ]\n", "\n", " ckpt_file = ckpt_file[-1]\n", " ckpt_path = model_dir / ckpt_file\n", " model = NCAD.load_from_checkpoint(ckpt_path)\n", "\n", " anomaly_probs_avg, anomaly_vote = model.tsdetect(data_set, stride, classifier_threshold)\n", " \n", " print(f\"NCAD on batadal dataset finished successfully!\") \n", " \n", " return anomaly_probs_avg, anomaly_vote" ] }, { "cell_type": "code", "execution_count": null, "id": "abf0dd4a", "metadata": {}, "outputs": [], "source": [ "anomaly_probs_avg, anomaly_vote = load_and_evaluate(\n", " model_config[\"model_dir\"],\n", " exp_name,\n", " test_standard,\n", " model_config[\"stride_roll_pred_val_test\"],\n", " classifier_threshold\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "af8f0806", "metadata": {}, "outputs": [], "source": [ "print(anomaly_probs_avg.shape, anomaly_vote.shape)" ] }, { "cell_type": "markdown", "id": "31e2772b", "metadata": {}, "source": [ "# References\n", "Riccardo Taormina and Stefano Galelli and Nils Ole Tippenhauer and Elad Salomons and Avi Ostfeld and Demetrios G. Eliades and Mohsen Aghashahi and Raanju Sundararajan and Mohsen Pourahmadi and M. Katherine Banks and B. M. Brentan and Enrique Campbell and G. Lima and D. Manzi and D. Ayala-Cabrera and M. Herrera and I. Montalvo and J. Izquierdo and E. Luvizotto and Sarin E. Chandy and Amin Rasekh and Zachary A. Barker and Bruce Campbell and M. Ehsan Shafiee and Marcio Giacomoni and Nikolaos Gatsis and Ahmad Taha and Ahmed A. Abokifa and Kelsey Haddad and Cynthia S. Lo and Pratim Biswas and M. Fayzul K. Pasha and Bijay Kc and Saravanakumar Lakshmanan Somasundaram and Mashor Housh and Ziv Ohar; \"The Battle Of The Attack Detection Algorithms: Disclosing Cyber Attacks On Water Distribution Networks.\" Journal of Water Resources Planning and Management, 144 (8), August 2018\n", "\n", "Chris U. Carmona, François-Xavier Aubet, Valentin Flunkert, and Jan Gasthaus. 2021. Neural Contextual Anomaly Detection for Time Series." ] }, { "cell_type": "code", "execution_count": null, "id": "e3302c01", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "kedro-ncad-venv", "language": "python", "name": "kedro-ncad-venv" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.12" } }, "nbformat": 4, "nbformat_minor": 5 }