{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# Building and Registering our ML Model\n", "\n", "**SageMaker Studio Kernel**: Data Science\n", "\n", "In this exercise you will do:\n", " - Upload the dataset\n", " - Train your ML model using Pytorch\n", " - Compute the thresholds, used by the application, to classify the predictions as anomalies or normal behavior\n", " - Register the model in the model registry\n", "\n", "\n", "### Upload the dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "import numpy as np\n", "import glob\n", "from sagemaker.pytorch.estimator import PyTorch\n", "\n", "role = sagemaker.get_execution_role()\n", "sagemaker_session=sagemaker.Session()\n", "bucket_name = sagemaker_session.default_bucket()\n", "\n", "prefix='wind_turbine_anomaly'\n", "data_files = glob.glob('data/*.npy')\n", "\n", "train_input = \"s3://%s/%s/data\" % (bucket_name, prefix)\n", "\n", "for f in data_files:\n", " sagemaker_session.upload_data(f, key_prefix=\"%s/data\" % prefix)\n", "n_features = np.load(data_files[0]).shape[1]\n", "\n", "print(train_input)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Training script" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile wind_turbine.py\n", "import argparse\n", "import glob\n", "import numpy as np\n", "import os\n", "import time\n", "import torch\n", "import torch.nn as nn\n", "import torch.optim as optim\n", "from torch.autograd import Variable\n", "from sklearn.model_selection import KFold\n", "\n", "device = \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", "\n", "def create_model(n_features, dropout=0): \n", " return torch.nn.Sequential(\n", " torch.nn.Conv2d(n_features, 32, kernel_size=2, padding=1),\n", " torch.nn.ReLU(),\n", " torch.nn.Dropout(dropout),\n", " torch.nn.Conv2d(32, 64, kernel_size=2, padding=1),\n", " torch.nn.ReLU(),\n", " torch.nn.Dropout(dropout),\n", " torch.nn.Conv2d(64, 128, kernel_size=2, padding=2),\n", " torch.nn.ReLU(),\n", " torch.nn.ConvTranspose2d(128, 64, kernel_size=2, padding=2),\n", " torch.nn.ReLU(),\n", " torch.nn.Dropout(dropout),\n", " torch.nn.ConvTranspose2d(64, 32, kernel_size=2, padding=1),\n", " torch.nn.ReLU(),\n", " torch.nn.Dropout(dropout),\n", " torch.nn.ConvTranspose2d(32, n_features, kernel_size=2, padding=1),\n", " ) \n", "\n", "def load_data(data_dir):\n", " input_files = glob.glob(os.path.join(data_dir, '*.npy'))\n", " data = [np.load(i) for i in input_files]\n", " return np.vstack(data) \n", "\n", "def train_epoch(optimizer, criterion, epoch, model, train_dataloader, test_dataloader):\n", " train_loss = 0.0 \n", " test_loss = 0.0 \n", " model.train()\n", " for x_train, y_train in train_dataloader:\n", " # clearing the Gradients of the model parameters\n", " optimizer.zero_grad()\n", " # prediction for training and validation set \n", " output_train = model(x_train) \n", " loss_train = criterion(output_train, y_train)\n", " \n", " # computing the updated weights of all the model parameters\n", " # statistics\n", " train_loss += loss_train.item()\n", " loss_train.backward()\n", " optimizer.step() \n", " model.eval()\n", " for x_test, y_test in test_dataloader: \n", " output_test = model(x_test.float())\n", " loss_test = criterion(output_test, y_test)\n", " # statistics\n", " test_loss += loss_test.item() \n", " \n", " return train_loss, test_loss\n", "\n", "def model_fn(model_dir):\n", " model = torch.load(os.path.join(model_dir, \"model.pth\"))\n", " model = model.to(device)\n", " model.eval()\n", " return model\n", "\n", "def predict_fn(input_data, model): \n", " with torch.no_grad():\n", " return model(input_data.float().to(device))\n", "\n", "def train(args):\n", " best_of_the_best = (0,-1)\n", " best_loss = 10000000\n", " num_epochs = args.num_epochs\n", " batch_size = args.batch_size \n", " \n", " X = load_data(args.train)\n", " criterion = nn.MSELoss() \n", " kf = KFold(n_splits=args.k_fold_splits, shuffle=True)\n", " \n", " for i, indexes in enumerate(kf.split(X)):\n", " # skip other Ks if fixed was informed\n", " if args.k_index_only >= 0 and args.k_index_only != i: continue\n", " \n", " train_index, test_index = indexes\n", " print(\"Test dataset proportion: %.02f%%\" % (len(test_index)/len(train_index) * 100))\n", " X_train, X_test = X[train_index], X[test_index]\n", " X_train = torch.from_numpy(X_train).float().to(device)\n", " X_test = torch.from_numpy(X_test).float().to(device)\n", "\n", " train_dataset = torch.utils.data.TensorDataset(X_train, X_train)\n", " train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size)\n", " test_dataset = torch.utils.data.TensorDataset(X_test, X_test)\n", " test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size)\n", "\n", " model = create_model(args.num_features, args.dropout_rate)\n", " model = model.to(device)\n", "\n", " optimizer = torch.optim.Adam(model.parameters(), lr=args.learning_rate)\n", " # Instantiate model\n", " # Training loop\n", " for epoch in range(num_epochs):\n", " start_time = time.time()\n", " train_loss, test_loss = train_epoch( optimizer, criterion, epoch, model, train_dataloader, test_dataloader)\n", " elapsed_time = (time.time() - start_time)\n", " print(\"k=%d; epoch=%d; train_loss=%.3f; test_loss=%.3f; elapsed_time=%.3fs\" % (i, epoch, train_loss, test_loss, elapsed_time))\n", " if test_loss < best_loss: \n", " torch.save(model.state_dict(), os.path.join(args.output_data_dir,'model_state.pth'))\n", " best_loss = test_loss\n", " if best_loss < best_of_the_best[0]:\n", " best_of_the_best = (best_loss, i)\n", " print(\"\\nBest model: best_mse=%f;\" % best_loss)\n", " model = create_model(args.num_features, args.dropout_rate)\n", " model.load_state_dict( torch.load(os.path.join(args.output_data_dir, \"model_state.pth\")) ) \n", " torch.save(model, os.path.join(args.model_dir, \"model.pth\"))\n", "\n", "if __name__ == '__main__':\n", " nn.DataParallel\n", " parser = argparse.ArgumentParser()\n", "\n", " # Hyperparameters are described here. In this simple example we are just including one hyperparameter. \n", " parser.add_argument('--k_fold_splits', type=int, default=6)\n", " parser.add_argument('--k_index_only', type=int, default=-1)\n", " parser.add_argument('--batch_size', type=int, default=16)\n", " parser.add_argument('--num_epochs', type=int, default=10)\n", " parser.add_argument('--num_features', type=int, default=8)\n", " parser.add_argument('--learning_rate', type=float, default=0.003)\n", " parser.add_argument('--dropout_rate', type=float, default=0.0)\n", "\n", " # Sagemaker specific arguments. Defaults are set in the environment variables.\n", " parser.add_argument('--output-data-dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])\n", " parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])\n", " parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])\n", " parser.add_argument('--num-gpus', type=int, default=os.environ['SM_NUM_GPUS'])\n", "\n", " args = parser.parse_args()\n", " train(args)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Training our model with SageMaker" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Use the buit-in PyTorch container\n", "estimator = PyTorch(\n", " 'wind_turbine.py', \n", " framework_version='1.6.0',\n", " role=role,\n", " sagemaker_session=sagemaker_session,\n", " instance_type='ml.p3.2xlarge',\n", " #instance_type='local_gpu', \n", " instance_count=1,\n", " py_version='py3', \n", " hyperparameters={\n", " 'k_fold_splits': 6,\n", " 'k_index_only': 3, # after running some experiments with this dataset, it makes sense to fix it\n", " 'num_epochs': 200,\n", " 'batch_size': 256,\n", " 'learning_rate': 0.0001,\n", " 'dropout_rate': 0.001,\n", " 'num_features': n_features\n", " },\n", " metric_definitions=[\n", " {'Name': 'train_loss:mse', 'Regex': ' train_loss=(\\S+);'},\n", " {'Name': 'test_loss:mse', 'Regex': ' test_loss=(\\S+);'}\n", " ]\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "estimator.fit({'train': train_input})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Compute the threshold based on MAE" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "transformer = estimator.transformer(\n", " instance_count=1, \n", " instance_type='ml.m5.xlarge', \n", " output_path=\"s3://%s/%s/output\" % (bucket_name, prefix),\n", " accept='application/x-npy',\n", " max_payload=20,\n", " strategy='MultiRecord',\n", " assemble_with='Line'\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "# To start a transform job:\n", "transformer.transform(train_input, content_type='application/x-npy')\n", "# Then wait until transform job is completed\n", "transformer.wait()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Download the predictions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_session.download_data(bucket=bucket_name, key_prefix='wind_turbine_anomaly/output/', path='data/preds/')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Compute MAE & the thresholds" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import glob\n", "\n", "x_inputs = np.vstack([np.load(i) for i in data_files])\n", "y_preds = np.vstack([np.load(i) for i in glob.glob('data/preds/*.out')])\n", "\n", "n_samples,n_features,n_rows,n_cols = x_inputs.shape\n", "\n", "x_inputs = x_inputs.reshape(n_samples, n_features, n_rows*n_cols).transpose((0,2,1))\n", "y_preds = y_preds.reshape(n_samples, n_features, n_rows*n_cols).transpose((0,2,1))\n", "\n", "mae_loss = np.mean(np.abs(y_preds - x_inputs), axis=1).transpose((1,0))\n", "mae_loss[np.isnan(mae_loss)] = 0\n", "\n", "thresholds = np.mean(mae_loss, axis=1)\n", "print(\",\".join(thresholds.astype(str)), thresholds.shape)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Register the model in the registry" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Specify the model source\n", "from sagemaker.pytorch.estimator import PyTorchModel\n", "\n", "model_package_group_turbine=\"modelPackageGroupTurbine\"\n", "\n", "model_url = '%s%s/output/model.tar.gz' % (estimator.output_path, estimator.latest_training_job.name)\n", "\n", "env= {\n", " \"batch_size\": \"256\",\n", " \"dropout_rate\": \"0.001\",\n", " \"learning_rate\": \"0.001\",\n", " \"kernel_size\": \"2\"\n", "}\n", "\n", "#https://sagemaker.readthedocs.io/en/stable/frameworks/pytorch/using_pytorch.html#create-a-pytorchmodel-object\n", "model = PyTorchModel(\n", " model_url,\n", " role,\n", " 'wind_turbine.py', \n", " py_version='py3', \n", " env=env,\n", " sagemaker_session=sagemaker_session,\n", " framework_version='1.6.0',\n", " name=estimator.latest_training_job.name\n", ")\n", "\n", "model_package = model.register(\n", " content_types=[\"*\"],\n", " response_types=[\"application/json\"],\n", " inference_instances=[\"ml.p3.2xlarge\"],\n", " transform_instances=[\"ml.m5.xlarge\"],\n", " description=\"Turbine anomaly detection\",\n", " approval_status=\"PendingManualApproval\",\n", " model_package_group_name=model_package_group_turbine\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "At this stage, our model is trained, and registered in the registry. The model is ready to be approved. Let's verify it was correctly pushed to the registry:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "boto3_sm = boto3.client('sagemaker')\n", "\n", "boto3_sm.describe_model_package_group(ModelPackageGroupName=model_package_group_turbine)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "boto3_sm.list_model_packages(ModelPackageGroupName=model_package_group_turbine)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.4" }, "vscode": { "interpreter": { "hash": "38f4662eb4bddf4b0838614596a2477c08d568b0320d708549fe50dd483441aa" } } }, "nbformat": 4, "nbformat_minor": 4 }