import boto3
import json
import matplotlib.pyplot as plt
import pandas as pd
l4e_client = boto3.client('lookoutequipment')
cw_client = boto3.client('cloudwatch')
def create_button(action,
payload,
label,
display_mode='popup',
disabled=False):
"""
This function creates an HTML button the user can interact with from
a CloudWatch custom widget.
Parameters:
action (string):
The ARN of a lambda function to call when clicking on this button
payload (string):
A JSON formatted string that will be passed as argument to the
lambda function used as endpoint for this button
label (string):
The label to use on the button
display_mode (string):
Can either be `popup` (display the result in a popup) or `widget`
(to replace the content of the current widget by the output of the
Lambda function used as endpoint). Defaults to `popup`
disabled (boolean):
Set to True to display a disabled button
Returns:
string: an HTML string with the button to display
"""
if disabled:
button_style = 'awsui-button-disabled'
disabled = 'disabled=""'
else:
button_style = 'awsui-button awsui-button-variant-primary'
disabled = ""
button = (
f'{label}\n'
''
f'{json.dumps(payload)}'
'\n'
)
return button
def create_link(href, label, disabled=False):
"""
Creates a simple button to redirect the user to a given link
Parameters:
href (string):
The link to redirect the user to
label (string):
The label to use as the button label
disabled (boolean):
Set to True to display a disabled button. Defaults to False
Returns:
string: an HTML string with the button to display
"""
if disabled:
button_style = 'btn awsui-button-disabled'
disabled = 'disabled=""'
else:
button_style = 'btn'
disabled = ""
if href != '':
href = f'href="{href}"'
button = f'{label}'
return button
def get_dashboard_list(dashboard_name_prefix=None):
"""
This function lists all the dashboards currently available in CloudWatch
Parameters:
dashboard_name_prefix (string):
Filters out the dashboards that do not start with this prefix.
Defaults to None (no filter)
Returns:
list of of CloudWatch dashboards found
"""
cw_client = boto3.client('cloudwatch')
if dashboard_name_prefix is None:
response = cw_client.list_dashboards()
else:
response = cw_client.list_dashboards(
DashboardNamePrefix=dashboard_name_prefix
)
dashboard_entries = []
for dashboard in response['DashboardEntries']:
dashboard_entries.append(dashboard['DashboardName'])
return dashboard_entries
def dashboard_exists(dashboard_name):
"""
Checks if a dashboard with this name exists
Parameters:
dashboard_name (string):
Name of the dashboard to check existence for
Returns:
Boolean: returns True if a dashboard with this name already exists and
False otherwise.
"""
# Lists all the dashbard with a name that starts with the searched string:
list_dashboard = get_dashboard_list(dashboard_name)
return dashboard_name in list_dashboard
def build_dashboard_list(client=None):
"""
Query CloudWatch to list all existing dashboard in the current account
Parameters:
client (boto3.Client):
A boto3 client to query the CloudWatch service. Defaults to None
Returns:
dashboard_entries (list): a list with the name of all the dashboards
found in this account
"""
if client is None:
response = cw_client.list_dashboards()
else:
response = client.list_dashboards()
dashboard_entries = []
for dashboard in response['DashboardEntries']:
dashboard_entries.append(dashboard['DashboardName'])
return dashboard_entries
def get_model_tags(model_name):
"""
List of the tags in a key/value dictionary associated to a given Lookout
for Equipment model.
Parameters:
model_name (string):
Name of the model we want the tags list from
Returns:
tags (Dict): a dictionnary with all the tags keys and values that are
attached to the model passed as argument
"""
model_arn = l4e_client.describe_model(ModelName=model_name)['ModelArn']
model_tags = l4e_client.list_tags_for_resource(ResourceArn=model_arn)['Tags']
tags = dict()
for tag in model_tags:
tags.update({tag['Key']: tag['Value']})
return tags
def set_aws_stylesheet():
"""
This function loads a color branding consistent with the Polaris design
the CloudWatch console if using
Returns:
colors (List): a list of all colors defined in this template
"""
# Load AWS light background style sheet:
plt.style.use('/opt/python/aws_color_branding_light.mpl')
# Get colors from custom AWS palette:
prop_cycle = plt.rcParams['axes.prop_cycle']
colors = prop_cycle.by_key()['color']
return colors
def assign_color(value, threshold, colors):
"""
Given a threshold, match the passed value with a color from the AWS color
styles. Anything greater that the threshold will yield a red color, anything
else with a green color
Parameters:
value (float):
The value to consider
threshold (float):
The threshold to compare the value with
colors (list of strings):
A list of strings with hexadecimal definitions of colors
Returns:
string: an hexadecimal string coding red if the value is greater than
the threshold and green otherwise
"""
if value > threshold:
return colors[5]
else:
return colors[9]
def get_matching_s3_keys(bucket, prefix='', suffix=''):
"""
Generate the keys in an S3 bucket.
Parameters:
bucket (string):
Name of the S3 bucket
prefix (string):
Only fetch keys that start with this prefix (optional)
suffix (string):
Only fetch keys that end with this suffix (optional)
"""
s3 = boto3.client('s3')
kwargs = {'Bucket': bucket}
# If the prefix is a single string (not a tuple of strings), we can
# do the filtering directly in the S3 API.
if isinstance(prefix, str):
kwargs['Prefix'] = prefix
while True:
# The S3 API response is a large blob of metadata.
# 'Contents' contains information about the listed objects.
resp = s3.list_objects_v2(**kwargs)
try:
for obj in resp['Contents']:
key = obj['Key']
if key.startswith(prefix) and key.endswith(suffix):
yield key
except KeyError:
print(f'No object found in s3://{bucket}/{prefix}/')
# The S3 API is paginated, returning up to 1000 keys at a time.
# Pass the continuation token into the next response, until we
# reach the final page (when this field is missing).
try:
kwargs['ContinuationToken'] = resp['NextContinuationToken']
except KeyError:
break
def convert_ranges(ranges_df, start_date, end_date, default_freq='1min'):
"""
This method expands a list of ranges into an datetime index
pandas.Series
Parameters:
ranges_df (pandas.DataFrame):
A dataframe with two columns, the start and end timestamp of
each event
default_freq (string):
The default frequency to generate the time range for. This will
be used to generate the DateTimeIndex for this pandas.Series
Returns:
pandas.DataFrame: a dataframe with a DateTimeIndex spanning from the
minimum to the maximum timestamps present in the input dataframe.
This will be a single Series named "Label" where a value of 1.0
will correspond to the presence of an event (labels or anomalies).
"""
range_index = pd.date_range(
start=start_date,
end=end_date,
freq=default_freq
)
range_data = pd.DataFrame(index=range_index)
range_data.loc[:, 'Label'] = 0.0
for _, row in ranges_df.iterrows():
event_start = row[0]
event_end = row[1]
range_data.loc[event_start:event_end, 'Label'] = 1.0
return range_data
def plot_ranges(range_df, range_title, color, ax):
"""
Plot a range with either labelled or predicted events as a filled
area positionned under the timeseries data.
Parameters:
range_df (pandas.DataFrame):
A DataFrame that must contain at least a DateTimeIndex and a
column called "Label"
range_title (string):
Title of the ax containing this range
color (string):
A string used as a color for the filled area of the plot
ax (matplotlib.pyplot.Axis):
The ax in which to render the range plot
"""
ax.plot(range_df['Label'], color=color)
ax.axes.get_xaxis().set_ticks([])
ax.axes.get_yaxis().set_ticks([])
ax.set_xlabel(range_title, fontsize=12)
def expand_results(df):
"""
Let's first expand the results to expose the content of the diagnostics
column above into different dataframe columns
"""
expanded_results = []
for _, row in df.iterrows():
new_row = dict()
new_row.update({'start': row['start']})
new_row.update({'end': row['end']})
new_row.update({'prediction': 1.0})
diagnostics = pd.DataFrame(row['diagnostics'])
diagnostics = dict(zip(diagnostics['name'], diagnostics['value']))
new_row = {**new_row, **diagnostics}
expanded_results.append(new_row)
expanded_results = pd.DataFrame(expanded_results)
tags_list = expanded_results.columns[3:]
tags_list = {t.split('\\')[-1]: t for t in tags_list}
df_list = []
for _, row in expanded_results.iterrows():
new_index = pd.date_range(start=row['start'], end=row['end'], freq='1T')
new_df = pd.DataFrame(index=new_index)
for tag, col in tags_list.items():
new_df[tag] = row[col]
df_list.append(new_df)
expanded_results_v2 = pd.concat(df_list, axis='index')
return expanded_results_v2
def list_inference_executions(scheduler_name,
execution_status=None,
start_time=None,
end_time=None,
max_results=50):
"""
This method lists all the past inference execution triggered by a
given scheduler.
PARAMS
======
execution_status: string (default: None)
Only keep the executions with a given status
start_time: pandas.DateTime (default: None)
Filters out the executions that happened before start_time
end_time: pandas.DateTime (default: None)
Filters out the executions that happened after end_time
max_results: integer (default: 50)
Max number of results you want to get out of this method
RETURNS
=======
results_df: list of dict
A list of all past inference executions, with each inference
attributes stored in a python dictionary
"""
# Built the execution request object:
list_executions_request = {"MaxResults": max_results}
list_executions_request["InferenceSchedulerName"] = scheduler_name
if execution_status is not None:
list_executions_request["Status"] = execution_status
if start_time is not None:
list_executions_request['DataStartTimeAfter'] = start_time
if end_time is not None:
list_executions_request['DataEndTimeBefore'] = end_time
# Loops through all the inference executed by the current scheduler:
has_more_records = True
list_executions = []
while has_more_records:
list_executions_response = l4e_client.list_inference_executions(
**list_executions_request
)
if "NextToken" in list_executions_response:
list_executions_request["NextToken"] = list_executions_response["NextToken"]
else:
has_more_records = False
list_executions = list_executions + \
list_executions_response["InferenceExecutionSummaries"]
# Returns all the summaries in a list:
return list_executions