# Automate detection of abnormal equipment behavior and review predictions with human in the loop using Amazon Lookout for Equipment and Amazon A2I

In this notebook we will show you how you can setup Amazon Lookout for Equipment to train an abnormal behavior detection model using a wind turbine dataset for predictive maintenance and setup up a human in the loop workflow to review the predictions using Amazon A2I, augment the dataset and retrain the model.

To get started with Amazon Lookout for Equipment, we will create a dataset, ingest data, train a model and run inference by setting up a scheduler. After going through these steps we will show you how you can quickly setup human review process using Amazon A2I and retrain your model with augmented or human reviewed datasets. we will walk you through the following steps:
1.	Creating a dataset in Amazon Lookout for Equipment
2.	Ingesting data into the Amazon Lookout for Equipment dataset
3.	Training a model in Amazon Lookout for Equipment
4.	Running diagnostics on the trained model
5.	Creating an inference scheduler in Amazon Lookout for Equipment to send a simulated stream of real-time requests.
6.	Setting up an Amazon A2I private human loop  and reviewing the predictions from Amazon Lookout for Equipment.
7.	Retraining your Amazon Lookout for Equipment model based on augmented datasets from Amazon A2I.

**Note:** 
1. Before you get started, make sure you have downloaded the open source wind turbine dataset from Engie and saved it in a designated S3 path. If you haven't done this, please go through `1_data_preparation.ipynb` notebook. 

2. To run this notebook we provide you with pre-generated labels for our dataset example, so you can directly run this notebook after you run the `1_data_preparation.ipynb` notebook. 

3. In general, the open source wind turbine dataset doesn't come with known date ranges when the turbine behaved abnormaly and this is also a known and common issue for many of our customers. Please, also go through `2_discover_anomaly_labels.ipynb` notebook to generate labels.

## Prerequisites

1. Please ensure that the IAM role attached to your SageMaker notebook instance has the permissions to use Lookout for Equipment. You can enable this by adding the Lookout for Equipment policy to the IAM role attached to your notebook instance. To access this go to SageMaker Console --> Notebook Instances --> Click on your notebook name --> Scroll down to IAM and click on the IAM Role here

2. Next ensure you attach "lookoutequipment.amazonaws.com" in the trust policy for the IAM role attached to your notebook instance as below:

```
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": [
          "sagemaker.amazonaws.com",
          "lookoutequipment.amazonaws.com",
          "s3.amazonaws.com"
        ]
      },
      "Action": "sts:AssumeRole"
    }
  ]
}```


## Setup environment

In [None]:
%%sh
pip -q install --upgrade pip
pip -q install --upgrade awscli boto3 sagemaker smart_open
pip -q install tqdm

In [None]:
import boto3
import datetime
import os
import pandas as pd
import pprint
import sagemaker
from sagemaker import get_execution_role
from sagemaker.s3 import S3Uploader, S3Downloader
import s3fs
import sys
import time
import uuid
import warnings

# Helper functions for managing Lookout for Equipment API calls:
sys.path.append('../../getting_started/utils')
import lookout_equipment_utils as lookout

In [None]:
#Uncomment the lines below if you want to view all columns in a dataframe for example, but will be resource intensive
#import pandas as pd
#pd.set_option('display.max_rows', None)
#pd.set_option('display.max_columns', None)
#pd.set_option('display.width', None)
#pd.set_option('display.max_colwidth', -1)

In [None]:
REGION_NAME = 'us-east-1'
BUCKET = '<your-Amazon-S3-bucket-name>'
PREFIX = 'wind-turbine'

ROLE_ARN = sagemaker.get_execution_role()

TURBINE_ID = 'R80711'
TRAIN_DATA = f's3://{BUCKET}/{PREFIX}/training_data/{TURBINE_ID}'
LABEL_DATA = f's3://{BUCKET}/{PREFIX}/labelled_data/{TURBINE_ID}'

DATASET_NAME = 'wind-turbine-dataset'
MODEL_NAME = 'wind-turbine-model'

## Load and view data

In [None]:
df = pd.read_csv(f'{TRAIN_DATA}/telemetry.csv', index_col = 'Timestamp')
df.head()

In [None]:
df.shape

In [None]:
labels = pd.read_csv('labels/labels.csv', header=None)
labels.head()

In [None]:
labels.shape

### Create the Dataset Component Map

In [None]:
DATASET_COMPONENT_FIELDS_MAP = dict()
DATASET_COMPONENT_FIELDS_MAP[TURBINE_ID] = df.reset_index().columns.to_list()

### Create L4E Dataset

In [None]:
lookout_dataset = lookout.LookoutEquipmentDataset(
    dataset_name=DATASET_NAME,
    component_fields_map=DATASET_COMPONENT_FIELDS_MAP,
    region_name=REGION_NAME,
    access_role_arn=ROLE_ARN
)

pp = pprint.PrettyPrinter(depth=5)
pp.pprint(eval(lookout_dataset.dataset_schema))

In [None]:
lookout_dataset.create()

### Ingest data into L4E dataset

In [None]:
response = lookout_dataset.ingest_data(BUCKET, f'{PREFIX}/training_data/')

In [None]:
# Get the ingestion job ID and status:
data_ingestion_job_id = response['JobId']
data_ingestion_status = response['Status']

