## Time Series Classification Using AWS Built-in TensorFlow Framework

`Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0`


This notebook uses the NASA turbofan jet engine maintenace data, which is available [here](https://ti.arc.nasa.gov/m/project/prognostic-repository/CMAPSSData.zip),  to demostrate `TensorFlow` modeling using the AWS pre-built TensorFlow image.  

Built-in framework is one of methods using SageMaker services:  

<img src="../../images/SageMaker_BIA_BIF_BYO.jpg" width="1200">  

[Source: AWS re:Invent 2020: Implementing MLOps practices with Amazon SageMaker](https://www.youtube.com/watch?v=8ZpE-9LnaJk)

Please note when you use the built-in TensorFlow image, you will need to develop your own training script.  

The detailed steps are presented in the following sections.  


1. [Data preprocessing](#1-data-preprocessing)  
   -[Retrieve data](#retrieve-data-from-nasa-website)  
   -[Label data](#label-data)  
   -[Reshape data](#reshape-data)
2. [Create TensorFlow model](#2-create-tensorflow-model)  
   -[Create training script](#create-script)  
   -[Define TensorFlow model](#define-tensorflow-estimator)
3. [Train the model](#3-train-the-model)
4. [Deploy the model](#4-deploy-the-trained-model-to-an-endpoint)
5. [Invoke the endpoint](#5-invoke-the-endpoint)
6. [Delete the endpoint](#6-delete-the-endpoint)


Import libraries

In [None]:
import os
import zipfile
import urllib
import boto3

import numpy as np
import pandas as pd
import sagemaker
import sklearn
from sklearn import metrics
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sagemaker import get_execution_role
from sagemaker.tensorflow import TensorFlow

In [None]:
# Get SageMaker service role
try:
    role = get_execution_role() # runing from SageMaker Notebook
except ValueError:
    iam = boto3.client('iam') #runing from local machine
    role = iam.get_role(
        RoleName='AmazonSageMaker-ExecutionRole')['Role']['Arn']


In [None]:
#Use default bucket
sess = sagemaker.Session()
bucket = sess.default_bucket()  
# Use your own bucket
#bucket="<your bucket>"

### 1 Data preprocessing

  ### Retrieve Data from NASA website

In [None]:
# Get data from NASA website

data_folder = 'data'
if not os.path.exists(data_folder):
    os.makedirs(data_folder)
urllib.request.urlretrieve('https://ti.arc.nasa.gov/m/project/prognostic-repository/CMAPSSData.zip',
                           os.path.join(data_folder, 'CMAPSSData.zip'))

with zipfile.ZipFile(os.path.join(data_folder, 'CMAPSSData.zip'), "r") as zip_ref:
    zip_ref.extractall(data_folder)

columns = ['id', 'cycle', 'setting1', 'setting2', 'setting3', 's1', 's2', 's3', 's4', 's5', 's6',
           's7', 's8', 's9', 's10', 's11', 's12', 's13', 's14', 's15', 's16', 's17', 's18', 's19', 's20', 's21']


In [None]:
# read data
for i in range(1, 5):
    df = pd.read_csv(
        'data/train_FD{:03d}.txt'.format(i), delimiter=' ', header=None)
    df.drop(df.columns[[26, 27]], axis=1, inplace=True)
    df.columns = columns


In [None]:
df.head()

 ### Label data  
Generate target for the turbines. 14 cycles before the engine failure is defined as 'failure' (1).  
The data records the full life of engines. To create label for the data, it defines the last 14 cycles as failure. In this way, the model can predict engine failue 13 cycles in advance.  
The diagram depicts the appraoch.  
<img src="./img/label.png" width="800">

In [None]:
# Data Labeling - generate target.
rul = pd.DataFrame(df.groupby('id')['cycle'].max()).reset_index()
rul.columns = ['id', 'max']
df = df.merge(rul, on=['id'], how='left')
df['RUL'] = df['max'] - df['cycle']
df.drop('max', axis=1, inplace=True)
df['target'] = df['RUL'].apply(lambda x: 1 if x <= 14 else 0)
df.head()


In [None]:
df.shape

Split training and testing data

In [None]:
trainID, testID = train_test_split(df.id.unique(), test_size=0.2)

In [None]:
train = df[df['id'].isin(trainID)]

test = df[df['id'].isin(testID)]

In [None]:
train.columns

Get numerical columns and use scaler to transform the data

In [None]:
scaler = StandardScaler()
num_cols = ['setting1', 'setting2', 'setting3', 's1', 's2', 's3',
            's4', 's5', 's6', 's7', 's8', 's9', 's10', 's11', 's12', 's13', 's14',
            's15', 's16', 's17', 's18', 's19', 's20', 's21']
scaler.fit(train[num_cols])
train[num_cols] = scaler.transform(train[num_cols])
test[num_cols] = scaler.transform(test[num_cols])
train.head()

### Reshape data  
For time seriees analysis, the 2-dimension data (rows are timestamps, columns are features) needs to be reconstructed as 3-dimension data. For every sample, the sequence length defines how many time steps will be in this sample.  A single sample is a 2-D data with a window of sequence length by features. Sequence length is also called 'history size' or 'look back window'.  

<img src="./img/inputdata.png" width="1000">

In [None]:
#For time serries analysis
sequence_length = 30

In [None]:
# get train id that has record more than the sequence_length
id_count1 = train.groupby("id").count()
id_count1 = id_count1.reset_index()
train_id = id_count1[id_count1['s1'] > sequence_length]
train = train[train['id'].isin(train_id['id'])]


In [None]:
# get test id that has record more than tne sequence_length
id_count2 = test.groupby("id").count()
id_count2 = id_count2.reset_index()
test_id = id_count2[id_count2['s1'] > sequence_length]
test = test[test['id'].isin(test_id['id'])]


There are multiple engines for the dataset. To model engines, same engine's data need to be grouped and processed together. The following functions are used to reshape the data of multiple engines. 

Target size indicates if the model will predict future data. If the target size is 0, it means it use previous sequence length of data to predict current target. 

In [None]:
target_size=0
history_size=sequence_length

In [None]:
def reconstruct_data_singleID(dataset, target, start_index, end_index, history_size,
                              target_size, single_step=True):
  data = []
  labels = []

  start_index = start_index + history_size
  if end_index is None:
    end_index = len(dataset) - target_size

  for i in range(start_index, end_index):
    indices = range(i-history_size, i)
    data.append(dataset[indices])

    if single_step:
      labels.append([target[i+target_size]])
    else:
      labels.append([target[i:i+target_size]])

  return np.array(data), np.array(labels)


In [None]:
def reconstruct_data_mutipleID(raw_df):
    x_data = []
    y_data = []
    for id in raw_df['id'].unique():
        df = raw_df[raw_df['id'] == id]
        x_, y_ = reconstruct_data_singleID(df[num_cols].values, df['target'].values, 0, None, history_size,
                                           target_size, single_step=True)
        x_data.append(x_)
        y_data.append(y_)
    target = np.concatenate(y_data).astype(np.float32)
    fea_data = np.concatenate(x_data).astype(np.float32)
    return fea_data, target


In [None]:
x_train, y_train = reconstruct_data_mutipleID(train)

In [None]:
# function to reshape features into (samples, time steps, features)
def get_sequence(df, seq_length, feature_cols):
    data_array = df[feature_cols].values
    num_elements = data_array.shape[0]
    for start, stop in zip(range(0, num_elements-seq_length), range(seq_length, num_elements)):
        yield data_array[start:stop, :]


In [None]:
def get_targets(df, seq_length, target):
    data_array = df[target].values
    num_elements = data_array.shape[0]
    return data_array[seq_length:num_elements, :]


In [None]:
# generator for the sequences
seq_gen1 = (list(get_sequence(train[train['id'] == id], sequence_length, num_cols))
            for id in train['id'].unique())
# generate sequences and convert to numpy array
X_train = np.concatenate(list(seq_gen1)).astype(np.float32)
X_train.shape

In [None]:
# generate targets
target1 = [get_targets(train[train['id'] == id], sequence_length, ['target'])
              for id in train['id'].unique()]
y_train = np.concatenate(target1).astype(np.float32)
y_train.shape


In [None]:
seq_gen2 = (list(get_sequence(test[test['id'] == id], sequence_length, num_cols))
            for id in test['id'].unique())
# generate sequences and convert to numpy array
X_test = np.concatenate(list(seq_gen2)).astype(np.float32)
print(X_test.shape)
# generate targets
target2 = [get_targets(test[test['id'] == id], sequence_length, ['target'])
              for id in test['id'].unique()]
y_test = np.concatenate(target2).astype(np.float32)
print(y_test.shape)


In [None]:
from numpy import save
save('x_train.npy',X_train)
save('y_train.npy', y_train)
save('x_val.npy', X_test)
save('y_val.npy', y_test)


In [None]:
import numpy as np
x_test1 = np.load('x_val.npy')
y_test1 = np.load('y_val.npy')

In [None]:
y_test2=y_test1[-30:-10]
x_test2 = x_test1[-30:-10]

In [None]:
save('x_test1.npy',x_test1)
save('y_test1.npy', y_test1)
save('x_test2.npy', x_test2)
save('y_test2.npy', y_test2)

In [None]:
#Save data to S3
x_train_filename = 'train/x_train.npy'
y_train_filename = 'train/y_train.npy'
x_val_filename = 'train/x_val.npy'
y_val_filename = 'train/y_val.npy'
x_test1_filename = 'test/x_test1.npy'
y_test1_filename = 'test/y_test1.npy'
x_test2_filename = 'test/x_test2.npy'
y_test2_filename = 'test/y_test2.npy'

s3 = boto3.resource('s3')

s3.meta.client.upload_file('./x_train.npy', bucket, x_train_filename)
s3.meta.client.upload_file('./y_train.npy', bucket, y_train_filename)
s3.meta.client.upload_file('./x_val.npy', bucket, x_val_filename)
s3.meta.client.upload_file('./y_val.npy', bucket, y_val_filename)
s3.meta.client.upload_file('./x_test1.npy', bucket, x_test1_filename)
s3.meta.client.upload_file('./y_test1.npy', bucket, y_test1_filename)
s3.meta.client.upload_file('./x_test2.npy', bucket, x_test2_filename)
s3.meta.client.upload_file('./y_test2.npy', bucket, y_test2_filename)

### 2 Create TensorFlow Model

The `sagemaker.tensorflow.TensorFlow` estimator handles locating the script mode container, uploading your script to a S3 location and creating a SageMaker training job. Let's call out a couple important parameters here:

* `py_version` is set to `'py3'` to indicate that we are using script mode with built-in TensorFlow image.

* `distribution` is used to configure the distributed training setup. It's required only if you are doing distributed training either across a cluster of instances or across multiple GPUs. Here we are using parameter servers as the distributed training schema. SageMaker training jobs run on homogeneous clusters. To make parameter server more performant in the SageMaker setup, we run a parameter server on every instance in the cluster, so there is no need to specify the number of parameter servers to launch. Script mode also supports distributed training with [Horovod](https://github.com/horovod/horovod). You can find the full documentation on how to configure `distribution` [here](https://github.com/aws/sagemaker-python-sdk/tree/master/src/sagemaker/tensorflow#distributed-training). 

### Create script 

In [None]:
%%writefile tf_sdk_train.py
# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
#     http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.import tensorflow as tf

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, LSTM, Activation
import argparse
import os
import numpy as np
import pandas as pd
import json


def model(x_train, y_train, x_test, y_test):
    """Generate a simple model"""

    nb_features = x_train.shape[2]
    sequence_length = x_train.shape[1]
    nb_out = y_train.shape[1]
    
    model = Sequential()
    
    model.add(LSTM(
        input_shape=(sequence_length, nb_features),
        units=100,
        return_sequences=True))
    model.add(Dropout(0.2))
    model.add(LSTM(
        units=50,
        return_sequences=False))
    model.add(Dropout(0.2))
    model.add(Dense(units=nb_out, activation='sigmoid'))

    model.compile(loss='binary_crossentropy',
                  optimizer='RMSProp', metrics=[tf.keras.metrics.AUC()])

    model.fit(x_train, y_train)
    model.evaluate(x_test, y_test)

    return model


def _load_training_data(base_dir):
    """Load training data"""
    x_train = np.load(os.path.join(base_dir, 'x_train.npy'))
    y_train = np.load(os.path.join(base_dir, 'y_train.npy'))
    return x_train, y_train


def _load_testing_data(base_dir):
    """Load testing data"""
    x_test = np.load(os.path.join(base_dir, 'x_val.npy'))
    y_test = np.load(os.path.join(base_dir, 'y_val.npy'))
    return x_test, y_test


def _parse_args():
    parser = argparse.ArgumentParser()

    # Data, model, and output directories
    # model_dir is always passed in from SageMaker. By default this is a S3 path under the default bucket.
    parser.add_argument('--model_dir', type=str)
    parser.add_argument('--sm-model-dir', type=str, default=os.environ.get('SM_MODEL_DIR'))
    parser.add_argument('--train', type=str, default=os.environ.get('SM_CHANNEL_TRAINING'))
    parser.add_argument('--hosts', type=list, default=json.loads(os.environ.get('SM_HOSTS')))
    parser.add_argument('--current-host', type=str, default=os.environ.get('SM_CURRENT_HOST'))

    return parser.parse_known_args()


if __name__ == "__main__":
    args, unknown = _parse_args()

    train_data, train_labels = _load_training_data(args.train)
    eval_data, eval_labels = _load_testing_data(args.train)

    tf_classifier = model(train_data, train_labels, eval_data, eval_labels)

    if args.current_host == args.hosts[0]:
        # save model to an S3 directory with version number '00000001'
        tf_classifier.save(os.path.join(args.sm_model_dir, '000000001'), 'tf_sm_model.h5')


### Define TensorFlow estimator

In [None]:
tf_estimator = TensorFlow(entry_point='tf_sdk_train.py',
                             role=role,
                             instance_count=2,
                             instance_type='ml.p3.2xlarge',
                             framework_version='2.1.0',
                             py_version='py3',
                             distribution={'parameter_server': {'enabled': True}})

In [None]:
training_data_uri=f's3://{bucket}/train'

### 3 Train the model

To start a training job, call `estimator.fit(training_data_uri)`.

An S3 location is used here as the input. `fit` creates a default channel named `'training'`, which points to this S3 location. In the training script we can then access the training data from the location stored in `SM_CHANNEL_TRAINING`. `fit` accepts a couple other types of input as well. See the API doc [here](https://sagemaker.readthedocs.io/en/stable/estimators.html#sagemaker.estimator.EstimatorBase.fit) for details.

When training starts, the TensorFlow container executes training script tf_sdk_train.py, passing `hyperparameters` and `model_dir` from the estimator as script arguments. Because we didn't define either in this example, no hyperparameters are passed, and `model_dir` defaults to `s3://<DEFAULT_BUCKET>/<TRAINING_JOB_NAME>`, so the script execution is as follows:
```bash
python tf_sdk_train.py --model_dir s3://<DEFAULT_BUCKET>/<TRAINING_JOB_NAME>
```
When training is complete, the training job will upload the saved model for TensorFlow serving.

In [None]:
tf_estimator.fit(training_data_uri)

### 4 Deploy the trained model to an endpoint

The `deploy()` method creates a SageMaker model, which is then deployed to an endpoint to serve prediction requests in real time. We will use the TensorFlow Serving container for the endpoint, because we trained with script mode. This serving container runs an implementation of a web server that is compatible with SageMaker hosting protocol. The [Using your own inference code]() document explains how SageMaker runs inference containers.

Deployed the trained TensorFlow 2.1 model to an endpoint.

In [None]:
predictor = tf_estimator.deploy(
    initial_instance_count=1, instance_type='ml.p2.xlarge')


### 5 Invoke the endpoint

Let's download the training data and use that as input for inference.

### Invoke Endpoint: Option 1-using SDK

In [None]:
predictions = predictor.predict(x_test1[:50])
for i in range(0, 50):
    prediction = np.argmax(predictions['predictions'][i])
    label = y_test1[i]
    print('prediction is {}, label is {}, matched: {}'.format(prediction, label, prediction == label))

In [None]:
endpoint_name=predictor.endpoint
endpoint_name

### Invoke Endpoint: Option 2-using Boto3

In [None]:
import json
import botocore
ENDPOINT_NAME = predictor.endpoint

config = botocore.config.Config(read_timeout=80)
runtime = boto3.client('runtime.sagemaker', config=config)
data = x_test1[0:50]
payload = json.dumps(data.tolist())

response = runtime.invoke_endpoint(EndpointName=ENDPOINT_NAME,
                                   ContentType='application/json',
                                   Body=payload)
result = json.loads(response['Body'].read().decode())
res = result['predictions']

In [None]:
for i in range(0, 50):
    prediction = np.argmax(result['predictions'][i])
    label = y_test1[i]
    print('prediction is {}, label is {}, matched: {}'.format(prediction, label, prediction == label))

### 6. Delete the endpoint

Let's delete the endpoint we just created to prevent incurring any extra costs.

In [None]:
sagemaker.Session().delete_endpoint(endpoint_name)