AWSTemplateFormatVersion: "2010-09-09" Parameters: SourceBucket: Type: String Description: The bucket name that includes the source data and scripts Default: aws-blogs-artifacts-public SourceBucketPrefix: Type: String Description: The source bucket prefix Default: artifacts/BDB-1261/ EnvironmentName: Description: An environment name that is prefixed to resource names Type: String Default: MWAAEnvironment VpcCIDR: Description: The IP range (CIDR notation) for this VPC Type: String Default: 10.192.0.0/16 PublicSubnet1CIDR: Description: The IP range (CIDR notation) for the public subnet in the first Availability Zone Type: String Default: 10.192.10.0/24 PublicSubnet2CIDR: Description: The IP range (CIDR notation) for the public subnet in the second Availability Zone Type: String Default: 10.192.11.0/24 PrivateSubnet1CIDR: Description: The IP range (CIDR notation) for the private subnet in the first Availability Zone Type: String Default: 10.192.20.0/24 PrivateSubnet2CIDR: Description: The IP range (CIDR notation) for the private subnet in the second Availability Zone Type: String Default: 10.192.21.0/24 MaxWorkerNodes: Description: The maximum number of workers that can run in the environment Type: Number Default: 2 DagProcessingLogs: Description: Log level for DagProcessing Type: String Default: INFO SchedulerLogsLevel: Description: Log level for SchedulerLogs Type: String Default: INFO TaskLogsLevel: Description: Log level for TaskLogs Type: String Default: INFO WorkerLogsLevel: Description: Log level for WorkerLogs Type: String Default: INFO WebserverLogsLevel: Description: Log level for WebserverLogs Type: String Default: INFO Resources: ##################################################################################################################### # CREATE VPC ##################################################################################################################### VPC: Type: AWS::EC2::VPC Properties: CidrBlock: !Ref VpcCIDR EnableDnsSupport: true EnableDnsHostnames: true Tags: - Key: Name Value: MWAAEnvironment InternetGateway: Type: AWS::EC2::InternetGateway Properties: Tags: - Key: Name Value: MWAAEnvironment InternetGatewayAttachment: Type: AWS::EC2::VPCGatewayAttachment Properties: InternetGatewayId: !Ref InternetGateway VpcId: !Ref VPC PublicSubnet1: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 0, !GetAZs '' ] CidrBlock: !Ref PublicSubnet1CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: !Sub ${EnvironmentName} Public Subnet (AZ1) PublicSubnet2: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 1, !GetAZs '' ] CidrBlock: !Ref PublicSubnet2CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: !Sub ${EnvironmentName} Public Subnet (AZ2) PrivateSubnet1: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 0, !GetAZs '' ] CidrBlock: !Ref PrivateSubnet1CIDR MapPublicIpOnLaunch: false Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Subnet (AZ1) PrivateSubnet2: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 1, !GetAZs '' ] CidrBlock: !Ref PrivateSubnet2CIDR MapPublicIpOnLaunch: false Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Subnet (AZ2) NatGateway1EIP: Type: AWS::EC2::EIP DependsOn: InternetGatewayAttachment Properties: Domain: vpc NatGateway2EIP: Type: AWS::EC2::EIP DependsOn: InternetGatewayAttachment Properties: Domain: vpc NatGateway1: Type: AWS::EC2::NatGateway Properties: AllocationId: !GetAtt NatGateway1EIP.AllocationId SubnetId: !Ref PublicSubnet1 NatGateway2: Type: AWS::EC2::NatGateway Properties: AllocationId: !GetAtt NatGateway2EIP.AllocationId SubnetId: !Ref PublicSubnet2 PublicRouteTable: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${EnvironmentName} Public Routes DefaultPublicRoute: Type: AWS::EC2::Route DependsOn: InternetGatewayAttachment Properties: RouteTableId: !Ref PublicRouteTable DestinationCidrBlock: 0.0.0.0/0 GatewayId: !Ref InternetGateway PublicSubnet1RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PublicRouteTable SubnetId: !Ref PublicSubnet1 PublicSubnet2RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PublicRouteTable SubnetId: !Ref PublicSubnet2 PrivateRouteTable1: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Routes (AZ1) DefaultPrivateRoute1: Type: AWS::EC2::Route Properties: RouteTableId: !Ref PrivateRouteTable1 DestinationCidrBlock: 0.0.0.0/0 NatGatewayId: !Ref NatGateway1 PrivateSubnet1RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PrivateRouteTable1 SubnetId: !Ref PrivateSubnet1 PrivateRouteTable2: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Routes (AZ2) DefaultPrivateRoute2: Type: AWS::EC2::Route Properties: RouteTableId: !Ref PrivateRouteTable2 DestinationCidrBlock: 0.0.0.0/0 NatGatewayId: !Ref NatGateway2 PrivateSubnet2RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PrivateRouteTable2 SubnetId: !Ref PrivateSubnet2 EnvironmentBucket: Type: AWS::S3::Bucket Properties: VersioningConfiguration: Status: Enabled PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 DataBucket: Type: AWS::S3::Bucket Properties: VersioningConfiguration: Status: Enabled BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 ##################################################################################################################### # CREATE MWAA ##################################################################################################################### MwaaEnvironment: Type: AWS::MWAA::Environment DependsOn: MwaaExecutionPolicy Properties: Name: !Sub "${AWS::StackName}-MwaaEnvironment" SourceBucketArn: !GetAtt EnvironmentBucket.Arn ExecutionRoleArn: !GetAtt MwaaExecutionRole.Arn AirflowVersion: 2.0.2 DagS3Path: dags NetworkConfiguration: SecurityGroupIds: - !GetAtt SecurityGroup.GroupId SubnetIds: - !Ref PrivateSubnet1 - !Ref PrivateSubnet2 WebserverAccessMode: PUBLIC_ONLY MaxWorkers: !Ref MaxWorkerNodes LoggingConfiguration: DagProcessingLogs: LogLevel: !Ref DagProcessingLogs Enabled: true SchedulerLogs: LogLevel: !Ref SchedulerLogsLevel Enabled: true TaskLogs: LogLevel: !Ref TaskLogsLevel Enabled: true WorkerLogs: LogLevel: !Ref WorkerLogsLevel Enabled: true WebserverLogs: LogLevel: !Ref WebserverLogsLevel Enabled: true SecurityGroup: Type: AWS::EC2::SecurityGroup Properties: VpcId: !Ref VPC GroupDescription: !Sub "Security Group for Amazon MWAA Environment ${AWS::StackName}-MwaaEnvironment" GroupName: !Sub "airflow-security-group-${AWS::StackName}-MwaaEnvironment" SecurityGroupIngress: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: !Ref SecurityGroup IpProtocol: "-1" SourceSecurityGroupId: !Ref SecurityGroup SecurityGroupEgress: Type: AWS::EC2::SecurityGroupEgress Properties: GroupId: !Ref SecurityGroup IpProtocol: "-1" CidrIp: "0.0.0.0/0" MwaaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - airflow-env.amazonaws.com - airflow.amazonaws.com Action: - "sts:AssumeRole" Path: "/service-role/" MwaaExecutionPolicy: DependsOn: EnvironmentBucket Type: AWS::IAM::ManagedPolicy Properties: Roles: - !Ref MwaaExecutionRole PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: airflow:PublishMetrics Resource: - !Sub "arn:aws:airflow:${AWS::Region}:${AWS::AccountId}:environment/${EnvironmentName}" - Effect: Deny Action: s3:ListAllMyBuckets Resource: - !Sub "${EnvironmentBucket.Arn}" - !Sub "${EnvironmentBucket.Arn}/*" - Effect: Allow Action: - "s3:GetObject*" - "s3:GetBucket*" - "s3:List*" Resource: - !Sub "${EnvironmentBucket.Arn}" - !Sub "${EnvironmentBucket.Arn}/*" - Effect: Allow Action: - logs:CreateLogStream - logs:CreateLogGroup - logs:PutLogEvents - logs:GetLogEvents - logs:GetLogRecord - logs:GetLogGroupFields - logs:GetQueryResults - logs:DescribeLogGroups Resource: - !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:airflow-${AWS::StackName}*" - Effect: Allow Action: cloudwatch:PutMetricData Resource: "*" - Effect: Allow Action: - sqs:ChangeMessageVisibility - sqs:DeleteMessage - sqs:GetQueueAttributes - sqs:GetQueueUrl - sqs:ReceiveMessage - sqs:SendMessage Resource: - !Sub "arn:aws:sqs:${AWS::Region}:*:airflow-celery-*" - Effect: Allow Action: - kms:Decrypt - kms:DescribeKey - "kms:GenerateDataKey*" - kms:Encrypt NotResource: !Sub "arn:aws:kms:*:${AWS::AccountId}:key/*" Condition: StringLike: "kms:ViaService": - !Sub "sqs.${AWS::Region}.amazonaws.com" - Effect: Allow Action: - s3:PutObject - s3:GetObject - s3:DeleteObject - s3:ListBucket Resource: - !Join ['', ['arn:aws:s3:::', !Ref 'DataBucket']] - !Join ['', ['arn:aws:s3:::', !Ref 'DataBucket', /*]] - Effect: Allow Action: - glue:GetJob - glue:GetJobRun - glue:GetJobRuns - glue:StartJobRun - glue:ListJobs Resource: - !Sub "arn:aws:glue:*:${AWS::AccountId}:job/glue-csv-to-parquet" - !Sub "arn:aws:glue:*:${AWS::AccountId}:job/glue-transform" AWSLambdaExecutionRole: Type: AWS::IAM::Role Properties: RoleName: !Sub AWSLambdaExecutionRole-${AWS::StackName} AssumeRolePolicyDocument: Statement: - Action: - sts:AssumeRole Effect: Allow Principal: Service: - lambda.amazonaws.com Version: '2012-10-17' Path: "/" Policies: - PolicyDocument: Version: '2012-10-17' Statement: - Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Effect: Allow Resource: arn:aws:logs:*:*:* PolicyName: !Sub AWSLambda-CW-${AWS::StackName} - PolicyDocument: Version: '2012-10-17' Statement: - Effect: "Allow" Action: - s3:GetObject - s3:PutObject - s3:ListBucket - s3:DeleteObject Resource: - !Sub arn:aws:s3:::${DataBucket} - !Sub arn:aws:s3:::${DataBucket}/* - !Sub arn:aws:s3:::${EnvironmentBucket} - !Sub arn:aws:s3:::${EnvironmentBucket}/* - !Sub arn:aws:s3:::${SourceBucket} - !Sub arn:aws:s3:::${SourceBucket}/* PolicyName: !Sub AWSLambda-S3-${AWS::StackName} S3CustomResource: Type: Custom::S3CustomResource Properties: ServiceToken: !GetAtt AWSLambdaFunction.Arn databucketname: !Ref DataBucket airflowbucketname: !Ref EnvironmentBucket sourcebucketname: !Ref SourceBucket sourcebucketprefix: !Ref SourceBucketPrefix AWSLambdaFunction: Type: "AWS::Lambda::Function" Properties: Description: "Custom resource for S3 setup" FunctionName: !Sub 'lambda-custom-resource-${AWS::StackName}' Handler: index.handler Role: !GetAtt AWSLambdaExecutionRole.Arn MemorySize: 128 Timeout: 300 Runtime: python3.8 Code: ZipFile: | import datetime import boto3 import cfnresponse s3 = boto3.resource('s3') """ Function reads contents of the blog code resources and replaces all S3 bucket placeholder values in Airflow DAGs and AWS Glue ETL job Python files with the unique S3 bucket names that have been newly created as part of the CFN template execution. Function creates a new file in the target buckets to form the code artifacts to be orchestrated by AWS MWAA. """ def handler(event, context): #Retrieve and store parameters from event dictionary object source_bucket_name = event['ResourceProperties']['sourcebucketname'] source_bucket_prefix = event['ResourceProperties']['sourcebucketprefix'] airflow_bucket_name = event['ResourceProperties']['airflowbucketname'] data_bucket_name = event['ResourceProperties']['databucketname'] source_bucket = s3.Bucket(source_bucket_name) response_data = {} try: #Exclude ALL Delete requests if(event['RequestType'] != 'Delete'): #Files to be copied in airflow environment bucket airflow_bucket_files = [ 'dags/run-glue-jobs.py', 'dags/run-simple-dag.py', 'dags/extract_metadata.py' ] for file in airflow_bucket_files: source_bucket_key = source_bucket_prefix + file obj = s3.Object(source_bucket_name, source_bucket_key) content = obj.get()['Body'].read().decode('UTF-8') content = content.replace('',data_bucket_name) s3.Object(airflow_bucket_name, file).put(Body=content) #Files to be copied in data environment bucket data_bucket_files = [ 'glue-csv-to-parquet.py', 'glue-transform.py', 'raw/customer_activity.csv' ] for file in data_bucket_files: source_bucket_key = source_bucket_prefix + file obj = s3.Object(source_bucket_name, source_bucket_key) content = obj.get()['Body'].read().decode('UTF-8') content = content.replace('',data_bucket_name) s3.Object(data_bucket_name, file).put(Body=content) cfnresponse.send(event, context, cfnresponse.SUCCESS, response_data) except Exception as e: response_data['Data'] = str(e) cfnresponse.send(event, context, cfnresponse.FAILED, response_data) AWSGlueETLAirflowRole: Type: "AWS::IAM::Role" Properties: RoleName: AWSGlueETLAirflowRole AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - glue.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: AWSGlueETLAirflowPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - "s3:GetObject" - "s3:PutObject" - "s3:ListBucket" - "s3:DeleteObject" Resource: - !Join ['', ['arn:aws:s3:::', !Ref 'DataBucket', /*]] ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole Path: "/" gluecsvtoparquet: Type: AWS::Glue::Job DependsOn: S3CustomResource Properties: Command: Name: glueetl ScriptLocation: !Sub s3://${DataBucket}/glue-csv-to-parquet.py ExecutionProperty: MaxConcurrentRuns: 2 GlueVersion: 2.0 MaxRetries: 0 Name: glue-csv-to-parquet Role: !Ref AWSGlueETLAirflowRole gluetransform: Type: AWS::Glue::Job DependsOn: S3CustomResource Properties: Command: Name: glueetl ScriptLocation: !Sub s3://${DataBucket}/glue-transform.py ExecutionProperty: MaxConcurrentRuns: 2 GlueVersion: 2.0 MaxRetries: 0 Name: glue-transform Role: !Ref AWSGlueETLAirflowRole Outputs: VPC: Description: A reference to the created VPC Value: !Ref VPC PublicSubnets: Description: A list of the public subnets Value: !Join [ ",", [ !Ref PublicSubnet1, !Ref PublicSubnet2 ]] PrivateSubnets: Description: A list of the private subnets Value: !Join [ ",", [ !Ref PrivateSubnet1, !Ref PrivateSubnet2 ]] PublicSubnet1: Description: A reference to the public subnet in the 1st Availability Zone Value: !Ref PublicSubnet1 PublicSubnet2: Description: A reference to the public subnet in the 2nd Availability Zone Value: !Ref PublicSubnet2 PrivateSubnet1: Description: A reference to the private subnet in the 1st Availability Zone Value: !Ref PrivateSubnet1 PrivateSubnet2: Description: A reference to the private subnet in the 2nd Availability Zone Value: !Ref PrivateSubnet2 SecurityGroupIngress: Description: Security group with self-referencing inbound rule Value: !Ref SecurityGroupIngress MwaaApacheAirflowUI: Description: MWAA Environment Value: !Sub "https://${MwaaEnvironment.WebserverUrl}" DataBucket: Value: !Ref DataBucket EnvironmentBucket: Value: !Ref EnvironmentBucket