--- AWSTemplateFormatVersion: '2010-09-09' Description: HumanReadableAlertsHtmlForLookoutForMetrics IAM Policies, and Lambda Function to deliver human readable alerts in HTML format from Lookout for Metrics. Parameters: SenderEmailAddress: Type: String #Default: sender@your.domain Description: Enter the name email address which appears as the sender of the alert emails. ReceiverEmailAddress: Type: String #Default: receiver@your.domain Description: Enter the name email address where you wish to receive alerts. Resources: # Role for Lambda HumanReadableAlertHtmlLambdaRole: Type: AWS::IAM::Role Properties: RoleName: HumanReadableAlertHtmlLambdaRole AssumeRolePolicyDocument: Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: sts:AssumeRole Policies: - PolicyName: "HumanReadableAlertHtmlPolicyL4mSesLogs" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "lookoutmetrics:ListAnomalyGroupSummaries" - "lookoutmetrics:ListAnomalyGroupTimeSeries" Resource: "*" - Effect: "Allow" Action: "ses:SendEmail" Resource: - !Join - "" - - "arn:aws:ses:" - !Ref AWS::Region - ":" - !Ref AWS::AccountId - ":identity/" - !Ref SenderEmailAddress - !Join - "" - - "arn:aws:ses:" - !Ref AWS::Region - ":" - !Ref AWS::AccountId - ":identity/" - !Ref ReceiverEmailAddress - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: - "arn:aws:logs:*:*:log-group:/aws/lambda/*" - "arn:aws:logs:*:*:log-group:/aws/lambda/*:log-stream:*" # Lambda Function To Deliver Alerts HumanReadableAlertsHtmlLambdaFunction: DependsOn: HumanReadableAlertHtmlLambdaRole Type: AWS::Lambda::Function Properties: FunctionName: HumanReadableAlertsHtml Role: Fn::GetAtt: - HumanReadableAlertHtmlLambdaRole - Arn Runtime: python3.8 Handler: index.lambda_handler Environment: Variables: sender_email_address: !Ref SenderEmailAddress receiver_email_address: !Ref ReceiverEmailAddress Code: ZipFile: | import os import re import json import datetime import boto3 # ---- # Get email addresses from environment variables sender_email_address = os.environ["sender_email_address"] receiver_email_address = os.environ["receiver_email_address"] # ---- # Parse anomaly detector ARN and extract components def parse_anomaly_detector_arn(anomaly_detector_arn): re_result = re.match( r"arn:aws:lookoutmetrics:([a-z0-9-]+):([0-9]+):AnomalyDetector:(.*)", anomaly_detector_arn) region_name = re_result.group(1) account_id = re_result.group(2) anomaly_detector_name = re_result.group(3) return region_name, account_id, anomaly_detector_name # Parse L4M's timestamp string and construct a datetime object def parse_l4m_timestamp(s): re_result = re.match( r"([0-9]+)\-([0-9]+)\-([0-9]+)T([0-9]+):([0-9]+)Z\[(.+)\]", s) t = datetime.datetime( int(re_result.group(1)), int(re_result.group(2)), int(re_result.group(3)), int(re_result.group(4)), int(re_result.group(5)), ) return t # Format anomaly group detail page URL def create_url_to_console(anomaly_detector_arn, anomaly_group_id): region_name, _, _ = parse_anomaly_detector_arn(anomaly_detector_arn) url = f"https://{region_name}.console.aws.amazon.com/lookoutmetrics/home?region={region_name}#{anomaly_detector_arn}/anomalies/anomaly/{anomaly_group_id}" return url # Look up the anomaly group which matches the received alert, and gather details def lookup_anomaly_group_and_get_details(anomaly_detector_arn, alert_timestamp, alert_metric_name): relevant_time_series = [] # Extract region name from detector ARN region_name, _, _ = parse_anomaly_detector_arn(anomaly_detector_arn) # Create L4M client lookoutmetrics_client = boto3.client( "lookoutmetrics", region_name=region_name) # Find the anomaly group by checking metric names and timestamps anomaly_group_id = None next_token = None while True: params = { "AnomalyDetectorArn": anomaly_detector_arn, "SensitivityThreshold": 0, "MaxResults": 100, } if next_token: params["NextToken"] = next_token response = lookoutmetrics_client.list_anomaly_group_summaries(**params) for anomaly_group in response["AnomalyGroupSummaryList"]: anomaly_group_metric_name = anomaly_group['PrimaryMetricName'] anomaly_group_start_time = parse_l4m_timestamp( anomaly_group["StartTime"]) anomaly_group_end_time = parse_l4m_timestamp( anomaly_group["EndTime"]) # Check if this AnomalyGroup matches if alert_metric_name == anomaly_group_metric_name: if anomaly_group_start_time <= alert_timestamp and alert_timestamp <= anomaly_group_end_time: anomaly_group_id = anomaly_group["AnomalyGroupId"] break # Exit loop when found if anomaly_group_id: break # Loop continues when NextToken is included in the response if "NextToken" in response: next_token = response["NextToken"] continue # Not found break # If anomaly group was found, gather all relevant time series if anomaly_group_id: next_token = None while True: params = { "AnomalyDetectorArn": anomaly_detector_arn, "AnomalyGroupId": anomaly_group_id, "MetricName": alert_metric_name, "MaxResults": 100, } if next_token: params["NextToken"] = next_token response = lookoutmetrics_client.list_anomaly_group_time_series( **params) # Gather relevant time series for time_series in response["TimeSeriesList"]: relevant_time_series.append(time_series["DimensionList"]) # Loop continues when NextToken is included in the response if "NextToken" in response: next_token = response["NextToken"] continue break else: raise KeyError("AnomalyGroup not found") # Return anomaly group id, and list of relevant time series return anomaly_group_id, relevant_time_series # Compile email subject and HTML body from gathered information def create_email_contents(anomaly_detector_arn, alert_timestamp, alert_metric_name, anomaly_score, anomaly_group_id, relevant_time_series): # Format email subject subject = "L4M Alert - %s - %s" % (alert_metric_name, alert_timestamp.strftime("%Y-%m-%d %H:%M")) # Begin formatting HTML body html_body = '\n' # Table of summary html_body += '\n' html_body += "\n" % ( "Measure name :", alert_metric_name) html_body += "\n" % ( "Timestamp :", alert_timestamp.strftime("%Y-%m-%d %H:%M")) html_body += "\n" % ( "Anomaly score :", "%.2f" % anomaly_score) html_body += "\n" % ( "Num relevant time series :", "%d" % len(relevant_time_series)) html_body += '
%s %s
%s %s
%s %s
%s %s
\n' html_body += "
\n" # Prepare list of dimension names for the relevant time series table (to be consistently sorted) dimension_names = [dimension["DimensionName"] for dimension in relevant_time_series[0]] # Table of relevant time series html_body += '\n' html_body += '\n' # Header row with dimension names html_body += '' for dimension_name in dimension_names: html_body += "" % dimension_name html_body += "\n" # Data rows with dimension values for dimension_list in relevant_time_series: # Converting to python dictionary to easily lookup dimension_name_value_map = {} for dimension_name_value in dimension_list: dimension_name_value_map[dimension_name_value["DimensionName"] ] = dimension_name_value["DimensionValue"] html_body += "" for dimension_name in dimension_names: html_body += "" % dimension_name_value_map[dimension_name] html_body += "\n" html_body += '
List of relevant time series
%s
%s
\n' html_body += "
\n" # Direct link to anomaly group detail page console_url = create_url_to_console(anomaly_detector_arn, anomaly_group_id) html_body += 'Link to Lookout for Metrics console\n' % console_url html_body += "" # Return compiled email contents return subject, html_body # Send the HTML formatted email via SES def send_email(subject, html_body): ses_client = boto3.client("ses") params = { "Destination": { "ToAddresses": [ receiver_email_address ] }, "Message": { "Subject": { "Charset": "UTF-8", "Data": subject, }, "Body": { "Html": { "Charset": "UTF-8", "Data": html_body } }, }, "Source": sender_email_address, } ses_client.send_email(** params) # Lambda entrypoint def lambda_handler(event, context): # Get necessary information from the lambda event anomaly_detector_arn = event["anomalyDetectorArn"] alert_timestamp = parse_l4m_timestamp(event["timestamp"]) alert_metric_name = event["impactedMetric"]["metricName"] anomaly_score = event["anomalyScore"] # Lookup anomaly group by metric name and timestamp anomaly_group_id, relevant_time_series = lookup_anomaly_group_and_get_details( anomaly_detector_arn, alert_timestamp, alert_metric_name) # Create email contents from the identified information subject, html_body = create_email_contents( anomaly_detector_arn, alert_timestamp, alert_metric_name, anomaly_score, anomaly_group_id, relevant_time_series) # Send email by SES send_email(subject, html_body) # Returning from Lambda function return { 'statusCode': 200, 'body': json.dumps('Message Delivered!') }