# Wait until ingestion completes:
print("=====Polling Data Ingestion Status=====\n")
lookout_client = lookout.get_client(region_name=REGION_NAME)
print(str(pd.to_datetime(datetime.datetime.now()))[:19], "| ", data_ingestion_status)

while data_ingestion_status == 'IN_PROGRESS':
    time.sleep(60)
    describe_data_ingestion_job_response = lookout_client.describe_data_ingestion_job(JobId=data_ingestion_job_id)
    data_ingestion_status = describe_data_ingestion_job_response['Status']
    print(str(pd.to_datetime(datetime.datetime.now()))[:19], "| ", data_ingestion_status)
    
print("\n=====End of Polling Data Ingestion Status=====")

In [None]:
describe_data_ingestion_job_response

## Train L4E Model

### Split train and test data

In [None]:
train_ratio = 0.8
train_split = int(len(df.index)*train_ratio)

def change_date_format(datetime):
    return pd.to_datetime(datetime).strftime("%Y-%m-%d %H:%M:%S")

training_start   = pd.to_datetime(df.index[0])
training_end     = pd.to_datetime(df.index[train_split])
evaluation_start = pd.to_datetime(df.index[train_split+1])
evaluation_end   = pd.to_datetime(df.index[-1])

print(f'Training period: from {training_start} to {training_end}')
print(f'Evaluation period: from {evaluation_start} to {evaluation_end}')

### Prepare labels
For this notebook example we are using the existing labels available in our dataset. If you would like to know how to create your own labels for your dataset please refer to 2_discover_anomaly_labels.ipynb

In [None]:
df_labels = pd.read_csv('labels/labels.csv', header=None, parse_dates=True)
df_labels[0] = [pd.to_datetime(x).strftime("%Y-%m-%dT%H:%M:%S.%f") for x in df_labels[0]]
df_labels[1] = [pd.to_datetime(x).strftime("%Y-%m-%dT%H:%M:%S.%f") for x in df_labels[1]]
df_labels

In [None]:
df_labels.to_csv('labels/labels_reviewed.csv', header=None, index=None)

In [None]:
!aws s3 cp labels/labels_reviewed.csv s3://$BUCKET/$PREFIX/labelled_data/labels.csv

### Setup Training Config

In [None]:
# Prepare the model parameters:
lookout_model = lookout.LookoutEquipmentModel(model_name=MODEL_NAME,
                                              dataset_name=DATASET_NAME,
                                              region_name=REGION_NAME)

# Set the training / evaluation split date:
lookout_model.set_time_periods(evaluation_start,
                               evaluation_end,
                               training_start,
                               training_end)

# Set the label data location:
lookout_model.set_label_data(bucket=BUCKET, 
                             prefix=PREFIX+'/labelled_data/',
                             access_role_arn=ROLE_ARN)

# This sets up the rate the service will resample the data before 
# training:
lookout_model.set_target_sampling_rate(sampling_rate='PT10M')

### Train model

In [None]:
# Actually create the model and train it:
lookout_model.train()

#### the step below will make this notebook poll for 2.5 hours

In [None]:
# Run this only if you want this notebook to wait here till the training is complete
lookout_model.poll_model_training()

### Get diagnostics for the trained model

In [None]:
MODEL_NAME

In [None]:
lookout_client = lookout.get_client(region_name=REGION_NAME)
describe_model_response = lookout_client.describe_model(ModelName=MODEL_NAME)
list(describe_model_response.keys())

In [None]:
describe_model_response['Status']

In [None]:
LookoutDiagnostics = lookout.LookoutEquipmentAnalysis(model_name=MODEL_NAME, tags_df=df, region_name=REGION_NAME)
LookoutDiagnostics.set_time_periods(evaluation_start, evaluation_end, training_start, training_end)
predicted_ranges = LookoutDiagnostics.get_predictions()
labels_fname = os.path.join(LABEL_DATA, 'labels.csv')
labeled_ranges = LookoutDiagnostics.get_labels(labels_fname)

In [None]:
labeled_ranges

#### Model diagnostics with feature contribution (% that the feature contributed to the anomaly that was detected) toward anomaly patterns

In [None]:
list_d = []
for rec in predicted_ranges['diagnostics']:
    list_d.append(pd.DataFrame.from_dict(rec).set_index('name'))
diagnostics_df_ = pd.concat(list_d, axis=1).T.reset_index(drop=True)
diagnostics_df = pd.concat([predicted_ranges[['start','end']],diagnostics_df_], axis=1)
diagnostics_df

### Display Anomaly Events

In [None]:
def build_labels_df(df, predicted_ranges, labeled_ranges):
    labels_df = pd.DataFrame(index=pd.to_datetime(df.index))
    labels_df['true'] = 0
    labels_df['predicted'] = 0
    
    mask = labels_df.index >= evaluation_start
    labels_df = labels_df.loc[mask, :]
    
    for row in labeled_ranges.iterrows():
        s = pd.to_datetime(row[1]['start'])
        e = pd.to_datetime(row[1]['end'])
        labels_df.loc[s:e,'true'] = 1
    
    for row in predicted_ranges.iterrows():
        s = pd.to_datetime(row[1]['start'])
        e = pd.to_datetime(row[1]['end'])
        labels_df.loc[s:e,'predicted'] = 1
    
    return labels_df

