{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Training and compiling our ML Model\n", "\n", "**SageMaker Studio Kernel**: Data Science\n", "\n", "In this exercise you will do:\n", " - Create/Run a Model Building Pipeline using Pytorch and [SageMaker Pipelines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines.html)\n", " - Compute the thresholds, used by the applicatio to classify the predictions as anomalies or normal behavior\n", "\n", "\n", "The following diagram shows all the steps we're going to execute: \n", "![Pipeline](../../imgs/ggv2_lab2_train_pipeline.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Part 1/3 - Setup\n", "Here we'll import some libraries and define some variables. You can also take a look on the scripts that were previously created for preparing the data and training our model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "import numpy as np\n", "import sagemaker\n", "import glob\n", "import os\n", "import boto3" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "project_name='<>'\n", "\n", "s3_client = boto3.client('s3')\n", "sm_client = boto3.client('sagemaker')\n", "\n", "project_id = sm_client.describe_project(ProjectName=project_name)['ProjectId']\n", "bucket_name = 'sagemaker-wind-turbine-farm-%s' % project_id\n", "\n", "prefix='wind_turbine_anomaly'\n", "sagemaker_session=sagemaker.Session(default_bucket=bucket_name)\n", "role = sagemaker.get_execution_role()\n", "print('Project name: %s' % project_name)\n", "print('Project id: %s' % project_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Get the dataset and upload it to an S3 bucket\n", "This bucket will be the input path of the data prep step of our ML Pipeline\n", "\n", "The below cell checks if the data is already downloaded.\n", "# \n", "\n", "if not, then it downloads the data. Otherwise it skips the downloading." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os.path\n", "\n", "if os.path.isfile('../data/dataset_wind.csv.gz') is not True:\n", " print('file not present, downloading now..')\n", " os.system(\"mkdir -p ../data\")\n", " os.system(\"curl https://aws-ml-blog.s3.amazonaws.com/artifacts/monitor-manage-anomaly-detection-model-wind-turbine-fleet-sagemaker-neo/dataset_wind_turbine.csv.gz -o ../data/dataset_wind.csv.gz\")\n", " print('download complete !!')\n", "else:\n", " print('file already present, no need to download again !')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Download the \n", "# clean the buckets first\n", "s3_client.delete_object(Bucket=bucket_name, Key='%s/data/' % prefix)\n", "s3_client.delete_object(Bucket=bucket_name, Key='wind_turbine_anomaly/output')\n", "\n", "input_data = sagemaker_session.upload_data('../data/dataset_wind.csv.gz', key_prefix=\"%s/data\" % prefix )\n", "print(input_data)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Visualize the training script & the preprocessing script" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## This script was created to express what we saw in the previous exercise.\n", "## It will get the raw data from the turbine sensors, select some features, \n", "## denoise, normalize, encode and reshape it as a 6x10x10 tensor\n", "## This script is the entrypoint of the first step of the ML Pipelie: Data preparation\n", "!pygmentize preprocessing.py" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## This is the training/prediction script, used by the training step of \n", "## our ML Pipeline. In this step, a SageMaker Training Job will run this \n", "## script to build the model. Then, in the batch transform step,\n", "## the same script will be used again to load the trained model\n", "## and rebuild (predict) all the training samples. These predictions\n", "## will then be used to compute MAE and the thresholds, for detecting anomalies\n", "!pygmentize wind_turbine.py" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Creating our ML Pipeline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Input parameters of the pipeline\n", "These input parameters can be overriden later if you want. The final pipeline is like a function f(x), where you reuse many times to train/retrain your model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "from sagemaker.workflow.steps import CacheConfig\n", "from sagemaker.workflow.parameters import (\n", " ParameterInteger,\n", " ParameterString,\n", ")\n", "## By enabling cache, if you run this pipeline again, without changing the input \n", "## parameters it will skip the training part and reuse the previous trained model\n", "cache_config = CacheConfig(enable_caching=True, expire_after=\"30d\")\n", "ts = time.strftime('%Y-%m-%d-%H-%M-%S')\n", "\n", "# Data prep\n", "processing_instance_type = ParameterString( # instance type for data preparation\n", " name=\"ProcessingInstanceType\",\n", " default_value=\"ml.m5.xlarge\"\n", ")\n", "processing_instance_count = ParameterInteger( # number of instances used for data preparation\n", " name=\"ProcessingInstanceCount\",\n", " default_value=1\n", ")\n", "\n", "# Training\n", "training_instance_type = ParameterString( # instance type for training the model\n", " name=\"TrainingInstanceType\",\n", " default_value=\"ml.g4dn.xlarge\"\n", ")\n", "training_instance_count = ParameterInteger( # number of instances used to train your model\n", " name=\"TrainingInstanceCount\",\n", " default_value=1 # wind_turbine.py supports only 1 instance\n", ")\n", "\n", "# Batch prediction\n", "transform_instance_type = ParameterString( # instance type for batch transform jobs\n", " name=\"TransformInstanceType\",\n", " default_value=\"ml.c5.xlarge\"\n", ")\n", "transform_instance_count = ParameterInteger( # number of instances used for batch prediction\n", " name=\"TransformInstanceCount\",\n", " default_value=2\n", ")\n", "\n", "# Dataset input data: S3 path\n", "input_data = ParameterString(\n", " name=\"InputData\",\n", " default_value=input_data,\n", ")\n", "\n", "# Batch prediction output: S3 path\n", "output_batch_data = ParameterString(\n", " name=\"OutputBatchData\",\n", " default_value=\"s3://%s/%s/output\" % (bucket_name, prefix),\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Defining the steps of our pipeline\n", "\n", "### Step 1/4 - Preprocess the raw data to clean, denoise and normalize" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "from sagemaker.sklearn.processing import SKLearnProcessor\n", "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "from sagemaker.workflow.steps import ProcessingStep\n", "\n", "sts_client = boto3.client('sts')\n", "account_id = sts_client.get_caller_identity().get('Account')\n", "region = boto3.session.Session().region_name\n", "\n", "script_processor = SKLearnProcessor(\n", " framework_version=\"0.23-1\",\n", " role=role,\n", " instance_type=processing_instance_type,\n", " instance_count=processing_instance_count,\n", " max_runtime_in_seconds=7200,\n", ")\n", "\n", "step_process = ProcessingStep(\n", " name=\"WindTurbineDataPreprocess\",\n", " code='preprocessing.py', ## this is the script defined above\n", " processor=script_processor,\n", " inputs=[\n", " ProcessingInput(source=input_data, destination='/opt/ml/processing/input')\n", " ],\n", " outputs=[\n", " ProcessingOutput(output_name='train_data', source='/opt/ml/processing/train'),\n", " ProcessingOutput(output_name='statistics', source='/opt/ml/processing/statistics')\n", " ],\n", " job_arguments=['--num-dataset-splits', '20']\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Step 2/4 - Training with a Pytorch Estimator" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### First we create the SageMaker Estimator" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.pytorch.estimator import PyTorch\n", "framework_version='1.6.0'\n", "py_version='py3'\n", "estimator = PyTorch(\n", " 'wind_turbine.py', ## This is the script (wind_turbine.py) defined above\n", " framework_version=framework_version,\n", " role=role,\n", " sagemaker_session=sagemaker_session,\n", " instance_type=training_instance_type,\n", " instance_count=training_instance_count,\n", " py_version=py_version, \n", " hyperparameters={\n", " 'k_fold_splits': 6,\n", " 'k_index_only': 3, # after running some experiments with this dataset, it makes sense to fix it\n", " 'num_epochs': 200,\n", " 'batch_size': 256,\n", " 'learning_rate': 0.0001,\n", " 'dropout_rate': 0.001\n", " },\n", " metric_definitions=[\n", " {'Name': 'train_loss:mse', 'Regex': ' train_loss=(\\S+);'},\n", " {'Name': 'test_loss:mse', 'Regex': ' test_loss=(\\S+);'}\n", " ]\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Then, we define the training step" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.inputs import TrainingInput\n", "from sagemaker.workflow.steps import TrainingStep\n", "\n", "step_train = TrainingStep(\n", " name=\"WindTurbineAnomalyTrain\",\n", " estimator=estimator,\n", " inputs={\"train\": TrainingInput(\n", " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\"train_data\"].S3Output.S3Uri,\n", " content_type=\"application/x-npy\"\n", " )},\n", " cache_config=cache_config\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Step 3/4 - Create a new model in the SageMaker Models Catalog\n", "This step will transform the results of your Training Job into a real Model. After that, you'll be able to deploy and invoke your model" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.steps import CreateModelStep\n", "from sagemaker.model import Model\n", "from sagemaker.inputs import CreateModelInput\n", "\n", "model = Model(\n", " image_uri=sagemaker.image_uris.retrieve(\n", " framework=\"pytorch\", # we are using the SageMaker pre-built PyTorch inference image\n", " region=sagemaker_session.boto_session.region_name,\n", " version=framework_version,\n", " py_version=py_version,\n", " instance_type=training_instance_type,\n", " image_scope='inference'\n", " ),\n", " model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,\n", " sagemaker_session=sagemaker_session,\n", " role=role\n", ")\n", "step_create_model = CreateModelStep(\n", " name=\"WindTurbineAnomalyCreateModel\",\n", " model=model,\n", " inputs=CreateModelInput(\n", " instance_type=transform_instance_type\n", " )\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Step 4/4 - Run a batch transform job to get all the predictions\n", "The predictions will then be used to compute the Thresholds. Only the training samples will be used in this step" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.steps import TransformStep\n", "from sagemaker.inputs import TransformInput\n", "from sagemaker.transformer import Transformer\n", "\n", "step_transform = TransformStep(\n", " name=\"WindTurbineAnomalyTransform\",\n", " transformer=Transformer(\n", " model_name=step_create_model.properties.ModelName,\n", " instance_type=transform_instance_type,\n", " instance_count=transform_instance_count,\n", " output_path=output_batch_data,\n", " accept='application/x-npy',\n", " max_payload=20,\n", " strategy='MultiRecord',\n", " assemble_with='Line'\n", " ),\n", " inputs=TransformInput(data=step_process.properties.ProcessingOutputConfig.Outputs[\"train_data\"].S3Output.S3Uri, content_type=\"application/x-npy\")\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Now that we have created all the requireds steps, its time to create our pipeline\n", "This code will create a physical (with ARN) resource that will execute all the steps defined abov" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from botocore.exceptions import ClientError, ValidationError\n", "from sagemaker.workflow.pipeline import Pipeline\n", "\n", "# NOTE:\n", "# condition steps have issues in service so we go straight to step_register\n", "pipeline_name = \"WindTurbineAnomalyTrain-%s\" % ts\n", "pipeline = Pipeline(\n", " name=pipeline_name,\n", " parameters=[\n", " processing_instance_type,\n", " processing_instance_count, \n", " training_instance_type,\n", " training_instance_count, \n", " transform_instance_type,\n", " transform_instance_count, \n", " input_data,\n", " output_batch_data\n", " ],\n", " steps=[step_process, step_train, step_create_model, step_transform],\n", " sagemaker_session=sagemaker_session,\n", ")\n", "\n", "try:\n", " response = pipeline.create(role_arn=role)\n", "except ClientError as e:\n", " error = e.response[\"Error\"]\n", " if error[\"Code\"] == \"ValidationError\" and \"Pipeline names must be unique within\" in error[\"Message\"]:\n", " print(error[\"Message\"])\n", " response = pipeline.describe()\n", " else:\n", " raise\n", "\n", " ## The following code put some tags that will be tracked by SageMaker Studio\n", "pipeline_arn = response[\"PipelineArn\"]\n", "sm_client = boto3.client('sagemaker')\n", "sm_client.add_tags(\n", " ResourceArn=pipeline_arn,\n", " Tags=[\n", " {'Key': 'sagemaker:project-name', 'Value': project_name },\n", " {'Key': 'sagemaker:project-id', 'Value': project_id }\n", " ]\n", ")\n", "print(pipeline_arn)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Part 2/3 - Now its time to execute our pipeline. After kicking-off the pipeline, you can open SageMaker Studio and go to your project -> Pipelines to see execution\n", "It takes ~17mins to complete the whole pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "start_response = pipeline.start(parameters={\n", " # 'TrainingInstanceType': 'ml.c5.xlarge', # uncomment this line if your account doesn't support g4 instances. It will take 20x more to finish.\n", " 'TransformInstanceType': 'ml.c5.xlarge'\n", "})\n", "\n", "pipeline_execution_arn = start_response.arn\n", "print(pipeline_execution_arn)\n", "\n", "while True:\n", " resp = sm_client.describe_pipeline_execution(PipelineExecutionArn=pipeline_execution_arn)\n", " if resp['PipelineExecutionStatus'] == 'Executing':\n", " print('Running...')\n", " else:\n", " print(resp['PipelineExecutionStatus'], pipeline_execution_arn)\n", " break\n", " time.sleep(30)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Part 3/3 - Compute the threshold based on MAE" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Download the predictions & Compute MAE/thresholds" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_session.download_data(bucket=bucket_name, key_prefix='wind_turbine_anomaly/output/', path='../data/preds/')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import sagemaker\n", "\n", "sm_client = boto3.client('sagemaker')\n", "sagemaker_session = sagemaker.Session()\n", "role = sagemaker.get_execution_role()\n", "\n", "execution_id = pipeline_execution_arn.split('/')[-1]\n", "training_jobs = sm_client.list_training_jobs(NameContains=execution_id, StatusEquals='Completed')['TrainingJobSummaries']\n", "\n", "assert(len(training_jobs) == 1) # it must have exactly one training job\n", "training_job_name=training_jobs[0]['TrainingJobName']\n", "\n", "# We will recreate the estimator, based on the training job\n", "estimator = sagemaker.estimator.Estimator.attach(\n", " training_job_name=training_job_name, \n", " sagemaker_session=sagemaker_session\n", ")\n", "input_data = sm_client.describe_training_job(TrainingJobName=training_job_name)\n", "input_data = input_data['InputDataConfig'][0]['DataSource']['S3DataSource']['S3Uri']\n", "\n", "tokens = input_data.split('/', 3)\n", "sagemaker_session.download_data(bucket=tokens[2], key_prefix=tokens[3], path='../data/input/')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import glob\n", "\n", "x_inputs = np.vstack([np.load(i) for i in glob.glob('../data/input/*.npy')])\n", "y_preds = np.vstack([np.load(i) for i in glob.glob('../data/preds/*.out')])\n", "\n", "n_samples,n_features,n_rows,n_cols = x_inputs.shape\n", "\n", "x_inputs = x_inputs.reshape(n_samples, n_features, n_rows*n_cols).transpose((0,2,1))\n", "y_preds = y_preds.reshape(n_samples, n_features, n_rows*n_cols).transpose((0,2,1))\n", "\n", "mae_loss = np.mean(np.abs(y_preds - x_inputs), axis=1).transpose((1,0))\n", "mae_loss[np.isnan(mae_loss)] = 0\n", "\n", "thresholds = np.mean(mae_loss, axis=1)\n", "np.save('../statistics/thresholds.npy', thresholds)\n", "print(\",\".join(thresholds.astype(str)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Store variables to use in the next exercise" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "estimator_output_path = estimator.output_path\n", "\n", "%store project_id\n", "%store estimator_output_path\n", "%store training_job_name\n", "%store n_features" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Alright! Now, You can start the next Exercise which is on Package and deploy the model using SageMaker Edge Manager.\n", "\n", "\n", "We use SageMaker Edge Manager to create a model package. But when it comes to deploying, there are two ways you can deploy the model. Click on the relevant exercise for your workshop to proceed further:\n", "\n", "1. [__Using IoT Jobs__](../03-Package-Deploy/iot-jobs/03-package-deploy-using-iot-jobs.ipynb)\n", "2. [__Using IoT Greengrass V2__](../03-Package-Deploy/greengrass-v2/03-package-using-ggv2.ipynb)\n", "\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-2:429704687514:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }