AWSTemplateFormatVersion: '2010-09-09' Description: Central template for Data Engineering Immersion Day # Note - if running this manually in Isengard, you can specify arbitrary values # for all of the parameters *except* the module Id and version which are needed # to pull assets properly from S3, *and* you should leave the SNS topic blank # so that, based on Conditions in this template, CloudFormation will avoid # subscribing resources to a non-existent SNS topic. Parameters: EEAPIBase: Description: "The Event Engine API Base URL in the form https://hostname:port/api" Type: String EEAPIToken: Description: "The API Token that the module uses for authentication" Type: String EEEventId: Description: "ID of the specific Event" Type: String EELifecycleTopicArn: Description: "ARN of the Event Engine Module lifecycle SNS topic" Type: String EEModuleId: Description: "Your module ID" Type: String EEModuleVersion: Description: "Your Module Version" Type: String EEAssetsBucket: Description: "Artifacts Bucket" Type: String LatestAmiId: Type: 'AWS::SSM::Parameter::Value' Default: '/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2' SourceBucket: Description: S3 bucket which contains the source object Default: aws-dataengineering-day.workshop.aws Type: String SourceKey: Description: S3 Key which contains the source object Default: dmslambda_v1.zip Type: String Conditions: # Used to optionally exclude the subscription of Lambda functions (or other # resources) to the lifecycle SNS topic if its not provided in the template # to avoid deployment errors. This is specifically aimed at making it easier # to deploy your template in Isengard (outside of EE) for test/debug. LifecycleTopicIsNotEmpty: Fn::Not: - !Equals [!Ref EELifecycleTopicArn, ""] Resources: DMSLabCFNS3Bucket: Type: 'AWS::S3::Bucket' LambdaCopySourceRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: LambdaCopySourceRolePolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: s3:GetObject Resource: !Sub arn:aws:s3:::${SourceBucket}/* - Effect: Allow Action: - s3:PutObject - s3:DeleteObject Resource: !Sub arn:aws:s3:::${DMSLabCFNS3Bucket}/* - Effect: Allow Action: logs:CreateLogGroup Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:* - Effect: Allow Action: - logs:CreateLogStream - logs:PutLogEvents Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:* LambdaCopySource: Type: AWS::Lambda::Function Properties: Handler: index.handler Role: !GetAtt LambdaCopySourceRole.Arn Code: ZipFile: !Sub | import os import json import cfnresponse import boto3 from botocore.exceptions import ClientError s3 = boto3.client('s3') import logging logger = logging.getLogger() logger.setLevel(logging.INFO) def handler(event, context): logger.info("Received event: %s" % json.dumps(event)) source_bucket = '${SourceBucket}' source_prefix = '${SourceKey}' bucket = '${DMSLabCFNS3Bucket}' prefix = '${SourceKey}' result = cfnresponse.SUCCESS try: if event['RequestType'] == 'Create' or event['RequestType'] == 'Update': copy_source = {'Bucket': source_bucket, 'Key': source_prefix} s3.copy(copy_source, bucket, prefix) elif event['RequestType'] == 'Delete': s3.delete_object(Bucket=bucket, Key=prefix) except ClientError as e: logger.error('Error: %s', e) result = cfnresponse.FAILED cfnresponse.send(event, context, result, {}) Runtime: python3.8 Timeout: 60 CopySourceObject: Type: "Custom::CopySourceObject" Properties: ServiceToken: !GetAtt LambdaCopySource.Arn DMSInstructorVPC: Type: AWS::EC2::VPC Properties: CidrBlock: 10.0.0.0/16 EnableDnsSupport: true EnableDnsHostnames: true Tags: - Key: Name Value: DMSLabSourceDB RDSSubNet: Type: AWS::EC2::Subnet Properties: CidrBlock: 10.0.1.0/24 AvailabilityZone: !Select - 0 - Fn::GetAZs: !Ref 'AWS::Region' VpcId: !Ref DMSInstructorVPC Tags: - Key: Name Value: DMSLabRDS1 EC2SubNet: Type: AWS::EC2::Subnet Properties: CidrBlock: 10.0.0.0/24 AvailabilityZone: !Select - 0 - Fn::GetAZs: !Ref 'AWS::Region' VpcId: !Ref DMSInstructorVPC Tags: - Key: Name Value: DMSLabEC2 RDSSubNet2: Type: AWS::EC2::Subnet Properties: CidrBlock: 10.0.2.0/24 AvailabilityZone: !Select - 1 - Fn::GetAZs: !Ref 'AWS::Region' VpcId: !Ref DMSInstructorVPC Tags: - Key: Name Value: DMSLabRDS2 DMSLabIGW: Type: AWS::EC2::InternetGateway Properties: Tags: - Key: Name Value: DMSLabIGW DMSLabDHCPOptions: Type: AWS::EC2::DHCPOptions Properties: DomainName: ec2.internal DomainNameServers: - AmazonProvidedDNS DMSLabRouteTable: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref DMSInstructorVPC Tags: - Key: Name Value: DMSLabRT DMSLabInstance: Type: AWS::EC2::Instance Properties: DisableApiTermination: false InstanceInitiatedShutdownBehavior: stop EbsOptimized: true ImageId: !Ref LatestAmiId InstanceType: t3.2xlarge UserData: Fn::Base64: !Sub | Content-Type: multipart/mixed; boundary="//" MIME-Version: 1.0 --// Content-Type: text/cloud-config; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Content-Disposition: attachment; filename="cloud-config.txt" #cloud-config cloud_final_modules: - [scripts-user, always] --// Content-Type: text/x-shellscript; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Content-Disposition: attachment; filename="userdata.txt" #!/bin/bash -xe yum install -y postgresql yum install -y git yum update -y cd /home/ec2-user DIR="aws-database-migration-samples" if [ ! -d "$DIR" ]; then git clone https://github.com/aws-samples/aws-database-migration-samples.git fi cd aws-database-migration-samples/PostgreSQL/sampledb/v1/ export PGPASSWORD=admin123 export ENDPOINT=${RDSDMSLabDB.Endpoint.Address} nohup psql --host=${!ENDPOINT} --port=5432 --dbname=sportstickets --username=adminuser -f install-postgresql.sql Monitoring: false Tags: - Key: Name Value: DMSLabEC2 NetworkInterfaces: - DeleteOnTermination: true Description: 'Primary network interface' DeviceIndex: '0' SubnetId: !Ref EC2SubNet PrivateIpAddresses: - PrivateIpAddress: 10.0.0.40 Primary: true GroupSet: - !Ref DMSLabSG AssociatePublicIpAddress: true RDSDMSLabDB: Type: AWS::RDS::DBInstance DependsOn: InternetRoute Properties: AllocatedStorage: '500' AllowMajorVersionUpgrade: false AutoMinorVersionUpgrade: true DBInstanceClass: db.t2.2xlarge DBInstanceIdentifier: dmslabinstance Port: '5432' PubliclyAccessible: true StorageType: gp2 BackupRetentionPeriod: 7 MasterUsername: adminuser MasterUserPassword: admin123 PreferredBackupWindow: 04:00-04:30 PreferredMaintenanceWindow: sun:05:20-sun:05:50 DBName: sportstickets Engine: postgres EngineVersion: '11.20' LicenseModel: postgresql-license DBSubnetGroupName: !Ref DMSLabDefaultDBSubnet DBParameterGroupName: !Ref DMSLabDBPG VPCSecurityGroups: - !Ref DMSLabDBSG Tags: - Key: workload-type Value: other DMSLabDefaultDBSubnet: Type: AWS::RDS::DBSubnetGroup Properties: DBSubnetGroupDescription: Created from the RDS Management Console SubnetIds: - !Ref RDSSubNet - !Ref EC2SubNet - !Ref RDSSubNet2 DMSLabDBPG: Type: AWS::RDS::DBParameterGroup Properties: Description: Parameter for Ticket RedshiftDatabaseName Family: postgres11 Parameters: rds.logical_replication: '1' wal_sender_timeout: '0' max_wal_senders: '20' max_replication_slots: '50' DMSLabSG: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: EC2 Security Group VpcId: !Ref DMSInstructorVPC DMSLabDBSG: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: RDS Security Group VpcId: !Ref DMSInstructorVPC Tags: - Key: Name Value: DMSLabRDS-SG DMSLabGWAttachement: Type: AWS::EC2::VPCGatewayAttachment Properties: VpcId: !Ref DMSInstructorVPC InternetGatewayId: !Ref DMSLabIGW SubnetRoute1: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref DMSLabRouteTable SubnetId: !Ref RDSSubNet2 SubnetRoute2: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref DMSLabRouteTable SubnetId: !Ref RDSSubNet SubnetRoute3: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref DMSLabRouteTable SubnetId: !Ref EC2SubNet InternetRoute: Type: AWS::EC2::Route Properties: DestinationCidrBlock: 0.0.0.0/0 RouteTableId: !Ref DMSLabRouteTable GatewayId: !Ref DMSLabIGW DependsOn: DMSLabGWAttachement DHCPAssociation: Type: AWS::EC2::VPCDHCPOptionsAssociation Properties: VpcId: !Ref DMSInstructorVPC DhcpOptionsId: !Ref DMSLabDHCPOptions SGIngress1: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: !Ref DMSLabSG IpProtocol: tcp FromPort: 22 ToPort: 22 CidrIp: 0.0.0.0/0 SGIngress2: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: !Ref DMSLabDBSG IpProtocol: tcp FromPort: 5432 ToPort: 5432 CidrIp: 0.0.0.0/0 SGEgress1: Type: AWS::EC2::SecurityGroupEgress Properties: GroupId: !Ref DMSLabSG IpProtocol: '-1' CidrIp: 0.0.0.0/0 SGEgress2: Type: AWS::EC2::SecurityGroupEgress Properties: GroupId: !Ref DMSLabDBSG IpProtocol: '-1' CidrIp: 0.0.0.0/0 GenerateCDCData: DependsOn: CopySourceObject Type: AWS::Lambda::Function Properties: Code: S3Bucket: !Ref DMSLabCFNS3Bucket S3Key: !Ref SourceKey Description: Function to generate CDC data FunctionName: GenerateCDCData Handler: index.handler MemorySize: 128 Runtime: nodejs18.x Timeout: 300 Environment: Variables: HOST: !GetAtt RDSDMSLabDB.Endpoint.Address Role: !GetAtt LambdaExecutionRole.Arn VpcConfig: SecurityGroupIds: - !GetAtt DMSLabSG.GroupId SubnetIds: - !Ref EC2SubNet LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: DMSlabRDSAccess PolicyDocument: Version: 2012-10-17 Statement: - Sid: RDSAccess Effect: Allow Action: - rds:* - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents - ec2:CreateNetworkInterface - ec2:DescribeNetworkInterfaces - ec2:DeleteNetworkInterface Resource: "*" LambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: LambdaRolePolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: s3:DeleteObject Resource: !Sub arn:aws:s3:::${DMSLabCFNS3Bucket}/* - Effect: Allow Action: s3:ListBucket Resource: !Sub arn:aws:s3:::${DMSLabCFNS3Bucket} - Effect: Allow Action: logs:CreateLogGroup Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:* - Effect: Allow Action: - logs:CreateLogStream - logs:PutLogEvents Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:* S3BucketHandler: Type: AWS::Lambda::Function Properties: Handler: index.handler Role: !GetAtt LambdaRole.Arn Code: ZipFile: | import os import json import cfnresponse import boto3 from botocore.exceptions import ClientError s3 = boto3.resource('s3') def handler(event, context): print("Received event: %s" % json.dumps(event)) s3_bucket = s3.Bucket(event['ResourceProperties']['Bucket']) try: if event['RequestType'] == 'Create' or event['RequestType'] == 'Update': result = cfnresponse.SUCCESS elif event['RequestType'] == 'Delete': s3_bucket.objects.delete() result = cfnresponse.SUCCESS except ClientError as e: print('Error: %s', e) result = cfnresponse.FAILED cfnresponse.send(event, context, result, {}) Runtime: python3.8 Timeout: 300 EmptyDMSLabCFNS3Bucket: Type: "Custom::EmptyDMSLabCFNS3Bucket" Properties: ServiceToken: !GetAtt S3BucketHandler.Arn Bucket: !Ref DMSLabCFNS3Bucket #------------------------------------------------------------------------------# # FUNCTION TO HANDLE EVENT LIFECYCLE EVENTS RECEIVED ON SNS TOPIC #------------------------------------------------------------------------------# HandleMasterLifecycleEventsFunction: Type: AWS::Lambda::Function Properties: Code: ZipFile: | import json import boto3 import os import traceback client = boto3.client('lambda') send_team_output_to_event_engine_function = os.environ['SEND_TEAM_OUTPUT_TO_EVENT_ENGINE_FUNCTION'] print('Loading function') def handler(event, context): try: print(event) message = json.loads(event['Records'][0]['Sns']['Message']) event_type = message['event'] event_id = message['game-id'] module_id = message['module-id'] team_id = message['team-id'] cfn_outputs = {} if 'cfn-outputs' in message: cfn_outputs = message['cfn-outputs'] if module_id == '3fccddd609114925bf8094186f402676': cfn_outputs['DBEndpoint'] = os.environ['DB_ENDPOINT'] cfn_outputs['DBUsername'] = os.environ['DB_USER'] cfn_outputs['DBPassword'] = os.environ['DB_PASSWORD'] cfn_outputs['DBPort'] = os.environ['DB_PORT'] cfn_outputs['DBName'] = os.environ['DB_NAME'] if event_type == 'MODULE_DEPLOYING': print('Module {} for event {} for team {} is deploying...' .format(module_id, event_id, team_id) ) print('Nothing to do!') elif event_type == 'MODULE_DEPLOYED': print('Module {} for event {} for team {} is deployed...' .format(module_id, event_id, team_id) ) if (len(cfn_outputs) == 0): print('Team module has no CloudFormation outputs to send to EE') else: send_initial_team_outputs_to_ee(team_id, cfn_outputs) if event_type == 'MODULE_UNDEPLOYING': print('Module {} for event {} for team {} is undeploying...' .format(module_id, event_id, team_id) ) print('Nothing to do!') if event_type == 'MODULE_UNDEPLOYED': print('Module {} for event {} for team {} is undeployed...' .format(module_id, event_id, team_id) ) print('Nothing to do!') else: print('Module {} for event {} for team {} has unhandled event {}...' .format(module_id, event_id, team_id, event_type) ) return { "status": 200, "message": "Done!" } except Exception as e: msg = "Unexpected error: {}".format(e) print(msg) traceback.print_exc() return { "status": 500, "message": msg } def send_initial_team_outputs_to_ee(team_id, cfn_outputs): print ('Sending team CloudFormation Outputs to Event Engine...') for key, value in cfn_outputs.items(): payload = { "team_id": team_id, "output_key": key, "output_value": value } print('Invoking Lambda {} with payload:\n'.format(send_team_output_to_event_engine_function) + json.dumps(payload, indent=2) ) response = client.invoke( FunctionName=send_team_output_to_event_engine_function, InvocationType='RequestResponse', Payload=json.dumps(payload), ) response_payload = json.loads(response['Payload'].read()) print('Invoke response status {}:\n{}'.format( response['StatusCode'], json.dumps(response_payload,indent=2) )) Description: Function receives lifecycle events from event engine and handles them as needed. Environment: Variables: API_BASE: !Ref EEAPIBase API_TOKEN: !Ref EEAPIToken EVENT_REGION: !Ref AWS::Region EVENT_ID: !Ref EEEventId MODULE_ID: !Ref EEModuleId SEND_TEAM_OUTPUT_TO_EVENT_ENGINE_FUNCTION: !Ref SendTeamOutputToEventEngineFunction DB_ENDPOINT: !GetAtt RDSDMSLabDB.Endpoint.Address DB_PORT: 5432 DB_USER: adminuser DB_PASSWORD: admin123 DB_NAME: sportstickets Handler: index.handler Role: !GetAtt HandleMasterLifecycleEventsRole.Arn Runtime: python3.7 Timeout: 120 HandleMasterLifecycleEventsSnsSubscription: Type: AWS::SNS::Subscription Condition: LifecycleTopicIsNotEmpty Properties: Endpoint: !GetAtt HandleMasterLifecycleEventsFunction.Arn Protocol: lambda TopicArn: !Ref EELifecycleTopicArn HandleMasterLifecycleEventsPermissionForSns: Type: AWS::Lambda::Permission Condition: LifecycleTopicIsNotEmpty Properties: Action: lambda:InvokeFunction Principal: sns.amazonaws.com SourceArn: !Ref EELifecycleTopicArn FunctionName: !Ref HandleMasterLifecycleEventsFunction HandleMasterLifecycleEventsRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: root PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:* Resource: arn:aws:logs:*:*:* - Effect: Allow Action: - lambda:invokeFunction Resource: arn:aws:lambda:*:*:* #------------------------------------------------------------------------------# # FUNCTION TO SEND OUTPUTS TO EVENT ENGINE VIA AN API CALL #------------------------------------------------------------------------------# SendTeamOutputToEventEngineFunction: Type: AWS::Lambda::Function Properties: Code: S3Bucket: !Ref EEAssetsBucket S3Key: !Sub "modules/${EEModuleId}/v${EEModuleVersion}/lambda/send-team-output-to-event-engine.zip" Description: Sends team outputs to Event Engine. Environment: Variables: API_BASE: !Ref EEAPIBase API_TOKEN: !Ref EEAPIToken EVENT_REGION: !Ref AWS::Region EVENT_ID: !Ref EEEventId MODULE_ID: !Ref EEModuleId Handler: index.handler Role: !GetAtt SendOutputToEventEngineRole.Arn Runtime: python3.7 Timeout: 120 SendOutputToEventEngineRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: root PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:* Resource: arn:aws:logs:*:*:* Outputs: RDSInstanceEndpoint: Description: RDS database server name Value: !GetAtt RDSDMSLabDB.Endpoint.Address