labels_df = build_labels_df(df, predicted_ranges, labeled_ranges)
labels_df

In [None]:
c_ = []
for row in labeled_ranges.iterrows():
    s = pd.to_datetime(row[1]['start'])
    e = pd.to_datetime(row[1]['end'])
    a = labels_df.loc[s:e,:].index
    b = labels_df.loc[labels_df.sum(axis=1) == 2].index
    c = set(a).intersection(set(b))
    if c:
        c_.append(1)

print('Total abnormal events detected: ', len(c_))
print('Total abnormal events in the evaluation period: ', len(labeled_ranges.loc[labeled_ranges['start']>=evaluation_start,:]))

In [None]:
PREDICTIONS_FNAME = 'predictions.csv'
labels_df.to_csv(f's3://{BUCKET}/{PREFIX}/labelled_data/{PREDICTIONS_FNAME}')

## Run inference on the L4E model

### Create the inference scheduler
The CreateInferenceScheduler API creates a scheduler **and** starts it: this means that this starts costing you right away. However, you can stop and start an existing scheduler at will (see at the end of this notebook):

In [None]:
ROLE_ARN = sagemaker.get_execution_role()

# Name of the inference scheduler you want to create
INFERENCE_SCHEDULER_NAME = 'wind-turbine-inference-scheduler'

# Name of the model on which you want to create this inference scheduler
MODEL_NAME_FOR_CREATING_INFERENCE_SCHEDULER = MODEL_NAME

# Mandatory parameters:
INFERENCE_DATA_SOURCE_BUCKET = BUCKET
INFERENCE_DATA_SOURCE_PREFIX = f'{PREFIX}/inference-a2i/input/'
INFERENCE_DATA_OUTPUT_BUCKET = BUCKET
INFERENCE_DATA_OUTPUT_PREFIX = f'{PREFIX}/inference-a2i/output/'
ROLE_ARN_FOR_INFERENCE = ROLE_ARN
DATA_UPLOAD_FREQUENCY = 'PT10M'

In [None]:
DATA_DELAY_OFFSET_IN_MINUTES = None
INPUT_TIMEZONE_OFFSET = '+00:00'
COMPONENT_TIMESTAMP_DELIMITER = '_'
TIMESTAMP_FORMAT = 'yyyyMMddHHmmss'

In [None]:
scheduler = lookout.LookoutEquipmentScheduler(
    scheduler_name=INFERENCE_SCHEDULER_NAME,
    model_name=MODEL_NAME_FOR_CREATING_INFERENCE_SCHEDULER,
    region_name=REGION_NAME
)

scheduler_params = {
    'input_bucket': INFERENCE_DATA_SOURCE_BUCKET,
    'input_prefix': INFERENCE_DATA_SOURCE_PREFIX,
    'output_bucket': INFERENCE_DATA_OUTPUT_BUCKET,
    'output_prefix': INFERENCE_DATA_OUTPUT_PREFIX,
    'role_arn': ROLE_ARN_FOR_INFERENCE,
    'upload_frequency': DATA_UPLOAD_FREQUENCY,
    'delay_offset': DATA_DELAY_OFFSET_IN_MINUTES,
    'timezone_offset': INPUT_TIMEZONE_OFFSET,
    'component_delimiter': COMPONENT_TIMESTAMP_DELIMITER,
    'timestamp_format': TIMESTAMP_FORMAT
}

scheduler.set_parameters(**scheduler_params)

### Prepare the inference data
---
Let's prepare and send some data in the S3 input location our scheduler will monitor:

In [None]:
# Let's load all our original signals:
all_tags_fname = TRAIN_DATA+'/'+turbine_id+'/'+turbine_id+'.csv'
all_tags_df = pd.read_csv(all_tags_fname)
all_tags_df['Timestamp']= pd.to_datetime(all_tags_df['Timestamp'])
all_tags_df = all_tags_df.set_index('Timestamp')
all_tags_df.head()

In [None]:
all_tags_df.index.max()

#### To build our sample inference dataset, we will extract the last few minutes of the evaluation period of the original time series:
Specifically we will create 3 csv files for our turbine 5 minutes apart. These are all stored in s3 in the inference-a2i folder

In [None]:
# How many sequences do we want to extract:
num_sequences = 3

# The scheduling frequency in minutes: this **MUST** match the
# resampling rate used to train the model:
frequency = 10
# Getting a better range for more data points
duration = 40

# Loops through each sequence:
start = all_tags_df.index.max() + datetime.timedelta(minutes=-duration * (num_sequences))
j = 0
for i in range(num_sequences):
    print("num seq i: " + str(i))
    end = start + datetime.timedelta(minutes=+duration)
    
