AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: Implementing a Transactional Outbox Pattern with DynamoDB Streams, EventBridge Pipes and SQS Globals: Function: MemorySize: 128 Timeout: 3 Parameters: StageName: Type: String Default: prod Description: Name of API stage. Resources: APIGatewayRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Action: - 'sts:AssumeRole' Effect: Allow Principal: Service: - apigateway.amazonaws.com Policies: - PolicyName: APIGatewayDynamoDBPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 'dynamodb:PutItem' Resource: !Sub - '${varTableArn}' - varTableArn: !GetAtt DynamoDBOrderTable.Arn OrderApi: Type: AWS::Serverless::Api Properties: Name: "order-api" StageName: !Sub '${StageName}' TracingEnabled: true AccessLogSetting: DestinationArn: !GetAtt ApiGatewayLogGroup.Arn Format: > {"requestId":"$context.requestId", "integration-error":"$context.integration.error", "integration-status":"$context.integration.status", "integration-latency":"$context.integration.latency", "integration-requestId":"$context.integration.requestId", "integration-integrationStatus":"$context.integration.integrationStatus", "response-latency":"$context.responseLatency", "status":"$context.status"} MethodSettings: - DataTraceEnabled: true HttpMethod: "*" LoggingLevel: INFO ResourcePath: "/*" ThrottlingBurstLimit: 5000 ThrottlingRateLimit: 10000 DefinitionBody: swagger: "2.0" info: title: "Order API" version: "1.0" paths: /createId: post: responses: '200': description: OK headers: Access-Control-Allow-Origin: type: string examples: application/json: {} '500': description: Internal Server Error headers: Access-Control-Allow-Origin: type: string examples: application/json: {} x-amazon-apigateway-integration: httpMethod: POST type: "aws" uri: !Sub "arn:aws:apigateway:${AWS::Region}:dynamodb:action/PutItem" credentials: !GetAtt APIGatewayRole.Arn passthroughBehavior: "when_no_match" requestTemplates: application/json: | { "TableName": "Order", "Item": { "id": { "S": "$context.requestId" }, "createdAt": { "S": "$context.requestTime" }, "Status": { "S": $input.json('$.Status') } } } responses: default: statusCode: "200" responseTemplates: application/json: | #set($inputRoot = $input.path('$')) { "statusCode": 200, "message": "Your order has been processed" } '500': statusCode: "500" responseTemplates: application/json: | #set($inputRoot = $input.path('$')) { "statusCode": 500, "message": "An error occurred while processing your request" } DependsOn: APIGatewayRole ApiGatewayLogGroup: Type: AWS::Logs::LogGroup UpdateReplacePolicy: Delete DeletionPolicy: Delete Properties: RetentionInDays: 30 # DynamoDB Stream DynamoDBOrderTable: Type: AWS::DynamoDB::Table Properties: TableName: Order AttributeDefinitions: - AttributeName: id AttributeType: S - AttributeName: createdAt AttributeType: S KeySchema: - AttributeName: id KeyType: HASH - AttributeName: createdAt KeyType: RANGE ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 StreamSpecification: ## Listen for KEYS_ONLY, NEW_IMAGE, OLD_IMAGE, or NEW_AND_OLD_IMAGES (https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_StreamSpecification.html) StreamViewType: NEW_IMAGE AWSLambdaFunctionCreate: Type: AWS::Serverless::Function Properties: CodeUri: src/ Handler: order_created_publisher_one.lambda_handler Runtime: python3.10 Tracing: Active Architectures: - arm64 Events: SQSEvent: Type: SQS Properties: Queue: !GetAtt SQSQueueCreated.Arn AWSLambdaFunctionCreateLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub "/aws/lambda/${AWSLambdaFunctionCreate}" RetentionInDays: 30 AWSLambdaFunctionUpdate: Type: AWS::Serverless::Function Properties: CodeUri: src/ Handler: order_created_publisher_two.lambda_handler Runtime: python3.10 Tracing: Active Architectures: - arm64 Events: SQSEvent: Type: SQS Properties: Queue: !GetAtt SQSQueueUpdated.Arn AWSLambdaFunctionUpdateLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub "/aws/lambda/${AWSLambdaFunctionUpdate}" RetentionInDays: 30 QueueEventBridgePolicy: Type: AWS::SQS::QueuePolicy Properties: PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: { Service: events.amazonaws.com } Action: SQS:SendMessage Resource: - !GetAtt SQSQueueCreated.Arn - !GetAtt SQSQueueUpdated.Arn Queues: - !Ref SQSQueueCreated - !Ref SQSQueueUpdated LambdaRoleCreated: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: SQSPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - logs:CreateLogGroup Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:* - PolicyName: CreateLogStream PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: - !GetAtt SQSQueueCreated.Arn - PolicyName: SourcePolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - "sqs:DeleteMessage" - "sqs:GetQueueAttributes" - "sqs:ReceiveMessage" Resource: - !GetAtt SQSQueueCreated.Arn - PolicyName: X-RayPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - "xray:PutTraceSegments" - "xray:PutTelemetryRecords" Resource: - "*" LambdaRoleUpdated: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: SQSPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - logs:CreateLogGroup Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:* - PolicyName: CreateLogStream PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: - !GetAtt SQSQueueUpdated.Arn - PolicyName: SourcePolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - "sqs:DeleteMessage" - "sqs:GetQueueAttributes" - "sqs:ReceiveMessage" Resource: - !GetAtt SQSQueueUpdated.Arn - PolicyName: X-RayPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - "xray:PutTraceSegments" - "xray:PutTelemetryRecords" Resource: - "*" # Event Bus (Target) ApplicationEventBus: Type: AWS::Events::EventBus Properties: Name: OrderEventBus # DLQ for Stream (Source) PipeDLQueue: Type: AWS::SQS::Queue PipeRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - pipes.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: SourcePolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - "dynamodb:DescribeStream" - "dynamodb:GetRecords" - "dynamodb:GetShardIterator" - "dynamodb:ListStreams" Resource: !GetAtt DynamoDBOrderTable.StreamArn - PolicyName: TargetPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 'events:PutEvents' Resource: !GetAtt ApplicationEventBus.Arn - PolicyName: PipesSQSDLQPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: SQS:SendMessage Resource: - !GetAtt PipeDLQueue.Arn #EventBridge Rule EventRule: Type: AWS::Events::Rule Properties: EventBusName: !Ref ApplicationEventBus EventPattern: detail: dynamodb: NewImage: Status: S: - prefix: CREATED Name: !Sub ${AWS::StackName}-sqs State: ENABLED Targets: - Id: SQSTargetCreated Arn: !GetAtt SQSQueueCreated.Arn RetryPolicy: MaximumRetryAttempts: 4 MaximumEventAgeInSeconds: 400 DeadLetterConfig: Arn: !GetAtt EventBridgeDeadLetterQueue.Arn - Id: SQSTargetUpdated Arn: !GetAtt SQSQueueUpdated.Arn RetryPolicy: MaximumRetryAttempts: 4 MaximumEventAgeInSeconds: 400 DeadLetterConfig: Arn: !GetAtt EventBridgeDeadLetterQueue.Arn #SQS Target SQSQueueCreated: Type: AWS::SQS::Queue Properties: RedrivePolicy: deadLetterTargetArn: Fn::GetAtt: - "SQSDeadLetterQueueCreated" - "Arn" maxReceiveCount: 5 SQSQueueUpdated: Type: AWS::SQS::Queue Properties: RedrivePolicy: deadLetterTargetArn: Fn::GetAtt: - "SQSDeadLetterQueueUpdated" - "Arn" maxReceiveCount: 5 #SQS Dead Letter Queue SQSDeadLetterQueueCreated: Type: AWS::SQS::Queue SQSDeadLetterQueueUpdated: Type: AWS::SQS::Queue #EventBridge Rule Dead Letter Queue EventBridgeDeadLetterQueue: Type: AWS::SQS::Queue # EventBridge Pipe Pipe: Type: AWS::Pipes::Pipe Properties: Name: ddb-to-eventbridge Description: "Pipe to connect DDB stream to EventBridge event bus" RoleArn: !GetAtt PipeRole.Arn Source: !GetAtt DynamoDBOrderTable.StreamArn SourceParameters: DynamoDBStreamParameters: StartingPosition: LATEST BatchSize: 1 DeadLetterConfig: Arn: !GetAtt PipeDLQueue.Arn Target: !GetAtt ApplicationEventBus.Arn TargetParameters: EventBridgeEventBusParameters: DetailType: "OrderDetailsChanged" Source: "order.event" Outputs: ApiRootUrl: Description: Root Url of the API Value: !Sub - 'https://${ApiId}.execute-api.${AWS::Region}.amazonaws.com/${StageName}/createId' - ApiId: !Ref OrderApi DynamoDBTableName: Description: DynamoDB table name Value: !Ref DynamoDBOrderTable