# Training & Deploying a XGBoost model for Predicting Machine Failures(Predictive Maintainance)

This notebook should be run after the Data Pre-Processing.ipynb has been run, to generate the curated train/test datasets.

In this notebook, we train a ML model to predict whether the machine failed or not based on system readings. We will train a XGBoost model, using Amazon SageMaker's built in algorithm. XGBoost can provide good results for multiple types of ML problems including classification, even when training samples are limited.

### Import libraries

In [None]:
import sagemaker
import numpy as np
import pandas as pd
import os
import json
import boto3
import matplotlib.pyplot as plt

sagemaker_session = sagemaker.Session()
boto_session = boto3.session.Session()
sm_client = boto_session.client("sagemaker")
sm_runtime = boto_session.client("sagemaker-runtime")
region = boto_session.region_name
account = boto3.client('sts').get_caller_identity().get('Account')
role = sagemaker.get_execution_role()

## IMPORTANT -
#### Replace <> below with the bucket name created by the CloudFormation template. 
#### The bucket name is created with the format  <-stack name->-<-eventsbucket->-<-############->



In [None]:
s3_client = boto3.client('s3')
response = s3_client.list_buckets()
for bucketname in response['Buckets']:
    if "eventsbucket"  in bucketname["Name"]:
        print(bucketname["Name"])

In [None]:
bucket = '<>' 
prefix = 'xgb-data'

### Set up Paths and Directories

In [None]:
# Path to upload the trained model
xgb_upload_location = os.path.join('s3://{}/{}'.format(bucket, 'xgb-model'))

In [None]:
# Retrieve the XGBoost container image from ECR
region = sagemaker_session.boto_region_name
container= sagemaker.image_uris.retrieve('xgboost', region, '0.90-1')

In [None]:
# Upload the training and test data to S3
train_channel = prefix + '/train'
validation_channel = prefix + '/validation'

sagemaker_session.upload_data(path='training_data', bucket=bucket, key_prefix=train_channel)
sagemaker_session.upload_data(path='test_data', bucket=bucket, key_prefix=validation_channel)

s3_train_channel = sagemaker.inputs.TrainingInput('s3://{}/{}'.format(bucket, train_channel), content_type ='csv')
s3_valid_channel = sagemaker.inputs.TrainingInput('s3://{}/{}'.format(bucket, validation_channel), content_type ='csv')

<h2> Model Training

We train a SageMaker provided XGBoost model using default hyperparameters and SageMaker Training Job. The overall time for this training job to complete is approximtely 5 minutes.

In [None]:
%%time
xgb = sagemaker.estimator.Estimator(container,
                                    role, 
                                    instance_count=1, 
                                    instance_type='ml.c4.4xlarge',
                                    output_path=xgb_upload_location,
                                    sagemaker_session=sagemaker_session)
xgb.set_hyperparameters(max_depth=5,
                        eta=0.2,
                        gamma=4,
                        min_child_weight=6,
                        subsample=0.8,
                        silent=0,
                        objective='binary:hinge',
                        num_round=100)

xgb.fit({'train': s3_train_channel, 'validation': s3_valid_channel})

<h2> Model deployment

In [None]:
model_url =xgb.model_data

In [None]:
resource_name = "Predictive-Maintainance-XgBoost"

In [None]:
model_name = resource_name.format("Model")
create_model_response = sm_client.create_model(
    ModelName=model_name,
    ExecutionRoleArn=role,
    PrimaryContainer={
        "Image": container,
        "ModelDataUrl": model_url,
    },
)

print(f"Created Model: {create_model_response['ModelArn']}")

Next we create Endpoint Config including Asynch s3 output location, SNS and concurrency configuration. 

In [None]:
endpoint_config_name = resource_name.format("EndpointConfig")
create_endpoint_config_response = sm_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            "VariantName": "variant1",
            "ModelName": model_name,
            "InstanceType": "ml.m5.xlarge",
            "InitialInstanceCount": 1,
        }
    ],
    AsyncInferenceConfig={
        "OutputConfig": {
            "S3OutputPath": f"s3://{bucket}/{prefix}/output",
            #Specify Amazon SNS topics
             "NotificationConfig": {
                 "SuccessTopic": f"arn:aws:sns:{region}:{account}:async-success",
                 "ErrorTopic": f"arn:aws:sns:{region}:{account}:async-error",
               
             }
        },
        "ClientConfig": {"MaxConcurrentInvocationsPerInstance": 4},
    },
)
print(f"Created EndpointConfig: {create_endpoint_config_response['EndpointConfigArn']}")

We then create asynchronous endpoint, using the endpoint configuration created above.

In [None]:
endpoint_name = resource_name.format("Endpoint")
create_endpoint_response = sm_client.create_endpoint(
    EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
)
print(f"Created Endpoint: {create_endpoint_response['EndpointArn']}")