# Rounding time to the previous 5 minutes:
    tm = datetime.datetime.now()
    print(tm)
    tm = tm - datetime.timedelta(
        minutes=tm.minute % frequency,
        seconds=tm.second,
        microseconds=tm.microsecond
    )
    tm = tm + datetime.timedelta(minutes=+frequency * (i))
    current_timestamp = (tm).strftime(format='%Y%m%d%H%M%S')


    # For each sequence, we need to loop through all components:
    print(f'Extracting data from {start} to {end}:')
    new_index = None
    
        # Extracting the dataframe for this component and this particular time range:
    signals = list(df.columns)
    signals_df = all_tags_df.loc[start:end, signals]
        
        # We need to reset the index to match the time 
        # at which the scheduler will run inference:
    if new_index is None:
        new_index = pd.date_range(
            start=tm,
            periods=signals_df.shape[0], 
            freq='2min'
        )
    signals_df.index = new_index
    signals_df.index.name = 'Timestamp'
    signals_df = signals_df.reset_index()
    signals_df['Timestamp'] = pd.to_datetime(signals_df['Timestamp'], errors='coerce')
    # IMPORTANT STEP - we are populating a new data frame here to be used in A2I display UI for reference
    if j == 0:
        sig_full_df = signals_df
        j = 1
    else:
        sig_full_df = pd.concat([sig_full_df,signals_df], ignore_index=True)
    # Export this file in CSV format:
    component_fname = os.path.join(INFER_DATA_A2I, 'input', f'{turbine_id}_{current_timestamp}.csv')
    print("creating inference input files: " + component_fname)
    signals_df.to_csv(component_fname, index=None)
    
    start = start + datetime.timedelta(minutes=+duration)
    
    # Upload the whole folder to S3, in the input location:
    INFERENCE_INPUT = os.path.join(INFER_DATA_A2I, 'input')
    !aws s3 cp --recursive --quiet $INFERENCE_INPUT s3://$BUCKET/$PREFIX/inference-a2i/input
    


In [None]:
sig_full_df

In [None]:
# Now that we've prepared the data, create the scheduler by running:
create_scheduler_response = scheduler.create()

## Get inference results
---

### List inference executions

**Let's now wait for 5-15 minutes to give some time to the scheduler to run its first inferences.** Once the wait is over, we can use the ListInferenceExecution API for our current inference scheduler. The only mandatory parameter is the scheduler name.

You can also choose a time period for which you want to query inference executions for. If you don't specify it, then all executions for an inference scheduler will be listed. If you want to specify the time range, you can do this:

```python
START_TIME_FOR_INFERENCE_EXECUTIONS = datetime.datetime(2010,1,3,0,0,0)
END_TIME_FOR_INFERENCE_EXECUTIONS = datetime.datetime(2010,1,5,0,0,0)
```

Which means the executions after `2010-01-03 00:00:00` and before `2010-01-05 00:00:00` will be listed.

You can also choose to query for executions in particular status, the allowed status are `IN_PROGRESS`, `SUCCESS` and `FAILED`.

In [None]:
START_TIME_FOR_INFERENCE_EXECUTIONS = None
END_TIME_FOR_INFERENCE_EXECUTIONS = None
EXECUTION_STATUS = None

execution_summaries = []

while len(execution_summaries) == 0:
    execution_summaries = scheduler.list_inference_executions(
        start_time=START_TIME_FOR_INFERENCE_EXECUTIONS,
        end_time=END_TIME_FOR_INFERENCE_EXECUTIONS,
        execution_status=EXECUTION_STATUS
    )
    if len(execution_summaries) == 0:
        print('WAITING FOR THE FIRST INFERENCE EXECUTION')
        time.sleep(60)
        
    else:
        print('FIRST INFERENCE EXECUTED\n')
        break
            
execution_summaries

### Get actual prediction results

After each successful inference, a CSV file is created in the output location of your bucket. Each inference creates a new folder with a single `results.csv` file in it. Let's read these files and display their content here:

In [None]:
# If not installed at the beginning of the notebook, run this
#!pip install smart_open

In [None]:
import json
from smart_open import smart_open
results_df = []
something = 0
for execution_summary in execution_summaries:
    print("Checking inference for " + str(execution_summary['ScheduledStartTime']) + " with status " + execution_summary['Status'])
    if execution_summary['Status'] == 'SUCCESS':
        something = 1
        bucket = execution_summary['CustomerResultObject']['Bucket']
        key = execution_summary['CustomerResultObject']['Key']
        fname = f's3://{bucket}/{key}'
        with smart_open(fname,'r') as file:
            data = json.load(file)
        results_df.append(pd.DataFrame([data]))

        # Assembles them into a DataFrame:
if something == 1:
    results_df = pd.concat(results_df, axis='index')
    results_df.columns = ['Timestamp', 'Predictions']
    results_df['Timestamp'] = pd.to_datetime(results_df['Timestamp'],errors='coerce')
    results_df = results_df.set_index('Timestamp')
else:
    results_df.append('No successful inference results yet, please try again..')

results_df

In [None]:
results_df.to_csv(os.path.join(INFER_DATA_A2I, 'output', 'results.csv'))
results_df

### Stop Inference Scheduler
Let's make sure to stop the inference scheduler as we won't require it for the rest of the steps below. But, as part of your solution, the inference scheduler should be running to ensure real-time inference for your equipment are continued.

In [None]:
scheduler.stop(wait=True)

In [None]:
# IF we dont need this scheduler anymore
scheduler.delete()

# A2I activities start here
Now that we saw the inference has been executed, let's now understand how to setup a UI to review the inference results and update it, so we can send it back to L4E for retraining the model. Follow the steps provided below

### Initialize handlers

In [None]:
timestamp = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
# Amazon SageMaker client
sagemaker_client = boto3.client('sagemaker')

# Amazon Augment AI (A2I) client
a2i = boto3.client('sagemaker-a2i-runtime')

# Amazon S3 client 
s3 = boto3.client('s3')

