AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: Template to connect Kinesis streams together with filtering Resources: #Kinesis Data Stream Source: Type: AWS::Kinesis::Stream Properties: ShardCount: 1 Name: !Sub ${AWS::StackName}-source RetentionPeriodHours: 168 # Kinesis stream for orders (only for new customers) NewCustomerOrdersTarget: Type: AWS::Kinesis::Stream Properties: ShardCount: 1 Name: !Sub ${AWS::StackName}-new-customers-target RetentionPeriodHours: 168 # Kinesis stream for orders (only for existing customers) ExistingCustomerOrdersTarget: Type: AWS::Kinesis::Stream Properties: ShardCount: 1 Name: !Sub ${AWS::StackName}-existing-customers-target RetentionPeriodHours: 168 PipeRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - pipes.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: !Sub ${AWS::StackName}-source-policy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 'kinesis:DescribeStream' - 'kinesis:DescribeStreamSummary' - 'kinesis:GetRecords' - 'kinesis:GetShardIterator' - 'kinesis:ListStreams' - 'kinesis:ListShards' Resource: !GetAtt Source.Arn - PolicyName: !Sub ${AWS::StackName}-target-policy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 'kinesis:PutRecord' - 'kinesis:PutRecords' Resource: - !GetAtt NewCustomerOrdersTarget.Arn - !GetAtt ExistingCustomerOrdersTarget.Arn # Pipe to connect General orders kinesis stream to new customers only kinesis stream NewCustomersPipe: Type: AWS::Pipes::Pipe Properties: Name: !Sub ${AWS::StackName}-new-customer RoleArn: !GetAtt PipeRole.Arn Source: !GetAtt Source.Arn SourceParameters: KinesisStreamParameters: BatchSize: 1 StartingPosition: LATEST FilterCriteria: Filters: - Pattern: '{"data":{"type":["NEW_CUSTOMER"]}}' Target: !GetAtt NewCustomerOrdersTarget.Arn TargetParameters: KinesisStreamParameters: PartitionKey: '1' # Pipe to connect General orders kinesis stream to existing customers only kinesis stream ExistingCustomersPipe: Type: AWS::Pipes::Pipe Properties: Name: !Sub ${AWS::StackName}-existing-customer RoleArn: !GetAtt PipeRole.Arn Source: !GetAtt Source.Arn SourceParameters: KinesisStreamParameters: BatchSize: 1 StartingPosition: LATEST FilterCriteria: Filters: - Pattern: '{"data":{"type":["EXISTING_CUSTOMER"]}}' Target: !GetAtt ExistingCustomerOrdersTarget.Arn TargetParameters: KinesisStreamParameters: PartitionKey: '1' Outputs: KinesisSource: Description: 'Source Kinesis Stream' Value: !Ref Source NewCustomerOrdersTarget: Description: 'NewCustomerOrdersTarget Kinesis Stream' Value: !Ref NewCustomerOrdersTarget ExistingCustomerOrdersTarget: Description: 'ExistingCustomerOrdersTarget Kinesis Stream' Value: !Ref ExistingCustomerOrdersTarget