{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Data preparation\n", "\n", "**SageMaker Studio Kernel**: Data Science\n", "\n", "The challenge we're trying to address here is to detect anomalies in the components of a Wind Turbine. Each wind turbine has many sensors that reads data like:\n", " - Internal & external temperature\n", " - Wind speed\n", " - Rotor speed\n", " - Air pressure\n", " - Voltage (or current) in the generator\n", " - Vibration in the GearBox (using an IMU -> Accelerometer + Gyroscope)\n", "\n", "So, depending on the types of the anomalies we want to detect, we need to select one or more features and then prepare a dataset that 'explains' the anomalies. We are interested in three types of anomalies:\n", " - Rotor speed (when the rotor is not in an expected speed)\n", " - Produced voltage (when the generator is not producing the expected voltage)\n", " - Gearbox vibration (when the vibration of the gearbox is far from the expected)\n", " \n", "All these three anomalies (or violations) depend on many variables while the turbine is working. Thus, in order to address that, let's use a ML model called [Autoencoder](https://en.wikipedia.org/wiki/Autoencoder), with correlated features. This model is unsupervised. It learns the latent representation of the dataset and tries to predict (regression) the same tensor given as input. The strategy then is to use a dataset collected from a normal turbine (without anomalies). The model will then learn **'what is a normal turbine'**. When the sensors readings of a malfunctioning turbine is used as input, the model will not be able to rebuild the input, predicting something with a high error and detected as an anomaly.\n", "\n", "The sequence of the sensors readings can be seen as a time-series dataset and therefore we observe a high correlation between neighbour samples. We can explore this by reformatting the data as a multidimensional tensor. We'll create a temporal encoding of six features in 10x10 steps of 250ms each. 250ms is the interval computed using 5 samples (the time interval between each sample is ~50ms). It means that we will create a tensor with a shape of 6x10x10.\n", "\n", "![Tensor](../imgs/tensor.png)\n", "\n", "In the tensor above, each color is a different feature, encoded in 100 (10x10) timesteps (from the current reading to the past in a sliding window).\n", "\n", "Let's start preparing our dataset, then." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Install this lib to improve data visualization" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "!pip install -U matplotlib seaborn==0.11.1 pywavelets==1.1.1" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Download the raw data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!mkdir -p data\n", "!curl https://aws-ml-blog.s3.amazonaws.com/artifacts/monitor-manage-anomaly-detection-model-wind-turbine-fleet-sagemaker-neo/dataset_wind_turbine.csv.gz -o data/dataset_wind.csv.gz" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Preparing the dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%matplotlib inline\n", "%config InlineBackend.figure_format='retina'\n", "import pandas as pd\n", "import math\n", "import matplotlib.pyplot as plt\n", "import pywt\n", "import numpy as np\n", "from datetime import datetime" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Helper functions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def euler_from_quaternion(x, y, z, w):\n", " \"\"\"\n", " Convert a quaternion into euler angles (roll, pitch, yaw)\n", " roll is rotation around x in radians (counterclockwise)\n", " pitch is rotation around y in radians (counterclockwise)\n", " yaw is rotation around z in radians (counterclockwise)\n", " \"\"\"\n", " t0 = +2.0 * (w * x + y * z)\n", " t1 = +1.0 - 2.0 * (x * x + y * y)\n", " roll_x = math.atan2(t0, t1)\n", "\n", " t2 = +2.0 * (w * y - z * x)\n", " t2 = +1.0 if t2 > +1.0 else t2\n", " t2 = -1.0 if t2 < -1.0 else t2\n", " pitch_y = math.asin(t2)\n", "\n", " t3 = +2.0 * (w * z + x * y)\n", " t4 = +1.0 - 2.0 * (y * y + z * z)\n", " yaw_z = math.atan2(t3, t4)\n", "\n", " return roll_x, pitch_y, yaw_z # in radians" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def wavelet_denoise(data, wavelet, noise_sigma):\n", " '''Filter accelerometer data using wavelet denoising\n", "\n", " Modification of F. Blanco-Silva's code at: https://goo.gl/gOQwy5\n", " '''\n", " \n", " wavelet = pywt.Wavelet(wavelet)\n", " levels = min(5, (np.floor(np.log2(data.shape[0]))).astype(int))\n", " \n", " # Francisco's code used wavedec2 for image data\n", " wavelet_coeffs = pywt.wavedec(data, wavelet, level=levels)\n", " threshold = noise_sigma*np.sqrt(2*np.log2(data.size))\n", "\n", " new_wavelet_coeffs = map(lambda x: pywt.threshold(x, threshold, mode='soft'), wavelet_coeffs)\n", "\n", " return pywt.waverec(list(new_wavelet_coeffs), wavelet)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Preparing the dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "parser = lambda date: datetime.strptime(date, '%Y-%m-%dT%H:%M:%S.%f+00:00')\n", "df = pd.read_csv('data/dataset_wind.csv.gz', compression=\"gzip\", sep=',', low_memory=False, parse_dates=[ 'eventTime'], date_parser=parser)\n", "\n", "df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Features:\n", " - **nanoId**: id of the edge device that collected the data\n", " - **turbineId**: id of the turbine that produced this data\n", " - **arduino_timestamp**: timestamp of the arduino that was operating this turbine\n", " - **nanoFreemem**: amount of free memory in bytes\n", " - **eventTime**: timestamp of the row\n", " - **rps**: rotation of the rotor in Rotations Per Second\n", " - **voltage**: voltage produced by the generator in milivolts\n", " - **qw, qx, qy, qz**: quaternion angular acceleration\n", " - **gx, gy, gz**: gravity acceleration\n", " - **ax, ay, az**: linear acceleration\n", " - **gearboxtemp**: internal temperature\n", " - **ambtemp**: external temperature\n", " - **humidity**: air humidity\n", " - **pressure**: air pressure\n", " - **gas**: air quality\n", " - **wind_speed_rps**: wind speed in Rotations Per Second" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('now converting quat to euler...')\n", "roll,pitch,yaw = [], [], []\n", "for idx, row in df.iterrows():\n", " r,p,y = euler_from_quaternion(row['qx'], row['qy'], row['qz'], row['qw'])\n", " roll.append(r)\n", " pitch.append(p)\n", " yaw.append(y)\n", "df['roll'] = roll\n", "df['pitch'] = pitch\n", "df['yaw'] = yaw" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## we will select the following features to prepare our dataset\n", "## with these features we have parameters for vibration, rotation and voltage\n", "quat=['qx', 'qy', 'qz', 'qw']\n", "rot=['wind_speed_rps', 'rps']\n", "volt=['voltage']\n", "features = quat + rot + volt" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Ploting the vibration data, just to have an idea" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df[quat[:3]].iloc[1910:2000].plot(figsize=(20,10))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Now, plot the rotation of the turbine and the wind speed in RPS" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df[rot].iloc[1910:2000].plot(figsize=(20,10))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Finally, plot the voltage readings" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df[volt].iloc[1910:2000].plot(figsize=(20,10))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Cleaning and normalizing" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "df_train = df.copy()\n", "\n", "# select the features\n", "features = ['roll', 'pitch', 'yaw', 'wind_speed_rps', 'rps', 'voltage']\n", "\n", "# get the std for denoising\n", "raw_std = df_train[features].std()\n", "for f in features:\n", " df_train[f] = wavelet_denoise(df_train[f].values, 'db6', raw_std[f])#[:-1]\n", "\n", "# normalize\n", "training_std = df_train[features].std()\n", "training_mean = df_train[features].mean()\n", "df_train = (df_train[features] - training_mean) / training_std\n", "\n", "print(\"raw_std = np.array([\")\n", "for k in raw_std.keys(): print(\" %f, # %s\" % (raw_std[k], k))\n", "print(\"])\")\n", "\n", "print(\"mean = np.array([\")\n", "for k in training_mean.keys(): print(\" %f, # %s\" % (training_mean[k], k))\n", "print(\"])\")\n", "print(\"std = np.array([\")\n", "for k in training_std.keys(): print(\" %f, # %s\" % (training_std[k], k))\n", "print(\"])\")\n", "\n", "print(\"Number of training samples:\", len(df_train))\n", "df_train.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Alright, this is our dataset. Let's just plot the original vs the prepared data\n", "**Original Data**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df[features][:2000].plot(figsize=(20,10))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Denoised & Normalized Data**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_train[:2000].plot(figsize=(20,10))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import seaborn as sns\n", "corr = df_train.corr()\n", "\n", "f, ax = plt.subplots(figsize=(15, 8))\n", "sns.heatmap(corr, annot=True, fmt=\"f\",\n", " xticklabels=corr.columns.values,\n", " yticklabels=corr.columns.values,\n", " ax=ax)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The correlation between rps and voltage is high, but remember that we need both to detect anomalies" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def create_dataset(X, time_steps=1, step=1):\n", " Xs = []\n", " for i in range(0, len(X) - time_steps, step):\n", " v = X.iloc[i:(i + time_steps)].values\n", " Xs.append(v)\n", " return np.array(Xs)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "INTERVAL = 5 # seconds\n", "TIME_STEPS = 20 * INTERVAL # 50ms -> seg: 50ms * 20\n", "STEP = 10\n", "n_cols = len(df_train.columns)\n", "X = create_dataset(df_train, TIME_STEPS, STEP)\n", "X = np.nan_to_num(X, copy=True, nan=0.0, posinf=None, neginf=None)\n", "\n", "X = np.transpose(X, (0, 2, 1)).reshape(X.shape[0], n_cols, 10, 10)\n", "\n", "X.shape" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## We need to split the array in chunks of at most 5MB\n", "!rm -rf data/*.npy\n", "for i,x in enumerate(np.array_split(X, 60)):\n", " np.save('data/wind_turbine_%02d.npy' % i, x)" ] } ], "metadata": { "kernelspec": { "display_name": ".env", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.4 (default, Sep 29 2022, 09:38:30) \n[Clang 14.0.0 (clang-1400.0.29.102)]" }, "vscode": { "interpreter": { "hash": "7afa42a84cdccb65309963e1bb5252b23462aabc18ffc1fd32db85a162cf9cb7" } } }, "nbformat": 4, "nbformat_minor": 4 }