AWSTemplateFormatVersion: 2010-09-09 Description: AWS DMS Blog Metadata: 'AWS::CloudFormation::Interface': ParameterGroups: - Label: default: VPC Network Configuration Parameters: - VpcCidrBlock - Subnet1CidrBlock - Subnet2CidrBlock - Label: default: DB Instance Parameters Parameters: - DBInstanceClass - MasterUsername - MasterUserPassword - Label: default: DMS Parameters Parameters: - ReplicationInstanceClass - Label: default: EC2 Instance Parameters Parameters: - EC2InstanceType - KeyName - IPCidrToAllowTraffic ParameterLabels: DBInstanceClass: default: DB Instance Class EC2InstanceType: default: EC2 Instance Type KeyName: default: Key Name ReplicationInstanceClass: default: Replication Instance Class MasterUsername: default: Master Username MasterUserPassword: default: Master Password Subnet1CidrBlock: default: Subnet 1 CIDR Block Subnet2CidrBlock: default: Subnet 2 CIDR Block VpcCidrBlock: default: VPC CIDR Block Parameters: VpcCidrBlock: AllowedPattern: '(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})/(\d{1,3})' Default: 10.0.0.0/16 Description: Input a IPv4 network range for a VPC in CIDR notation. Type: String Subnet1CidrBlock: AllowedPattern: '(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})/(\d{1,3})' Default: 10.0.50.0/24 Description: Input a IPv4 network range for a subnet 1 in CIDR notation. Type: String Subnet2CidrBlock: AllowedPattern: '(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})/(\d{1,3})' Default: 10.0.200.0/24 Description: Input a IPv4 network range for a subnet 2 in CIDR notation. Type: String #changed DBInstanceClass: Description: DB instance class for source and target. Type: String Default: db.t3.medium AllowedValues: - db.r5.large - db.r5.xlarge - db.r5.2xlarge - db.r5.4xlarge - db.r5.8xlarge - db.t3.small - db.t3.medium ReplicationInstanceClass: Description: Replication instance class. Type: String Default: dms.t2.medium AllowedValues: - dms.t2.micro - dms.t2.small - dms.t2.medium - dms.t2.large - dms.c4.large - dms.c4.xlarge - dms.c4.2xlarge - dms.c4.4xlarge MasterUsername: Description: A name for the master user for both source and target database. Type: String Default: master AllowedPattern: '^[0-9a-zA-Z]+$' MasterUserPassword: Default: Password1 Description: >- A password of the master user for both source and target database. A default password is 'Password1'. NoEcho: 'true' Type: String #changed EC2InstanceType: Description: An instance type for client EC2 instance Type: String Default: t3.medium AllowedValues: - t3.micro - t3.small - t3.medium - t3.large IPCidrToAllowTraffic: Description: An IP address from where to allow ssh to client instance. Type: String Default: 0.0.0.0/0 #changed KeyName: Description: Input your key pair name for client EC2 instance. Type: AWS::EC2::KeyPair::KeyName #mightfail ImageId: Description: Image ID for client EC2 instance. Do not change this parameter. Type: 'AWS::SSM::Parameter::Value' Default: '/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2' AllowedValues: - '/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2' Resources: VPC: Type: 'AWS::EC2::VPC' Properties: CidrBlock: !Ref VpcCidrBlock EnableDnsSupport: 'true' EnableDnsHostnames: 'true' InstanceTenancy: default Tags: - Key: Name Value: !Ref 'AWS::StackName' Subnet1: Type: 'AWS::EC2::Subnet' Properties: AvailabilityZone: !Select - '1' - !GetAZs Ref: 'AWS::Region' CidrBlock: !Ref Subnet1CidrBlock MapPublicIpOnLaunch: 'true' VpcId: !Ref VPC Tags: - Key: Name Value: !Ref 'AWS::StackName' Subnet2: Type: 'AWS::EC2::Subnet' Properties: AvailabilityZone: !Select - '2' - !GetAZs Ref: 'AWS::Region' CidrBlock: !Ref Subnet2CidrBlock MapPublicIpOnLaunch: 'true' VpcId: !Ref VPC Tags: - Key: Name Value: !Ref 'AWS::StackName' InternetGateway: Type: 'AWS::EC2::InternetGateway' Properties: Tags: - Key: Name Value: !Ref 'AWS::StackName' InstanceSecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: Allow http to client host VpcId: !Ref VPC SecurityGroupIngress: - IpProtocol: tcp FromPort: 22 ToPort: 22 CidrIp: !Ref IPCidrToAllowTraffic SGBaseIngress: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: !Ref InstanceSecurityGroup IpProtocol: tcp FromPort: 0 ToPort: 65535 SourceSecurityGroupId: !GetAtt InstanceSecurityGroup.GroupId VPCGatewayAttachment: Type: 'AWS::EC2::VPCGatewayAttachment' Properties: InternetGatewayId: !Ref InternetGateway VpcId: !Ref VPC PublicRouteTable: Type: 'AWS::EC2::RouteTable' Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Ref 'AWS::StackName' PublicRoute: Type: 'AWS::EC2::Route' DependsOn: VPCGatewayAttachment Properties: RouteTableId: !Ref PublicRouteTable DestinationCidrBlock: 0.0.0.0/0 GatewayId: !Ref InternetGateway SubnetRouteTableAssociation1: Type: 'AWS::EC2::SubnetRouteTableAssociation' Properties: SubnetId: !Ref Subnet1 RouteTableId: !Ref PublicRouteTable SubnetRouteTableAssociation2: Type: 'AWS::EC2::SubnetRouteTableAssociation' Properties: SubnetId: !Ref Subnet2 RouteTableId: !Ref PublicRouteTable PublicNetworkAcl: Type: 'AWS::EC2::NetworkAcl' Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Ref 'AWS::StackName' SubnetNetworkAclAssociation1: Type: 'AWS::EC2::SubnetNetworkAclAssociation' Properties: SubnetId: !Ref Subnet1 NetworkAclId: !Ref PublicNetworkAcl SubnetNetworkAclAssociation2: Type: 'AWS::EC2::SubnetNetworkAclAssociation' Properties: SubnetId: !Ref Subnet2 NetworkAclId: !Ref PublicNetworkAcl InboundPublicNetworkAclEntry: Type: 'AWS::EC2::NetworkAclEntry' Properties: NetworkAclId: !Ref PublicNetworkAcl RuleNumber: 100 Protocol: '-1' RuleAction: allow Egress: 'false' CidrBlock: 0.0.0.0/0 OutboundPublicNetworkAclEntry: Type: 'AWS::EC2::NetworkAclEntry' Properties: NetworkAclId: !Ref PublicNetworkAcl RuleNumber: 100 Protocol: '-1' RuleAction: allow Egress: 'true' CidrBlock: 0.0.0.0/0 DBSubnetGroup: Type: 'AWS::RDS::DBSubnetGroup' Properties: DBSubnetGroupDescription: !Join - '' - - 'Created from the CloudFormation stack: ' - !Ref 'AWS::StackName' - . SubnetIds: - !Ref Subnet1 - !Ref Subnet2 Tags: - Key: Name Value: !Ref 'AWS::StackName' AuroraClusterParameterGroup: Type: 'AWS::RDS::DBClusterParameterGroup' Properties: Description: Aurora Cluster Parameter Group for source database. Family: aurora5.6 Parameters: binlog_format: ROW binlog_checksum: NONE AuroraParameterGroup: Type: 'AWS::RDS::DBParameterGroup' Properties: Description: Aurora Parameter Group for source database. Family: aurora5.6 Parameters: {} AuroraCluster: Type: 'AWS::RDS::DBCluster' DeletionPolicy: Delete Properties: DatabaseName: testdb MasterUsername: !Ref MasterUsername MasterUserPassword: !Ref MasterUserPassword Engine: aurora DBSubnetGroupName: !Ref DBSubnetGroup DBClusterParameterGroupName: !Ref AuroraClusterParameterGroup VpcSecurityGroupIds: - !Ref InstanceSecurityGroup AuroraDBInstance1: Type: 'AWS::RDS::DBInstance' Properties: AllowMajorVersionUpgrade: 'false' AutoMinorVersionUpgrade: 'false' AvailabilityZone: !GetAtt Subnet1.AvailabilityZone DBInstanceClass: !Ref DBInstanceClass DBInstanceIdentifier: !Sub '${AWS::StackName}-source-1' DBSubnetGroupName: !Ref DBSubnetGroup DBParameterGroupName: !Ref AuroraParameterGroup DBClusterIdentifier: !Ref AuroraCluster Engine: aurora PubliclyAccessible: 'false' AuroraDBInstance2: Type: 'AWS::RDS::DBInstance' Properties: AllowMajorVersionUpgrade: 'false' AutoMinorVersionUpgrade: 'false' AvailabilityZone: !GetAtt Subnet2.AvailabilityZone DBInstanceClass: !Ref DBInstanceClass DBInstanceIdentifier: !Sub '${AWS::StackName}-source-2' DBSubnetGroupName: !Ref DBSubnetGroup DBParameterGroupName: !Ref AuroraParameterGroup DBClusterIdentifier: !Ref AuroraCluster Engine: aurora PubliclyAccessible: 'false' CustomClusterConfigRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: sts:AssumeRole Path: / ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Policies: - PolicyName: KafkaCreateConf PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - 'kafka:CreateConfiguration' - 'kafka:GetBootstrapBrokers' Resource: ['*'] - PolicyName: DmsTestConnection PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - 'dms:TestConnection' - 'dms:CreateEndpoint' - 'dms:DescribeConnections' Resource: ['*'] CustomClusterConfigLambda: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Runtime: python3.7 Role: !GetAtt CustomClusterConfigRole.Arn Timeout: 900 Code: ZipFile: | import boto3 import cfnresponse import json import random import string kafka = boto3.client('kafka') conf = bytes('auto.create.topics.enable = true', 'utf-8') def randomString(stringLength=10): letters = string.ascii_lowercase return ''.join(random.choice(letters) for i in range(stringLength)) def lambda_handler(event, context): print('Event is ' + json.dumps(event)) try: if event['RequestType'] == 'Create': print('Create stack call, MSK cluster config will be created') r = kafka.create_configuration( Description='MSKMMCluster1 cluster configuration', KafkaVersions=[ '2.2.1'], Name=randomString(), ServerProperties=conf ) conf_arn = r['Arn'] cfnresponse.send(event, context, cfnresponse.SUCCESS, {}, conf_arn) elif (event['RequestType'] == 'Delete') or (event['RequestType'] == 'Update'): cfnresponse.send(event, context, cfnresponse.SUCCESS, {}, '') except Exception as e: print(e) cfnresponse.send(event, context, cfnresponse.FAILED, {}, '') CustomClusterConfig: Type: Custom::ClusterConfig Properties: ServiceToken: !GetAtt CustomClusterConfigLambda.Arn MSKMMCluster1: Type: 'AWS::MSK::Cluster' Properties: BrokerNodeGroupInfo: ClientSubnets: - !Ref Subnet1 - !Ref Subnet2 InstanceType: kafka.m5.large SecurityGroups: - !Ref InstanceSecurityGroup StorageInfo: EBSStorageInfo: VolumeSize: 100 ConfigurationInfo: Arn: !Ref CustomClusterConfig Revision: 1 ClusterName: MSKMMCluster1 EncryptionInfo: EncryptionInTransit: ClientBroker: TLS_PLAINTEXT InCluster: true EnhancedMonitoring: PER_TOPIC_PER_BROKER KafkaVersion: 2.2.1 NumberOfBrokerNodes: 2 ReplicationSubnetGroup: Type: 'AWS::DMS::ReplicationSubnetGroup' Properties: #changed to sub # ReplicationSubnetGroupDescription: !Join # - '' # - - 'Created from the CloudFormation stask: ' # - !Ref 'AWS::StackName' # - . ReplicationSubnetGroupDescription: !Sub 'Created from the CloudFormation stack: ${AWS::StackName}' SubnetIds: - !Ref Subnet1 - !Ref Subnet2 ReplicationInstance: Type: 'AWS::DMS::ReplicationInstance' Properties: AllocatedStorage: 200 AutoMinorVersionUpgrade: 'false' AvailabilityZone: !GetAtt - Subnet1 - AvailabilityZone EngineVersion: 3.3.2 PubliclyAccessible: 'false' ReplicationInstanceClass: !Ref ReplicationInstanceClass #mightfail https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-dms-replicationinstance.html#cfn-dms-replicationinstance-replicationinstanceidentifier ReplicationInstanceIdentifier: !Ref 'AWS::StackName' ReplicationSubnetGroupIdentifier: !Ref ReplicationSubnetGroup VpcSecurityGroupIds: - !Ref InstanceSecurityGroup SourceEndpoint: Type: 'AWS::DMS::Endpoint' Properties: DatabaseName: testdb EndpointIdentifier: !Join - '-' - - !Ref 'AWS::StackName' - source EndpointType: source EngineName: aurora Password: !Ref MasterUserPassword Port: !GetAtt - AuroraCluster - Endpoint.Port ServerName: !GetAtt - AuroraCluster - Endpoint.Address Username: !Ref MasterUsername ClientEC2Instance: Type: 'AWS::EC2::Instance' Properties: ImageId: !Ref ImageId InstanceType: !Ref EC2InstanceType KeyName: !Ref KeyName SubnetId: !Ref Subnet1 SecurityGroupIds: - !Ref InstanceSecurityGroup Tags: - Key: Name Value: !Ref 'AWS::StackName' UserData: Fn::Base64: !Sub | #!/bin/bash sudo yum update -y sudo yum install python3.7 -y sudo yum install java-1.8.0-openjdk-devel -y sudo yum erase awscli -y sudo yum install mysql -y sudo yum install git maven -y cd /home/ec2-user echo "export PATH=.local/bin:$PATH" >> .bash_profile mkdir kafka mkdir mm cd kafka wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz tar -xzf kafka_2.12-2.2.1.tgz cd /home/ec2-user wget https://bootstrap.pypa.io/get-pip.py su -c "python3.7 get-pip.py --user" -s /bin/sh ec2-user su -c "/home/ec2-user/.local/bin/pip3 install boto3 --user" -s /bin/sh ec2-user su -c "/home/ec2-user/.local/bin/pip3 install awscli --user" -s /bin/sh ec2-user chown -R ec2-user ./kafka chgrp -R ec2-user ./kafka chown -R ec2-user ./mm chgrp -R ec2-user ./mm CustomKakfaEndpoint: Type: Custom::KafkfaEndpoint Properties: ServiceToken: !GetAtt CustomKafkaEndpointLambda.Arn KafkaCluster: !Ref MSKMMCluster1 CustomKafkaEndpointLambda: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Runtime: python3.7 Role: !GetAtt CustomClusterConfigRole.Arn Timeout: 900 Code: ZipFile: | import boto3 import cfnresponse import json kafka = boto3.client('kafka') dms = boto3.client('dms') def lambda_handler(event, context): print('Event is ' + json.dumps(event)) try: if event['RequestType'] == 'Create': c_arn = event['ResourceProperties']['KafkaCluster'] b = kafka.get_bootstrap_brokers(ClusterArn=c_arn) broker = b['BootstrapBrokerString'].split(',')[0] print('Kafka broker is: ' + broker) r = dms.create_endpoint( EndpointIdentifier='dms-blog-kafka-target', EndpointType='target', EngineName='kafka', KafkaSettings={ 'Broker': broker, 'Topic': 'dms-blog' } ) k_ep = r['Endpoint']['EndpointArn'] print('Target endpoint is: ' + k_ep) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}, k_ep) elif (event['RequestType'] == 'Delete') or (event['RequestType'] == 'Update'): cfnresponse.send(event, context, cfnresponse.SUCCESS, {}, '') except Exception as e: print(e) cfnresponse.send(event, context, cfnresponse.FAILED, {}, '') TestConnAllEndpoints: Type: Custom::TestConn Properties: ServiceToken: !GetAtt TestConnLambda.Arn ReplEndpoint: !Ref ReplicationInstance KafkaEndpoint: !Ref CustomKakfaEndpoint AuroraEndpoint: !Ref SourceEndpoint #Modified script from TestConnLambda: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Runtime: python3.7 Role: !GetAtt CustomClusterConfigRole.Arn Timeout: 900 Code: ZipFile: | import cfnresponse import json import boto3 def lambda_handler(event, context): source_endpoint = event['ResourceProperties']['AuroraEndpoint'] kafka_endpoint = event['ResourceProperties']['KafkaEndpoint'] replication_inst = event['ResourceProperties']['ReplEndpoint'] if 'Create' in event['RequestType']: print ('This is a %s event' %(event['RequestType'])) print('Checking connection for Source .....') source_result = check_connection(source_endpoint,replication_inst) print('Source result was %s' %(source_result)) if 'success' in source_result: print('Proceeding to check connection for Kafka Target ....') kafka_result = check_connection(kafka_endpoint,replication_inst) print('Kafka Target result was %s' %(kafka_result)) if 'success' in kafka_result: cfnresponse.send(event, context, cfnresponse.SUCCESS, {}, '') else: print('Kafka target connection failed') cfnresponse.send(event, context, cfnresponse.FAILED, {}, '') else: print('Source connection failed') cfnresponse.send(event, context, cfnresponse.FAILED, {}, '') else: print('Delete event nothing will be done') cfnresponse.send(event, context, cfnresponse.FAILED, {}, '') def check_connection(endpoint,rep): dms = boto3.client('dms') dms.test_connection(ReplicationInstanceArn=rep,EndpointArn=endpoint) waiter = dms.get_waiter('test_connection_succeeds') waiter.wait( Filters=[ { 'Name': 'endpoint-arn', 'Values': [endpoint] }, { 'Name': 'replication-instance-arn', 'Values':[rep] } ] ) status_conn_api = dms.describe_connections( Filters=[ { 'Name': 'endpoint-arn', 'Values': [endpoint] }, { 'Name': 'replication-instance-arn', 'Values': [rep] } ] ) stat_task = status_conn_api['Connections'][0]['Status'] print('The connection test was %s' %(stat_task)) return (stat_task) ReplicationTaskAuroraKafka: Type: 'AWS::DMS::ReplicationTask' Properties: MigrationType: full-load-and-cdc ReplicationInstanceArn: !Ref ReplicationInstance #hard coded value as the regex with stack name needs to be converted to lower case for task name ReplicationTaskIdentifier: dms-blog-aurora-kafka-task ReplicationTaskSettings: >- {"TargetMetadata":{"TargetSchema":"MASTER","SupportLobs":true,"LimitedSizeLobMode":true,"LobMaxSize":32},"sFullLoadSettings":{"TargetTablePrepMode":"DROP_AND_CREATE"},"Logging":{"EnableLogging":true,"LogComponents":[{"Id":"SOURCE_CAPTURE","Severity":"LOGGER_SEVERITY_DETAILED_DEBUG"},{"Id":"TASK_MANAGER","Severity":"LOGGER_SEVERITY_DETAILED_DEBUG"}]}} SourceEndpointArn: !Ref SourceEndpoint TableMappings: >- { "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "testdb", "table-name": "orders" }, "rule-action": "include" } ]} TargetEndpointArn: !Ref CustomKakfaEndpoint Outputs: VPCId: Description: VPCId of the newly created VPC Value: !Ref VPC Subnet1: Description: SubnetId of the public subnet Value: !Ref Subnet1 Subnet2: Description: SubnetId of the public subnet Value: !Ref Subnet2 AuroraCluster: Description: Source Aurora Cluster Value: !Ref AuroraCluster AuroraDBInstance1: Description: Target DB Instance Value: !Ref AuroraDBInstance1 AuroraDBInstance2: Description: Target DB Instance Value: !Ref AuroraDBInstance2 ReplicationInstance: Description: Replication Instance Value: !Ref ReplicationInstance MSKMMCluster1: Description: The Arn for the MSKMMCluster1 MSK cluster Value: !Ref MSKMMCluster1 ReplicationTaskAuroraKafka: Description: Replication Task Value: !Ref ReplicationTaskAuroraKafka InstanceSecurityGroup: Description: Security Group Value: !Ref InstanceSecurityGroup