--- AWSTemplateFormatVersion: 2010-09-09 Description: Supporting elements for the Kinesis Analytics click stream lab Parameters: email: Description: Email address to send anomaly detection events. Type: String SMS: Description: Mobile Phone number to send SMS anomaly detection events. +1XXXXXXXXXX Type: String Username: Description: The username of the user you want to create in Amazon Cognito. Type: String AllowedPattern: "^(?=\\s*\\S).*$" ConstraintDescription: Cannot be empty Password: Description: The password of the user you want to create in Amazon Cognito. Must be at least 6 alpha-numeric characters, and contain at least one number Type: String NoEcho: true AllowedPattern: "^(?=.*[A-Za-z])(?=.*\\d)[A-Za-z\\d]{6,}$" ConstraintDescription: Must be at least 6 alpha-numeric characters, and contain at least one number Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: "Kinesis Pre Lab set up" Parameters: - "Username" - "Password" - "email" - "SMS" Resources: RawS3Bucket: Type: AWS::S3::Bucket ProcessedS3Bucket: Type: AWS::S3::Bucket FirehoseDeliveryStream: Type: AWS::KinesisFirehose::DeliveryStream Properties: S3DestinationConfiguration: BucketARN: !Sub arn:aws:s3:::${RawS3Bucket} BufferingHints: IntervalInSeconds: 60 SizeInMBs: 50 CompressionFormat: GZIP Prefix: weblogs/raw/ RoleARN: !GetAtt S3Role.Arn S3Role: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: firehose.amazonaws.com Action: sts:AssumeRole Condition: StringEquals: sts:ExternalId: !Ref AWS::AccountId Policies: - PolicyName: S3Access PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - "s3:*" Resource: - !Sub arn:aws:s3:::${RawS3Bucket} - !Sub arn:aws:s3:::${RawS3Bucket}/* - PolicyName: CloudWatch PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - "cloudwatch:*" - "cloudwatchlogs:*" Resource: - "*" DataGenCognitoSetupLambdaFunc: Type: AWS::Lambda::Function Properties: Code: S3Bucket: !Sub aws-kdg-tools-${AWS::Region} S3Key: datagen-cognito-setup.zip Description: Creates a Cognito User Pool, Identity Pool, and a User. Returns IDs to be used in the Kinesis Data Generator. FunctionName: KinesisDataGeneratorCognitoSetup Handler: createCognitoPool.createPoolAndUser Role: !GetAtt CognitoLambdaExecutionRole.Arn Runtime: nodejs18.x Timeout: 60 CognitoLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: root PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - logs:* Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:* - Effect: Allow Action: - cognito-idp:AdminConfirmSignUp - cognito-idp:CreateUserPoolClient - cognito-idp:AdminCreateUser Resource: - !Sub arn:aws:cognito-idp:${AWS::Region}:${AWS::AccountId}:userpool/* - Effect: Allow Action: - cognito-idp:CreateUserPool - cognito-identity:* Resource: "*" - Effect: Allow Action: - iam:UpdateAssumeRolePolicy Resource: - !GetAtt AuthenticatedUserRole.Arn - !GetAtt UnauthenticatedUserRole.Arn - Effect: Allow Action: - iam:PassRole Resource: - !GetAtt AuthenticatedUserRole.Arn - !GetAtt UnauthenticatedUserRole.Arn SetupCognitoCustom: Type: Custom::DataGenCognitoSetupLambdaFunc Properties: ServiceToken: !GetAtt DataGenCognitoSetupLambdaFunc.Arn Region: !Ref AWS::Region Username: !Ref Username Password: !Ref Password AuthRoleName: !Ref AuthenticatedUserRole AuthRoleArn: !GetAtt AuthenticatedUserRole.Arn UnauthRoleName: !Ref UnauthenticatedUserRole UnauthRoleArn: !GetAtt UnauthenticatedUserRole.Arn AuthenticatedUserRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Federated: - cognito-identity.amazonaws.com Action: - sts:AssumeRoleWithWebIdentity Path: / Policies: - PolicyName: root PolicyDocument: Version: 2012-10-17 Statement: - Action: - kinesis:DescribeStream - kinesis:PutRecord - kinesis:PutRecords Resource: - !Sub arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/* Effect: Allow - Action: - firehose:DescribeDeliveryStream - firehose:PutRecord - firehose:PutRecordBatch Resource: - !Sub arn:aws:firehose:${AWS::Region}:${AWS::AccountId}:deliverystream/* Effect: Allow - Action: - mobileanalytics:PutEvents - cognito-sync:* - cognito-identity:* - ec2:DescribeRegions - firehose:ListDeliveryStreams - kinesis:ListStreams Resource: - "*" Effect: Allow UnauthenticatedUserRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Federated: - cognito-identity.amazonaws.com Action: - sts:AssumeRoleWithWebIdentity Path: / Policies: - PolicyName: root PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - mobileanalytics:PutEvents - cognito-sync:* Resource: - "*" CSELambdaSNSPublishRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: lambda_sns PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - sns:Publish Resource: !Ref CSEClickStreamEvent CSEClickStreamEvent: Type: AWS::SNS::Topic Properties: DisplayName: ClkStrEv2 Subscription: - Endpoint: !Ref email Protocol: email - Endpoint: !Ref SMS Protocol: sms TopicName: ClickStreamEvent2 LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: CSELambdaExecutionRole PolicyDocument: Version: 2012-10-17 Statement: - Sid: publish2sns Effect: Allow Action: - sns:Publish Resource: - !Ref CSEClickStreamEvent - Sid: writelogs Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:* - Sid: readkinesis Effect: Allow Action: - kinesis:DescribeStream - kinesis:GetRecords - kinesis:GetShardIterator - kinesis:ListStreams Resource: "*" - Action: - s3:PutObject Resource: !Sub arn:aws:s3:::${ProcessedS3Bucket}/* Effect: Allow Sid: writeToS3 CSEBeconAnomalyResponse: Type: AWS::Lambda::Function Properties: Code: ZipFile: !Sub | var AWS = require('aws-sdk'); var sns = new AWS.SNS( { region: '${AWS::Region}' }); var s3 = new AWS.S3(); exports.handler = function(event, context) { console.log(JSON.stringify(event, null, 3)); event.records.forEach(function(record) { var payload = new Buffer(record.data, 'base64').toString('ascii'); var rec = payload.split(','); var ctr = rec[0]; var anomaly_score = rec[1]; var detail = 'Anomaly detected with a click through rate of ' + ctr + '% and an anomaly score of ' + anomaly_score; var subject = 'Anomaly Detected'; var SNSparams = { Message: detail, MessageStructure: 'String', Subject: subject, TopicArn: '${CSEClickStreamEvent}' }; sns.publish(SNSparams, function(err, data) { if (err) context.fail(err.stack); else{ var anomaly = [{ 'date': Date.now(), 'ctr': ctr, 'anomaly_score': anomaly_score }] convertArrayOfObjectsToCSV({ data: anomaly }, function(err1, data1){ if(err1) context.fail(err1.stack); // an error occurred else{ var today = new Date(); var S3params = { 'Bucket': '${ProcessedS3Bucket}', 'Key': `weblogs/processed/${!today.getFullYear()}/${!today.getMonth()}/${!today.getDate()}/${!record.recordId}.csv`, 'Body': data1, 'ContentType': 'text/csv' } s3.putObject(S3params, function(err2, data2) { if (err2) context.fail(err2.stack); // an error occurred else { context.succeed('Published Notification'); // successful response } }) } }) } }); }); }; function convertArrayOfObjectsToCSV(args, callback) { var result, ctr, keys, columnDelimiter, lineDelimiter, data; data = args.data || null; if (data == null || !data.length) { callback(new Error('data is null')); } columnDelimiter = args.columnDelimiter || ','; lineDelimiter = args.lineDelimiter || ','; keys = Object.keys(data[0]); result = ''; result += keys.join(columnDelimiter); result += lineDelimiter; data.forEach(function(item) { ctr = 0; keys.forEach(function(key) { if (ctr > 0) result += columnDelimiter; result += item[key]; ctr++; }); result += lineDelimiter; }); callback(null, result); } Description: Click Stream Example Lambda Function FunctionName: CSEBeconAnomalyResponse Handler: index.handler MemorySize: 128 Role: !GetAtt LambdaExecutionRole.Arn Runtime: nodejs18.x Timeout: 5 CSEKinesisAnalyticsRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - kinesisanalytics.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: firehose PolicyDocument: Version: 2012-10-17 Statement: - Sid: ReadInputFirehose Effect: Allow Action: - firehose:DescribeDeliveryStream - firehose:Get* Resource: - !GetAtt FirehoseDeliveryStream.Arn - Sid: UseLambdaFunction Effect: Allow Action: - lambda:InvokeFunction - lambda:GetFunctionConfiguration Resource: - !Sub ${CSEBeconAnomalyResponse.Arn}:$LATEST LambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: LambdaRolePolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: s3:DeleteObject Resource: - !Sub arn:aws:s3:::${RawS3Bucket}/* - !Sub arn:aws:s3:::${ProcessedS3Bucket}/* - Effect: Allow Action: s3:ListBucket Resource: - !Sub arn:aws:s3:::${RawS3Bucket} - !Sub arn:aws:s3:::${ProcessedS3Bucket} - Effect: Allow Action: logs:CreateLogGroup Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:* - Effect: Allow Action: - logs:CreateLogStream - logs:PutLogEvents Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:* S3BucketHandler: Type: AWS::Lambda::Function Properties: Handler: index.handler Role: !GetAtt LambdaRole.Arn Code: ZipFile: | import os import json import cfnresponse import boto3 from botocore.exceptions import ClientError s3 = boto3.resource('s3') def handler(event, context): print("Received event: %s" % json.dumps(event)) s3_bucket = s3.Bucket(event['ResourceProperties']['Bucket']) try: if event['RequestType'] == 'Create' or event['RequestType'] == 'Update': result = cfnresponse.SUCCESS elif event['RequestType'] == 'Delete': s3_bucket.objects.delete() result = cfnresponse.SUCCESS except ClientError as e: print('Error: %s', e) result = cfnresponse.FAILED cfnresponse.send(event, context, result, {}) Runtime: python3.8 Timeout: 300 EmptyRawS3Bucket: Type: "Custom::EmptyS3Bucket" Properties: ServiceToken: !GetAtt S3BucketHandler.Arn Bucket: !Ref RawS3Bucket EmptyProcessedS3Bucket: Type: "Custom::EmptyS3Bucket" Properties: ServiceToken: !GetAtt S3BucketHandler.Arn Bucket: !Ref ProcessedS3Bucket Outputs: KinesisDataGeneratorUrl: Description: The URL for your Kinesis Data Generator. Value: !Sub https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html?${SetupCognitoCustom.Querystring} RawBucketName: Description: This the bucket name of where your Raw data will be store at Value: !Ref RawS3Bucket ProcessedBucketName: Description: This the bucket name of where your Processed data will be store at Value: !Ref ProcessedS3Bucket