AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: > Kinesis Data Streams - Cross Region Replicator Function Metadata: AWS::ServerlessRepo::Application: Name: Kinesis-Data-Streams-Cross-Region-Replicator Description: "Creates Enhanced-Fan-Out Lambda Consumer to replicate KDS data across regions" Labels: - Kinesis-Data-Streams - Enhanced-Fan-Out - Lambda - Serverless Parameters: InputStreamName: Type: String Description: Input Kinesis Stream SourceKMSKeyId: Type: String Description: Source Region KMS KeyId. Refer https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-kinesis-stream-streamencryption.html Default: alias/aws/kinesis CheckpointTableName: Type: String Description: Enter the DDB Checkpoint Table Name Default: kdsReplicationCheckpoint ActiveRegionConfigTableName: Type: String Description: Enter the DDB Active Region Config Table Name Default: kdsActiveRegionConfig TargetStreamReplicationRegion: Type: String Description: Enter the KDS Stream Target replication region # More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst Globals: Function: Timeout: 20 Resources: LambdaRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "lambda.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole Policies: - PolicyName: DDBAccess PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - dynamodb:BatchGetItem - dynamodb:BatchWriteItem - dynamodb:PutItem - dynamodb:GetItem - dynamodb:Scan - dynamodb:Query - dynamodb:UpdateItem Resource: - !Sub 'arn:aws:dynamodb:*:${AWS::AccountId}:table/${CheckpointTableName}' - !Sub 'arn:aws:dynamodb:*:${AWS::AccountId}:table/${ActiveRegionConfigTableName}' - PolicyName: KDSWrites PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - kinesis:PutRecord Resource: - !Sub 'arn:aws:kinesis:*:${AWS::AccountId}:stream/${InputStreamName}' - PolicyName: CloudWatchMetrics PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - cloudwatch:PutMetricData Resource: "*" KinesisReplicatorFunction: Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction Properties: CodeUri: KinesisReplicatorFunction Handler: consumer.Handler::handleRequest PackageType: Zip Role: !GetAtt LambdaRole.Arn Runtime: java11 MemorySize: 512 Timeout: 900 Environment: # More info about Env Vars: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#environment-object Variables: DDB_CHECKPOINT_TABLE_NAME: !Ref CheckpointTableName DDB_ACTIVE_REGION_CONFIG_TABLE_NAME: !Ref ActiveRegionConfigTableName TARGET_STREAM_REPLICATION_REGION: !Ref TargetStreamReplicationRegion KinesislambdaEventSourceMapping: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 8000 BisectBatchOnFunctionError: false Enabled: true EventSourceArn: !GetAtt KinesisStreamConsumer.ConsumerARN FunctionName: !GetAtt KinesisReplicatorFunction.Arn FunctionResponseTypes: - ReportBatchItemFailures StartingPosition: LATEST KinesisStream: Type: AWS::Kinesis::Stream Properties: Name: !Ref InputStreamName ShardCount: 1 StreamEncryption: EncryptionType: KMS KeyId: !Ref SourceKMSKeyId KinesisStreamConsumer: Type: AWS::Kinesis::StreamConsumer Properties: StreamARN: !GetAtt KinesisStream.Arn ConsumerName: !Sub "${AWS::StackName}" Outputs: FunctionName: Description: "Function name" Value: !Ref KinesisReplicatorFunction StreamARN: Description: "Stream ARN" Value: !GetAtt KinesisStream.Arn ConsumerARN: Description: "Stream consumer ARN" Value: !GetAtt KinesisStreamConsumer.ConsumerARN