Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0

# Train NCAD with IOT data

**Note**: This model requires GPUs to efficiently train. 

## Table of Contents

1. Load model parameters
2. Load train and test CSVs
 * Load them into `TimeSeries` objects defined by the `ncad` library
3. Standardize data based on interquartile range
4. Inject anomalies
 * NCAD is a semi-supervised algorithm, and adds samples with anomalies to the training set
 
5. Split test dataset into val and test
6. Construct NCADDataModule
7. Set up callbacks 
8. Train model
9. Perform inference

In [None]:
import sys 
sys.path.append('../../src/anomaly_detection_spatial_temporal_data/')

In [None]:
import os
import time
import json
from typing import List, Union, Optional, Tuple, Dict

from pathlib import Path, PosixPath

import pandas as pd
import numpy as np
import itertools

import re

import torch
from torch import nn

import pytorch_lightning as pl
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger

import ncad
from ncad.ts import TimeSeries, TimeSeriesDataset
from ncad.ts import transforms as tr
from ncad.model import NCAD, NCADDataModule

import tqdm
import yaml

# Load model params

In [None]:
model_config_file = "../../conf/base/parameters/ncad.yml"

In [None]:
with open(model_config_file, "r") as stream:
 try:
 model_config = yaml.safe_load(stream)
 print(model_config)
 except yaml.YAMLError as exc:
 print(exc)

# Load data

Load processed CSVs into `TimeSeries` object

In [None]:
def load_dataset(train_clean: pd.DataFrame, train_anom: pd.DataFrame, test: pd.DataFrame, sensor_cols: List[str], label_col: str) -> Tuple:
 if type(sensor_cols) == str:
 sensor_cols = sensor_cols.split("\n")
 train_dataset = TimeSeriesDataset()
 test_dataset = TimeSeriesDataset()
 
 
 for sensor in sensor_cols:
 train_dataset.append(
 TimeSeries(
 values=train_clean[sensor].values,
 labels=None,
 item_id=f"{sensor}_train_clean"
 )
 )
 
 train_dataset.append(
 TimeSeries(
 values=train_anom[sensor].values,
 labels=train_anom[label_col].values,
 item_id=f"{sensor}_train_anom"
 )
 )
 
 test_dataset.append(
 TimeSeries(
 values=test[sensor].values,
 labels=test[label_col].values,
 item_id=f"{sensor}_test"
 )
 )
 
 return train_dataset, test_dataset

In [None]:
train_clean = pd.read_csv("../../data/03_primary/iot/iot_ncad_train.csv")
train_anom = pd.read_csv("../../data/03_primary/iot/iot_ncad_train_anom.csv")
test = pd.read_csv("../../data/03_primary/iot/iot_ncad_test.csv")

with open(f"../../data/03_primary/iot/iot_sensor_list_batadal.txt", "r") as f:
 sensors = f.read().split("\n")

In [None]:
train_dataset, test_dataset = load_dataset(train_clean, train_anom, test, sensors, "label")

# Standardize data

substract median, divide by interquartile range

In [None]:
def standardize_dataset(train_set: TimeSeriesDataset, test_set: TimeSeriesDataset) -> Tuple:
 # Standardize TimeSeries values (substract median, divide by interquartile range)
 scaler = tr.TimeSeriesScaler(type="robust")
 train_set = TimeSeriesDataset(ncad.utils.take_n_cycle(scaler(train_set), len(train_set)))
 test_set = TimeSeriesDataset(ncad.utils.take_n_cycle(scaler(test_set), len(test_set)))
 ts_channels = train_set[0].shape[1]
 return train_set, test_set, ts_channels

In [None]:
train_standard, test_standard, ts_channels = standardize_dataset(train_dataset, test_dataset)

# Inject anomalies

NCAD is a semi-supervised algorithm, and includes functionalities to inject anomalies into the training data set

