# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 AWSTemplateFormatVersion: '2010-09-09' Parameters: Scope: Type: String Description: Enter WebACL Scope CLOUDFRONT or REGIONAL AllowedValues: [REGIONAL, CLOUDFRONT] WebACLName: Type: String Description: Enter WebACL name WebACLId: Type: String Description: Enter WebACL ID RateBasedRuleName: Type: String Description: Enter Rate Based Rule name CustomBlockPeriod: Type: Number Description: Enter custom block period for blocking the IP addresses in minutes. Minimum is 06 minutes MinValue: 6 Resources: CustomRBRLogBucket: Type: "AWS::S3::Bucket" Properties: BucketName: !Join - "-" - - "custom-rbr-log-bucket" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true LoggingConfiguration: DestinationBucketName: !Ref AccessLoggingBucket CustomRBRLogBucketPolicy: Type: AWS::S3::BucketPolicy Properties: Bucket: Ref: CustomRBRLogBucket PolicyDocument: Statement: - Action: "s3:*" Condition: Bool: aws:SecureTransport: 'false' Effect: Deny Principal: "*" Resource: - !GetAtt CustomRBRLogBucket.Arn - !Join ["/", [!GetAtt CustomRBRLogBucket.Arn, "*"]] Sid: HttpsOnly Version: '2012-10-17' AccessLoggingBucket: Type: AWS::S3::Bucket Properties: BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 PublicAccessBlockConfiguration: BlockPublicAcls: True BlockPublicPolicy: True IgnorePublicAcls: True RestrictPublicBuckets: True Metadata: cfn_nag: rules_to_suppress: - id: W35 reason: "This bucket is an access logging bucket for another bucket and does not require access logging to be configured for it." AccessLoggingBucketPolicy: Type: AWS::S3::BucketPolicy Properties: Bucket: Ref: AccessLoggingBucket PolicyDocument: Statement: - Action: "s3:*" Condition: Bool: aws:SecureTransport: 'false' Effect: Deny Principal: "*" Resource: - !GetAtt AccessLoggingBucket.Arn - !Join ["/", [!GetAtt AccessLoggingBucket.Arn, "*"]] Sid: HttpsOnly Version: '2012-10-17' IPv4IPset: Type: "AWS::WAFv2::IPSet" Properties: Name: !Join - "-" - - "IPv4-IPset" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" Scope: !Ref Scope Description: "IPv4 IP set for custom rate based block rule" IPAddressVersion: "IPV4" Addresses: [] IPv6IPset: Type: "AWS::WAFv2::IPSet" Properties: Name: !Join - "-" - - "IPv6-IPset" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" Scope: !Ref Scope Description: "IPv6 IP set for custom rate based block rule" IPAddressVersion: "IPV6" Addresses: [] CustomRBRLambdaFunction: Type: 'AWS::Lambda::Function' Properties: FunctionName: !Join - "-" - - "CustomRBRLambdaFunction" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" Description: Lambda function containing the logic for custom RBR Handler: index.lambda_handler Role: !GetAtt LambdaRole.Arn Runtime: python3.9 Environment: Variables: SCOPE: !Ref Scope WEB_ACL_NAME: !Ref WebACLName WEB_ACL_ID: !Ref WebACLId RATE_BASED_RULE_NAME: !Ref RateBasedRuleName CUSTOM_BLOCK_PERIOD: !Ref CustomBlockPeriod CONFIG_LOG_BUCKET: !Ref CustomRBRLogBucket CONFIG_LOG_KEY: blocked_ips_list.json IP_SET_ID_CUSTOM_V4: !GetAtt IPv4IPset.Id IP_SET_NAME_CUSTOM_V4: !Select - "0" - !Split [ "|" , Ref: IPv4IPset] IP_SET_ID_CUSTOM_V6: !GetAtt IPv6IPset.Id IP_SET_NAME_CUSTOM_V6: !Select - "0" - !Split [ "|" , Ref: IPv6IPset] Code: ZipFile: | import json import boto3 import logging import datetime import os wafv2_client = boto3.client('wafv2') s3_client = boto3.client('s3') def update_custom_ipset_and_config(log, latest_ipv4_blocked_list, latest_ipv6_blocked_list): try: # update the custom v4 IP set ipv4_lock_token = get_lock_token( log, wafv2_client, os.getenv('IP_SET_ID_CUSTOM_V4'), os.getenv('IP_SET_NAME_CUSTOM_V4') ) update_ip_set( log, wafv2_client, os.getenv('IP_SET_ID_CUSTOM_V4'), list(latest_ipv4_blocked_list.keys()), ipv4_lock_token, os.getenv('IP_SET_NAME_CUSTOM_V4') ) # update the custom v6 IP set ipv6_lock_token = get_lock_token( log, wafv2_client, os.getenv('IP_SET_ID_CUSTOM_V6'), os.getenv('IP_SET_NAME_CUSTOM_V6') ) update_ip_set( log, wafv2_client, os.getenv('IP_SET_ID_CUSTOM_V6'), list(latest_ipv6_blocked_list.keys()), ipv6_lock_token, os.getenv('IP_SET_NAME_CUSTOM_V6') ) except Exception as e: # log error message log.error("[update_custom_ipset_and_config] " "Error updating custom ipset.") raise e try: # create json object of the latest custom config latest_custom_config = { 'IPv4': latest_ipv4_blocked_list, 'IPv6': latest_ipv6_blocked_list } byte_latest_custom_config = json.dumps(latest_custom_config).encode() # upload the config to s3 s3_client.put_object( Bucket=os.getenv('CONFIG_LOG_BUCKET'), Body=byte_latest_custom_config, Key=os.getenv('CONFIG_LOG_KEY') ) except Exception as e: # log error message log.error("[update_custom_ipset_and_config] " "Error uploading config to S3.") raise e def get_lock_token(log, wafv2_client, ip_set_id, name): try: ipv4_get_response = wafv2_client.get_ip_set( Scope=os.getenv('SCOPE'), Name=name, Id=ip_set_id ) return ipv4_get_response['LockToken'] except Exception as e: log.error(f"Error in get_lock_token: {e}") raise def update_ip_set(log, wafv2_client, ip_set_id, addresses, lock_token, name): try: wafv2_client.update_ip_set( Scope=os.getenv('SCOPE'), Name=name, Id=ip_set_id, Description='Last Update: ' + datetime.datetime.now(datetime.timezone.utc).strftime( "%Y-%m-%d %H:%M:%S %Z%z"), Addresses=addresses, LockToken=lock_token ) except Exception as e: log.error("Error in update_ip_set: {}".format(e)) raise def sync_ip_from_rbr_to_custom_ipset(log, rbr_managed_ip_list, custom_managed_ip_config): # Get the current timestamp in UTC format utc_now_timestamp = datetime.datetime.now( datetime.timezone.utc) # Convert the timestamp to string utc_now_timestamp_str = utc_now_timestamp.strftime( "%Y-%m-%d %H:%M:%S %Z%z") # Iterate over the managed IPs in the RBR list for managed_ip in rbr_managed_ip_list: # If the IP is already in the custom IP config if managed_ip in custom_managed_ip_config.keys(): # Get the timestamp when the IP was blocked in UTC format utc_blocked_at = datetime.datetime.strptime( custom_managed_ip_config[managed_ip], "%Y-%m-%d %H:%M:%S %Z%z").astimezone( datetime.timezone.utc) # Calculate the difference in minutes between now and when the IP # was blocked total_diff_min = ((utc_now_timestamp - utc_blocked_at) .total_seconds()) / 60 # If the difference is greater than block period, update the timestamp if round(total_diff_min) >= int(os.getenv('CUSTOM_BLOCK_PERIOD')): custom_managed_ip_config[managed_ip] = utc_now_timestamp_str # If the IP is not in the custom IP config, add it with the current # timestamp else: custom_managed_ip_config[managed_ip] = utc_now_timestamp_str # Create a new dictionary to store the latest blocked IPs latest_ip_blocked_list = {} # Iterate over the custom IP config for blocked_ip, blocked_at_str in custom_managed_ip_config.items(): # Get the timestamp when the IP was blocked in UTC format utc_blocked_at = datetime.datetime.strptime( custom_managed_ip_config[blocked_ip], "%Y-%m-%d %H:%M:%S %Z%z").astimezone(datetime.timezone.utc) # Calculate the difference in minutes between now and when the IP # was blocked total_diff_min = ((utc_now_timestamp - utc_blocked_at) .total_seconds()) / 60 # If the difference is less than the custom block period #then add it to the latest blocked IPs list if round(total_diff_min) < int(os.getenv('CUSTOM_BLOCK_PERIOD')): latest_ip_blocked_list[blocked_ip] = blocked_at_str return latest_ip_blocked_list def get_custom_config_file(log): try: # Get the custom config file from S3 s3_response = s3_client.get_object( Bucket=os.getenv('CONFIG_LOG_BUCKET'), Key=os.getenv('CONFIG_LOG_KEY') ) # Load the custom config file as a JSON object custom_managed_ip_config = json.loads( s3_response['Body'].read() ) except Exception as e: log.error("[get_custom_config_file] Error to get the custom config " "file from S3") log.error(e) # If there is an error, return an empty config custom_managed_ip_config = {'IPv4': {}, 'IPv6': {}} return custom_managed_ip_config def get_rbr_managed_ip_list(log): try: # Get the list of IPs blocked by the rate based rule wafv2_response = wafv2_client.get_rate_based_statement_managed_keys( Scope=os.getenv('SCOPE'), WebACLName=os.getenv('WEB_ACL_NAME'), WebACLId=os.getenv('WEB_ACL_ID'), RuleName=os.getenv('RATE_BASED_RULE_NAME') ) return wafv2_response except Exception as e: log.error("[get_rbr_managed_ip_list] " "Error to get the list of IP blocked by rate based rule") log.error(e) # If there is an error, raise the exception raise e def lambda_handler(event, context): log = logging.getLogger() try: # Set Log Level log.setLevel(logging.ERROR) # Get the list of IP blocked by rate based rule rbr_managed_list = get_rbr_managed_ip_list(log) # Get custom config file from S3 custom_managed_ip_config = get_custom_config_file(log) # Update IP from rate based rule list to custom list latest_ipv4_blocked_list = sync_ip_from_rbr_to_custom_ipset( log, rbr_managed_list['ManagedKeysIPV4']['Addresses'], custom_managed_ip_config['IPv4']) latest_ipv6_blocked_list = sync_ip_from_rbr_to_custom_ipset( log, rbr_managed_list['ManagedKeysIPV6']['Addresses'], custom_managed_ip_config['IPv6']) # Update latest blocked list to S3 and WAF IPset update_custom_ipset_and_config(log, latest_ipv4_blocked_list, latest_ipv6_blocked_list) return { 'statusCode': 200, 'body': json.dumps('Update Success!') } except Exception as e: log.error(e) return { 'statusCode': 500, 'body': e } Timeout: 10 Metadata: cfn_nag: rules_to_suppress: - id: W89 reason: There is no need to run this lambda in a VPC - id: W92 reason: There is no need for Reserved Concurrency LambdaRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "lambda.amazonaws.com" Action: "sts:AssumeRole" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Policies: - PolicyName: !Join - "-" - - "LambdaRolePolicy" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" PolicyDocument: Version: "2012-10-17" Statement: - Sid: "S3BucketPermissions" Effect: "Allow" Action: - "s3:PutObject" - "s3:GetObject" Resource: - !Sub 'arn:${AWS::Partition}:s3:::${CustomRBRLogBucket}/blocked_ips_list.json' - Sid: "WAFIPSetPermissions" Effect: "Allow" Action: - "wafv2:GetIPSet" - "wafv2:UpdateIPSet" Resource: - !GetAtt IPv6IPset.Arn - !GetAtt IPv4IPset.Arn - Sid: "WAFRBRPermissions" Effect: "Allow" Action: "wafv2:GetRateBasedStatementManagedKeys" Resource: !Sub - 'arn:${AWS::Partition}:wafv2:${AWS::Region}:${AWS::AccountId}:${WebACLSope}/webacl/${WebACLName}/${WebACLId}' - WebACLSope: !If [IsRegional, "regional", "cloudfront"] EventBridgeRule: Type: "AWS::Events::Rule" Properties: Name: !Join - "-" - - "EventBridgeRule" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" ScheduleExpression: "rate(1 minute)" State: "ENABLED" Targets: - Id: "CustomRBRLambdaFunction" Arn: !GetAtt CustomRBRLambdaFunction.Arn LambdaPermissionForEventBridge: Type: "AWS::Lambda::Permission" Properties: FunctionName: !Ref CustomRBRLambdaFunction Action: "lambda:InvokeFunction" Principal: "events.amazonaws.com" SourceArn: !GetAtt EventBridgeRule.Arn Outputs: IPv4IPsetName: Description: IPv4 IPSet for custom rate based block rule Value: !Select - "0" - !Split [ "|" , Ref: IPv4IPset] IPv6IPsetName: Description: IPv6 IPSet for custom rate based block rule Value: !Select - "0" - !Split [ "|" , Ref: IPv6IPset] Conditions: IsRegional: !Equals [!Ref Scope, "REGIONAL"]