AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: EventBridge Pipe that connects DynamoDB Streams to Lambda with Enrichment Resources: # DynamoDB Table Creation with Stream Enabled DynamoDBSampleTable: Type: AWS::DynamoDB::Table Properties: TableName: SampleTable AttributeDefinitions: - AttributeName: PK AttributeType: S - AttributeName: SK AttributeType: S KeySchema: - AttributeName: PK KeyType: HASH - AttributeName: SK KeyType: RANGE ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 StreamSpecification: StreamViewType: NEW_IMAGE # Pipes target (Lambda function) Target: Type: AWS::Serverless::Function Properties: FunctionName: !Sub ${AWS::StackName}-target-lambda CodeUri: src/ Handler: lambda_function.lambda_handler Runtime: python3.9 MemorySize: 128 Timeout: 15 Architectures: - arm64 # Enrichment log group EnrichmentLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Join ['/', ['stepfunctions', StateMachine]] # Enrichment with Step Functions EnrichmentStateMachine: Type: AWS::Serverless::StateMachine Properties: Type: EXPRESS DefinitionUri: workflow/stepfunction.asl.json Role: !GetAtt EnrichmentStateMachineRole.Arn Logging: Destinations: - CloudWatchLogsLogGroup: LogGroupArn: !GetAtt EnrichmentLogGroup.Arn IncludeExecutionData: true Level: 'ALL' #Execution Role for StepFunctions EnrichmentStateMachineRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - Action: - sts:AssumeRole Policies: - PolicyName: CloudWatchLogs PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - 'logs:CreateLogDelivery' - 'logs:GetLogDelivery' - 'logs:UpdateLogDelivery' - 'logs:DeleteLogDelivery' - 'logs:ListLogDeliveries' - 'logs:PutResourcePolicy' - 'logs:DescribeResourcePolicies' - 'logs:DescribeLogGroups' Resource: '*' # Role + Permissions for Pipes PipeRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - Action: - sts:AssumeRole Policies: - PolicyName: SourcePolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - "dynamodb:DescribeStream" - "dynamodb:GetRecords" - "dynamodb:GetShardIterator" - "dynamodb:ListStreams" Resource: !GetAtt DynamoDBSampleTable.StreamArn - PolicyName: TargetPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 'lambda:InvokeFunction' Resource: !GetAtt Target.Arn - PolicyName: EnrichmentPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 'states:StartExecution' - 'states:StartSyncExecution' Resource: !GetAtt EnrichmentStateMachine.Arn Pipe: Type: AWS::Pipes::Pipe Properties: Name: ddb-to-lambda-enrichment Description: "Pipe to connect DynamoDB Stream to AWS Lambda with filtering and enrichment" RoleArn: !GetAtt PipeRole.Arn Source: !GetAtt DynamoDBSampleTable.StreamArn SourceParameters: FilterCriteria: Filters: - Pattern: '{ "eventName": ["INSERT"], "dynamodb": { "NewImage": { "messageId": { "S": [{ "exists": true }] }, "PK": { "S": [{ "prefix": "Message#" }] }, "SK": { "S": [{ "prefix": "Channel#" }] } } } }' DynamoDBStreamParameters: StartingPosition: LATEST BatchSize: 1 Enrichment: !GetAtt EnrichmentStateMachine.Arn EnrichmentParameters: InputTemplate: '{ "PK": <$.dynamodb.NewImage.PK.S>, "SK": <$.dynamodb.NewImage.SK.S> }' Target: !GetAtt Target.Arn