# Flow definition name - this value is unique per account and region. You can also provide your own value here.
flowDefinitionName = 'fd-l4e-' + timestamp

# Task UI name - this value is unique per account and region. You can also provide your own value here.
taskUIName = 'ui-l4e-' + timestamp

# Flow definition outputs - temp S3 bucket in current region, as L4E is in AP region currently - to be changed at GA
a2ibucket = 'prem-experiments'
OUTPUT_PATH = f's3://' + a2ibucket + '/' + PREFIX + '/a2i-results'

role = get_execution_role()
print("RoleArn: {}".format(role))
WORKTEAM_ARN = '<workteam-arn>'

### Create the human task UI
Create a human task UI resource, giving a UI template in liquid html.You can download this tempalte and customize it  This template will be rendered to the human workers whenever human loop is required. For over 70 pre built UIs, check: https://github.com/aws-samples/amazon-a2i-sample-task-uis. But first, lets declare some variables that we need during the next set of steps.

In [None]:
# We customized the tabular template for our notebook as below
template = r"""
<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/Chart.js/2.5.0/Chart.min.js"></script>

<crowd-form>
   <div style="margin-left: 40px">
        <h1>Instructions</h1>
        <p>Please review the equipment sensor inference inputs, and make corrections to anomaly predictions from the Lookout for Equipment Model.</p>
    
   <div>
      <h3>Equipment Sensor Readings</h3>
      <div style="width:75%;">
        <canvas id="canvas"></canvas>
      </div>
   </div>
    <br>
    <h3>Select the correct equipment status below</h3>
    <p>0 means the equipment is fine. 1 means the equipment is faulty or is in the process of wearing down</p>
    <table>
    <tr>
        <th>Start</th>
        <th>End</th>
        <th>Predicted Anomaly</th>
        <th>Corrected Start</th>
        <th>Corrected End</th>
        <th>Corrected Status</th>
        <th>Comments</th>
    </tr>
    {% for pair in task.input.anomaly %}

        <tr>
          <td><crowd-text-area name="startts-{{ forloop.index }}" value="{{ pair.startts }}" rows="2"></crowd-text-area></td>
          <td><crowd-text-area name="endts-{{ forloop.index }}" value="{{ pair.endts }}" rows="2"></crowd-text-area></td>
          <td><crowd-text-area name="ano-{{ forloop.index }}" value="{{ pair.ano }}"></crowd-text-area></td>     
          <td>
          <p>
            <input type="text" name="TrueStart{{ forloop.index }}" value="{{ pair.startts }}" style="height:50%; width:100%" />
            </p>
            </td>
            <td>
            <p>
            <input type="text" name="TrueEnd{{ forloop.index }}" value="{{ pair.endts }}" style="height:50%; width:100%" />
            </p>
            </td>
            <td>
            <p>
            <crowd-radio-group>
                <crowd-radio-button name="faulty-{{forloop.index}}" value="1">1-Faulty</crowd-radio-button><br>
                <crowd-radio-button name="good-{{forloop.index}}" value="0">0-Good</crowd-radio-button>
            </crowd-radio-group>
            </p>
           </td>
           <td>
            <p>
            <input type="text" name="Comments{{ forloop.index }}" placeholder="Explain why you changed the value" style="height:50%; width:80%"/>
            </p>
           </td>
        </tr>
      {% endfor %}
    </table>
    <br>
    </div>
</crowd-form>

<script>
window.chartColors = {
  red: 'rgb(255, 99, 132)',
  orange: 'rgb(255, 159, 64)',
  yellow: 'rgb(255, 205, 86)',
  green: 'rgb(75, 192, 192)',
  blue: 'rgb(54, 162, 235)',
  purple: 'rgb(153, 102, 255)',
  grey: 'rgb(231,233,237)'
};

var signal = "{{task.input.signal | to_json}}";
var reactive_power = [];
var wind_speed_1 = [];
var outdoor_temp = [];
var grid_frequency = [];
var pitch_angle = [];
var timestamps = [];


var config = {
  type: 'line',
  data: {
    labels: timestamps,
    datasets: [{
      label: "Reactive Power",
      backgroundColor: window.chartColors.red,
      borderColor: window.chartColors.red,
      data: reactive_power,
      fill: false,
    }, {
      label: "Wind Speed 1",
      fill: false,
      backgroundColor: window.chartColors.blue,
      borderColor: window.chartColors.blue,
      data: wind_speed_1,
    }, {
      label: "Outdoor Temp",
      fill: false,
      backgroundColor: window.chartColors.orange,
      borderColor: window.chartColors.orange,
      data: outdoor_temp,
    }, {
      label: "Grid Frequency",
      fill: false,
      backgroundColor: window.chartColors.green,
      borderColor: window.chartColors.green,
      data: grid_frequency,
    }, {
      label: "Pitch Angle",
      fill: false,
      backgroundColor: window.chartColors.purple,
      borderColor: window.chartColors.purple,
      data: pitch_angle,
    }         
              ]
  },
  options: {
    responsive: true,
    title:{
      display:true,
      text:'Equipment Sensor Readings Line Chart'
    },
    tooltips: {
      mode: 'index',
      intersect: false,
    },
   hover: {
      mode: 'nearest',
      intersect: true
    },
    scales: {
      xAxes: [{
        display: true,
        scaleLabel: {
          display: true,
          labelString: 'Timestamp'
        }
      }],
      yAxes: [{
        display: true,
        scaleLabel: {
          display: true,
        },
      }]
    }
  }
};

document.addEventListener('all-crowd-elements-ready', populateChart);

function populateChart() {
  
  try {
    
    var obj = JSON.parse(signal.replace(/&quot;/g,'"'));
    
    for (i = 0; i < obj.length; i++) {
      timestamps.push(obj[i].timestamp);
      reactive_power.push(obj[i].reactive_power);
      wind_speed_1.push(obj[i].wind_speed_1);
      outdoor_temp.push(obj[i].outdoor_temp);
      grid_frequency.push(obj[i].grid_frequency);
      pitch_angle.push(obj[i].pitch_angle);
    }
    
  } catch (error) {
    console.error(error);
  }
    
  var ctx = document.getElementById("canvas").getContext("2d");
  var myLine = new Chart(ctx, config);
}
  
</script>
"""

