import boto3
import json
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import sys
from l4ecwcw import *
from io import StringIO
import matplotlib.ticker as mtick
# Mandatory to ensure text is rendered in SVG plots:
matplotlib.rcParams['svg.fonttype'] = 'none'
dpi = 100
# Making Lookout for Equipment client available to all methods in this Lambda:
l4e_client = boto3.client('lookoutequipment')
s3 = boto3.resource('s3')
def get_execution_summary(event, context):
"""
Entry point of the lambda function
Returns:
widget_html (string): an HTML-formatted string that can be displayed
by a CloudWatch custom widgets
"""
scheduler_name = event['scheduler_name']
widget_context = event['widgetContext']
width = widget_context['width']
height = widget_context['height']
svg = build_execution_summary(scheduler_name, width, height)
return svg
def build_execution_summary(scheduler_name, width, height):
list_executions = execution_summaries = list_inference_executions(
scheduler_name,
start_time=None,
end_time=None,
execution_status=None
)
# Loops through the executions summaries:
results_json = []
for execution_summary in list_executions:
# We only request an output if the inference execution is a sucess:
status = execution_summary['Status']
if status == 'SUCCESS':
# Download the JSON-line file locally:
bucket = execution_summary['CustomerResultObject']['Bucket']
key = execution_summary['CustomerResultObject']['Key']
current_timestamp = key.split('/')[-2]
local_fname = os.path.join('/tmp', f'{scheduler_name}_{current_timestamp}.jsonl')
if not os.path.exists(local_fname):
content_object = s3.Object(bucket, key)
file_content = content_object.get()['Body'].read().decode('utf-8')
with open(local_fname, 'w') as f:
f.write(file_content)
# Opens the file and concatenate the results into a dataframe:
with open(local_fname, 'r') as f:
content = [eval(line) for line in f.readlines()]
results_json = results_json + content
# Build the final dataframes with all the results:
results_df = pd.DataFrame(results_json)
results_df['timestamp'] = pd.to_datetime(results_df['timestamp'])
results_df = results_df.set_index('timestamp')
results_df = results_df.sort_index()
expanded_results = expand_signal_diagnostics(results_df)
expanded_results = expanded_results[expanded_results['prediction'] == 1]
if expanded_results.shape[0] > 0:
event_details = pd.DataFrame(expanded_results.iloc[-1, 1:]).reset_index()
event_details.columns = ['name', 'value']
event_details = event_details.sort_values(by='value', ascending=False)
event_details = event_details.iloc[:15, :].reset_index(drop=True)
event_details = event_details.sort_values(by='value', ascending=True)
title = f'Last event detected at {expanded_results.index[-1]}'
html = plot_single_diagnostic(
event_details,
len(expanded_results.columns) - 1,
title,
width,
height
)
else:
html = '
No anomaly detected by this scheduler yet
'
return html
def list_inference_executions(scheduler_name,
execution_status=None,
start_time=None,
end_time=None,
max_results=50):
"""
This method lists all the past inference execution triggered by a
given scheduler.
PARAMS
======
execution_status: string (default: None)
Only keep the executions with a given status
start_time: pandas.DateTime (default: None)
Filters out the executions that happened before start_time
end_time: pandas.DateTime (default: None)
Filters out the executions that happened after end_time
max_results: integer (default: 50)
Max number of results you want to get out of this method
RETURNS
=======
results_df: list of dict
A list of all past inference executions, with each inference
attributes stored in a python dictionary
"""
# Built the execution request object:
list_executions_request = {"MaxResults": max_results}
list_executions_request["InferenceSchedulerName"] = scheduler_name
if execution_status is not None:
list_executions_request["Status"] = execution_status
if start_time is not None:
list_executions_request['DataStartTimeAfter'] = start_time
if end_time is not None:
list_executions_request['DataEndTimeBefore'] = end_time
# Loops through all the inference executed by the current scheduler:
has_more_records = True
list_executions = []
while has_more_records:
list_executions_response = l4e_client.list_inference_executions(
**list_executions_request
)
if "NextToken" in list_executions_response:
list_executions_request["NextToken"] = list_executions_response["NextToken"]
else:
has_more_records = False
list_executions = list_executions + \
list_executions_response["InferenceExecutionSummaries"]
# Returns all the summaries in a list:
return list_executions
def expand_signal_diagnostics(results_df):
expanded_results = []
for index, row in results_df.iterrows():
new_row = dict()
new_row.update({'timestamp': index})
new_row.update({'prediction': row['prediction']})
if row['prediction'] == 1:
diagnostics = pd.DataFrame(row['diagnostics'])
diagnostics = dict(zip(diagnostics['name'], diagnostics['value']))
new_row = {**new_row, **diagnostics}
expanded_results.append(new_row)
expanded_results = pd.DataFrame(expanded_results)
expanded_results['timestamp'] = pd.to_datetime(expanded_results['timestamp'])
expanded_results = expanded_results.set_index('timestamp')
return expanded_results
def plot_single_diagnostic(event_details, num_signals, title, width, height):
# We can then plot a horizontal bar chart:
colors = set_aws_stylesheet()
y_pos = np.arange(event_details.shape[0])
values = list(event_details['value'])
threshold = 1 / num_signals
signal_color = {v: assign_color(v, threshold, colors) for v in values}
signal_color = list(signal_color.values())
# fig = plt.figure(figsize=(12,10))
fig = plt.figure(figsize=(width/dpi, height/dpi), dpi=dpi)
ax = plt.subplot(1,1,1)
ax.barh(y_pos, event_details['value'], align='center', color=signal_color)
ax.set_yticks(y_pos)
ax.set_yticklabels(event_details['name'])
ax.xaxis.set_major_formatter(mtick.PercentFormatter(1.0))
# Add the values in each bar:
for i, v in enumerate(values):
if v == 0:
ax.text(0.0005, i, f'{v*100:.2f}%', color='#000000', verticalalignment='center')
else:
ax.text(0.0005, i, f'{v*100:.2f}%', color='#FFFFFF', fontweight='bold', verticalalignment='center')
ax.vlines(x=threshold, ymin=-0.5, ymax=np.max(y_pos) + 0.5, linestyle='--', linewidth=2.0, color=colors[0])
ax.vlines(x=threshold, ymin=-0.5, ymax=np.max(y_pos) + 0.5, linewidth=4.0, alpha=0.3, color=colors[0])
plt.title(title)
svg_io = StringIO()
fig.savefig(svg_io, format="svg", bbox_inches='tight')
return svg_io.getvalue().replace('DejaVu Sans', 'Amazon Ember')