AWSTemplateFormatVersion: "2010-09-09" Description: "Apache Log Analytics implemtation using KinesisAnalytics via CloudFormation" Resources: BasicApplication: Type: "AWS::KinesisAnalytics::Application" Properties: ApplicationName: "LogAnalyticsApp" ApplicationDescription: "Analyze Streaming Logs" ApplicationCode: "-- Create the destination stream that stores the response count as per the source application. \n-- This helps you determine request count per source. \n-- It also helps you determine if data is coming from unknown sources. \nCREATE STREAM \"DESTINATION_SQL_STREAM\" (\napplicationName VARCHAR(64),\ncontact VARCHAR(64),\nresponse SMALLINT,\nresponseCount SMALLINT);\n \n-- Aggregrate response over joined data with host application mapping stored on S3. \n-- It will always used latest S3 file \nCREATE OR REPLACE PUMP \"DESTINATION_SQL_STREAM\" AS \nINSERT INTO \"DESTINATION_SQL_STREAM\"\n SELECT STREAM metadata.\"ApplicationName\" , metadata.\"Contact\", logstream.\"response\", COUNT(*) as responseCount \n FROM \"SOURCE_SQL_STREAM_001\" logstream LEFT JOIN \"ApplicationHostMapping\" metadata\n ON logstream.\"host\" = metadata.\"Host\"\n GROUP BY metadata.\"ApplicationName\", metadata.\"Contact\", logstream.\"response\",FLOOR((logstream.ROWTIME - TIMESTAMP '\"'\"'1970-01-01 00:00:00'\"'\"') MINUTE / 5 TO MINUTE);\n\"}'" Inputs: - NamePrefix: "SOURCE_SQL_STREAM" InputSchema: RecordColumns: - Name: "host" SqlType: "VARCHAR(16)" Mapping: "$.host" - Name: "datetime" SqlType: "VARCHAR(32)" Mapping: "$.datetime" - Name: "request" SqlType: "VARCHAR(64)" Mapping: "$.request" - Name: "response" SqlType: "SMALLINT" Mapping: "$.response" - Name: "bytes" SqlType: "SMALLINT" Mapping: "$.bytes" - Name: "agent" SqlType: "VARCHAR(128)" Mapping: "$.agent" - Name: "referrer" SqlType: "VARCHAR(32)" Mapping: "$.referrer" RecordFormat: RecordFormatType: "JSON" MappingParameters: JSONMappingParameters: RecordRowPath: "$" RecordEncoding: "UTF-8" KinesisStreamsInput: ResourceARN: !GetAtt InputKinesisStream.Arn RoleARN: !GetAtt KinesisAnalyticsRole.Arn InputKinesisStream: Type: "AWS::Kinesis::Stream" Properties: Name: "AppLogInputStream" ShardCount: 1 KinesisAnalyticsRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: kinesisanalytics.amazonaws.com Action: "sts:AssumeRole" Path: "/" Policies: - PolicyName: Open PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "kinesis:DescribeStream" - "kinesis:GetShardIterator" - "kinesis:GetRecords" Resource: "*" - Effect: Allow Action: - "kinesis:DescribeStream" - "kinesis:PutRecord" - "kinesis:PutRecords" Resource: "*" - Effect: Allow Action: - "s3:GetObject" Resource: "*" BasicApplicationOutputs: Type: "AWS::KinesisAnalytics::ApplicationOutput" DependsOn: BasicApplication Properties: ApplicationName: !Ref BasicApplication Output: DestinationSchema: RecordFormatType: "CSV" KinesisStreamsOutput: ResourceARN: !GetAtt OutputKinesisStream.Arn RoleARN: !GetAtt KinesisAnalyticsRole.Arn Name : "DESTINATION_SQL_STREAM" OutputKinesisStream: Type: "AWS::Kinesis::Stream" Properties: Name: "AggLogDataStream" ShardCount: 10 ApplicationReferenceDataSource: Type: "AWS::KinesisAnalytics::ApplicationReferenceDataSource" DependsOn: BasicApplicationOutputs Properties: ApplicationName: !Ref BasicApplication ReferenceDataSource: TableName: "ApplicationHostMapping" ReferenceSchema: RecordColumns: - Name: "Host" SqlType: "VARCHAR(64)" - Name: "ApplicationName" SqlType: "VARCHAR(64)" - Name: "Contact" SqlType: "VARCHAR(64)" RecordFormat: RecordFormatType: "CSV" MappingParameters: CSVMappingParameters: RecordRowDelimiter: "\n" RecordColumnDelimiter: "," S3ReferenceDataSource: BucketARN: arn:aws:s3:::referenced-data FileKey: 'HostApplicationMap.csv' ReferenceRoleARN: !GetAtt KinesisAnalyticsRole.Arn LambdaExecutionRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: "sts:AssumeRole" Path: "/" Policies: - PolicyName: Open PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "cloudwatch:*" - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" - "lambda:InvokeFunction" - "kinesis:ListStreams" - "kinesis:DescribeStream" - "kinesis:GetRecords" - "kinesis:GetShardIterator" - "xray:*" Resource: "*" PublishLogMetrics: Type: "AWS::Lambda::Function" Properties: FunctionName: "PublishAggregrationMetrics" Handler: "com.amazonaws.lambda.ProcessKinesisEvents::recordHandler" Role: Fn::GetAtt: - "LambdaExecutionRole" - "Arn" Code: S3Bucket: "publish-log-metrics" S3Key: "log-metrics-cloudwatch-0.0.1-SNAPSHOT.jar" Runtime: "java8" Timeout: "120" TracingConfig: Mode: "Active" MapAggregration: Type: "AWS::Lambda::EventSourceMapping" Properties: BatchSize: 200 Enabled: true EventSourceArn: !GetAtt OutputKinesisStream.Arn FunctionName: !GetAtt PublishLogMetrics.Arn StartingPosition: "TRIM_HORIZON" Outputs: ApplicationPhysicalResourceId: Value: !Ref BasicApplication