AWSTemplateFormatVersion: 2010-09-09 Parameters: DBEngine: Description: Choose the source DB engine to stream changes. Type: String AllowedValues: - mysql - oracle - postgres - mariadb - aurora - aurora-postgresql - db2 - sybase - sqlserver DBServer: Description: Hostname of the source DB. Type: String DBPort: Description: Port of the source DB. Type: Number DBUser: Description: Username for the source DB. Type: String DBPassword: Description: Password for the source DB. Type: String NoEcho: true DBName: Description: Name of the source DB. Type: String DMSReplicationInstanceArn: Description: Enter the ARN for the DMS Replication Instance which has been configured with network connectivity to your Source DB. Type: String DMSTableFilter: Description: Choose the DMS table filter criteria. Type: String Default: "%" AllowedPattern: ^[a-zA-Z0-9-_%]*$ ConstraintDescription: Must only contain upper/lower-case letters, numbers, -, _, or %. DMSSchemaFilter: Description: Choose the DMS schema filter criteria. Type: String Default: "%" AllowedPattern: ^[a-zA-Z0-9-_%]*$ ConstraintDescription: Must only contain upper/lower-case letters, numbers, -, or _. CreateDMSBucket: Description: Select Yes to create a new bucket and No to use an existing bucket Type: String Default: 'Yes' AllowedValues: - 'Yes' - 'No' DMSBucket: Description: Name of the Bucket to land the DMS data. Type: String DMSFolder: Description: (Optional) Prefix within bucket to store DMS data. Please omit the trailing / Type: String CreateLakeBucket: Description: Select Yes to create a new bucket and No to use an existing bucket Type: String Default: 'Yes' AllowedValues: - 'Yes' - 'No' LakeBucket: Description: Name of the Bucket to land the processed data. Type: String LakeFolder: Description: (Optional) Prefix within bucket to store processed data. Please omit the trailing / Type: String LakeDBName: Description: Name to be used in analytical tools such as Athena and Redshift to reference these tables within the data lake. Type: String CDCSchedule: Description: "CDC Execution Schedule Cron Expression. Default is every hour on the hour. For more info: https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/ScheduledEvents.html" Type: String Default: cron(0 0-23 * * ? *) CrawlerSchedule: Description: "Crawler Execution Schedule Cron Expression. Default is daily at 12:30 AM GMT: https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/ScheduledEvents.html" Type: String Default: cron(30 0 * * ? *) Conditions: IsDMSFolderEmpty: !Equals [!Ref "DMSFolder", ""] IsLakeFolderEmpty: !Equals [!Ref "LakeFolder", ""] CreateDMSBucket: !Equals - !Ref CreateDMSBucket - 'Yes' CreateLakeBucket: !Equals - !Ref CreateLakeBucket - 'Yes' Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: "DMS Source Database Configuration" Parameters: - DBEngine - DBServer - DBPort - DBUser - DBPassword - DBName - Label: default: "DMS Task Configuration" Parameters: - DMSReplicationInstanceArn - DMSTableFilter - DMSSchemaFilter - CreateDMSBucket - DMSBucket - DMSFolder - Label: default: "Data Lake Configuration" Parameters: - CreateLakeBucket - LakeBucket - LakeFolder - LakeDBName - CDCSchedule - CrawlerSchedule Resources: DMSSourceEndpoint: Type: AWS::DMS::Endpoint Properties: EndpointType: source EngineName: Ref: DBEngine ServerName: Ref: DBServer Port: Ref: DBPort Username: Ref: DBUser Password: Ref: DBPassword DatabaseName: Ref: DBName DMSRawBucket: Condition: CreateDMSBucket Type: AWS::S3::Bucket Properties: BucketName: Ref: DMSBucket DMSTargetEndpoint: Type: AWS::DMS::Endpoint Properties: EndpointType: target EngineName: s3 ExtraConnectionAttributes : "dataFormat=parquet;includeOpForFullLoad=true;;parquetTimestampInMillisecond=true;" S3Settings : BucketFolder: Ref: DMSFolder BucketName: Ref: DMSBucket ServiceAccessRoleArn: !Sub 'arn:aws:iam::${AWS::AccountId}:role/DMSCDC_Execution_Role' DMSReplicationTask: Type: AWS::DMS::ReplicationTask Properties: MigrationType: full-load-and-cdc ReplicationInstanceArn: Ref: DMSReplicationInstanceArn SourceEndpointArn: Ref: DMSSourceEndpoint TableMappings: Fn::Sub: - "{\"rules\": [{\"rule-type\": \"selection\", \"rule-id\": \"1\", \"rule-action\": \"include\", \"object-locator\": {\"schema-name\": \"${schema}\", \"table-name\": \"${tables}\"}, \"rule-name\": \"1\"}]}" - {schema: !Ref DMSSchemaFilter, tables: !Ref DMSTableFilter } TargetEndpointArn: Ref: DMSTargetEndpoint ReplicationTaskSettings: "{\"Logging\": {\"EnableLogging\": true }}" DataLakeBucket: Condition: CreateDMSBucket Type: 'AWS::S3::Bucket' Properties: BucketName: Ref: LakeBucket Trigger: Type: AWS::Glue::Trigger Properties: Type: SCHEDULED Name: !Sub - "DMSCDC-Trigger-${dbname}" - {dbname: !Ref LakeDBName} Schedule: Ref: CDCSchedule Actions: - JobName: DMSCDC_Controller Arguments: '--bucket': !Ref DMSBucket '--prefix': Fn::If: - IsDMSFolderEmpty - !Ref DMSFolder - !Sub - '${prefix}/' - prefix: !Ref DMSFolder '--out_bucket': !Ref LakeBucket '--out_prefix': Fn::If: - IsLakeFolderEmpty - !Ref LakeFolder - !Sub - '${prefix}/' - prefix: !Ref LakeFolder GlueCrawler: Type: AWS::Glue::Crawler Properties: Role: !Sub 'arn:aws:iam::${AWS::AccountId}:role/DMSCDC_Execution_Role' Schedule: ScheduleExpression: Ref: CrawlerSchedule DatabaseName: Ref: LakeDBName Targets: S3Targets: - Path: Fn::Join : - "/" - - Ref : LakeBucket - Fn::If: - IsLakeFolderEmpty - !Ref LakeFolder - !Sub - '${prefix}/' - prefix: !Ref LakeFolder InitDMS: Type: Custom::InitDMS DependsOn: - DMSReplicationTask Properties: ServiceToken: !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:DMSCDC_InitDMS' taskArn: !Ref DMSReplicationTask sourceArn: !Ref DMSSourceEndpoint targetArn: !Ref DMSTargetEndpoint instanceArn: !Ref DMSReplicationInstanceArn dmsBucket: !Ref DMSBucket InitController: Type: Custom::InitController DependsOn: - InitDMS - Trigger Properties: ServiceToken: !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:DMSCDC_InitController' dmsBucket: !Ref DMSBucket dmsPath: !Sub - "${prefix}${schema}/" - { prefix: !Ref DMSFolder,schema: !Ref DMSSchemaFilter} lakeBucket: !Ref LakeBucket lakePath: Fn::If: - IsLakeFolderEmpty - !Ref LakeFolder - !Sub - '${prefix}/' - prefix: !Ref LakeFolder lakeDBName: !Ref LakeDBName