In [None]:
def batadal_inject_anomalies(
 dataset: TimeSeriesDataset,
 injection_method: str,
 ratio_injected_spikes: float,
) -> TimeSeriesDataset:

 # dataset is transformed using a TimeSeriesTransform depending on the type of injection
 if injection_method == "None":
 return dataset
 elif injection_method == "local_outliers":
 if ratio_injected_spikes is None:
 # Inject synthetic anomalies: LocalOutlier
 ts_transform = tr.LocalOutlier(area_radius=500, num_spikes=360)
 else:
 ts_transform = tr.LocalOutlier(
 area_radius=700,
 num_spikes=ratio_injected_spikes,
 spike_multiplier_range=(1.0, 3.0),
 # direction_options = ['increase'],
 )

 # Generate many examples of injected time series
 multiplier = 20
 ts_transform_iterator = ts_transform(itertools.cycle(dataset))
 dataset_transformed = ncad.utils.take_n_cycle(
 ts_transform_iterator, multiplier * len(dataset)
 )
 dataset_transformed = TimeSeriesDataset(dataset_transformed)
 else:
 raise ValueError(f"injection_method = {injection_method} not supported!")

 return dataset_transformed

In [None]:
%%time
train_injected = batadal_inject_anomalies(
 train_standard, 
 model_config["injection_method"], 
 model_config["ratio_injected_spikes"]
)

# Split Test into val/test

In [None]:
def split_test_into_val(test_set: TimeSeriesDataset, window_length: int, suspect_window_length: int) -> Tuple:
 # Split test dataset in two, half for validation and half for test
 _, validation_set, test_set = ncad.ts.split_train_val_test(
 data=test_set,
 val_portion=0.5, # No labels in training data that could serve for validation
 test_portion=0.5, # This dataset provides test set, so we don't need to take from training data
 split_method="past_future_with_warmup",
 split_warmup_length=window_length - suspect_window_length,
 verbose=False,
 )
 
 return validation_set, test_set

In [None]:
validation_split, test_split = split_test_into_val(
 test_standard, 
 model_config["window_length"],
 model_config["suspect_window_length"]
)

# Construct data module

In [None]:
def construct_data_module(
 train_set_transformed: TimeSeriesDataset, validation_set: TimeSeriesDataset, test_set: TimeSeriesDataset,
 window_length: int, suspect_window_length: int, 
 num_series_in_train_batch: int,
 num_crops_per_series: int,
 stride_roll_pred_val_test: int,
 num_workers_loader: int
 
) -> NCADDataModule:
 # Define DataModule for training with lighting (window cropping + window injection)#
 data_module = NCADDataModule(
 train_ts_dataset=train_set_transformed,
 validation_ts_dataset=validation_set,
 test_ts_dataset=test_set,
 window_length=window_length,
 suspect_window_length=suspect_window_length,
 num_series_in_train_batch=num_series_in_train_batch,
 num_crops_per_series=num_crops_per_series,
 label_reduction_method="any",
 stride_val_and_test=stride_roll_pred_val_test,
 num_workers=num_workers_loader,
 )
 
 return data_module

In [None]:
ncad_data_module = construct_data_module(
 train_injected, validation_split, test_split,
 model_config["window_length"], model_config["suspect_window_length"],
 model_config["num_series_in_train_batch"],
 model_config["num_crops_per_series"],
 model_config["stride_roll_pred_val_test"],
 model_config["num_workers_loader"]
)

# Set up call backs

In [None]:
def set_up_callbacks(
 ## General
 model_dir: Union[str, PosixPath],
 log_dir: Union[str, PosixPath],
) -> ModelCheckpoint:
 # Experiment name #
 time_now = time.strftime("%Y-%m-%d-%H%M%S", time.localtime())
 exp_name = f"batadal-{time_now}"

 ### Training the model ###

 logger = TensorBoardLogger(save_dir=log_dir, name=exp_name)

 # Checkpoint callback, monitoring 'val_f1'
 checkpoint_cb = ModelCheckpoint(
 monitor="val_f1",
 dirpath=model_dir,
 filename="ncad-model-" + exp_name + "-{epoch:02d}-{val_f1:.4f}",
 save_top_k=1,
 mode="max",
 )
 
 return checkpoint_cb, logger, exp_name

