""" -*- coding: utf-8 -*- # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. """ from aws_cdk import ( aws_iam as iam, aws_ec2 as ec2, aws_s3 as s3, aws_s3_deployment as s3deploy, aws_mwaa as mwaa, aws_kms as kms, aws_ecs as ecs, Stack, CfnOutput, Tags ) from aws_cdk.aws_ecr_assets import DockerImageAsset, NetworkMode from constructs import Construct from .common import constants class MWAAOrchestrationStack(Stack): def __init__(self, scope: Construct, construct_id: str,vpc,env,**kwargs) -> None: super().__init__(scope, construct_id, **kwargs) # Create S3 Bucket for storing dags s3_tags = { 'app': f"{self.stack_name}" } dags_bucket = s3.Bucket( self, "mwaa-dags", versioned=True, block_public_access=s3.BlockPublicAccess.BLOCK_ALL ) for tag in s3_tags: Tags.of(dags_bucket).add(tag, s3_tags[tag]) # Deploy dags s3deploy.BucketDeployment(self, "DeployDAG", sources=[s3deploy.Source.asset("./mwaa/dags/")], destination_bucket=dags_bucket, destination_key_prefix="airflow/dags/", prune=False, retain_on_delete=False ) # Deploy requirements s3deploy.BucketDeployment(self, "DeployRequirements", sources=[s3deploy.Source.asset("./mwaa/requirements/")], destination_bucket=dags_bucket, destination_key_prefix="airflow/requirements/", prune=False, retain_on_delete=False ) dags_bucket_arn = dags_bucket.bucket_arn # Create MWAA IAM Policies and Roles, copied from MWAA documentation site # After destroy remove cloudwatch log groups, S3 bucket and verify KMS key is removed. mwaa_policy_document = iam.PolicyDocument( statements=[ iam.PolicyStatement( actions=["airflow:PublishMetrics"], effect=iam.Effect.ALLOW, resources=[f"arn:aws:airflow:{self.region}:{self.account}:environment/{self.stack_name}"], ), iam.PolicyStatement( actions=[ "s3:ListAllMyBuckets" ], effect=iam.Effect.DENY, resources=[ f"{dags_bucket_arn}/*", f"{dags_bucket_arn}" ], ), iam.PolicyStatement( actions=[ "s3:*" ], effect=iam.Effect.ALLOW, resources=[ f"{dags_bucket_arn}/*", f"{dags_bucket_arn}" ], ), iam.PolicyStatement( actions=[ "logs:CreateLogStream", "logs:CreateLogGroup", "logs:PutLogEvents", "logs:GetLogEvents", "logs:GetLogRecord", "logs:GetLogGroupFields", "logs:GetQueryResults", "logs:DescribeLogGroups" ], effect=iam.Effect.ALLOW, resources=[f"arn:aws:logs:{self.region}:{self.account}:log-group:airflow-{self.stack_name}-*"], ), iam.PolicyStatement( actions=[ "logs:DescribeLogGroups" ], effect=iam.Effect.ALLOW, resources=["*"], ), iam.PolicyStatement( actions=[ "sqs:ChangeMessageVisibility", "sqs:DeleteMessage", "sqs:GetQueueAttributes", "sqs:GetQueueUrl", "sqs:ReceiveMessage", "sqs:SendMessage" ], effect=iam.Effect.ALLOW, resources=[f"arn:aws:sqs:{self.region}:*:airflow-celery-*"], ), iam.PolicyStatement( actions=[ "ecs:RunTask", "ecs:DescribeTasks", "ecs:RegisterTaskDefinition", "ecs:DescribeTaskDefinition", "ecs:ListTasks", "ecs:CreateCluster", "ecs:CreateTaskSet", "ecs:DeleteCluster", "ecs:DeleteTaskDefinitions", "ecs:StartTask", "ecs:DescribeClusters", "ecs:DeregisterTaskDefinition" ], effect=iam.Effect.ALLOW, resources=[ "*" ], ), iam.PolicyStatement( actions=[ "iam:PassRole" ], effect=iam.Effect.ALLOW, resources=[ "*" ], conditions= { "StringLike": { "iam:PassedToService": "ecs-tasks.amazonaws.com" } }, ), iam.PolicyStatement( actions=[ "kms:Decrypt", "kms:DescribeKey", "kms:GenerateDataKey*", "kms:Encrypt", "kms:PutKeyPolicy" ], effect=iam.Effect.ALLOW, resources=["*"], conditions={ "StringEquals": { "kms:ViaService": [ f"sqs.{self.region}.amazonaws.com", f"s3.{self.region}.amazonaws.com", ] } }, ), ] ) mwaa_service_role = iam.Role( self, "mwaa-service-role", assumed_by=iam.CompositePrincipal( iam.ServicePrincipal("airflow.amazonaws.com"), iam.ServicePrincipal("airflow-env.amazonaws.com"), iam.ServicePrincipal("ecs-tasks.amazonaws.com"), ), inline_policies={"CDKmwaaPolicyDocument": mwaa_policy_document}, path="/service-role/" ) # Create MWAA Security Group and get networking info security_group = ec2.SecurityGroup( self, id = "mwaa-sg", vpc = vpc, security_group_name = "mwaa-sg" ) security_group_id = security_group.security_group_id security_group.connections.allow_internally(ec2.Port.all_traffic(),"MWAA") subnets = [subnet.subnet_id for subnet in vpc.private_subnets] network_configuration = mwaa.CfnEnvironment.NetworkConfigurationProperty( security_group_ids=[security_group_id], subnet_ids=subnets, ) # **OPTIONAL** Configure specific MWAA settings - you can externalise these if you want logging_configuration = mwaa.CfnEnvironment.LoggingConfigurationProperty( dag_processing_logs=mwaa.CfnEnvironment.ModuleLoggingConfigurationProperty( enabled=True, log_level="INFO" ), task_logs=mwaa.CfnEnvironment.ModuleLoggingConfigurationProperty( enabled=True, log_level="INFO" ), worker_logs=mwaa.CfnEnvironment.ModuleLoggingConfigurationProperty( enabled=True, log_level="INFO" ), scheduler_logs=mwaa.CfnEnvironment.ModuleLoggingConfigurationProperty( enabled=True, log_level="INFO" ), webserver_logs=mwaa.CfnEnvironment.ModuleLoggingConfigurationProperty( enabled=True, log_level="INFO" ) ) options = { 'core.load_default_connections': False, 'core.load_examples': False, 'webserver.dag_default_view': 'tree', 'webserver.dag_orientation': 'TB' } tags = { "app": f"{self.stack_name}" } # **OPTIONAL** Create KMS key that MWAA will use for encryption kms_mwaa_policy_document = iam.PolicyDocument( statements=[ iam.PolicyStatement( actions=[ "kms:Create*", "kms:Describe*", "kms:Enable*", "kms:List*", "kms:Put*", "kms:Decrypt*", "kms:Update*", "kms:Revoke*", "kms:Disable*", "kms:Get*", "kms:Delete*", "kms:ScheduleKeyDeletion", "kms:GenerateDataKey*", "kms:CancelKeyDeletion" ], principals=[ iam.AccountRootPrincipal(), # Optional: # iam.ArnPrincipal(f"arn:aws:sts::{self.account}:assumed-role/AWSReservedSSO_rest_of_SSO_account"), ], resources=["*"]), iam.PolicyStatement( actions=[ "kms:Decrypt*", "kms:Describe*", "kms:GenerateDataKey*", "kms:Encrypt*", "kms:ReEncrypt*", "kms:PutKeyPolicy" ], effect=iam.Effect.ALLOW, resources=["*"], principals=[iam.ServicePrincipal("logs.amazonaws.com", region=f"{self.region}")], conditions={"ArnLike": {"kms:EncryptionContext:aws:logs:arn": f"arn:aws:logs:{self.region}:{self.account}:*"}}, ), ] ) key = kms.Key( self, f"{self.stack_name}Key", enable_key_rotation=True, policy=kms_mwaa_policy_document ) key.add_alias(f"alias/{self.stack_name}Key") # Create ECR spark_image_assest = DockerImageAsset(self, "SparkImage", directory='./infra/spark_image/', network_mode=NetworkMode.HOST ) # ECS Task execution Role ecs_tasks_service_role = iam.Role( self, "ecs-tasks-service-role", assumed_by=iam.CompositePrincipal( iam.ServicePrincipal("ecs-tasks.amazonaws.com") ), path="/service-role/" ) # Create MWAA environment using all the info above mwaa_configuration_options = { 'cdk.cluster_name': constants.CLUSTER_NAME, 'core.default_timezone': 'utc', 'cdk.spark_image':spark_image_assest.image_uri, 'cdk.task_role':ecs_tasks_service_role.role_arn, 'cdk.stack_name':self.stack_name, 'cdk.vpc': vpc.vpc_id, 'cdk.subnet_id':','.join([subnet.subnet_id for subnet in vpc.private_subnets]), 'cdk.security_group': vpc.vpc_default_security_group } managed_airflow = mwaa.CfnEnvironment( scope=self, id='airflow-test-environment', name=f"{self.stack_name}", airflow_version='2.4.3', dag_s3_path="airflow/dags", environment_class='mw1.small', execution_role_arn=mwaa_service_role.role_arn, kms_key=key.key_arn, logging_configuration=logging_configuration, max_workers=5, network_configuration=network_configuration, requirements_s3_path="airflow/requirements/requirements.txt", source_bucket_arn=dags_bucket_arn, webserver_access_mode='PUBLIC_ONLY', airflow_configuration_options=mwaa_configuration_options ) managed_airflow.add_override('Properties.AirflowConfigurationOptions', options) managed_airflow.add_override('Properties.Tags', tags) ecs_tasks_service_role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name("service-role/AmazonECSTaskExecutionRolePolicy")) CfnOutput( self, id="SparkImageURI", value=spark_image_assest.image_uri, description="Spark Container image URI" )