In [None]:
def create_task_ui():
    '''
    Creates a Human Task UI resource.
    Returns:
    struct: HumanTaskUiArn
    '''
    response = sagemaker_client.create_human_task_ui(
        HumanTaskUiName=taskUIName,
        UiTemplate={'Content': template})
    return response

In [None]:
# Create task UI
humanTaskUiResponse = create_task_ui()
humanTaskUiArn = humanTaskUiResponse['HumanTaskUiArn']
print(humanTaskUiArn)

In [None]:
role = get_execution_role()
print("RoleArn: {}".format(role))

In [None]:
create_workflow_definition_response = sagemaker_client.create_flow_definition(
        FlowDefinitionName= flowDefinitionName,
        RoleArn=role,
        HumanLoopConfig= {
            "WorkteamArn": WORKTEAM_ARN,
            "HumanTaskUiArn": humanTaskUiArn,
            "TaskCount": 1,
            "TaskDescription": "Review the contents and select correct values as indicated",
            "TaskTitle": "Equipment Condition Review"
        },
        OutputConfig={
            "S3OutputPath" : OUTPUT_PATH
        }
    )
flowDefinitionArn = create_workflow_definition_response['FlowDefinitionArn'] # let's save this ARN for future use

In [None]:
for x in range(60):
    describeFlowDefinitionResponse = sagemaker_client.describe_flow_definition(FlowDefinitionName=flowDefinitionName)
    print(describeFlowDefinitionResponse['FlowDefinitionStatus'])
    if (describeFlowDefinitionResponse['FlowDefinitionStatus'] == 'Active'):
        print("Flow Definition is active")
        break
    time.sleep(2)

# Sending predictions to Amazon A2I human loops

In [None]:
a2i_sig_full_df = sig_full_df.reset_index()

In [None]:
NUM_TO_REVIEW = 5 # number of line items to review
dftimestamp = a2i_sig_full_df['Timestamp'].astype(str).to_list()
dfsig001 = a2i_sig_full_df['Q_avg'].astype(str).to_list()
dfsig002 = a2i_sig_full_df['Ws1_avg'].astype(str).to_list()
dfsig003 = a2i_sig_full_df['Ot_avg'].astype(str).to_list()
dfsig004 = a2i_sig_full_df['Nf_avg'].astype(str).to_list()
dfsig046 = a2i_sig_full_df['Ba_avg'].astype(str).to_list()
sig_list = [{'timestamp': dftimestamp[x], 'reactive_power': dfsig001[x], 'wind_speed_1': dfsig002[x], 'outdoor_temp': dfsig003[x], 'grid_frequency': dfsig004[x], 'pitch_angle': dfsig046[x]} for x in range(NUM_TO_REVIEW)]
sig_list

In [None]:
old_results_df = results_df

In [None]:
# To be executed only for the first time for after an inference call
results_df.reset_index(inplace=True)

In [None]:
results_df['StartTimestamp'] = results_df['Timestamp'] - datetime.timedelta(minutes=frequency*12)
results_df['EndTimestamp'] = results_df['Timestamp'] + datetime.timedelta(minutes=frequency*12)

In [None]:
#results_df = results_df.drop(['index'], axis=1)
results_df

In [None]:
dfstartts = results_df['StartTimestamp'].astype(str).to_list()
dfendts = results_df['EndTimestamp'].astype(str).to_list()
dfano = results_df['Predictions'].to_list()
ano_list = [{'startts': dfstartts[x], 'endts': dfendts[x], 'ano': dfano[x]} for x in range(len(results_df))]
ano_list

In [None]:
ip_content = {"signal": sig_list,
             'anomaly': ano_list
             }

# Start the human review

In [None]:
import json
humanLoopName = str(uuid.uuid4())

start_loop_response = a2i.start_human_loop(
            HumanLoopName=humanLoopName,
            FlowDefinitionArn=flowDefinitionArn,
            HumanLoopInput={
                "InputContent": json.dumps(ip_content)
            }
        )


In [None]:
completed_human_loops = []
resp = a2i.describe_human_loop(HumanLoopName=humanLoopName)
print(f'HumanLoop Name: {humanLoopName}')
print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
print('\n')
   
      
if resp["HumanLoopStatus"] == "Completed":
    completed_human_loops.append(resp)

# login link to navigate to the private workforce portal

