AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: Kinesis Resources: ## Raw data bucket RawDataBucket: Type: AWS::S3::Bucket ## Processed Data ProcessedDataBucket: Type: AWS::S3::Bucket ## Link count table CountTable: Type: AWS::Serverless::SimpleTable # Raw data tale ProcessedDataTable: Type: AWS::DynamoDB::Table Properties: BillingMode: PAY_PER_REQUEST KeySchema: - AttributeName: id KeyType: HASH AttributeDefinitions: - AttributeName: id AttributeType: S TimeToLiveSpecification: AttributeName: ttl Enabled: true ## Ingest Firehose Firehose: Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamType: DirectPut ExtendedS3DestinationConfiguration: BucketARN: !GetAtt ProcessedDataBucket.Arn CompressionFormat: GZIP RoleARN: !GetAtt FirehoseAccessRole.Arn ProcessingConfiguration: Enabled: true Processors: - Type: Lambda Parameters: - ParameterName: LambdaArn ParameterValue: !GetAtt ProcessFunction.Arn BufferingHints: IntervalInSeconds: 60 SizeInMBs: 1 S3BackupMode: Enabled S3BackupConfiguration: BucketARN: !GetAtt RawDataBucket.Arn CompressionFormat: GZIP RoleARN: !GetAtt FirehoseAccessRole.Arn BufferingHints: IntervalInSeconds: 60 SizeInMBs: 1 # Initial process function ProcessFunction: Type: AWS::Serverless::Function Properties: Timeout: 180 CodeUri: src/ Handler: process.handler Runtime: nodejs12.x Policies: - DynamoDBCrudPolicy: {TableName: !Ref ProcessedDataTable} Environment: Variables: TABLE_NAME: !Ref ProcessedDataTable # Link count function CountFunction: Type: AWS::Serverless::Function Properties: Timeout: 180 CodeUri: src/ Handler: count.handler Runtime: nodejs12.x Policies: - DynamoDBCrudPolicy: {TableName: !Ref CountTable} Environment: Variables: TABLE_NAME: !Ref CountTable # Kinesis Data Analytics Application KinesisAnalyticsApp: Type: AWS::KinesisAnalytics::Application Properties: ApplicationCode: > CREATE OR REPLACE STREAM "LINK_STREAM" ("resourcePath" varchar(16), link_count INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "LINK_STREAM" SELECT STREAM "resourcePath", COUNT(*) AS link_count FROM "SESSIONS_STREAM_001" GROUP BY "resourcePath", STEP("SESSIONS_STREAM_001".ROWTIME BY INTERVAL '10' SECOND); Inputs: - InputSchema: RecordColumns: - Name: requestId Mapping: $.requestId SqlType: bigint - Name: ip Mapping: $.ip SqlType: varchar(16) - Name: status Mapping: $.status SqlType: varchar(8) - Name: resourcePath Mapping: $.resourcePath SqlType: varchar(16) RecordFormat: RecordFormatType: JSON KinesisFirehoseInput: ResourceARN: !GetAtt Firehose.Arn RoleARN: !GetAtt KinesisAnalyticsAccessRole.Arn NamePrefix: SESSIONS_STREAM # Output for Kinesis Data Analytics application KinesisAnalyticsOutput: Type: AWS::KinesisAnalytics::ApplicationOutput Properties: ApplicationName: !Ref KinesisAnalyticsApp Output: DestinationSchema: RecordFormatType: JSON LambdaOutput: ResourceARN: !GetAtt CountFunction.Arn RoleARN: !GetAtt KinesisAnalyticsAccessRole.Arn Name: LINK_STREAM # Access role for Firehose FirehoseAccessRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: "firehose.amazonaws.com" Action: - "sts:AssumeRole" Policies: - PolicyName: S3WritePolicy PolicyDocument: Version: '2012-10-17' Statement: Action: - s3:PutObject Effect: Allow Resource: - !GetAtt RawDataBucket.Arn - !Sub - ${Arn}/* - { Arn: !GetAtt RawDataBucket.Arn } - !GetAtt ProcessedDataBucket.Arn - !Sub - ${Arn}/* - { Arn: !GetAtt ProcessedDataBucket.Arn } - PolicyName: LambdaInvokePolicy PolicyDocument: Version: '2012-10-17' Statement: Action: - lambda:InvokeFunction Effect: Allow Resource: - !GetAtt ProcessFunction.Arn # Access role for Kinesis Data Analytics KinesisAnalyticsAccessRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: "kinesisanalytics.amazonaws.com" Action: - "sts:AssumeRole" Policies: - PolicyName: KinesisAccessPolicy PolicyDocument: Version: '2012-10-17' Statement: Action: - firehose:DescribeDeliveryStream - firehose:Get* - kinesis:Describe* - kinesis:Get* - kinesis:List* - kinesis:Put* Effect: Allow Resource: - !GetAtt Firehose.Arn - PolicyName: LambdaAccessPolicy PolicyDocument: Version: '2012-10-17' Statement: Action: - lambda:InvokeFunction - lambda:Get* Effect: Allow Resource: - !GetAtt CountFunction.Arn - !Sub - ${Func}:$LATEST - { Func: !GetAtt CountFunction.Arn }