--- AWSTemplateFormatVersion: 2010-09-09 Parameters: ProductName: Type: String Default: waflogs WafLogsS3Bucket: Type: String WafLogsS3Prefix: Type: String Default: / SourceIPAddressSource: Type: String Default: SOURCE_IP Resources: GlueDatabaseWAFLogs: Type: AWS::Glue::Database Properties: CatalogId: !Ref AWS::AccountId DatabaseInput: Name: !Ref "ProductName" LocationUri: !Sub 's3://${WafLogsS3Bucket}${WafLogsS3Prefix}${AWS::Region}/' GlueTableWAFLogs: DependsOn: GlueDatabaseWAFLogs Type: AWS::Glue::Table Properties: CatalogId: !Ref AWS::AccountId DatabaseName: !Ref "ProductName" TableInput: Name: waf_logs_raw TableType: EXTERNAL_TABLE PartitionKeys: - Name: datehour Type: string Parameters: EXTERNAL: 'TRUE' projection.datehour.format: yyyy/MM/dd/HH projection.datehour.interval: '1' projection.datehour.interval.unit: HOURS projection.datehour.range: 2021/01/01/00,NOW projection.datehour.type: date projection.enabled: 'true' storage.location.template: !Sub 's3://${WafLogsS3Bucket}${WafLogsS3Prefix}${AWS::Region}/${!datehour}' StorageDescriptor: InputFormat: org.apache.hadoop.mapred.TextInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Location: !Sub 's3://${WafLogsS3Bucket}${WafLogsS3Prefix}${AWS::Region}/' SerdeInfo: SerializationLibrary: org.openx.data.jsonserde.JsonSerDe Parameters: serialization.format: '1' #paths: action,formatVersion,httpRequest,httpSourceId,httpSourceName,labels,nonTerminatingMatchingRules,rateBasedRuleList,requestHeadersInserted,responseCodeSent,ruleGroupList,terminatingRuleId,terminatingRuleMatchDetails,terminatingRuleType,timestamp,webaclId Columns: - Name: timestamp Type: bigint - Name: formatversion Type: int - Name: webaclid Type: string - Name: terminatingruleid Type: string - Name: terminatingruletype Type: string - Name: action Type: string - Name: terminatingrulematchdetails Type: array>> - Name: httpsourcename Type: string - Name: httpsourceid Type: string - Name: rulegrouplist Type: array,nonterminatingmatchingrules:array>>>>,excludedrules:array>>> - Name: ratebasedrulelist Type: array> - Name: nonterminatingmatchingrules Type: array> - Name: requestheadersinserted Type: string - Name: responsecodesent Type: string - Name: httprequest Type: struct>,uri:string,args:string,httpversion:string,httpmethod:string,requestid:string> - Name: labels Type: array> AthenaWorkGroup: DeletionPolicy: Retain Type: AWS::Athena::WorkGroup Properties: Name: !Sub '${ProductName}-workgroup' State: ENABLED WorkGroupConfiguration: EnforceWorkGroupConfiguration: false ResultConfiguration: OutputLocation: !Sub 's3://${WafLogsS3Bucket}/athenaOutput/' EncryptionConfiguration: EncryptionOption: SSE_S3 BuildAthenaViewsCall: DependsOn: AthenaQueryLambdaPolicy Type: Custom::BuildAthenaViews Properties: ServiceToken: !GetAtt AthenaCreateViewsQueryLambda.Arn DetailedViewQueryId: !Ref AthenaNamedQueryIPDetailed AthenaQueryLambdaRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: / AthenaQueryLambdaPolicy: Type: 'AWS::IAM::Policy' Properties: PolicyName: LocalPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 'athena:GetQueryExecution' - 'athena:GetNamedQuery' - 'athena:ListNamedQueries' - 'athena:StartQueryExecution' Resource: !Sub "arn:aws:athena:${AWS::Region}:${AWS::AccountId}:workgroup/${AthenaWorkGroup}" - Effect: Allow Action: - 'logs:CreateLogGroup' - 'logs:CreateLogStream' - 'logs:PutLogEvents' Resource: 'arn:aws:logs:*:*:*' - Effect: Allow Action: - 'glue:Get*' - 'glue:Update*' - 'glue:CreateTable' Resource: - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/default - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/default/* - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${ProductName} - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${ProductName}/* - Effect: Allow Action: - 'athena:DeleteWorkGroup' Resource: - !Sub "arn:aws:athena:${AWS::Region}:${AWS::AccountId}:workgroup/${ProductName}-workgroup" - Effect: Allow Action: - 's3:Get*' - 's3:Put*' - 's3:List*' Resource: - !Sub "arn:aws:s3:::${WafLogsS3Bucket}/athenaOutput/*" - !Sub "arn:aws:s3:::${WafLogsS3Bucket}" Roles: - !Ref AthenaQueryLambdaRole AthenaCreateViewsQueryLambda: Type: 'AWS::Lambda::Function' DependsOn: AthenaQueryLambdaPolicy Metadata: cfn_nag: rules_to_suppress: - id: W58 reason: "Wildcard permissions for Athena needed" - id: W89 reason: "VPC bound lambda is not appropiate for this use case" - id: W92 reason: "Reserved concurrency is not appropiate for this use case" Properties: Runtime: python3.9 Role: !GetAtt AthenaQueryLambdaRole.Arn Handler: index.lambda_handler Timeout: 300 Environment: Variables: s3BasePath: !Sub 's3://${WafLogsS3Bucket}/athenaOutput/' workGroupName: !Ref "AthenaWorkGroup" glueDatabase: !Ref "GlueDatabaseWAFLogs" Code: ZipFile: | import boto3 import datetime import os import time import json import urllib3 import botocore s3BasePath = os.environ['s3BasePath'] workGroupName = os.environ['workGroupName'] database = os.environ['glueDatabase'] athena_client = boto3.client('athena') http = urllib3.PoolManager() responseData = {} def cfnrespond(event, context, responseStatus, responseData, physicalResourceId=None, noEcho=False): responseUrl = event['ResponseURL'] responseBody = {} responseBody['Status'] = responseStatus responseBody['Reason'] = 'See the details in CloudWatch Log Stream: ' + context.log_stream_name responseBody['PhysicalResourceId'] = physicalResourceId or context.log_stream_name responseBody['StackId'] = event['StackId'] responseBody['RequestId'] = event['RequestId'] responseBody['LogicalResourceId'] = event['LogicalResourceId'] responseBody['NoEcho'] = noEcho responseBody['Data'] = responseData json_responseBody = json.dumps(responseBody) print("Response body:\n" + json_responseBody) headers = { 'content-type' : '', 'content-length' : str(len(json_responseBody)) } try: response = http.request('PUT',responseUrl,body=json_responseBody.encode('utf-8'),headers=headers) print("Status code: " + response.reason) except Exception as e: print("send(..) failed executing requests.put(..): " + str(e)) def wait_for_queries_to_finish(executionIdList): while (executionIdList != []): for eId in executionIdList: currentState = athena_client.get_query_execution(QueryExecutionId=eId)['QueryExecution']['Status']['State'] if currentState in ['SUCCEEDED']: executionIdList.remove (eId) elif currentState in ['FAILED','CANCELLED']: return (executionIdList) time.sleep(1) return ([]) def lambda_handler(event, context): print (json.dumps(event)) if event['RequestType'] == 'Delete': try: athena_client.delete_work_group( WorkGroup=workGroupName, RecursiveDeleteOption=True ) cfnrespond(event, context, "SUCCESS", {}, "Graceful Delete") return () except: cfnrespond(event, context, "FAILED", {}, "") return () else: executionIdList = [] transformQuery = False transformQuery = True detailedViewQueryId = event['ResourceProperties']['DetailedViewQueryId'] baseQueryString = athena_client.get_named_query( NamedQueryId=detailedViewQueryId)['NamedQuery']['QueryString'] queryString = "CREATE OR REPLACE VIEW waf_detailed AS " + baseQueryString try: r = athena_client.start_query_execution( QueryString=queryString, QueryExecutionContext={ 'Database': database, 'Catalog': 'AwsDataCatalog'}, WorkGroup=workGroupName ) except botocore.exceptions.ClientError as error: print (error.response) cfnrespond(event, context, "FAILED", {}, "") return () #Wait for query to finish, it should take a second but wait just in case if wait_for_queries_to_finish([r['QueryExecutionId']]) != []: cfnrespond(event, context, "FAILED", responseData, "CreateViewQueriesFailed") return ("QueriesFailed") #Get all named query IDs in WorkGroup namedQueries = athena_client.list_named_queries( WorkGroup=workGroupName)['NamedQueryIds'] #Get all Named Queries for queryId in namedQueries: queryResults = athena_client.get_named_query( NamedQueryId=queryId )['NamedQuery'] print (queryResults) if queryResults['Name'] != 'waf_detailed': outputLocation = s3BasePath + queryResults['Name'].split('-')[-1] + '/' if transformQuery: queryString = "CREATE OR REPLACE VIEW " + '"' + database + '".' + queryResults['Name'].replace('-','_') + " AS " + queryResults['QueryString'] else: queryString = queryResults['QueryString'] print ("queryString") print (queryString) try: r = athena_client.start_query_execution( QueryString=queryString, ResultConfiguration={ 'OutputLocation': outputLocation, 'EncryptionConfiguration': { 'EncryptionOption': 'SSE_S3', } }, WorkGroup=workGroupName ) executionIdList.append(r['QueryExecutionId']) except botocore.exceptions.ClientError as error: print (error.response) cfnrespond(event, context, "FAILED", {}, "") return () print (executionIdList) if wait_for_queries_to_finish(executionIdList) != []: cfnrespond(event, context, "FAILED", responseData, "CreateViewQueriesFailed") return ("QueriesFailed") else: cfnrespond(event, context, "SUCCESS", responseData, "CreateViewsSuccessful") AthenaNamedQueryIPDetailed: DependsOn: AthenaWorkGroup Type: AWS::Athena::NamedQuery Properties: Database: !Ref "GlueDatabaseWAFLogs" Description: Detailed and Formatted Core RBR Data Name: "waf_detailed" WorkGroup: !Ref "AthenaWorkGroup" QueryString: !Sub | SELECT tz_window , sourceip , COALESCE(NULLIF(args, ''), args) args , COALESCE(NULLIF(httpSourceName, ''), httpSourceName) httpSourceName , country , uri , labels , accountId , webACLName , method , requestId , ntRules , region , scope , terminatingRuleId , action , datehour FROM ( SELECT httprequest.clientip sourceip , httprequest.country country , httprequest.uri uri , httprequest.args args , httprequest.httpMethod method , httprequest.requestId requestId , httpSourceName , transform(filter(httprequest.headers, (x) -> x.name = 'Host'),(x) -> x.value) as domainName , "split_part"(webaclId, ':', 5) accountId , "split"("split_part"(webaclId, ':', 6), '/', 4)[4] webACLName , "split_part"(webaclId, ':', 4) region , "split"("split_part"(webaclId, ':', 6), '/', 4)[1] scope , webaclId , "array_join"("transform"(nonTerminatingMatchingRules, (x) -> x.ruleId), ',') ntRules , concat("transform"("filter"(labels, (x) -> (x.name LIKE 'awswaf:managed:aws:%')), (x) -> "split"(x.name, 'awswaf:managed:aws:')[2]), "transform"("filter"(labels, (x) -> (NOT (x.name LIKE 'awswaf%'))), (x) -> x.name)) as labels , terminatingRuleId , "from_unixtime"(("floor"((timestamp / (1000 * 300))) * 300)) tz_window , action , datehour FROM "${GlueDatabaseWAFLogs}"."waf_logs_raw" ) AthenaNamedQueryByURIIP: DependsOn: AthenaWorkGroup Type: AWS::Athena::NamedQuery Properties: Database: !Ref "GlueDatabaseWAFLogs" Description: Count by URI then Source IP over time. Name: "URIRate" WorkGroup: !Ref "AthenaWorkGroup" QueryString: !Sub | SELECT "count"(sourceip) as count , tz_window , sourceip , uri FROM ( SELECT * FROM "${GlueDatabaseWAFLogs}"."waf_detailed" ) GROUP BY tz_window, sourceip, uri ORDER BY tz_window desc, count DESC AthenaNamedQueryByCountry: DependsOn: AthenaWorkGroup Type: AWS::Athena::NamedQuery Properties: Database: !Ref "GlueDatabaseWAFLogs" Description: Count by Country then Source IP over time. Name: "CountryRate" WorkGroup: !Ref "AthenaWorkGroup" QueryString: !Sub | SELECT "count"(sourceip) as count , tz_window , sourceip , country FROM ( SELECT * FROM "${GlueDatabaseWAFLogs}"."waf_detailed" ) GROUP BY tz_window, sourceip, country ORDER BY tz_window desc, count DESC AthenaNamedQueryIPRep: DependsOn: AthenaWorkGroup Type: AWS::Athena::NamedQuery Properties: Database: !Ref "GlueDatabaseWAFLogs" Description: !Sub "Identify Top Client IP to specific URI path based on header: ${SourceIPAddressSource}" Name: "SourceIPReputations" WorkGroup: !Ref "AthenaWorkGroup" QueryString: !Sub | SELECT reputation, count(sourceip) AS count, sourceip, uri, tz_window FROM ( SELECT sourceip, uri, tz_window, ntRules, filter (labels, (x)->(x LIKE '%IPReputationList')) as reputation FROM "${GlueDatabaseWAFLogs}"."waf_detailed" ) where reputation <> array [] GROUP BY tz_window, sourceip, uri, reputation order by reputation, count desc; AthenaNamedQueryIPAnon: DependsOn: AthenaWorkGroup Type: AWS::Athena::NamedQuery Properties: Database: !Ref "GlueDatabaseWAFLogs" Description: !Sub "Identify Top Client IP to specific URI path based on header: ${SourceIPAddressSource}" Name: "SourceIPAnonymousorHiddenOwner" WorkGroup: !Ref "AthenaWorkGroup" QueryString: !Sub | SELECT if(anonymous = array[], Null, array_join(anonymous, ','))as anonymous, count(sourceip) AS count, sourceip, uri, tz_window FROM ( SELECT sourceip, uri, tz_window, filter( labels, x -> x LIKE '%anonymous-ip-list%') as anonymous FROM "${GlueDatabaseWAFLogs}"."waf_detailed") WHERE anonymous <> array [] GROUP BY tz_window,sourceip,uri,anonymous order by anonymous desc, count; AthenaNamedQueryBotControl: DependsOn: AthenaWorkGroup Type: AWS::Athena::NamedQuery Properties: Database: !Ref "GlueDatabaseWAFLogs" Description: !Sub "Identify Bot Traffic" Name: "BotControlMatch" WorkGroup: !Ref "AthenaWorkGroup" QueryString: !Sub | select IF((botSignal = ARRAY[]), null, "split"(botSignal[1], 'bot-control:')[2]) botSignal, IF((botCategory = ARRAY[]), null, "split"(botCategory[1], 'bot-control:')[2]) botCategory, IF((botName = ARRAY[]), null, "split"(botName[1], 'bot-control:')[2]) botName, count(sourceip) as count, tz_window, sourceip, uri from (select filter(botLabels, x -> split(x,':')[2] = 'signal') as botSignal, filter(botLabels, x -> split(x,':')[3] = 'category') as botCategory, filter(botLabels, x -> split(x,':')[3] = 'name') as botName, tz_window, sourceip, uri, datehour from (SELECT sourceip, tz_window, filter(labels, x -> x LIKE 'bot-control%') AS botLabels, action, labels, uri, datehour FROM "${GlueDatabaseWAFLogs}"."waf_detailed" ) where botLabels <> array[] ) Group By tz_window, sourceip, botSignal, botCategory, botName, uri