AWSTemplateFormatVersion: 2010-09-09 Transform: AWS::Serverless-2016-10-31 Description: AlleyCat - KDA example # This template builds the Kinesis stream ARN from the base # template using the following expression: # !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/alleycat" Parameters: IoTdataEndpoint: Type: String Default: 'abc123abc123-ats.iot.us-east-2.amazonaws.com' Resources: ########################################## # Kinesis Data Analytics configuration # ########################################## KinesisAnalyticsApplication: Type: "AWS::KinesisAnalytics::Application" Properties: ApplicationName: "alleycat-stats" ApplicationCode: | -- -- 1. Calculate output field CREATE OR REPLACE STREAM "CALC_STREAM" ( "event" VARCHAR(8), "deviceTimestamp" BIGINT, "COL_second" INTEGER, "name" VARCHAR(8), "raceId" BIGINT NOT NULL, "classId" INTEGER, "cadence" DOUBLE, "resistance" DOUBLE, "racerId" INTEGER NOT NULL, "output" REAL); CREATE OR REPLACE PUMP "CALC_STREAM_PUMP" AS INSERT INTO "CALC_STREAM" SELECT STREAM "event", "deviceTimestamp", "COL_second", "name", "raceId", "classId", "cadence", "resistance", "racerId", ("cadence" + 35) * ("resistance" + 65) / 100 as "output" FROM "SOURCE_SQL_STREAM_001"; -- 2. Get best scores CREATE OR REPLACE STREAM "SCORES_STREAM" ("raceId" BIGINT, "racerId" INTEGER, "maxOutput" REAL); CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "SCORES_STREAM" ("raceId", "racerId","maxOutput") SELECT STREAM "raceId", "racerId", MAX("output") OVER W1 AS "maxOutput" FROM "CALC_STREAM" WINDOW W1 AS (PARTITION BY "raceId", "racerId" RANGE INTERVAL '10' MINUTE PRECEDING); Inputs: - NamePrefix: "SOURCE_SQL_STREAM" InputParallelism: Count: 1 InputSchema: RecordFormat: RecordFormatType: "JSON" MappingParameters: JSONMappingParameters: RecordRowPath: "$" RecordEncoding: "UTF-8" RecordColumns: - Name: "uuid" Mapping: "$.uuid" SqlType: "VARCHAR(64)" - Name: "event" Mapping: "$.event" SqlType: "VARCHAR(8)" - Name: "deviceTimestamp" Mapping: "$.deviceTimestamp" SqlType: "BIGINT" - Name: "COL_second" Mapping: "$.second" SqlType: "INTEGER" - Name: "raceId" Mapping: "$.raceId" SqlType: "BIGINT" - Name: "name" Mapping: "$.name" SqlType: "VARCHAR(8)" - Name: "racerId" Mapping: "$.racerId" SqlType: "INTEGER" - Name: "classId" Mapping: "$.classId" SqlType: "INTEGER" - Name: "cadence" Mapping: "$.cadence" SqlType: "DOUBLE" - Name: "resistance" Mapping: "$.resistance" SqlType: "DOUBLE" KinesisStreamsInput: ResourceARN: !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/alleycat" RoleARN: !GetAtt KinesisAnalyticsRole.Arn KinesisAnalyticsApplicationOutput: Type: "AWS::KinesisAnalytics::ApplicationOutput" Properties: ApplicationName: "alleycat-stats" Output: Name: "CALC_STREAM" LambdaOutput: ResourceARN: !GetAtt DestinationFunction.Arn RoleARN: !GetAtt KinesisAnalyticsRole.Arn DestinationSchema: RecordFormatType: "JSON" ########################################## # Lambda function for KDA destination # ########################################## DestinationFunction: Type: AWS::Serverless::Function Properties: CodeUri: DestinationFunction/ Handler: apps.handler Runtime: nodejs14.x MemorySize: 128 Environment: Variables: IOT_DATA_ENDPOINT: !Ref IoTdataEndpoint TOPIC: 'alleycat-subscribe' Policies: - Statement: - Sid: PublishToIotPolicy Effect: Allow Action: - 'iot:Publish' Resource: '*' ################################################################### # Kinesis Analytics Role # ################################################################### KinesisAnalyticsRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - kinesisanalytics.amazonaws.com Action: - 'sts:AssumeRole' Path: / Policies: - PolicyName: ReadInputKinesis PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "kinesis:DescribeStream" - "kinesis:GetShardIterator" - "kinesis:GetRecords" Resource: !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/alleycat" - PolicyName: InvokeDestinationLambdaFunction PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "lambda:InvokeFunction" - "lambda:GetFunctionConfiguration" Resource: !GetAtt DestinationFunction.Arn