# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. import boto3 import os import re from collections import Counter # Variables # Max number of findings to return. Although 50 can be returned without paginating, # keeping this below 15 is a good idea to avoid Alexa size limit. MAXRESP = os.environ['MAXRESP'] # Comma separated list of region codes with NO spaces to include in flash briefing stats. # GuardDuty must be enabled in declared regions. FLASHREGIONS = os.environ['FLASHREGIONS'] def lambda_handler(event, context): """ Route the incoming request based on type (LaunchRequest, IntentRequest, etc.) The JSON body of the request is provided in the event parameter. """ print("event.session.application.applicationId=" + event['session']['application']['applicationId']) """ Uncomment this if statement and populate with your skill's application ID to prevent someone else from configuring a skill that sends requests to this function. """ # if (event['session']['application']['applicationId'] != # "amzn1.echo-sdk-ams.app.[unique-value-here]"): # raise ValueError("Invalid Application ID") if event['session']['new']: on_session_started({'requestId': event['request']['requestId']}, event['session']) if event['request']['type'] == "LaunchRequest": return on_launch(event['request'], event['session']) elif event['request']['type'] == "IntentRequest": return on_intent(event['request'], event['session']) elif event['request']['type'] == "SessionEndedRequest": return on_session_ended(event['request'], event['session']) def on_session_started(session_started_request, session): """ Called when the session starts """ print("on_session_started requestId=" + session_started_request['requestId'] + ", sessionId=" + session['sessionId']) def on_launch(launch_request, session): """ Called when the user launches the skill without specifying what they want """ print("on_launch requestId=" + launch_request['requestId'] + ", sessionId=" + session['sessionId']) # Dispatch to skill's launch return get_welcome_response() def on_intent(intent_request, session): """ Called when the user specifies an intent for this skill """ print("on_intent requestId=" + intent_request['requestId'] + ", sessionId=" + session['sessionId']) intent = intent_request['intent'] intent_name = intent_request['intent']['name'] # Dispatch intent handlers if intent_name == "FlashBriefing": return get_flash_briefing(intent, session) elif intent_name == "SetRegion": return set_region_in_session(intent, session) elif intent_name == "ListFindings": return list_findings(intent, session) elif intent_name == "ListStats": return list_stats(intent, session) elif intent_name == "AMAZON.CancelIntent" or intent_name == "AMAZON.StopIntent": return handle_session_end_request() elif intent_name == "AMAZON.HelpIntent": return get_help() else: raise ValueError("Invalid intent") def on_session_ended(session_ended_request, session): """ Called when the user ends the session. Is not called when the skill returns should_end_session=true """ print("on_session_ended requestId=" + session_ended_request['requestId'] + ", sessionId=" + session['sessionId']) # --------------- Functions that control the skill's behavior ------------------ # Initial welcome def get_welcome_response(): session_attributes = {} card_title = "Ask GuardDuty Welcome" should_end_session = False speech_output = "Welcome to Ask GuardDuty. To get started, you can get" \ " global GuardDuty finding statistics by saying, get flash briefing. For additional information, you can say, Help." reprompt_text = "Are you still there? For regional statistics, you can say for example, get statistics for Virginia." \ " Or, get high severity findings for Oregon. You can also get" \ " global statistics by saying, Get flash briefing. For additional information, you can say, Help." return build_response(session_attributes, build_speechlet_response( card_title, speech_output, reprompt_text, should_end_session)) def create_selected_region(selected_region): return {"selectedRegion": selected_region} # Return region ID def get_region_id(selectedRegion): if selectedRegion in {"Virginia", "virginia", "Northern Virginia", "northern virginia", "northern Virginia"}: return {"regionId": "us-east-1"} elif selectedRegion in {"Ohio", "ohio"}: return {"regionId": "us-east-2"} elif selectedRegion in {"Frankfurt", "frankfurt"}: return {"regionId": "eu-central-1"} elif selectedRegion in {"California", "california", "Northern California", "northern california", "northern California"}: return {"regionId": "us-west-1"} elif selectedRegion in {"Oregon", "oregon"}: return {"regionId": "us-west-2"} elif selectedRegion in {"London", "london"}: return {"regionId": "eu-west-2"} elif selectedRegion in {"Ireland", "ireland"}: return {"regionId": "eu-west-1"} elif selectedRegion in {"Singapore", "singapore"}: return {"regionId": "ap-southeast-1"} elif selectedRegion in {"Sydney", "sydney"}: return {"regionId": "ap-southeast-2"} elif selectedRegion in {"Canada", "canada", "Central", "central"}: return {"regionId": "ca-central-1"} elif selectedRegion in {"Sao Paulo", "sao paulo", "sao Paulo"}: return {"regionId": "sa-east-1"} elif selectedRegion in {"Seoul", "seoul"}: return {"regionId": "ap-northeast-2"} elif selectedRegion in {"Mumbai", "mumbai"}: return {"regionId": "ap-south-1"} elif selectedRegion in {"Tokyo", "tokyo"}: return {"regionId": "ap-northeast-1"} else: return {"regionId": "Unknown"} # Return region friendly name def get_region_name(region_id): if region_id == "us-east-1": return {"regionName": "Virginia"} elif region_id == "us-east-2": return {"regionName": "Ohio"} elif region_id == "us-west-1": return {"regionName": "California"} elif region_id == "eu-central-1": return {"regionName": "Frankfurt"} elif region_id == "us-west-2": return {"regionName": "Oregon"} elif region_id == "eu-west-2": return {"regionName": "London"} elif region_id == "ap-southeast-1": return {"regionName": "Singapore"} elif region_id == "ap-southeast-2": return {"regionName": "Sydney"} elif region_id == "eu-west-1": return {"regionName": "Ireland"} elif region_id == "ca-central-1": return {"regionName": "Canada"} elif region_id == "sa-east-1": return {"regionName": "Sao Paulo"} elif region_id == "ap-northeast-2": return {"regionName": "Seoul"} elif region_id == "ap-south-1": return {"regionName": "Mumbai"} elif region_id == "ap-northeast-1": return {"regionName": "Tokyo"} else: return {"regionName": ""} # Get GuardDuty Flash Briefing def get_flash_briefing(intent, session): session_attributes = {} card_title = "Ask GuardDuty Flash Briefing" should_end_session = False flashglobal = getflashbrief()[0] flashregion = getflashbrief()[1] if flashglobal: speech_output = " Here is your GuardDuty flash briefing." \ " Globally, there are, " + str(flashglobal) + " findings." \ " Here are the regional finding statistics: " + str(flashregion) + "." else: speech_output = "There are no current GuardDuty findings for the selected AWS regions." \ " You can generate samples in the console and GuardDuty will" \ " populate your current list with one sample finding for each supported type. " reprompt_text = "Are you still there? For regional statistics, you can say for example, get statistics for Virginia." \ " Or, get high severity findings for Oregon." \ " For additional information, you can say, Help." return build_response(session_attributes, build_speechlet_response( card_title, speech_output, reprompt_text, should_end_session)) # Get finding details for region X and severity Y def list_findings(intent, session): session_attributes = {} card_title = "Ask GardDuty Finding Details" should_end_session = False selected_region = intent['slots']['selectedRegion']['value'] session_attributes = create_selected_region(selected_region) region_name = get_region_id(selected_region)['regionId'] # Try to catch unknown or unsupported region if region_name == 'Unknown': speech_output = "I'm not sure which AWS region you would like me to access." \ " Please confirm the selected region is valid and GuardDuty is enabled." reprompt_text = "Please confirm the selected region is valid and GuardDuty is enabled." return build_response(session_attributes, build_speechlet_response( card_title, speech_output, reprompt_text, should_end_session)) if region_name: try: sevname = intent['slots']['SevName']['value'] min_sev = getsevvalue(sevname)['MinSev'] except KeyError: min_sev = '0' # In case GD is not enabled for selected region. try: gdfindings = getfindings(minsev=min_sev, region_name=region_name)['Findings'] except TypeError: gdfindings = [0] sgdfindings = [] if 'selectedRegion' in intent['slots'] and gdfindings != [0]: selected_region = intent['slots']['selectedRegion']['value'] for f in gdfindings: sevname = getsevname(str(f['Severity']))['SeverityName'] description = str(f['Title']) sgdfindings.append("Severity, " + sevname + ", " + "Count, " + str(f['Service']['Count']) + ", " + description) # Clean up output findings = scruboutput(inputtxt = str(sgdfindings)) if gdfindings: speech_output = "Here are up to " + MAXRESP + " GuardDuty findings for, " + selected_region + ", with minimum severity " + str(min_sev) + ". " + str(findings) + "" else: speech_output = "There are no current GuardDuty findings for, " + selected_region + ", with minimum severity " + str(min_sev) + "." else: speech_output = "The was a problem retrieving the information. Please confirm GuardDuty" \ " is enabled in the " + selected_region + " region." reprompt_text = "Are you still there? You can get GuardDuty" \ " finding details by saying for example, get high severity findings for Oregon. You can also get " \ " global statistics by saying, Get flash briefing. For additional information, you can say, Help." return build_response(session_attributes, build_speechlet_response( card_title, speech_output, reprompt_text, should_end_session)) # Get statistics by region. def list_stats(intent, session): session_attributes = {} card_title = "Ask GuardDuty Finding Statistics" should_end_session = False selected_region = intent['slots']['selectedRegion']['value'] session_attributes = create_selected_region(selected_region) region_name = get_region_id(selected_region)['regionId'] if region_name == 'Unknown': speech_output = "I'm not sure which AWS region you would like me to access." \ " Please confirm the selected region is valid and GuardDuty is enabled." reprompt_text = "Are you still there? You can get GuardDuty" \ " finding details by saying, get high severity findings for Virginia. Or you can get global statistics by saying," \ " Get flash briefing. For additional information, you can say, Help." return build_response(session_attributes, build_speechlet_response( card_title, speech_output, reprompt_text, should_end_session)) try: gdstats = getstats(region_name=region_name)['FindingStatistics']['CountBySeverity'] # In case GD is not enabled for selected region. except TypeError or KeyError: gdstats = [0] sgdstats = [] if 'selectedRegion' in intent['slots'] and gdstats != [0]: selected_region = intent['slots']['selectedRegion']['value'] for key, value in gdstats.items(): sevname = getsevname(key)['SeverityName'] sgdstats.append(" " + str(value) + " " + sevname + " severity") # Clean up output stats = scruboutput(inputtxt = str(sgdstats)) if sgdstats: speech_output = " In " + selected_region + ", there are currently, " + str(stats) + " findings. " else: speech_output = "There are no current findings in " + selected_region + "." \ " You can generate samples in the console and GuardDuty will" \ " populate your current list with one sample finding for each supported type. " else: speech_output = "The was a problem retrieving the information. Please confirm GuardDuty" \ " is enabled in the " + selected_region + " region." reprompt_text = "Are you still there? You can get GuardDuty" \ " finding details by saying, get high severity findings for Virginia. Or you can get global statistics by saying," \ " Get flash briefing. For additional information, you can say, Help." return build_response(session_attributes, build_speechlet_response( card_title, speech_output, reprompt_text, should_end_session)) # Get Help def get_help(): session_attributes = {} card_title = "Ask GuardDuty Help" should_end_session = False speech_output = "Welcome to Ask GardDuty. Amazon GuardDuty is a managed" \ " threat detection service that continuously monitors for" \ " malicious or unauthorized behavior to help you protect your AWS accounts and workloads." \ " GuardDuty generates findings when it detects unexpected and potentially malicious activity" \ " in your AWS environment. To get started, you can get" \ " global GuardDuty finding statistics by saying, get flash briefing. I am currently configured" \ " to return flash briefing information for the following AWS regions: " + getflashregions() + "." \ " You can also get finding statistics for a region by saying for example, get statistics for Oregon." \ " I can retrieve information for other AWS regions where GuardDuty is enabled." \ " You can get GuardDuty finding details by saying for example," \ " get high severity findings for California." \ " I am currently configured to return up to " + MAXRESP + " findings in a response." \ " Each GuardDuty finding has an assigned severity level and value that can help you determine your " \ " response to a potential security issue that is highlighted by a finding. The value of the severity " \ " can fall within the 0.1 to 8.9 range. High severity findings fall within the 7.0 to 8.9 range," \ " medium severity falls within the 4.0 to 6.9 range and low severity falls within the 0.1 to 3.9 range." \ " You can generate samples in the console and GuardDuty will" \ " populate your current list with one sample finding for each supported type." \ " Finally, make sure GuardDuty is enabled in AWS regions you would like me to access." reprompt_text = "Are you still there? To get started, you can say, get statistics for Virginia. You can also get" \ " global statistics by saying, get flash briefing. For additional information, you can say, Help." return build_response(session_attributes, build_speechlet_response( card_title, speech_output, reprompt_text, should_end_session)) # Return GuardDuty detector Id for region def getdetectorid(region_name): gdclient = boto3.client('guardduty', region_name=region_name) try: response = gdclient.list_detectors()['DetectorIds'][0] except IndexError: response = [] return response # Return finding severity name based on value def getsevname(sevlevel): if float(sevlevel) < 4: return {"SeverityName": "Low"} elif 4 < float(sevlevel) < 7: return {"SeverityName": "Medium"} elif float(sevlevel) > 7: return {"SeverityName": "High"} else: return {"SeverityName": sevlevel} # Return finding value value based on severity def getsevvalue(sevname): if sevname == "low": return {"MinSev": "0", "MaxSev": "3.9"} if sevname == "medium": return {"MinSev": "4", "MaxSev": "6.9"} if sevname == "high": return {"MinSev": "7", "MaxSev": "10"} else: return {"MinSev": "0", "MaxSev": "10"} # List regions declared in FLASHREGIONS by name def getflashregions(): flashregions = [] try: regionids = FLASHREGIONS.split(",") for r in regionids: flashregions.append(get_region_name(r)['regionName']) # Clean up output response = scruboutput(inputtxt = str(flashregions)) except IndexError or NameError: response = [] return response # Redact stuff from findings and clean up output for card display def scruboutput(inputtxt): response = re.sub('&', 'and', inputtxt) response = re.sub(r'[\[\]\"]', '', response) response = re.sub(r'instance i-(\w+)', 'instance', response) response = re.sub(r'i-(\w{1,17}\b)', 'EC2 instance', response) response = re.sub(r'against i-(\w+)', 'against an EC2 instance', response) response = re.sub(r'([0-9]+)(?:\.[0-9]+){3}', 'IP host', response) return response # ReturnGuardDuty Flash Briefing def getflashbrief(): # set the target regions to aggregate findng stats targ_regions = FLASHREGIONS.split(",") flashglobal = [] flashregion = [] c = Counter() for r in targ_regions: region_name = r rn = get_region_name(r)['regionName'] response = getstats(region_name=region_name)['FindingStatistics']['CountBySeverity'] if response: flashregion.append("Findings for " + str(rn) + " region") for key, value in response.items(): sevname = getsevname(key)['SeverityName'] #Sum total findings across regions declared in FLASHREGIONS c[key] += value flashregion.append("" + str(value) + " " + sevname + " severity") else: flashregion.append(" There are no current findings in the " + str(rn) + " region.") t = dict(c) totals = t.items() for key, value in totals: sevname = getsevname(key)['SeverityName'] flashglobal.append("" + str(value) + " " + sevname + " severity") # Clean up output fglobal = scruboutput(inputtxt = str(flashglobal)) fregion = scruboutput(inputtxt = str(flashregion)) return fglobal, fregion # Return statistics for region def getstats(region_name): gdclient = boto3.client('guardduty', region_name=region_name) detector_id = getdetectorid(region_name) if detector_id: response = gdclient.get_findings_statistics( DetectorId=detector_id, FindingCriteria={ 'Criterion': { 'service.archived': { 'Eq': ['false'], }, 'severity': { 'Gte': 0, } } }, FindingStatisticTypes=[ 'COUNT_BY_SEVERITY', ] ) else: response = [] return response # Return findings with minimum severity def listfindings(minsev, region_name): gdclient = boto3.client('guardduty', region_name=region_name) detector_id = getdetectorid(region_name) if detector_id: response = gdclient.list_findings( DetectorId=detector_id, FindingCriteria={ 'Criterion': { 'service.archived': { 'Eq': ['false'], }, 'severity': { 'Gte': int(minsev), } } }, MaxResults=int(MAXRESP), SortCriteria={ 'AttributeName': 'severity', 'OrderBy': 'ASC' } ) else: response = [] return response # Return finding details def getfindings(minsev, region_name): gdclient = boto3.client('guardduty', region_name=region_name) detector_id = getdetectorid(region_name) if detector_id: gresponse = listfindings(minsev, region_name) response = gdclient.get_findings( DetectorId=detector_id, FindingIds=gresponse['FindingIds'], SortCriteria={ 'AttributeName': 'severity', 'OrderBy': 'ASC' } ) else: response = [] return response def handle_session_end_request(): card_title = "Session Ended" speech_output = "" # Setting this to true ends the session and exits the skill. should_end_session = True return build_response({}, build_speechlet_response( card_title, speech_output, None, should_end_session)) # Setting reprompt_text to None signifies that we do not want to reprompt # the user. If the user does not respond or says something that is not # understood, the session will end. return build_response(session_attributes, build_speechlet_response( intent['name'], speech_output, reprompt_text, should_end_session)) # --------------- Helpers that build all of the responses ---------------------- def build_speechlet_response(title, output, reprompt_text, should_end_session): cleanout = re.sub(r'(<([^>]+)>)', '', output) return { 'outputSpeech': { 'type': 'SSML', 'ssml': output }, 'card': { 'type': 'Standard', 'title': title, 'text': cleanout, 'image': { 'smallImageUrl': 'https://s3.amazonaws.com/awsiammedia/public/sample/GuardDutyStatisticsFindings/guarddutysmall.png', 'largeImageUrl': 'https://s3.amazonaws.com/awsiammedia/public/sample/GuardDutyStatisticsFindings/guarddutylarge.png' } }, 'reprompt': { 'outputSpeech': { 'type': 'SSML', 'ssml': reprompt_text } }, 'shouldEndSession': should_end_session } def build_response(session_attributes, speechlet_response): return { 'version': '1.0', 'sessionAttributes': session_attributes, 'response': speechlet_response }