In [None]:
workteamName = WORKTEAM_ARN[WORKTEAM_ARN.rfind('/') + 1:]
print("Navigate to the private worker portal and do the tasks. Make sure you've invited yourself to your workteam!")
print('https://' + sagemaker_client.describe_workteam(WorkteamName=workteamName)['Workteam']['SubDomain'])

In [None]:
completed_human_loops = []
resp = a2i.describe_human_loop(HumanLoopName=humanLoopName)
print(f'HumanLoop Name: {humanLoopName}')
print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
print('\n')
   
      
if resp["HumanLoopStatus"] == "Completed":
    completed_human_loops.append(resp)

# Evaluating the results

When the labeling work is complete, your results should be available in the S3 output path specified in the human review workflow definition. 
The human answers are returned and saved in the JSON file.

In [None]:
import re
import pprint

pp = pprint.PrettyPrinter(indent=4)
json_output = ''
for resp in completed_human_loops:
    splitted_string = re.split('s3://' + a2ibucket  + '/', resp['HumanLoopOutput']['OutputS3Uri'])
    print(splitted_string[1])
    output_bucket_key = splitted_string[1]
    response = s3.get_object(Bucket=a2ibucket, Key=output_bucket_key)
    content = response["Body"].read()
    json_output = json.loads(content)
    pp.pprint(json_output)
    print('\n')

## Retrain L4E based on A2I correction
Now we'll take the A2I output, preprocess it and send it back to L4E for retraining our model based on the user corrections

In [None]:
labels_df = pd.read_csv(os.path.join(LABEL_DATA, 'labels.csv'), header=None)
labels_df[0] = pd.to_datetime(labels_df[0])
labels_df[1] = pd.to_datetime(labels_df[1])
labels_df.columns = ['start', 'end']
labels_df.tail()

In [None]:
a2i_lbl_df = pd.DataFrame()

### Update Labels with new date ranges

In [None]:
faulty = False
a2i_lbl_df = labels_df
x = json_output['humanAnswers'][0]
row_df = pd.DataFrame(columns=['rownr'])
tslist = {}

# Let's first check if the users mark equipment as faulty and if so get those row numbers into a dataframe            
for i in json_output['humanAnswers']:
    print("checking equipment review...")
    x = i['answerContent']
    for idx, key in enumerate(x):
        if "faulty" in key:
            if str(x.get(key)).split(':')[1].lstrip().strip('}') == "True": # faulty equipment selected
                    faulty = True
                    row_df.loc[len(row_df.index)] = [key.split('-')[1]] 
                    print("found faulty equipment in row: " + key.split('-')[1])


# Now we will get the date ranges for the faulty choices                     
for idx,k in row_df.iterrows():
    x = json_output['humanAnswers'][0]
    strchk = "TrueStart"+k['rownr']
    endchk = "TrueEnd"+k['rownr']
    for i in x['answerContent']:
        if i == strchk:
            tslist[i] = x['answerContent'].get(i)
        if i == endchk:
            tslist[i] = x['answerContent'].get(i)

            
# And finally let's add it to our new a2i labels dataset
for idx,k in row_df.iterrows():
    x = json_output['humanAnswers'][0]
    strchk = "TrueStart"+k['rownr']
    endchk = "TrueEnd"+k['rownr']
    a2i_lbl_df.loc[len(a2i_lbl_df.index)] = [tslist[strchk], tslist[endchk]]

### Dont execute steps below if no new label was added

In [None]:
# Updated Labels after A2I results are included
a2i_lbl_df

In [None]:
a2i_label_src_fname = os.path.join(A2I_LABEL_DATA, 'labels.csv')
a2i_lbl_df.to_csv(a2i_label_src_fname, header=None, index=False)

In [None]:
# Uploading label dataset to S3:
a2i_label_s3_dest_path = f's3://{BUCKET}/{PREFIX}/augmented-labelled-data/labels.csv'
!aws s3 cp $a2i_label_src_fname $a2i_label_s3_dest_path

### Update the training dataset with new measurements
We will now update our original training dataset with the new measurement range based on what we got back from A2I

In [None]:
turbine_id = 'R80711'
file = '../data/wind-turbine/final/training-data/'+turbine_id+'/'+turbine_id+'.csv'
newdf = pd.read_csv(file, index_col='Timestamp')
newdf.head()

In [None]:
newdf = newdf.shape

In [None]:
sig_full_df = sig_full_df.set_index('Timestamp')

In [None]:
sig_full_df

In [None]:
sig_full_df.shape

In [None]:
tm = pd.to_datetime('2021-04-05 20:30:00')
print(tm)
new_index = pd.date_range(
        start=tm,
        periods=sig_full_df.shape[0], 
        freq='10min'
        )
sig_full_df.index = new_index
sig_full_df.index.name = 'Timestamp'
sig_full_df = sig_full_df.reset_index()
sig_full_df['Timestamp'] = pd.to_datetime(sig_full_df['Timestamp'], errors='coerce')

In [None]:
sig_full_df

In [None]:
# Append the original training data with the new measurements that we simulated before we ran our inference. We should be updating this only 
# if A2I reviews tagged faulty equipment
newdf = newdf.reset_index()
newdf = pd.concat([newdf,sig_full_df])
newdf.head()

In [None]:
newdf.tail()

In [None]:
newdf = newdf.set_index('Timestamp')