In [None]:
checkpointer, tb_logger, exp_name = set_up_callbacks(
 model_config["model_dir"],
 model_config["log_dir"]
)

# Train model

In [None]:
def set_and_train_model(
 data_module: NCADDataModule, 
 model_dir: Union[str, PosixPath],
 logger: TensorBoardLogger,
 epochs: int,
 limit_val_batches: float,
 num_sanity_val_steps: int,
 check_val_every_n_epoch: int,
 checkpoint_cb: ModelCheckpoint,
 ts_channels: int,
 window_length: int,
 suspect_window_length: int, 
 tcn_kernel_size: int,
 tcn_layers: int,
 tcn_out_channels: int,
 tcn_maxpool_out_channels: int,
 embedding_rep_dim: int,
 normalize_embedding: bool,
 # hpars for classifier
 distance: str,
 classifier_threshold: float,
 threshold_grid_length_test: float,
 # hpars for optimizer
 learning_rate: float,
 # hpars for anomalizers
 coe_rate: float,
 mixup_rate: float,
 # hpars for validation and test
 stride_roll_pred_val_test: int,
 val_labels_adj: bool,
 test_labels_adj: bool,
 max_windows_unfold_batch: Optional[int] = 5000,
 
 evaluation_result_path: Optional[Union[str, PosixPath]] = None,
) -> Tuple:
 if distance == "cosine":
 # For the contrastive approach, the cosine distance is used
 distance = ncad.model.distances.CosineDistance()
 elif distance == "L2":
 # For the contrastive approach, the L2 distance is used
 distance = ncad.model.distances.LpDistance(p=2)
 elif distance == "non-contrastive":
 # For the non-contrastive approach, the classifier is
 # a neural-net based on the embedding of the whole window
 distance = ncad.model.distances.BinaryOnX1(rep_dim=embedding_rep_dim, layers=1)

 # Instantiate model #
 model = NCAD(
 ts_channels=ts_channels,
 window_length=window_length,
 suspect_window_length=suspect_window_length,
 # hpars for encoder
 tcn_kernel_size=tcn_kernel_size,
 tcn_layers=tcn_layers,
 tcn_out_channels=tcn_out_channels,
 tcn_maxpool_out_channels=tcn_maxpool_out_channels,
 embedding_rep_dim=embedding_rep_dim,
 normalize_embedding=normalize_embedding,
 # hpars for classifier
 distance=distance,
 classification_loss=nn.BCELoss(),
 classifier_threshold=classifier_threshold,
 threshold_grid_length_test=threshold_grid_length_test,
 # hpars for anomalizers
 coe_rate=coe_rate,
 mixup_rate=mixup_rate,
 # hpars for validation and test
 stride_rolling_val_test=stride_roll_pred_val_test,
 val_labels_adj=val_labels_adj,
 test_labels_adj=test_labels_adj,
 max_windows_unfold_batch=max_windows_unfold_batch,
 # hpars for optimizer
 learning_rate=learning_rate,
 )
 
 trainer = Trainer(
 gpus=1 if torch.cuda.is_available() else 0,
 default_root_dir=model_dir,
 logger=logger,
 min_epochs=epochs,
 max_epochs=epochs,
 limit_val_batches=limit_val_batches,
 num_sanity_val_steps=1,
 check_val_every_n_epoch=check_val_every_n_epoch,
 callbacks=[checkpoint_cb],
 # callbacks=[checkpoint_cb, earlystop_cb, lr_logger],
 auto_lr_find=False,
 )
 
 trainer.fit(
 model=model,
 datamodule=data_module,
 )
 
 # Metrics on validation and test data #
 evaluation_result = trainer.test()
 evaluation_result = evaluation_result[0]
 classifier_threshold = evaluation_result["classifier_threshold"]
 
 # Save evaluation results
 if evaluation_result_path is not None:
 path = evaluation_result_path
 path = PosixPath(path).expanduser() if str(path).startswith("~") else Path(path)
 with open(path, "w") as f:
 json.dump(evaluation_result, f, cls=ncad.utils.NpEncoder)

 for key, value in evaluation_result.items():
 # if key.startswith('test_'):
 print(f"{key}={value}")
 
 return model_dir, classifier_threshold


