# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 AWSTemplateFormatVersion: "2010-09-09" Metadata: Generator: "lucas.rettenmeier" Description: "CloudFormation template for a stateful, serverless aggregation pipeline in the AWS cloud." Resources: # Kinesis Datastream KinesisStream: Type: "AWS::Kinesis::Stream" Properties: Name: "IncomingDataStream" RetentionPeriodHours: 24 ShardCount: 20 StreamEncryption: EncryptionType: "KMS" KeyId: "alias/aws/kinesis" # DynamoDB Tables StateTable: Type: "AWS::DynamoDB::Table" Properties: AttributeDefinitions: - AttributeName: "id" AttributeType: "S" BillingMode: "PAY_PER_REQUEST" TableName: "StateTable" KeySchema: - AttributeName: "id" KeyType: "HASH" StreamSpecification: StreamViewType: "NEW_AND_OLD_IMAGES" ReduceTable: Type: "AWS::DynamoDB::Table" Properties: AttributeDefinitions: - AttributeName: "MessageId" AttributeType: "S" BillingMode: "PAY_PER_REQUEST" TableName: "ReduceTable" KeySchema: - AttributeName: "MessageId" KeyType: "HASH" AggregateTable: Type: "AWS::DynamoDB::Table" Properties: AttributeDefinitions: - AttributeName: "Identifier" AttributeType: "S" BillingMode: "PAY_PER_REQUEST" TableName: "AggregateTable" KeySchema: - AttributeName: "Identifier" KeyType: "HASH" ParameterTable: Type: "AWS::DynamoDB::Table" Properties: AttributeDefinitions: - AttributeName: "parameter" AttributeType: "S" BillingMode: "PAY_PER_REQUEST" TableName: "ParameterTable" KeySchema: - AttributeName: "parameter" KeyType: "HASH" # Lambda Functions StateLambda: Type: "AWS::Lambda::Function" DependsOn: StateLambdaPolicy Properties: Code: S3Bucket: "amazon-dynamodb-labs.com" S3Key: "assets/StateLambdaPackage.zip" Description: "" FunctionName: "StateLambda" Handler: "lambda_function.lambda_handler" MemorySize: 256 Role: !Sub "arn:aws:iam::${AWS::AccountId}:role/StateLambdaRole" Runtime: "python3.8" Timeout: 180 TracingConfig: Mode: "PassThrough" MapLambda: Type: "AWS::Lambda::Function" DependsOn: MapLambdaPolicy Properties: Code: S3Bucket: "amazon-dynamodb-labs.com" S3Key: "assets/MapLambdaPackage.zip" Description: "" FunctionName: "MapLambda" Handler: "lambda_function.lambda_handler" MemorySize: 256 Role: !Sub "arn:aws:iam::${AWS::AccountId}:role/MapLambdaRole" Runtime: "python3.8" Timeout: 60 TracingConfig: Mode: "PassThrough" ReduceLambda: Type: "AWS::Lambda::Function" DependsOn: ReduceLambdaPolicy Properties: Code: S3Bucket: "amazon-dynamodb-labs.com" S3Key: "assets/ReduceLambdaPackage.zip" Description: "" FunctionName: "ReduceLambda" Handler: "lambda_function.lambda_handler" MemorySize: 256 Role: !Sub "arn:aws:iam::${AWS::AccountId}:role/ReduceLambdaRole" Runtime: "python3.8" Timeout: 30 TracingConfig: Mode: "PassThrough" GeneratorLambda: Type: "AWS::Lambda::Function" DependsOn: GeneratorLambdaPolicy Properties: Code: S3Bucket: "amazon-dynamodb-labs.com" S3Key: "assets/GeneratorLambdaPackage.zip" Description: "" FunctionName: "GeneratorLambda" Handler: "lambda_function.lambda_handler" MemorySize: 256 Role: !Sub "arn:aws:iam::${AWS::AccountId}:role/GeneratorLambdaRole" Runtime: "python3.8" Timeout: 55 TracingConfig: Mode: "PassThrough" # IAM Roles and Policies StateLambdaRole: Type: "AWS::IAM::Role" Properties: Path: "/" RoleName: "StateLambdaRole" AssumeRolePolicyDocument: "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"lambda.amazonaws.com\"},\"Action\":\"sts:AssumeRole\"}]}" MaxSessionDuration: 3600 Description: "Role with specifically needed permissions for StateLambda." StateLambdaPolicy: Type: "AWS::IAM::ManagedPolicy" Properties: PolicyDocument: !Sub | { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadFromKinesisStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListStreams", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${KinesisStream}" }, { "Sid": "CreateCloudwatchLogGroup", "Effect": "Allow", "Action": [ "logs:CreateLogGroup" ], "Resource": "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" }, { "Sid": "WriteToCloudwatchLogGroup", "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/StateLambda:*" }, { "Sid": "WriteToDynamoDB", "Effect": "Allow", "Action": [ "dynamodb:UpdateItem" ], "Resource": "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${StateTable}" }, { "Sid": "ReadFromParameterTable", "Effect": "Allow", "Action": [ "dynamodb:GetItem" ], "Resource": "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${ParameterTable}" } ] } Roles: - !Ref StateLambdaRole ManagedPolicyName: "StateLambdaPolicy" MapLambdaRole: Type: "AWS::IAM::Role" Properties: Path: "/" RoleName: "MapLambdaRole" AssumeRolePolicyDocument: "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"lambda.amazonaws.com\"},\"Action\":\"sts:AssumeRole\"}]}" MaxSessionDuration: 3600 Description: "Role with the specific permissions of the MapLambda Function." MapLambdaPolicy: Type: "AWS::IAM::ManagedPolicy" Properties: PolicyDocument: !Sub | { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadFromDynamoDBStream", "Effect": "Allow", "Action": [ "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams" ], "Resource": "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${StateTable}/stream/*" }, { "Sid": "CreateCloudwatchLogGroup", "Effect": "Allow", "Action": [ "logs:CreateLogGroup" ], "Resource": "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" }, { "Sid": "WriteToCloudwatchLogGroup", "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/MapLambda:*" }, { "Sid": "WriteToDynamoDBTable", "Effect": "Allow", "Action": "dynamodb:PutItem", "Resource": "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${ReduceTable}" }, { "Sid": "ReadFromParameterTable", "Effect": "Allow", "Action": [ "dynamodb:GetItem" ], "Resource": "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${ParameterTable}" } ] } Roles: - !Ref MapLambdaRole ManagedPolicyName: "MapLambdaPolicy" ReduceLambdaRole: Type: "AWS::IAM::Role" Properties: Path: "/" RoleName: "ReduceLambdaRole" AssumeRolePolicyDocument: "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"lambda.amazonaws.com\"},\"Action\":\"sts:AssumeRole\"}]}" MaxSessionDuration: 3600 Description: "Role with specific permissions for ReduceLambda Function." ReduceLambdaPolicy: Type: "AWS::IAM::ManagedPolicy" Properties: PolicyDocument: !Sub | { "Version": "2012-10-17", "Statement": [ { "Sid": "CreateCloudwatchLogGroup", "Effect": "Allow", "Action": [ "logs:CreateLogGroup" ], "Resource": "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" }, { "Sid": "WriteToCloudwatchLogGroup", "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/ReduceLambda:*" }, { "Sid": "ReadFromParameterTable", "Effect": "Allow", "Action": [ "dynamodb:GetItem" ], "Resource": "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${ParameterTable}" } ] } Roles: - !Ref ReduceLambdaRole ManagedPolicyName: "ReduceLambdaPolicy" GeneratorLambdaRole: Type: "AWS::IAM::Role" Properties: Path: "/" RoleName: "GeneratorLambdaRole" AssumeRolePolicyDocument: "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"lambda.amazonaws.com\"},\"Action\":\"sts:AssumeRole\"}]}" MaxSessionDuration: 3600 Description: "Role with specific permissions for GeneratorLambda Function." GeneratorLambdaPolicy: Type: "AWS::IAM::ManagedPolicy" Properties: PolicyDocument: !Sub | { "Version": "2012-10-17", "Statement": [ { "Sid": "PutToKinesisStream", "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${KinesisStream}" }, { "Sid": "CreateCloudwatchLogGroup", "Effect": "Allow", "Action": [ "logs:CreateLogGroup" ], "Resource": "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" }, { "Sid": "WriteToCloudwatchLogGroup", "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/GeneratorLambda:*" }, { "Sid": "ReadFromAggregateTable", "Effect": "Allow", "Action": [ "dynamodb:Scan" ], "Resource": "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${AggregateTable}" }, { "Sid": "ReadAndWriteParameterTable", "Effect": "Allow", "Action": [ "dynamodb:GetItem", "dynamodb:PutItem" ], "Resource": "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${ParameterTable}" } ] } Roles: - !Ref GeneratorLambdaRole ManagedPolicyName: "GeneratorLambdaPolicy" # Trigger for MapLambda MapLambdaEventSourceMapping: Type: "AWS::Lambda::EventSourceMapping" Properties: BatchSize: 10000 EventSourceArn: !GetAtt StateTable.StreamArn FunctionName: !GetAtt MapLambda.Arn Enabled: true MaximumBatchingWindowInSeconds: 3 ParallelizationFactor: 1 MaximumRecordAgeInSeconds: -1 BisectBatchOnFunctionError: false MaximumRetryAttempts: -1 TumblingWindowInSeconds: 0 StartingPosition: 'LATEST' # Rule for Invocation of Generator Lambda ScheduledRule: Type: AWS::Events::Rule Properties: Description: "ScheduledRule" ScheduleExpression: "rate(1 minute)" State: "ENABLED" Targets: - Arn: Fn::GetAtt: - "GeneratorLambda" - "Arn" Id: "TargetFunctionV1" PermissionForEventsToInvokeLambda: Type: AWS::Lambda::Permission Properties: FunctionName: !Ref "GeneratorLambda" Action: "lambda:InvokeFunction" Principal: "events.amazonaws.com" SourceArn: Fn::GetAtt: - "ScheduledRule" - "Arn"