AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: An Amazon API Gateway REST API integration with a Kinesis Data Stream, which is consumed by Lambda function. Resources: KinesisStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1 APIGatewayRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Action: - 'sts:AssumeRole' Effect: Allow Principal: Service: - apigateway.amazonaws.com Policies: - PolicyName: APIGWKinesisPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 'kinesis:PutRecord' - 'kinesis:PutRecords' - 'kinesis:GetShardIterator' - 'kinesis:GetRecords' Resource: !Sub - '${varStreamArn}*' - varStreamArn: !GetAtt KinesisStream.Arn Api: Type: 'AWS::ApiGateway::RestApi' Properties: Name: apigw-kinesis-api Description: REST API to push data to Kinesis Stream streams: Type: 'AWS::ApiGateway::Resource' Properties: RestApiId: !Ref Api ParentId: !GetAtt Api.RootResourceId PathPart: 'streams' streamName: Type: 'AWS::ApiGateway::Resource' Properties: RestApiId: !Ref Api ParentId: !Ref streams PathPart: '{stream-name}' record: Type: 'AWS::ApiGateway::Resource' Properties: RestApiId: !Ref Api ParentId: !Ref streamName PathPart: 'record' records: Type: 'AWS::ApiGateway::Resource' Properties: RestApiId: !Ref Api ParentId: !Ref streamName PathPart: 'records' recordMethodPost: Type: 'AWS::ApiGateway::Method' Properties: RestApiId: !Ref Api ResourceId: !Ref record HttpMethod: PUT ApiKeyRequired: false AuthorizationType: NONE Integration: Type: AWS Credentials: !GetAtt APIGatewayRole.Arn IntegrationHttpMethod: POST Uri: !Sub 'arn:aws:apigateway:${AWS::Region}:kinesis:action/PutRecord' PassthroughBehavior: WHEN_NO_TEMPLATES RequestTemplates: application/json: !Sub | { "StreamName": "$input.params('stream-name')", "Data": "$util.base64Encode($input.json('$.Data'))", "PartitionKey": "$input.path('$.PartitionKey')" } IntegrationResponses: - StatusCode: '200' MethodResponses: - StatusCode: '200' recordsMethodPost: Type: 'AWS::ApiGateway::Method' Properties: RestApiId: !Ref Api ResourceId: !Ref records HttpMethod: PUT ApiKeyRequired: false AuthorizationType: NONE Integration: Type: AWS Credentials: !GetAtt APIGatewayRole.Arn IntegrationHttpMethod: POST Uri: !Sub 'arn:aws:apigateway:${AWS::Region}:kinesis:action/PutRecords' PassthroughBehavior: WHEN_NO_TEMPLATES RequestTemplates: application/json: !Sub | { "StreamName": "$input.params('stream-name')", "Records": [ #foreach($elem in $input.path('$.records')) { "Data": "$util.base64Encode($elem.data)", "PartitionKey": "$elem.partition-key" }#if($foreach.hasNext),#end #end ] } IntegrationResponses: - StatusCode: '200' MethodResponses: - StatusCode: '200' ApiDeployment: Type: 'AWS::ApiGateway::Deployment' DependsOn: - recordMethodPost Properties: RestApiId: !Ref Api StageName: 'dev' LambdaConsumer: Type: AWS::Serverless::Function Properties: FunctionName: LambdaConsumer Handler: lambda_function.lambda_handler Runtime: python3.9 CodeUri: src/ Policies: - KinesisStreamReadPolicy: StreamName: !Ref KinesisStream Events: Stream: Type: Kinesis Properties: Stream: !GetAtt KinesisStream.Arn StartingPosition: LATEST BatchSize: 100 Outputs: ApiRootUrl: Description: Root Url of the API Value: !Sub - 'https://${ApiId}.execute-api.${AWS::Region}.amazonaws.com/dev' - ApiId: !Ref Api KinesisStream: Description: Kinesis Data Stream name Value: !Ref KinesisStream LambdaFunction: Description: Lambda Function ARN Value: !GetAtt LambdaConsumer.Arn