AWSTemplateFormatVersion: 2010-09-09 Description: >- This CloudFormation sample template migrates Market data from S3 to Kinesis using DMS. This Template requires an existing source s3 bucket with full load of market data. Parameters: VpcCIDR: Description: Please enter the IP range (CIDR notation) for this VPC Type: String Default: PublicSubnet1CIDR: Description: Please enter the IP range (CIDR notation) for the public subnet in the first Availability Zone Type: String Default: PublicSubnet2CIDR: Description: Please enter the IP range (CIDR notation) for the public subnet in the second Availability Zone Type: String Default: MarketDataS3Bucket: Type: String Description: S3 Bucket where the market data will reside. Metadata: 'AWS::CloudFormation::Interface': ParameterGroups: - Label: default: Configuration Parameters: - VpcCIDR - PublicSubnet1CIDR - PublicSubnet2CIDR - MarketDataS3Bucket ParameterLabels: VpcCIDR: default: Provide VPC CIDR Range PublicSubnet1CIDR: default: Provide Public Subnet 1 CIDR Range PublicSubnet2CIDR: default: Provide Public Subnet 2 CIDR Range MarketDataS3Bucket: default: Name of the S3 Bucket where market Data resides Resources: VPC: Type: AWS::EC2::VPC Properties: CidrBlock: !Ref VpcCIDR EnableDnsHostnames: true Tags: - Key: Name Value: Kinesis Algo Trading InternetGateway: Type: AWS::EC2::InternetGateway Properties: Tags: - Key: Name Value: Kinesis Algo Trading InternetGatewayAttachment: Type: AWS::EC2::VPCGatewayAttachment Properties: InternetGatewayId: !Ref InternetGateway VpcId: !Ref VPC PublicSubnet1: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [0, !GetAZs ""] CidrBlock: !Ref PublicSubnet1CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: Kinesis Algo Trading Public Subnet (AZ1) PublicSubnet2: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [1, !GetAZs ""] CidrBlock: !Ref PublicSubnet2CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: Kinesis Algo Trading Public Subnet (AZ2) PublicRouteTable: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: Kinesis Algo Trading Public Routes DefaultPublicRoute: Type: AWS::EC2::Route DependsOn: InternetGatewayAttachment Properties: RouteTableId: !Ref PublicRouteTable DestinationCidrBlock: GatewayId: !Ref InternetGateway PublicSubnet1RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PublicRouteTable SubnetId: !Ref PublicSubnet1 PublicSubnet2RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PublicRouteTable SubnetId: !Ref PublicSubnet2 KinesisDMSCloudwatchRole: Type: 'AWS::IAM::Role' Properties: RoleName: kinesis-dms-cloudwatch-logs-role AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - Action: - 'sts:AssumeRole' ManagedPolicyArns: - 'arn:aws:iam::aws:policy/service-role/AmazonDMSCloudWatchLogsRole' Path: / KinesisDMSVpcRole: Type: 'AWS::IAM::Role' Properties: RoleName: kinesis-dms-vpc-role AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - Action: - 'sts:AssumeRole' ManagedPolicyArns: - 'arn:aws:iam::aws:policy/service-role/AmazonDMSVPCManagementRole' Path: / DMSReplicationSubnetGroup: Type: 'AWS::DMS::ReplicationSubnetGroup' Properties: ReplicationSubnetGroupDescription: Subnets available for DMS SubnetIds: - !Ref PublicSubnet1 - !Ref PublicSubnet2 DependsOn: - KinesisDMSVpcRole - KinesisDMSCloudwatchRole KinesisStream: Type: 'AWS::Kinesis::Stream' Properties: Name: 'kinesis-algo-blog' RetentionPeriodHours: 8760 ShardCount: 1 StreamEncryption: EncryptionType: KMS KeyId: alias/aws/kinesis S3SourceDMSRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - Action: - 'sts:AssumeRole' Path: / Policies: - PolicyName: S3AccessForDMSPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 's3:GetObject' Resource: - !Join - '' - - 'arn:' - !Ref AWS::Partition - ':s3:::' - !Ref MarketDataS3Bucket - !Join - '' - - 'arn:' - !Ref AWS::Partition - ':s3:::' - !Ref MarketDataS3Bucket - '/*' - Effect: Allow Action: 's3:ListBucket' Resource: - !Join - '' - - 'arn:' - !Ref AWS::Partition - ':s3:::' - !Ref MarketDataS3Bucket DMSSecurityGroup: Type: 'AWS::EC2::SecurityGroup' Properties: GroupDescription: Security group for DMS Instance VpcId: !Ref VPC TargetKinesisRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - Action: - 'sts:AssumeRole' Path: / Policies: - PolicyName: KinesisAccessForDMSPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 'kinesis:DescribeStream' - 'kinesis:PutRecord' - 'kinesis:PutRecords' Resource: - !GetAtt - KinesisStream - Arn DMSReplicationInstance: Type: 'AWS::DMS::ReplicationInstance' Properties: PubliclyAccessible: true ReplicationInstanceClass: dms.r4.xlarge ReplicationSubnetGroupIdentifier: !Ref DMSReplicationSubnetGroup VpcSecurityGroupIds: - !Ref DMSSecurityGroup S3SourceEndpoint: Type: 'AWS::DMS::Endpoint' Properties: EndpointType: source EngineName: "s3" S3Settings: BucketName: !Ref MarketDataS3Bucket ExternalTableDefinition: >- {"TableCount": "1", "Tables": [{"TableName":"intc","TablePath":"marketData/intc/","TableOwner":"marketData","TableColumns":[{"ColumnName": "dt","ColumnType":"TIMESTAMP","ColumnNullable": "false","ColumnIsPk":"true"},{"ColumnName": "sym","ColumnType": "STRING","ColumnLength": "10"},{"ColumnName": "open","ColumnType": "NUMERIC","ColumnPrecision": "5","ColumnScale": "2"},{"ColumnName": "high","ColumnType": "NUMERIC","ColumnPrecision": "5","ColumnScale": "2"},{"ColumnName": "low","ColumnType": "NUMERIC","ColumnPrecision": "5","ColumnScale": "2"}, {"ColumnName": "close","ColumnType": "NUMERIC","ColumnPrecision": "5","ColumnScale": "2"},{"ColumnName": "vol","ColumnType": "NUMERIC","ColumnPrecision": "12","ColumnScale": "2"}],"TableColumnsTotal": "7"}]} ServiceAccessRoleArn: !GetAtt - S3SourceDMSRole - Arn DependsOn: - DMSReplicationInstance KinesisTargetEndpoint: Type: 'AWS::DMS::Endpoint' Properties: EndpointType: target EngineName: "kinesis" KinesisSettings: MessageFormat: json StreamArn: !GetAtt - KinesisStream - Arn ServiceAccessRoleArn: !GetAtt - TargetKinesisRole - Arn DependsOn: - DMSReplicationInstance DMSReplicationTask: Type: 'AWS::DMS::ReplicationTask' Properties: MigrationType: full-load ReplicationInstanceArn: !Ref DMSReplicationInstance ReplicationTaskSettings: >- { "Logging" : { "EnableLogging" : true, "LogComponents": [ { "Id" : "SOURCE_UNLOAD", "Severity" : "LOGGER_SEVERITY_DEFAULT" }, { "Id" : "SOURCE_CAPTURE", "Severity" : "LOGGER_SEVERITY_DEFAULT" }, { "Id" : "TARGET_LOAD", "Severity" : "LOGGER_SEVERITY_DEFAULT" }, { "Id" : "TARGET_APPLY", "Severity" : "LOGGER_SEVERITY_DEFAULT" } ] } } SourceEndpointArn: !Ref S3SourceEndpoint TableMappings: >- { "rules": [ { "rule-type" : "selection", "rule-id" : "1", "rule-name" : "1", "object-locator" : { "schema-name" : "%", "table-name" : "%" }, "rule-action" : "include" } ] } TargetEndpointArn: !Ref KinesisTargetEndpoint AlgorithmicTradingInstance: Type: AWS::SageMaker::NotebookInstance Properties: InstanceType: ml.t3.medium DefaultCodeRepository: RoleArn: !GetAtt 'SageMakerExecutionRole.Arn' SageMakerExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - Action: - sts:AssumeRole Path: / ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonSageMakerFullAccess - arn:aws:iam::aws:policy/AWSCloudFormationReadOnlyAccess - arn:aws:iam::aws:policy/AmazonKinesisReadOnlyAccess - arn:aws:iam::aws:policy/AmazonECS_FullAccess - arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryFullAccess - !Ref 'S3Policy' S3Policy: Type: AWS::IAM::ManagedPolicy Properties: Description: S3 Permission Path: / PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - s3:GetObject - s3:PutObject - s3:DeleteObject - s3:ListBucket Resource: - !Sub - arn:aws:s3:::${S3Bucket}/* - S3Bucket: !Ref 'MarketDataS3Bucket' Outputs: 01StackName: Value: !Ref 'AWS::StackName' 02RegionName: Value: !Ref 'AWS::Region' 03TargetKinesisStream: Value: !Ref KinesisStream 04DMSReplicationInstance: Value: !Ref DMSReplicationInstance 05SourceEndpoint: Value: !Ref S3SourceEndpoint 06TargetEndpoint: Value: !Ref KinesisTargetEndpoint 07DMSReplicationTask: Value: !Ref DMSReplicationTask 08KinesisStreamName: Value: !Ref KinesisStream