import logging from typing import Any, MutableMapping, Optional import botocore from cloudformation_cli_python_lib import ( BaseHookHandlerRequest, HandlerErrorCode, Hook, HookInvocationPoint, OperationStatus, ProgressEvent, SessionProxy, exceptions, ) from .models import HookHandlerRequest, TypeConfigurationModel # Use this logger to forward log messages to CloudWatch Logs. LOG = logging.getLogger(__name__) TYPE_NAME = "AWSSamples::S3BucketEncrypt::Hook" LOG.setLevel(logging.INFO) hook = Hook(TYPE_NAME, TypeConfigurationModel) test_entrypoint = hook.test_entrypoint def _validate_s3_bucket_encryption(bucket: MutableMapping[str, Any], required_encryption_algorithm: str) -> ProgressEvent: status = None message = "" error_code = None print(f"BUCKET: {bucket}") if bucket: bucket_name = bucket.get("BucketName") bucket_encryption = bucket.get("BucketEncryption") if bucket_encryption: server_side_encryption_rules = bucket_encryption.get("ServerSideEncryptionConfiguration") if server_side_encryption_rules: for rule in server_side_encryption_rules: bucket_key_enabled = rule.get("BucketKeyEnabled") if bucket_key_enabled: server_side_encryption_by_default = rule.get("ServerSideEncryptionByDefault") encryption_algorithm = server_side_encryption_by_default.get("SSEAlgorithm") kms_key_id = server_side_encryption_by_default.get("KMSMasterKeyID") # "KMSMasterKeyID" is name of the property for an AWS::S3::Bucket if encryption_algorithm == required_encryption_algorithm: if encryption_algorithm == "aws:kms" and not kms_key_id: status = OperationStatus.FAILED message = f"KMS Key ID not set for bucket with name: f{bucket_name}" else: status = OperationStatus.SUCCESS message = f"Successfully invoked PreCreateHookHandler for AWS::S3::Bucket with name: {bucket_name}" else: status = OperationStatus.FAILED message = f"SSE Encryption Algorithm is incorrect for bucket with name: {bucket_name}" else: status = OperationStatus.FAILED message = f"Bucket key not enabled for bucket with name: {bucket_name}" if status == OperationStatus.FAILED: break else: status = OperationStatus.FAILED message = f"No SSE Encryption configurations for bucket with name: {bucket_name}" else: status = OperationStatus.FAILED message = f"Bucket Encryption not enabled for bucket with name: {bucket_name}" else: status = OperationStatus.FAILED message = "Resource properties for S3 Bucket target model are empty" if status == OperationStatus.FAILED: error_code = HandlerErrorCode.NonCompliant return ProgressEvent( status=status, message=message, errorCode=error_code ) @hook.handler(HookInvocationPoint.CREATE_PRE_PROVISION) @hook.handler(HookInvocationPoint.UPDATE_PRE_PROVISION) def pre_create_handler( session: Optional[SessionProxy], request: HookHandlerRequest, callback_context: MutableMapping[str, Any], type_configuration: TypeConfigurationModel ) -> ProgressEvent: target_name = request.hookContext.targetName if "AWS::S3::Bucket" == target_name: response = _validate_s3_bucket_encryption(request.hookContext.targetModel.get("resourceProperties"), type_configuration.encryptionAlgorithm) else: raise exceptions.InvalidRequest(f"Unknown target type: {target_name}") LOG.info(response) return response