AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: Serverless Streaming Ingest Transform Load (ITL). **WARNING** This template creates a Kinesis Firehose Delivery Stream, an S3 Bucket, a DynamoDB Table and two Lambda Functions. You will be billed for the AWS resources used if you create a stack from this template. Copyright 2015-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at http://aws.amazon.com/apache2.0/ or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Parameters: FirehoseBufferSize: Description: The size of the buffer, in MBs, that Kinesis Firehose uses for incoming data before delivering it to the destination. Type: Number Default: '100' MinValue: '1' MaxValue: '128' ConstraintDescription: must be between 1 and 128 FirehoseBufferInterval: Description: The length of time, in seconds, that Kinesis Firehose buffers incoming data before delivering it to the destination. Type: Number Default: '300' MinValue: '60' MaxValue: '900' ConstraintDescription: must be between 60 and 900 Resources: OutputBucket: Type: AWS::S3::Bucket DeletionPolicy: Delete Properties: AccessControl: Private IngestStream: Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamName: 'IngestStream' DeliveryStreamType: 'DirectPut' ExtendedS3DestinationConfiguration: RoleARN: !GetAtt FirehoseDeliveryRole.Arn BucketARN: !GetAtt OutputBucket.Arn Prefix: 'transformed/' BufferingHints: SizeInMBs: !Ref FirehoseBufferSize IntervalInSeconds: !Ref FirehoseBufferInterval CompressionFormat: 'GZIP' ProcessingConfiguration: Enabled: true Processors: - Type: 'Lambda' Parameters: - ParameterName: 'LambdaArn' ParameterValue: !GetAtt FirehoseTransform.Arn S3BackupMode: 'Enabled' S3BackupConfiguration: RoleARN: !GetAtt FirehoseDeliveryRole.Arn BucketARN: !GetAtt OutputBucket.Arn Prefix: 'source_records/' BufferingHints: SizeInMBs: !Ref FirehoseBufferSize IntervalInSeconds: !Ref FirehoseBufferInterval CompressionFormat: 'GZIP' CloudWatchLoggingOptions: Enabled: true LogGroupName: '/aws/kinesisfirehose/IngestStream' LogStreamName: S3Delivery # This needs to be a separate policy as it references IngestStream. # This avoids a circular dependency between IngestStream and FirehoseDeliveryRole CWLPutLogs: Type: AWS::IAM::Policy Properties: PolicyName: CWL_PutLogs PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:PutLogEvents Resource: - !Sub 'arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/${IngestStream}:log-stream:*' Roles: - !Ref FirehoseDeliveryRole FirehoseDeliveryRole: Type: AWS::IAM::Role Properties: Path: / AssumeRolePolicyDocument: Statement: - Effect: Allow Principal: Service: - firehose.amazonaws.com Action: - sts:AssumeRole Condition: StringEquals: sts:ExternalId: !Ref AWS::AccountId Policies: - PolicyName: S3_ReadWrite_OutputBucket PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - s3:AbortMultipartUpload - s3:GetBucketLocation - s3:GetObject - s3:ListBucket - s3:ListBucketMultipartUploads - s3:PutObject Resource: - !Join ['', ['arn:aws:s3:::', !Ref 'OutputBucket', '/*']] - !Join ['', ['arn:aws:s3:::', !Ref 'OutputBucket']] - PolicyName: Lambda_Invoke_Transform PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - lambda:InvokeFunction - lambda:GetFunctionConfiguration Resource: - !GetAtt FirehoseTransform.Arn FirehoseTransform: Type: AWS::Serverless::Function Properties: Handler: firehoseTransform.lambda_handler Runtime: python3.6 CodeUri: lambda/ FunctionName: FirehoseTransform Description: Transforms and enriches messages ingested by a Firehose delivery stream MemorySize: 1536 Timeout: 300 Tracing: Active Policies: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole - arn:aws:iam::aws:policy/AWSXrayWriteOnlyAccess - Version: '2012-10-17' Statement: - Effect: Allow Action: - dynamodb:GetItem - dynamodb:BatchGetItem Resource: !GetAtt DeviceDetailsTable.Arn Environment: Variables: TABLE_NAME: !Ref DeviceDetailsTable DeviceDetailsTable: Type: AWS::Serverless::SimpleTable Properties: PrimaryKey: Name: device_id Type: String ProvisionedThroughput: ReadCapacityUnits: 50 WriteCapacityUnits: 50 # Lambda function resource used to populate the DeviceDetails table # with sample data PopulateTable: Type: AWS::Serverless::Function Properties: Handler: populateTable.lambda_handler Runtime: python3.6 CodeUri: lambda/ FunctionName: PopulateTable Description: Populates DynamoDB table with sample device information MemorySize: 1536 Timeout: 300 Policies: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole - Version: '2012-10-17' Statement: - Effect: Allow Action: - dynamodb:PutItem Resource: !GetAtt DeviceDetailsTable.Arn Environment: Variables: TABLE_NAME: !Ref DeviceDetailsTable # Custom resource to invoke PopulateTable Lambda function InvokePopulateTable: Type: Custom::InvokePopulateTable Properties: ServiceToken: !GetAtt PopulateTable.Arn # CloudWatch Dashboard for easy monitoring CWDashBoard: Type: AWS::CloudWatch::Dashboard Properties: DashboardName: StreamingITL DashboardBody: !Sub | { "widgets": [ { "type": "metric", "x": 0, "y": 0, "width": 24, "height": 6, "properties": { "view": "timeSeries", "stacked": false, "metrics": [ [ "AWS/Lambda", "Duration", "FunctionName", "${FirehoseTransform}", { "period": 60, "stat": "Average" } ], [ ".", "Invocations", ".", ".", { "period": 60, "yAxis": "right", "stat": "Sum", "color": "#2ca02c" } ], [ ".", "Errors", ".", ".", { "period": 60, "stat": "Sum", "yAxis": "right", "color": "#d62728" } ], [ ".", "Throttles", ".", ".", { "period": 60, "stat": "Sum", "yAxis": "right", "color": "#ff7f0e" } ] ], "region": "${AWS::Region}", "title": "Lambda", "period": 60 } }, { "type": "metric", "x": 0, "y": 6, "width": 24, "height": 6, "properties": { "view": "timeSeries", "stacked": false, "metrics": [ [ "AWS/Firehose", "ExecuteProcessing.Duration", "DeliveryStreamName", "${IngestStream}", { "yAxis": "left", "period": 300 } ], [ ".", "SucceedProcessing.Records", ".", ".", { "stat": "Sum", "yAxis": "right", "period": 300, "color": "#2ca02c" } ], [ ".", "IncomingRecords", ".", ".", { "stat": "Sum", "yAxis": "right", "period": 300, "color": "#ff7f0e" } ] ], "region": "${AWS::Region}", "title": "Kinesis Firehose", "period": 300 } }, { "type": "metric", "x": 0, "y": 12, "width": 24, "height": 6, "properties": { "view": "timeSeries", "stacked": true, "metrics": [ [ "AWS/DynamoDB", "ConsumedReadCapacityUnits", "TableName", "${DeviceDetailsTable}", { "period": 60, "stat": "Sum" } ], [ ".", "ReadThrottleEvents", ".", ".", { "period": 60, "yAxis": "right", "stat": "Sum" } ] ], "region": "${AWS::Region}", "title": "DynamoDB", "period": 300, "annotations": { "horizontal": [ { "label": "ProvisionedReadCapacityUnits", "value": 3000 } ] } } } ] } Outputs: OutputBucketName: Value: !Ref 'OutputBucket' Description: S3 bucket for transformed records and source record backups FirehoseTransformFunctionName: Value: !Ref 'FirehoseTransform' Description: Name of FirehoseTransform Lambda function