**Note:** As we can see above, 15 rows were appended to the end of the training dataset. Now lets create a csv file and copy the data to the training channel in S3

In [None]:
TRAIN_DATA_AUGMENTED = os.path.join(TRAIN_DATA,'augmented')
os.makedirs(TRAIN_DATA_AUGMENTED, exist_ok=True)
newdf.to_csv('../data/wind-turbine/final/training-data/augmented/'+turbine_id+'.csv')
!aws s3 sync $TRAIN_DATA_AUGMENTED s3://$BUCKET/$PREFIX/training_data/augmented

In [None]:
# Update the component map for augmented dataset. You should not see any changes to the dataset structure because of A2I updates but just in case
DATASET_COMPONENT_FIELDS_MAP = dict()
for subsystem in components:
    if subsystem not in ".ipynb_checkpoints" and subsystem in "augmented":
        subsystem = turbine_id
        print("sub: " + subsystem)
        subsystem_tags = ['Timestamp']
        for root, _, files in os.walk(f'{TRAIN_DATA}/{subsystem}'):
            for file in files:
                print("file: " + file)
                fname = os.path.join(root, file)
                current_subsystem_df = pd.read_csv(fname, nrows=1)
                subsystem_tags = subsystem_tags + current_subsystem_df.columns.tolist()[1:]

            DATASET_COMPONENT_FIELDS_MAP.update({subsystem: subsystem_tags})    

In [None]:
DATASET_COMPONENT_FIELDS_MAP

### Create the augmented dataset

In [None]:
ROLE_ARN = sagemaker.get_execution_role()
# REGION_NAME = boto3.session.Session().region_name

DATASET_NAME = 'wind-turbine-dataset-augmented'
MODEL_NAME = 'wind-turbine-model-augmented'


lookout_dataset = lookout.LookoutEquipmentDataset(
    dataset_name=DATASET_NAME,
    component_fields_map=DATASET_COMPONENT_FIELDS_MAP,
    region_name=REGION_NAME,
    access_role_arn=ROLE_ARN
)

pp = pprint.PrettyPrinter(depth=5)
pp.pprint(eval(lookout_dataset.dataset_schema))

In [None]:
lookout_dataset.create()

### Ingest augmented data into L4E

In [None]:
response = lookout_dataset.ingest_data(BUCKET, f'{PREFIX}/training_data/augmented/')

In [None]:
# Get the ingestion job ID and status:
data_ingestion_job_id = response['JobId']
data_ingestion_status = response['Status']

# Wait until ingestion completes:
print("=====Polling Data Ingestion Status=====\n")
lookout_client = lookout.get_client(region_name=REGION_NAME)
print(str(pd.to_datetime(datetime.datetime.now()))[:19], "| ", data_ingestion_status)

while data_ingestion_status == 'IN_PROGRESS':
    time.sleep(60)
    describe_data_ingestion_job_response = lookout_client.describe_data_ingestion_job(JobId=data_ingestion_job_id)
    data_ingestion_status = describe_data_ingestion_job_response['Status']
    print(str(pd.to_datetime(datetime.datetime.now()))[:19], "| ", data_ingestion_status)
    
print("\n=====End of Polling Data Ingestion Status=====")

In [None]:
describe_data_ingestion_job_response

### Update the time ranges for training and evaluation of augmented dataset
In the case of a continuous interval of data sampling and training, there will not be any gaps (or minimal gaps) in the time ranges between the previous training run and the current augmented training run. However in our wind turbine example we are looking at a dataset that was last recorded in 2018. As a result we select the training and evaluation period choices as shown below. During operational application, choose a time period that provides you the flexibility of a back test window for your evaluation with adequate data made available for training.  

In [None]:
newdf.index = pd.to_datetime(newdf.index)

In [None]:
# Loading time ranges, for augmented training, the training end will go upto the original evaluation end, and the evaluation end will be the last timestamp for 
# new data points

train_ratio = 0.8
train_split = int(len(df.index)*train_ratio)

   
training_start   = pd.to_datetime(newdf.index[0])
training_end     = pd.to_datetime(newdf.index[train_split])
evaluation_start = pd.to_datetime(newdf.index[train_split+1])
evaluation_end   = pd.to_datetime(newdf.index.max())
    

print(f'Training period: from {training_start} to {training_end}')
print(f'Evaluation period: from {evaluation_start} to {evaluation_end}')

print('Dataset used:', DATASET_NAME)

In [None]:
REGION_NAME

### Finally retrain L4E based on Augmented dataset

In [None]:
# Prepare the model parameters:
lookout_model = lookout.LookoutEquipmentModel(model_name=MODEL_NAME,
                                              dataset_name=DATASET_NAME,
                                              region_name=REGION_NAME)

# Set the training / evaluation split date:
lookout_model.set_time_periods(evaluation_start,
                               evaluation_end,
                               training_start,
                               training_end)

# Set the label data location:
lookout_model.set_label_data(bucket=BUCKET, 
                             prefix=f'{PREFIX}/augmented-labelled-data/',
                             access_role_arn=ROLE_ARN)

# This sets up the rate the service will resample the data before 
# training:
lookout_model.set_target_sampling_rate(sampling_rate='PT10M')

In [None]:
# Actually create the model and train it:
lookout_model.train()

In [None]:
lookout_model.poll_model_training()

## End of notebook
