AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: > Backend for Analytics Globals: Function: Layers: - !Sub arn:aws:lambda:${AWS::Region}:094274105915:layer:AWSLambdaPowertoolsTypeScript:5 - !Sub arn:aws:lambda:${AWS::Region}:580247275435:layer:LambdaInsightsExtension:14 Runtime: nodejs18.x Handler: app.handler MemorySize: 512 Timeout: 10 Tracing: Active Parameters: ResourceGroupPrefix: Type: String Description: Name of the ResourceGroup for resources in this template ServicePrefix: Type: String Description: Prefix used for resources S3BufferInterval: Description: Number of seconds to buffer data before delivering to S3 (60 to 900). Type: Number Default: 60 MinValue: 60 MaxValue: 900 S3BufferSize: Description: Number of MB of data to buffer before delivering to S3 (1 to 128). Type: Number Default: 5 MinValue: 1 MaxValue: 128 SourceStreamSize: Description: Kinesis Stream Size Type: String AllowedPattern: '[0-9]*' Default: '1' Resources: ResourceGroup: Type: "AWS::ResourceGroups::Group" Properties: Name: !Sub "${ResourceGroupPrefix}-Analytics" PurchaseSourceStream: Type: 'AWS::Kinesis::Stream' Properties: Name: !Join - '' - - !Ref ServicePrefix - '-Purchase' ShardCount: !Ref SourceStreamSize QuizSourceStream: Type: 'AWS::Kinesis::Stream' Properties: Name: !Join - '' - - !Ref ServicePrefix - '-Quiz' ShardCount: !Ref SourceStreamSize OutputKinesisStream: Type: 'AWS::Kinesis::Stream' Properties: Name: !Join - '' - - !Ref ServicePrefix - '-Aggregated' ShardCount: 1 StreamingAnalyticsBucket: Type: 'AWS::S3::Bucket' PurchaseRawDatatoS3: Type: 'AWS::KinesisFirehose::DeliveryStream' Properties: DeliveryStreamName: !Join - '' - - !Ref ServicePrefix - '-PurchaseRawDatatoS3' DeliveryStreamType: KinesisStreamAsSource KinesisStreamSourceConfiguration: KinesisStreamARN: !GetAtt - PurchaseSourceStream - Arn RoleARN: !GetAtt - RawDatatoS3Role - Arn S3DestinationConfiguration: BucketARN: !Join - '' - - 'arn:aws:s3:::' - !Ref StreamingAnalyticsBucket BufferingHints: IntervalInSeconds: 60 SizeInMBs: 10 CompressionFormat: UNCOMPRESSED Prefix: rawdata/purchase/ RoleARN: !GetAtt - RawDatatoS3Role - Arn QuizRawDatatoS3: Type: 'AWS::KinesisFirehose::DeliveryStream' Properties: DeliveryStreamName: !Join - '' - - !Ref ServicePrefix - '-QuizRawDatatoS3' DeliveryStreamType: KinesisStreamAsSource KinesisStreamSourceConfiguration: KinesisStreamARN: !GetAtt - QuizSourceStream - Arn RoleARN: !GetAtt - RawDatatoS3Role - Arn S3DestinationConfiguration: BucketARN: !Join - '' - - 'arn:aws:s3:::' - !Ref StreamingAnalyticsBucket BufferingHints: IntervalInSeconds: 60 SizeInMBs: 10 CompressionFormat: UNCOMPRESSED Prefix: rawdata/quiz/ RoleARN: !GetAtt - RawDatatoS3Role - Arn RawDatatoS3Policy: Type: 'AWS::IAM::ManagedPolicy' Properties: ManagedPolicyName: !Join - '' - - RawDatatoS3Policy- - !Ref 'AWS::StackName' PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 's3:AbortMultipartUpload' - 's3:GetBucketLocation' - 's3:GetObject' - 's3:ListBucket' - 's3:ListBucketMultipartUploads' - 's3:PutObject' Resource: - !Join - '' - - 'arn:aws:s3:::' - !Ref StreamingAnalyticsBucket - !Join - '' - - 'arn:aws:s3:::' - !Ref StreamingAnalyticsBucket - /* - Effect: Allow Action: - 'kinesis:DescribeStream' - 'kinesis:GetShardIterator' - 'kinesis:GetRecords' Resource: - !GetAtt - PurchaseSourceStream - Arn - !GetAtt - QuizSourceStream - Arn - Effect: Allow Action: - 'logs:PutLogEvents' Resource: - '*' RawDatatoS3Role: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - firehose.amazonaws.com Action: - 'sts:AssumeRole' Condition: StringEquals: 'sts:ExternalId': !Ref 'AWS::AccountId' ManagedPolicyArns: - !Ref RawDatatoS3Policy GlueAnalyticalDatabase: Type: 'AWS::Glue::Database' Properties: DatabaseInput: Name: !Join - '' - - !Ref ServicePrefix - '-glue-db' Description: Glue Database to hold tables for SDP data CatalogId: !Ref 'AWS::AccountId' GlueAnalyticalRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - glue.amazonaws.com Action: - 'sts:AssumeRole' Path: / ManagedPolicyArns: - 'arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole' - !Ref GlueAnalyticalPolicy GlueAnalyticalPolicy: Type: 'AWS::IAM::ManagedPolicy' Properties: ManagedPolicyName: !Join ['', [!Ref ServicePrefix, '-crawler-policy']] PolicyDocument: Version: 2012-10-17 Statement: - Sid: '' Effect: Allow Action: - 's3:*' Resource: - !Join - '' - - 'arn:aws:s3:::' - !Ref StreamingAnalyticsBucket - !Join - '' - - 'arn:aws:s3:::' - !Ref StreamingAnalyticsBucket - /* GlueAnalyticalCrawler: Type: 'AWS::Glue::Crawler' Properties: Role: !GetAtt - GlueAnalyticalRole - Arn DatabaseName: !Ref GlueAnalyticalDatabase Targets: S3Targets: - Path: !Join - '' - - !Ref StreamingAnalyticsBucket - /rawdata TablePrefix: raw_ BasicApplication: Type: 'AWS::KinesisAnalytics::Application' Properties: ApplicationName: !Join - '' - - !Ref ServicePrefix - '-first-app' ApplicationDescription: SampleApp ApplicationCode: !Sub > CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (metric_time TIMESTAMP, total_amount INT); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ROWTIME, SUM("amount") AS sum_amount FROM "SOURCE_SQL_STREAM_001" GROUP BY STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND); Inputs: - NamePrefix: SOURCE_SQL_STREAM InputSchema: RecordColumns: - Name: playerName SqlType: VARCHAR(16) Mapping: $.playerName - Name: dateOfPurchase SqlType: VARCHAR(32) Mapping: $.dateOfPurchase - Name: amount SqlType: INTEGER Mapping: $.amount RecordFormat: RecordFormatType: JSON MappingParameters: JSONMappingParameters: RecordRowPath: $ KinesisStreamsInput: ResourceARN: !GetAtt - PurchaseSourceStream - Arn RoleARN: !GetAtt - KinesisAnalyticsRole - Arn KinesisAnalyticsRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: kinesisanalytics.amazonaws.com Action: 'sts:AssumeRole' Path: / ManagedPolicyArns: - !Ref KinesisAnalyticsPolicy KinesisAnalyticsPolicy: Type: 'AWS::IAM::ManagedPolicy' Properties: ManagedPolicyName: !Join ['', [!Ref ServicePrefix, '-kinesis-analytics-policy']] PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 'kinesis:DescribeStream' - 'kinesis:GetShardIterator' - 'kinesis:GetRecords' - 'kinesis:ListShards' Resource: - !GetAtt - PurchaseSourceStream - Arn - Effect: Allow Action: - 'kinesis:DescribeStream' - 'kinesis:PutRecord' - 'kinesis:PutRecords' Resource: - !GetAtt - OutputKinesisStream - Arn BasicApplicationOutputs: Type: 'AWS::KinesisAnalytics::ApplicationOutput' DependsOn: BasicApplication Properties: ApplicationName: !Ref BasicApplication Output: Name: DESTINATION_SQL_STREAM DestinationSchema: RecordFormatType: JSON KinesisStreamsOutput: ResourceARN: !GetAtt - OutputKinesisStream - Arn RoleARN: !GetAtt - KinesisAnalyticsRole - Arn AggregationDynamoTable: Type: 'AWS::DynamoDB::Table' Properties: TableName: !Join - '' - - !Ref ServicePrefix - '-report' KeySchema: - KeyType: HASH AttributeName: METRIC_TIME AttributeDefinitions: - AttributeName: METRIC_TIME AttributeType: S ProvisionedThroughput: WriteCapacityUnits: 5 ReadCapacityUnits: 5 KinesisToDynamoDbIAMRole: Type: 'AWS::IAM::Role' Properties: ManagedPolicyArns: - 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' - !Ref KinesisToDynamoPolicy AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Action: - 'sts:AssumeRole' Principal: Service: - lambda.amazonaws.com Effect: Allow Sid: AllowLambdaServiceToAssumeRole KinesisToDynamoPolicy: Type: 'AWS::IAM::ManagedPolicy' Properties: ManagedPolicyName: !Join ['', [!Ref ServicePrefix, '-KinesisPeristerAccessPolicy']] PolicyDocument: Version: 2012-10-17 Statement: - Action: - 'kinesis:DescribeStream' - 'kinesis:ListStreams' - 'kinesis:ListTagsForStream' - 'kinesis:GetShardIterator' - 'kinesis:GetRecords' - 'kinesis:PutRecord' - 'kinesis:PutRecords' - 'dynamoDB:PutItem' - 'dynamoDB:UpdateItem' - 'dynamodb:BatchGetItem' - 'dynamoDB:Query' - 'dynamoDB:Scan' - 'dynamodb:BatchWriteItem' Resource: - !GetAtt - OutputKinesisStream - Arn - !GetAtt - AggregationDynamoTable - Arn Effect: Allow KinesisToDynamoDbLambda: Type: AWS::Serverless::Function Properties: FunctionName: !Join - '' - - !Ref ServicePrefix - 'KinesisToDynamoDbPersister' AutoPublishAlias: Live CodeUri: functions/KinesisToDynamoDbPersister MemorySize: 256 Timeout: 180 Environment: Variables: TABLE_NAME: !Ref AggregationDynamoTable REGION: !Sub "${AWS::Region}" Role: !GetAtt KinesisToDynamoDbIAMRole.Arn Policies: - CloudWatchLambdaInsightsExecutionRolePolicy Events: KinesisEvent: Type: Kinesis Properties: Stream: !GetAtt OutputKinesisStream.Arn StartingPosition: TRIM_HORIZON Enabled: true Tags: Dashboard: HTTP Outputs: StreamingAnalyticsBucketName: Value: !Ref StreamingAnalyticsBucket PurchaseSourceStreamName: Value: !Ref PurchaseSourceStream Export: Name: "STS-PurchaseSourceStreamName" QuizSourceStreamName: Value: !Ref QuizSourceStream Export: Name: "STS-QuizSourceStreamName" AggregationDynamoTableName: Value: !Ref AggregationDynamoTable