Parameters: PlaybookIAMRole: Type: String Resources: PlaybookGatherAppResourceAlarm: Type: "AWS::SSM::Document" Properties: DocumentType: Automation Name: Playbook-Gather-Resources Content: schemaVersion: '0.3' assumeRole: "{{AutomationAssumeRole}}" parameters: AlarmARN: description: (Required) The Alarm ARN triggering incident. type: String AutomationAssumeRole: type: String default: !Ref PlaybookIAMRole description: (Optional) The ARN of the role that allows Automation to perform the actions on your behalf. outputs: - Gather_Resources_For_Alarm.Resources mainSteps: - name: Gather_Resources_For_Alarm action: aws:executeScript description: Gather AWS resources related to the Alarm, based on it's Tag outputs: - Name: Resources Selector: $.Payload.ApplicationStackResources Type: String inputs: Runtime: python3.6 Handler: handler InputPayload: CloudWatchAlarmARN: '{{AlarmARN}}' Script: |- import json import re from datetime import datetime import boto3 import os def arn_deconstruct(arn): # arn:aws:cloudwatch:us-east-1:754323466686:alarm:mysecretword-canary-alarm arnlist = arn.split(":") service=arnlist[2] region=arnlist[3] accountid=arnlist[4] servicetype=arnlist[5] name=arnlist[6] return { "Service": service, "Region": region, "AccountId": accountid, "Type": servicetype, "Name": name } def locate_alarm_source(alarm): cwclient = boto3.client('cloudwatch', region_name = alarm['Region'] ) alarm_source = {} alarm_detail = cwclient.describe_alarms(AlarmNames=[alarm['Name']]) if len(alarm_detail['MetricAlarms']) > 0: metric_alarm = alarm_detail['MetricAlarms'][0] namespace = metric_alarm['Namespace'] # Condition if NameSpace is CloudWatch Syntetics if namespace == 'CloudWatchSynthetics': if 'Dimensions' in metric_alarm: dimensions = metric_alarm['Dimensions'] for i in dimensions: if i['Name'] == 'CanaryName': source_name = i['Value'] alarm_source['Type'] = namespace alarm_source['Name'] = source_name alarm_source['Region'] = alarm['Region'] alarm_source['AccountId'] = alarm['AccountId'] result = alarm_source return result # #Condition for CompositeAlarms # if len(alarm_detail['CompositeAlarms']) > 0: def locate_canary_endpoint(canaryname,region): result = None synclient = boto3.client('synthetics', region_name = region ) res = synclient.get_canary(Name=canaryname) canary = res['Canary'] if 'Tags' in canary: if 'TargetEndpoint' in canary['Tags']: target_endpoint = canary['Tags']['TargetEndpoint'] result = target_endpoint return result def locate_app_tag_value(resource): result = None if resource['Type'] == 'CloudWatchSynthetics': synclient = boto3.client('synthetics', region_name = resource['Region'] ) res = synclient.get_canary(Name=resource['Name']) canary = res['Canary'] if 'Tags' in canary: if 'Application' in canary['Tags']: apptag_val = canary['Tags']['Application'] result = apptag_val return result def locate_app_resources_by_tag(tag,region): result = None # Search CloufFormation Stacks for tag cfnclient = boto3.client('cloudformation', region_name = region ) list = cfnclient.list_stacks(StackStatusFilter=['CREATE_COMPLETE','ROLLBACK_COMPLETE','UPDATE_COMPLETE','UPDATE_ROLLBACK_COMPLETE','IMPORT_COMPLETE','IMPORT_ROLLBACK_COMPLETE'] ) for stack in list['StackSummaries']: app_resources_list = [] stack_name = stack['StackName'] stack_details = cfnclient.describe_stacks(StackName=stack_name) stack_info = stack_details['Stacks'][0] if 'Tags' in stack_info: for t in stack_info['Tags']: if t['Key'] == 'Application' and t['Value'] == tag: app_stack_name = stack_info['StackName'] app_resources = cfnclient.describe_stack_resources(StackName=app_stack_name) for resource in app_resources['StackResources']: app_resources_list.append( { 'PhysicalResourceId' : resource['PhysicalResourceId'], 'Type': resource['ResourceType'] } ) result = app_resources_list return result def handler(event, context): result = {} arn = event['CloudWatchAlarmARN'] alarm = arn_deconstruct(arn) # Locate tag from CloudWatch Alarm alarm_source = locate_alarm_source(alarm) # Identify Alarm Source tag_value = locate_app_tag_value(alarm_source) #Identify tag from source if alarm_source['Type'] == 'CloudWatchSynthetics': endpoint = locate_canary_endpoint(alarm_source['Name'],alarm_source['Region']) result['CanaryEndpoint'] = endpoint # Locate cloudformation with tag resources = locate_app_resources_by_tag(tag_value,alarm['Region']) result['ApplicationStackResources'] = json.dumps(resources) return result