{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Training & Deploying a XGBoost model for Predicting Machine Failures(Predictive Maintainance)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This notebook should be run after the Data Pre-Processing.ipynb has been run, to generate the curated train/test datasets." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this notebook, we train a ML model to predict whether the machine failed or not based on system readings. We will train a XGBoost model, using Amazon SageMaker's built in algorithm. XGBoost can provide good results for multiple types of ML problems including classification, even when training samples are limited." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Import libraries" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "import numpy as np\n", "import pandas as pd\n", "import os\n", "import json\n", "import boto3\n", "import matplotlib.pyplot as plt\n", "\n", "sagemaker_session = sagemaker.Session()\n", "boto_session = boto3.session.Session()\n", "sm_client = boto_session.client(\"sagemaker\")\n", "sm_runtime = boto_session.client(\"sagemaker-runtime\")\n", "region = boto_session.region_name\n", "account = boto3.client('sts').get_caller_identity().get('Account')\n", "role = sagemaker.get_execution_role()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## IMPORTANT -\n", "#### Replace <> below with the bucket name created by the CloudFormation template. \n", "#### The bucket name is created with the format <-stack name->-<-eventsbucket->-<-############->\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3_client = boto3.client('s3')\n", "response = s3_client.list_buckets()\n", "for bucketname in response['Buckets']:\n", " if \"eventsbucket\" in bucketname[\"Name\"]:\n", " print(bucketname[\"Name\"])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bucket = '<>' \n", "prefix = 'xgb-data'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Set up Paths and Directories" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "# Path to upload the trained model\n", "xgb_upload_location = os.path.join('s3://{}/{}'.format(bucket, 'xgb-model'))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Retrieve the XGBoost container image from ECR\n", "region = sagemaker_session.boto_region_name\n", "container= sagemaker.image_uris.retrieve('xgboost', region, '0.90-1')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Upload the training and test data to S3\n", "train_channel = prefix + '/train'\n", "validation_channel = prefix + '/validation'\n", "\n", "sagemaker_session.upload_data(path='training_data', bucket=bucket, key_prefix=train_channel)\n", "sagemaker_session.upload_data(path='test_data', bucket=bucket, key_prefix=validation_channel)\n", "\n", "s3_train_channel = sagemaker.inputs.TrainingInput('s3://{}/{}'.format(bucket, train_channel), content_type ='csv')\n", "s3_valid_channel = sagemaker.inputs.TrainingInput('s3://{}/{}'.format(bucket, validation_channel), content_type ='csv')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "

Model Training" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We train a SageMaker provided XGBoost model using default hyperparameters and SageMaker Training Job. The overall time for this training job to complete is approximtely 5 minutes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "xgb = sagemaker.estimator.Estimator(container,\n", " role, \n", " instance_count=1, \n", " instance_type='ml.c4.4xlarge',\n", " output_path=xgb_upload_location,\n", " sagemaker_session=sagemaker_session)\n", "xgb.set_hyperparameters(max_depth=5,\n", " eta=0.2,\n", " gamma=4,\n", " min_child_weight=6,\n", " subsample=0.8,\n", " silent=0,\n", " objective='binary:hinge',\n", " num_round=100)\n", "\n", "xgb.fit({'train': s3_train_channel, 'validation': s3_valid_channel})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "

Model deployment" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_url =xgb.model_data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "resource_name = \"Predictive-Maintainance-XgBoost\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_name = resource_name.format(\"Model\")\n", "create_model_response = sm_client.create_model(\n", " ModelName=model_name,\n", " ExecutionRoleArn=role,\n", " PrimaryContainer={\n", " \"Image\": container,\n", " \"ModelDataUrl\": model_url,\n", " },\n", ")\n", "\n", "print(f\"Created Model: {create_model_response['ModelArn']}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next we create Endpoint Config including Asynch s3 output location, SNS and concurrency configuration. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "endpoint_config_name = resource_name.format(\"EndpointConfig\")\n", "create_endpoint_config_response = sm_client.create_endpoint_config(\n", " EndpointConfigName=endpoint_config_name,\n", " ProductionVariants=[\n", " {\n", " \"VariantName\": \"variant1\",\n", " \"ModelName\": model_name,\n", " \"InstanceType\": \"ml.m5.xlarge\",\n", " \"InitialInstanceCount\": 1,\n", " }\n", " ],\n", " AsyncInferenceConfig={\n", " \"OutputConfig\": {\n", " \"S3OutputPath\": f\"s3://{bucket}/{prefix}/output\",\n", " #Specify Amazon SNS topics\n", " \"NotificationConfig\": {\n", " \"SuccessTopic\": f\"arn:aws:sns:{region}:{account}:async-success\",\n", " \"ErrorTopic\": f\"arn:aws:sns:{region}:{account}:async-error\",\n", " \n", " }\n", " },\n", " \"ClientConfig\": {\"MaxConcurrentInvocationsPerInstance\": 4},\n", " },\n", ")\n", "print(f\"Created EndpointConfig: {create_endpoint_config_response['EndpointConfigArn']}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We then create asynchronous endpoint, using the endpoint configuration created above." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "endpoint_name = resource_name.format(\"Endpoint\")\n", "create_endpoint_response = sm_client.create_endpoint(\n", " EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name\n", ")\n", "print(f\"Created Endpoint: {create_endpoint_response['EndpointArn']}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "waiter = sm_client.get_waiter(\"endpoint_in_service\")\n", "print(\"Waiting for endpoint creation...\")\n", "waiter.wait(EndpointName=endpoint_name)\n", "resp = sm_client.describe_endpoint(EndpointName=endpoint_name)\n", "print(f\"Endpoint Status: {resp['EndpointStatus']}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "

Model Evaluation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Drop the label column(Machine failure) from the dataset and export as test file locally." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "test_set = pd.read_csv('fulldataset.csv')\n", "resp = test_set['0']\n", "test_set = test_set.drop(columns = ['0'])\n", "test_set.to_csv('test.csv', index =False, header = False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Upload the local test file S3 test channel" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "test_channel = prefix + '/test/input'\n", "s3_test_file = sagemaker_session.upload_data('test.csv', bucket=bucket, key_prefix=test_channel)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Invoke the asynchronous endpoint with the entire test file" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = sm_runtime.invoke_endpoint_async(\n", " EndpointName=endpoint_name, InputLocation=s3_test_file, ContentType=\"csv\"\n", ")\n", "output_location = response[\"OutputLocation\"]\n", "print(f\"OutputLocation: {output_location}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once the asynchronous endpoint processes the request, download the generated inference file locally\n", "\n", "Note: Make sure that the endpoint has finished processing and the inference file has been generated before running this step. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data_dir = './data/inference'\n", "if not os.path.exists(data_dir):\n", " os.makedirs(data_dir)\n", "\n", "!aws s3 cp $output_location $data_dir'/test.csv.out'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Model Metrics\n", "\n", "Next lets extract some model metrics. Let's start with Precision and Recall.\n", "\n", "Precision is a measure of the number of false positives. High precision score indicates low false positives. Within the context of predictive maintenance, high false positives may imply that time and resources are spent in performing maintenance on a machine part when it is not required, resulting in avoidable cost.\n", "\n", "Recall on the other hand is a messure of false negatives. High recall score indicates low false negatives. Within the context of predictive maintenance, low false negatives may imply that machine parts in need of maintenance are correctly identified and flagged. This helps in troubleshooting and performing maintaince on a machine part, before it breaks down and therefore reduce associated downtime and cost." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "y_test = pd.read_csv(os.path.join(data_dir, 'test.csv.out'), header=None)\n", "y_vals = np.round(y_test.T.values)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.metrics import accuracy_score, confusion_matrix, classification_report" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"Model Accuracy = {} %\".format(accuracy_score(resp.values, y_vals)*100))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(classification_report(resp.values, y_vals))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next let's plot a Confusion Matrix for our actual and predicted labels." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def plot_confusion_matrix(y_true, y_pred, classes,\n", " normalize=False,\n", " title=None,\n", " cmap=plt.cm.Blues):\n", " \"\"\"\n", " This function prints and plots the confusion matrix.\n", " Normalization can be applied by setting `normalize=True`.\n", " \"\"\"\n", "\n", " \n", " if not title:\n", " if normalize:\n", " title = 'Normalized confusion matrix'\n", " else:\n", " title = 'Confusion matrix, without normalization'\n", "\n", " # Compute confusion matrix\n", " cm = confusion_matrix(y_true, y_pred)\n", " # Only use the labels that appear in the data\n", " classes = classes\n", " if normalize:\n", " cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]\n", " print(\"Normalized confusion matrix\")\n", " else:\n", " print('Confusion matrix, without normalization')\n", "\n", " print(cm)\n", "\n", " fig, ax = plt.subplots(figsize = (8, 8))\n", " im = ax.imshow(cm, interpolation='nearest', cmap=cmap)\n", " ax.figure.colorbar(im, ax=ax)\n", " # We want to show all ticks...\n", " ax.set(xticks=np.arange(cm.shape[1]),\n", " yticks=np.arange(cm.shape[0]),\n", " # ... and label them with the respective list entries\n", " xticklabels=classes, yticklabels=classes)\n", " plt.tick_params(labelsize=15) \n", " plt.xlabel('Predicted label', fontsize=18)\n", " plt.ylabel('True label',fontsize =18)\n", " plt.title(title, fontsize=18)\n", " # Rotate the tick labels and set their alignment.\n", " plt.setp(ax.get_xticklabels(), rotation=45, ha=\"right\",\n", " rotation_mode=\"anchor\")\n", "\n", " # Loop over data dimensions and create text annotations.\n", " fmt = '.2f' if normalize else 'd'\n", " thresh = cm.max() / 2.\n", " for i in range(cm.shape[0]):\n", " for j in range(cm.shape[1]):\n", " ax.text(j, i, format(cm[i, j], fmt),\n", " ha=\"center\", va=\"center\", fontsize=20,\n", " color=\"white\" if cm[i, j] > thresh else \"black\")\n", " fig.tight_layout()\n", " return ax\n", "\n", "\n", "np.set_printoptions(precision=2)\n", "\n", "# Plot non-normalized confusion matrix\n", "plot_confusion_matrix(resp.values, y_vals, classes=['Normal', 'Failure'],\n", " title='Confusion matrix, without normalization')\n", "\n", "# Plot normalized confusion matrix\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For nearly 10,000 observations in the dataset, our model correctly identified 281 cases where a part was faulty. However there were 58 cases where it missed to identiofy a faulty part and 16 cases where a part was incorrectly identified as faulty. In real world use cases, several iterations of feature engineering, algorithm selection, model training, optimization and evaluation may be required to achieve a ML model that performs according to the desired business outcomes.\n", "\n", "For this post we are now ready to leverage this deployed model for predicting machine parts that require maintaince on streaming incoming data. Continue back to the blog and follow the next steps for further build and end-to-end testing of the solution." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Cleanup" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Please make sure to delete the resources created by this notebook (especially SageMaker endpoints), once you are done testing the solution, to avoid unexpected charges." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = sm_client.delete_model(ModelName=model_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = sm_client.delete_endpoint(EndpointName=endpoint_name)" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.13" } }, "nbformat": 4, "nbformat_minor": 4 }