AWSTemplateFormatVersion: 2010-09-09 Parameters: SecurityGroup: Description: Sec Group for Replication Instance with rules to allow it to connect to your source DB. Type: AWS::EC2::SecurityGroup::Id Subnet1: Description: Subnet-1 for Replication Instance with network path to source DB. Type: AWS::EC2::Subnet::Id Subnet2: Description: Subnet-2 for Replication Instance with network path to source DB. Type: AWS::EC2::Subnet::Id Resources: LambdaRoleToInitDMS: Type: AWS::IAM::Role Properties : AssumeRolePolicyDocument: Version : 2012-10-17 Statement : - Effect : Allow Principal : Service : - lambda.amazonaws.com Action : - sts:AssumeRole Path : / Policies: - PolicyName: LambdaCloudFormationPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - s3:* Resource: - !Sub "arn:aws:s3:::cloudformation-custom-resource-response-${AWS::Region}" - !Sub "arn:aws:s3:::cloudformation-waitcondition-${AWS::Region}" - !Sub "arn:aws:s3:::cloudformation-custom-resource-response-${AWS::Region}/*" - !Sub "arn:aws:s3:::cloudformation-waitcondition-${AWS::Region}/*" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole - arn:aws:iam::aws:policy/AmazonS3FullAccess - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess - arn:aws:iam::aws:policy/AmazonRDSDataFullAccess - arn:aws:iam::aws:policy/IAMFullAccess - arn:aws:iam::aws:policy/AmazonRedshiftFullAccess LambdaFunctionDMSRoles: Type: "AWS::Lambda::Function" Properties: Timeout: 30 Code: ZipFile: | import json import boto3 import cfnresponse import logging import time client = boto3.client('iam') logging.basicConfig() logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) def handler(event, context): logger.info(json.dumps(event)) if event['RequestType'] == 'Delete': cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Delete complete'}) else: try: response = client.get_role(RoleName='dms-cloudwatch-logs-role') except: try: role_policy_document = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "dms.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] } client.create_role( RoleName='dms-cloudwatch-logs-role', AssumeRolePolicyDocument=json.dumps(role_policy_document) ) client.attach_role_policy( RoleName='dms-cloudwatch-logs-role', PolicyArn='arn:aws:iam::aws:policy/service-role/AmazonDMSCloudWatchLogsRole' ) except Exception as e: logger.error(e) cfnresponse.send(event, context, cfnresponse.FAILED, {'Data': 'Create failed'}) try: response = client.get_role(RoleName='dms-vpc-role') except: try: role_policy_document = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "dms.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] } client.create_role( RoleName='dms-vpc-role', AssumeRolePolicyDocument=json.dumps(role_policy_document) ) client.attach_role_policy( RoleName='dms-vpc-role', PolicyArn='arn:aws:iam::aws:policy/service-role/AmazonDMSVPCManagementRole' ) time.sleep(30) except Exception as e: logger.error(e) cfnresponse.send(event, context, cfnresponse.FAILED, {'Data': 'Create failed'}) cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Create complete'}) Handler: index.handler Role: Fn::GetAtt: [LambdaRoleToInitDMS, Arn] Runtime: python3.7 InitDMSRoles: Type: Custom::InitDMSRoles DependsOn: - LambdaFunctionDMSRoles Properties: ServiceToken: !GetAtt 'LambdaFunctionDMSRoles.Arn' DMSSubnetGroup: Type: AWS::DMS::ReplicationSubnetGroup DependsOn: - InitDMSRoles Properties: ReplicationSubnetGroupDescription: Subnet group for DMSCDC pipeline. SubnetIds: - !Ref Subnet1 - !Ref Subnet2 DMSReplicationInstance: Type: AWS::DMS::ReplicationInstance DependsOn: - InitDMSRoles Properties: ReplicationInstanceIdentifier: DMSCDC-ReplicationInstance ReplicationInstanceClass: dms.c4.large ReplicationSubnetGroupIdentifier: !Ref DMSSubnetGroup VpcSecurityGroupIds: - !Ref SecurityGroup ExecutionRole: Type: AWS::IAM::Role Properties : RoleName: DMSCDC_Execution_Role AssumeRolePolicyDocument: Version : 2012-10-17 Statement : - Effect : Allow Principal : Service : - lambda.amazonaws.com - glue.amazonaws.com - dms.amazonaws.com Action : - sts:AssumeRole Path : / ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole - arn:aws:iam::aws:policy/AmazonS3FullAccess - arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole Policies : - PolicyName: DMSCDCExecutionPolicy PolicyDocument : Version: 2012-10-17 Statement: - Effect: Allow Action: - lambda:InvokeFunction - dynamodb:PutItem - dynamodb:CreateTable - dynamodb:UpdateItem - dynamodb:UpdateTable - dynamodb:GetItem - dynamodb:DescribeTable - iam:GetRole - iam:PassRole - dms:StartReplicationTask - dms:TestConnection - dms:StopReplicationTask Resource: - !Sub 'arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/DMSCDC_Controller' - !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:DMSCDC_InitController' - !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:DMSCDC_InitDMS' - !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:DMSCDC_ImportGlueScript' - !Sub 'arn:aws:iam::${AWS::AccountId}:role/DMSCDC_ExecutionRole' - !Sub 'arn:aws:dms:${AWS::Region}:${AWS::AccountId}:endpoint:*' - !Sub 'arn:aws:dms:${AWS::Region}:${AWS::AccountId}:task:*' - Effect: Allow Action: - dms:DescribeConnections - dms:DescribeReplicationTasks Resource: "*" LambdaFunctionImportGlueScript: Type: "AWS::Lambda::Function" Properties: FunctionName: DMSCDC_ImportGlueScript Timeout: 30 Code: ZipFile: | import json import boto3 import cfnresponse import logging logging.basicConfig() logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) def handler(event, context): logger.info(json.dumps(event)) s3 = boto3.client('s3') glueBucket = event['ResourceProperties']['glueBucket'] if event['RequestType'] == 'Delete': try: s3.delete_object(Bucket=glueBucket, Key='DMSCDC_LoadIncremental.py') s3.delete_object(Bucket=glueBucket, Key='DMSCDC_LoadIncremental.py.temp') s3.delete_object(Bucket=glueBucket, Key='DMSCDC_LoadInitial.py') s3.delete_object(Bucket=glueBucket, Key='DMSCDC_LoadInitial.py.temp') s3.delete_object(Bucket=glueBucket, Key='DMSCDC_ProcessTable.py') s3.delete_object(Bucket=glueBucket, Key='DMSCDC_ProcessTable.py.temp') s3.delete_object(Bucket=glueBucket, Key='DMSCDC_Controller.py') s3.delete_object(Bucket=glueBucket, Key='DMSCDC_Controller.py.temp') except Exception as e: logger.info(e.message) logger.info('Delete Complete') cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Delete Complete'}) else: try: s3.delete_object(Bucket=glueBucket, Key='DMSCDC_LoadIncremental.py') except Exception as e: logger.info(e.message) try: s3.delete_object(Bucket=glueBucket, Key='DMSCDC_LoadInitial.py') except Exception as e: logger.info(e.message) try: s3.delete_object(Bucket=glueBucket, Key='DMSCDC_Controller.py') except Exception as e: logger.info(e.message) try: s3.delete_object(Bucket=glueBucket, Key='DMSCDC_ProcessTable.py') except Exception as e: logger.info(e.message) try: s3.copy_object(Bucket=glueBucket, CopySource="dmscdc-files/DMSCDC_LoadIncremental.py", Key="DMSCDC_LoadIncremental.py") s3.copy_object(Bucket=glueBucket, CopySource="dmscdc-files/DMSCDC_LoadInitial.py", Key="DMSCDC_LoadInitial.py") s3.copy_object(Bucket=glueBucket, CopySource="dmscdc-files/DMSCDC_Controller.py", Key="DMSCDC_Controller.py") s3.copy_object(Bucket=glueBucket, CopySource="dmscdc-files/DMSCDC_ProcessTable.py", Key="DMSCDC_ProcessTable.py") logger.info('Copy Complete') cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Copy complete'}) except Exception as e: message = 'S3 Issue: ' + str(e) logger.error(message) cfnresponse.send(event, context, cfnresponse.FAILED, {'Data': message}) Handler: index.handler Role: Fn::GetAtt: [ExecutionRole, Arn] Runtime: python3.7 DependsOn: - ExecutionRole ImportScript: Type: Custom::CopyScript DependsOn: - LambdaFunctionImportGlueScript - GlueBucket Properties: ServiceToken: Fn::GetAtt : [LambdaFunctionImportGlueScript, Arn] glueBucket: Ref: GlueBucket LambdaFunctionInitController: Type: "AWS::Lambda::Function" Properties: FunctionName: DMSCDC_InitController Timeout: 600 Code: ZipFile: | import json import boto3 import cfnresponse import time import logging logging.basicConfig() logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) glue = boto3.client('glue') def testGlueJob(jobId, count, sec): i = 0 while i < count: response = glue.get_job_run(JobName='DMSCDC_Controller', RunId=jobId) status = response['JobRun']['JobRunState'] if (status == 'SUCCEEDED' or status == 'STARTING' or status == 'STOPPING'): return 1 elif status == 'RUNNING' : time.sleep(sec) i+=1 else: return 0 if i == count: return 0 def handler(event, context): logger.info(json.dumps(event)) if event['RequestType'] != 'Create': logger.info('No action necessary') cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'NA'}) else: try: dmsBucket = event['ResourceProperties']['dmsBucket'] dmsPath = event['ResourceProperties']['dmsPath'] lakeBucket = event['ResourceProperties']['lakeBucket'] lakePath = event['ResourceProperties']['lakePath'] lakeDBName = event['ResourceProperties']['lakeDBName'] response = glue.start_job_run( JobName='DMSCDC_Controller', Arguments={ '--bucket':dmsBucket, '--prefix':dmsPath, '--out_bucket':lakeBucket, '--out_prefix':lakePath}) if testGlueJob(response['JobRunId'], 20, 30) != 1: message = 'Error during Controller execution' logger.info(message) cfnresponse.send(event, context, cfnresponse.FAILED, {'Data': message}) else: try: message = 'Load Complete: https://console.aws.amazon.com/dynamodb/home?#tables:selected=DMSCDC_Controller;tab=items' response = glue.start_trigger(Name='DMSCDC-Trigger-'+lakeDBName) logger.info(message) cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': message}) except: message = 'Load Complete, but trigger failed to initialize' logger.info(message) cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': message}) except Exception as e: message = 'Glue Job Issue: ' + str(e) logger.info(message) cfnresponse.send(event, context, cfnresponse.FAILED, {'Data': message}) Handler: index.handler Role: Fn::GetAtt: [ExecutionRole, Arn] Runtime: python3.7 DependsOn: - ExecutionRole LambdaFunctionInitDMS: Type: "AWS::Lambda::Function" Properties: FunctionName: DMSCDC_InitDMS Timeout: 600 Code: ZipFile: | import json import boto3 import cfnresponse import time import logging logging.basicConfig() logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) dms = boto3.client('dms') s3 = boto3.resource('s3') def testConnection(endpointArn, count): i = 0 while i < count: connections = dms.describe_connections(Filters=[{'Name':'endpoint-arn','Values':[endpointArn]}]) for connection in connections['Connections']: if connection['Status'] == 'successful': return 1 elif connection['Status'] == 'testing': time.sleep(30) i += 1 elif connection['Status'] == 'failed': return 0 if i == count: return 0 def testRepTask(taskArn, count): i = 0 while i < count: replicationTasks = dms.describe_replication_tasks(Filters=[{'Name':'replication-task-arn','Values':[taskArn]}]) for replicationTask in replicationTasks['ReplicationTasks']: if replicationTask['Status'] == 'running': return 1 elif replicationTask['Status'] == 'starting': time.sleep(30) i += 1 else: return 0 if i == count: return 0 def handler(event, context): logger.info(json.dumps(event)) taskArn = event['ResourceProperties']['taskArn'] sourceArn = event['ResourceProperties']['sourceArn'] targetArn = event['ResourceProperties']['targetArn'] instanceArn = event['ResourceProperties']['instanceArn'] dmsBucket = event['ResourceProperties']['dmsBucket'] if event['RequestType'] == 'Delete': try: dms.stop_replication_task(ReplicationTaskArn=taskArn) time.sleep(30) s3.Bucket(dmsBucket).objects.all().delete() logger.info('Task Stopped, Bucket Emptied') except Exception as e: message = 'Issue with Stopping Task or Emptying Bucket:' + str(e) logger.error(message) cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'NA'}) else: taskArn = event['ResourceProperties']['taskArn'] sourceArn = event['ResourceProperties']['sourceArn'] targetArn = event['ResourceProperties']['targetArn'] instanceArn = event['ResourceProperties']['instanceArn'] try: if testConnection(sourceArn, 1) != 1: dms.test_connection( ReplicationInstanceArn=instanceArn, EndpointArn=sourceArn ) if testConnection(sourceArn,10) != 1: message = 'Source Endpoint Connection Failure' cfnresponse.send(event, context, cfnresponse.FAILED, {'Data': message}) return {'message': message} if testConnection(targetArn, 1) != 1: dms.test_connection( ReplicationInstanceArn=instanceArn, EndpointArn=targetArn ) if testConnection(targetArn,10) != 1: message = 'Target Endpoint Connection Failure' logger.error(message) cfnresponse.send(event, context, cfnresponse.FAILED, {'Data': message}) except Exception as e: message = 'DMS Endpoint Issue: ' + str(e) logger.error(message) cfnresponse.send(event, context, cfnresponse.FAILED, {'Data': message}) try: if testRepTask(taskArn, 1) != 1: dms.start_replication_task( ReplicationTaskArn=taskArn, StartReplicationTaskType='start-replication') if testRepTask(taskArn, 10) != 1: message = 'Error during DMS execution' logger.error(message) cfnresponse.send(event, context, cfnresponse.FAILED, {'Data': message}) else: message = 'DMS Execution Successful' logger.info(message) cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': message}) except Exception as e: message = 'DMS Task Issue: ' + str(e) logger.error(message) cfnresponse.send(event, context, cfnresponse.FAILED, {'Data': message}) Handler: index.handler Role: Fn::GetAtt: [ExecutionRole, Arn] Runtime: python3.7 DependsOn: - ExecutionRole GlueBucket: Type: AWS::S3::Bucket GlueJobLoadIncremental: Type: AWS::Glue::Job Properties: Name: DMSCDC_LoadIncremental Role: Fn::GetAtt: [ExecutionRole, Arn] ExecutionProperty: MaxConcurrentRuns: 50 AllocatedCapacity: 2 GlueVersion: 3.0 Command: Name: glueetl ScriptLocation: !Sub - s3://${bucket}/DMSCDC_LoadIncremental.py - {bucket: !Ref GlueBucket} DefaultArguments: "--job-bookmark-option" : "job-bookmark-disable" "--TempDir" : !Sub - s3://${bucket} - {bucket: !Ref GlueBucket} "--enable-metrics" : "" DependsOn: - ExecutionRole GlueJobLoadInitial: Type: AWS::Glue::Job Properties: Name: DMSCDC_LoadInitial Role: Fn::GetAtt: [ExecutionRole, Arn] ExecutionProperty: MaxConcurrentRuns: 50 AllocatedCapacity: 2 GlueVersion: 3.0 Command: Name: glueetl ScriptLocation: !Sub - s3://${bucket}/DMSCDC_LoadInitial.py - {bucket: !Ref GlueBucket} DefaultArguments: "--job-bookmark-option" : "job-bookmark-disable" "--TempDir" : !Sub - s3://${bucket} - {bucket: !Ref GlueBucket} "--enable-metrics" : "" DependsOn: - ExecutionRole GlueJobController: Type: AWS::Glue::Job Properties: Name: DMSCDC_Controller Role: Fn::GetAtt: [ExecutionRole, Arn] ExecutionProperty: MaxConcurrentRuns: 50 Command: Name: pythonshell PythonVersion: 3 ScriptLocation: !Sub - s3://${bucket}/DMSCDC_Controller.py - {bucket: !Ref GlueBucket} DefaultArguments: "--job-bookmark-option" : "job-bookmark-disable" "--TempDir" : !Sub - s3://${bucket} - {bucket: !Ref GlueBucket} "--enable-metrics" : "" DependsOn: - ExecutionRole GlueJobProcessTable: Type: AWS::Glue::Job Properties: Name: DMSCDC_ProcessTable Role: Fn::GetAtt: [ExecutionRole, Arn] ExecutionProperty: MaxConcurrentRuns: 50 Command: Name: pythonshell PythonVersion: 3 ScriptLocation: !Sub - s3://${bucket}/DMSCDC_ProcessTable.py - {bucket: !Ref GlueBucket} DefaultArguments: "--job-bookmark-option" : "job-bookmark-disable" "--TempDir" : !Sub - s3://${bucket} - {bucket: !Ref GlueBucket} "--enable-metrics" : "" DependsOn: - ExecutionRole