In [None]:
waiter = sm_client.get_waiter("endpoint_in_service")
print("Waiting for endpoint creation...")
waiter.wait(EndpointName=endpoint_name)
resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
print(f"Endpoint Status: {resp['EndpointStatus']}")

<h2> Model Evaluation

Drop the label column(Machine failure) from the dataset and export as test file locally.

In [None]:
test_set = pd.read_csv('fulldataset.csv')
resp = test_set['0']
test_set = test_set.drop(columns = ['0'])
test_set.to_csv('test.csv', index =False, header = False)

Upload the local test file S3 test channel

In [None]:
test_channel = prefix + '/test/input'
s3_test_file = sagemaker_session.upload_data('test.csv', bucket=bucket, key_prefix=test_channel)

Invoke the asynchronous endpoint with the entire test file

In [None]:
response = sm_runtime.invoke_endpoint_async(
    EndpointName=endpoint_name, InputLocation=s3_test_file, ContentType="csv"
)
output_location = response["OutputLocation"]
print(f"OutputLocation: {output_location}")

Once the asynchronous endpoint processes the request, download the generated inference file locally

Note: Make sure that the endpoint has finished processing and the inference file has been generated before running this step. 

In [None]:
data_dir = './data/inference'
if not os.path.exists(data_dir):
    os.makedirs(data_dir)

!aws s3 cp  $output_location $data_dir'/test.csv.out'

## Model Metrics

Next lets extract some model metrics. Let's start with Precision and Recall.

Precision is a measure of the number of false positives. High precision score indicates low false positives. Within the context of predictive maintenance, high false positives may imply that time and resources are spent in performing maintenance on a machine part when it is not required, resulting in avoidable cost.

Recall on the other hand is a messure of false negatives. High recall score indicates low false negatives. Within the context of predictive maintenance, low false negatives may imply that machine parts in need of maintenance are correctly identified and flagged. This helps in troubleshooting and performing maintaince on a machine part, before it breaks down and therefore reduce associated downtime and cost.

In [None]:
y_test = pd.read_csv(os.path.join(data_dir, 'test.csv.out'), header=None)
y_vals = np.round(y_test.T.values)

In [None]:
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

In [None]:
print("Model Accuracy = {} %".format(accuracy_score(resp.values, y_vals)*100))

In [None]:
print(classification_report(resp.values, y_vals))

Next let's plot a Confusion Matrix for our actual and predicted labels.

In [None]:
def plot_confusion_matrix(y_true, y_pred, classes,
                          normalize=False,
                          title=None,
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """

    
    if not title:
        if normalize:
            title = 'Normalized confusion matrix'
        else:
            title = 'Confusion matrix, without normalization'

    # Compute confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    # Only use the labels that appear in the data
    classes = classes
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    fig, ax = plt.subplots(figsize = (8, 8))
    im = ax.imshow(cm, interpolation='nearest', cmap=cmap)
    ax.figure.colorbar(im, ax=ax)
    # We want to show all ticks...
    ax.set(xticks=np.arange(cm.shape[1]),
           yticks=np.arange(cm.shape[0]),
     # ... and label them with the respective list entries
           xticklabels=classes, yticklabels=classes)
    plt.tick_params(labelsize=15)  
    plt.xlabel('Predicted label', fontsize=18)
    plt.ylabel('True label',fontsize =18)
    plt.title(title, fontsize=18)
    # Rotate the tick labels and set their alignment.
    plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
             rotation_mode="anchor")

    # Loop over data dimensions and create text annotations.
    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax.text(j, i, format(cm[i, j], fmt),
                    ha="center", va="center", fontsize=20,
                    color="white" if cm[i, j] > thresh else "black")
    fig.tight_layout()
    return ax


np.set_printoptions(precision=2)

# Plot non-normalized confusion matrix
plot_confusion_matrix(resp.values, y_vals, classes=['Normal', 'Failure'],
                      title='Confusion matrix, without normalization')

# Plot normalized confusion matrix
plt.show()

For nearly 10,000 observations in the dataset, our model correctly identified 281 cases where a part was faulty. However there were 58 cases where it missed to identiofy a faulty part and 16 cases where a part was incorrectly identified as faulty. In real world use cases, several iterations of feature engineering, algorithm selection, model training, optimization and evaluation may be required to achieve a ML model that performs according to the desired business outcomes.

For this post we are now ready to leverage this deployed model for predicting machine parts that require maintaince on streaming incoming data. Continue back to the blog and follow the next steps for further build and end-to-end testing of the solution.

## Cleanup

Please make sure to delete the resources created by this notebook (especially SageMaker endpoints), once you are done testing the solution, to avoid unexpected charges.

In [None]:
response = sm_client.delete_model(ModelName=model_name)

In [None]:
response = sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)

In [None]:
response = sm_client.delete_endpoint(EndpointName=endpoint_name)