In [None]:
_, classifier_threshold = set_and_train_model(
 ncad_data_module, 
 model_config["model_dir"],
 tb_logger,
 model_config["epochs"],
 model_config["limit_val_batches"],
 model_config["num_sanity_val_steps"],
 model_config["check_val_every_n_epoch"],
 checkpointer,
 ts_channels,
 model_config["window_length"],
 model_config["suspect_window_length"], 
 model_config["tcn_kernel_size"],
 model_config["tcn_layers"],
 model_config["tcn_out_channels"],
 model_config["tcn_maxpool_out_channels"],
 model_config["embedding_rep_dim"],
 model_config["normalize_embedding"],
 # hpars for classifier
 model_config["distance"],
 model_config["classifier_threshold"],
 model_config["threshold_grid_length_test"],
 # hpars for optimizer
 model_config["learning_rate"],
 # hpars for anomalizers
 model_config["coe_rate"],
 model_config["mixup_rate"],
 # hpars for validation and test
 model_config["stride_roll_pred_val_test"],
 model_config["val_labels_adj"],
 model_config["test_labels_adj"],
 model_config["max_windows_unfold_batch"],
 
 model_config["evaluation_result_path"],
)

# Perform inference

In [None]:
def load_and_evaluate(
 model_dir: Union[str, PosixPath],
 exp_name: str,
 data_set: TimeSeriesDataset,
 stride: int,
 classifier_threshold: float
):
 model_dir = PosixPath(model_dir).expanduser() if str(model_dir).startswith("~") else Path(model_dir)
 
 # Load top performing checkpoint
 # ckpt_path = [x for x in model_dir.glob('*.ckpt')][-1]
 ckpt_file = [
 file
 for file in os.listdir(model_dir)
 if (file.endswith(".ckpt") and file.startswith("ncad-model-" + exp_name))
 ]

 ckpt_file = ckpt_file[-1]
 ckpt_path = model_dir / ckpt_file
 model = NCAD.load_from_checkpoint(ckpt_path)

 anomaly_probs_avg, anomaly_vote = model.tsdetect(data_set, stride, classifier_threshold)
 
 print(f"NCAD on batadal dataset finished successfully!") 
 
 return anomaly_probs_avg, anomaly_vote

In [None]:
anomaly_probs_avg, anomaly_vote = load_and_evaluate(
 model_config["model_dir"],
 exp_name,
 test_standard,
 model_config["stride_roll_pred_val_test"],
 classifier_threshold
)

In [None]:
print(anomaly_probs_avg.shape, anomaly_vote.shape)

# References
Riccardo Taormina and Stefano Galelli and Nils Ole Tippenhauer and Elad Salomons and Avi Ostfeld and Demetrios G. Eliades and Mohsen Aghashahi and Raanju Sundararajan and Mohsen Pourahmadi and M. Katherine Banks and B. M. Brentan and Enrique Campbell and G. Lima and D. Manzi and D. Ayala-Cabrera and M. Herrera and I. Montalvo and J. Izquierdo and E. Luvizotto and Sarin E. Chandy and Amin Rasekh and Zachary A. Barker and Bruce Campbell and M. Ehsan Shafiee and Marcio Giacomoni and Nikolaos Gatsis and Ahmad Taha and Ahmed A. Abokifa and Kelsey Haddad and Cynthia S. Lo and Pratim Biswas and M. Fayzul K. Pasha and Bijay Kc and Saravanakumar Lakshmanan Somasundaram and Mashor Housh and Ziv Ohar; "The Battle Of The Attack Detection Algorithms: Disclosing Cyber Attacks On Water Distribution Networks." Journal of Water Resources Planning and Management, 144 (8), August 2018

Chris U. Carmona, François-Xavier Aubet, Valentin Flunkert, and Jan Gasthaus. 2021. Neural Contextual Anomaly Detection for Time Series.