AWSTemplateFormatVersion: "2010-09-09" Description: "(SO0039) - Real-Time IoT Device Monitoring with Kinesis Analytics: Analyze IoT Device Connectivity using Kinesis Analytics, Version %%VERSION%%" Parameters: UserName: Description: The username of the user you want to create in Amazon Cognito. Type: String AllowedPattern: "^(?=\\s*\\S).*$" ConstraintDescription: " cannot be empty" MinLength: 1 UserEmail: Type: String Description: Email address for dashboard user. After successfully launching this solution, you will receive an email with instructions to log in. AllowedPattern: ^[_A-Za-z0-9-\+]+(\.[_A-Za-z0-9-]+)*@[A-Za-z0-9-]+(\.[A-Za-z0-9]+)*(\.[A-Za-z]{2,})$ MinLength: 1 IoTTopicName: Type: String MinLength: 1 Default: "iot_device_analytics" Description: "IoT Topic Name that your devices will send messages to." Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: Cognito User for Access to the Dashboard Parameters: - UserName - UserEmail - Label: default: IoT Settings Parameters: - IoTTopicName ParameterLabels: UserName: default: "User Name" UserEmail: default: "User Email Address" IoTTopicName: default: "IoT Topic to monitor" Mappings: SourceCode: General: S3Bucket: '%%BUCKET_NAME%%' KeyPrefix: '%%SOLUTION_NAME%%/%%VERSION%%' LogPrefix: '%%SOLUTION_NAME%%/' KinesisAnalyticsApp: Outputs: FireHoseStreamName: PROCESSED_METRICS_S3_STREAM LambdaStreamName: UPDATE_DDB_LAMBDA_STREAM DDB: Scaling: ReadTargetUtilization: 70 ReadCapacityMin: 1 ReadCapacityMax: 100 WriteTargetUtilization: 50 WriteCapacityMin: 5 WriteCapacityMax: 1000 Solution: Data: ID: SO0039 Version: '%%VERSION%%' SendAnonymousUsageData: 'True' Resources: IoTMetricsLogGroup: Type: AWS::Logs::LogGroup Properties: RetentionInDays: 7 IotMetricsLogStream: Type: AWS::Logs::LogStream Properties: LogGroupName: !Ref IoTMetricsLogGroup IoTTopicRule: Type: AWS::IoT::TopicRule Properties: TopicRulePayload: Description: 'Send IoT Device data in raw format to Kinesis Analytics' AwsIotSqlVersion: '2016-03-23' RuleDisabled: 'false' Sql: !Sub 'SELECT *, parse_time("yyyy-MM-dd HH:mm:ss", timestamp()) as ts FROM "${IoTTopicName}"' Actions: - Firehose: DeliveryStreamName: !Ref RawMetricsDeliveryStream RoleArn: !Sub '${IoTTopicRuleRole.Arn}' Separator: "\n" IoTTopicRuleRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - 'iot.amazonaws.com' Action: - 'sts:AssumeRole' Path: / Policies: # Posts to RawMetricsDeliveryStream - PolicyName: 'IoTTopicRulePolicy' PolicyDocument: Version: '2012-10-17' Statement: Effect: Allow Action: - firehose:DescribeDeliveryStream - firehose:ListDeliveryStreams - firehose:PutRecord - firehose:PutRecordBatch Resource: !Sub '${RawMetricsDeliveryStream.Arn}' RawMetricsDeliveryStream: Type: AWS::KinesisFirehose::DeliveryStream Properties: S3DestinationConfiguration: BucketARN: !GetAtt RawMetricsBucket.Arn BufferingHints: IntervalInSeconds: 60 SizeInMBs: 10 CloudWatchLoggingOptions: Enabled: true LogGroupName: !Ref IoTMetricsLogGroup LogStreamName: 'RawMetricsS3Delivery' CompressionFormat: 'UNCOMPRESSED' EncryptionConfiguration: NoEncryptionConfig: 'NoEncryption' Prefix: !FindInMap - SourceCode - General - LogPrefix RoleARN: !GetAtt RawMetricsDeliveryStreamRole.Arn RawMetricsBucket: DeletionPolicy: Retain Type: AWS::S3::Bucket Properties: BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true LoggingConfiguration: DestinationBucketName: !Ref LogsBucket LogFilePrefix: raw-metrics-bucket/ LifecycleConfiguration: Rules: - Id: ExpirationRule Status: Enabled ExpirationInDays: '7' Metadata: cfn_nag: rules_to_suppress: - id: W51 reason: "This is a private bucket. Does not require bucket policy" RawMetricsDeliveryStreamRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - 'firehose.amazonaws.com' Action: - 'sts:AssumeRole' Path: / Policies: # Puts objects in RawMetricsBucket - PolicyName: 'RawMetricsS3UploadPolicy' PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - s3:AbortMultipartUpload - s3:GetBucketLocation - s3:GetObject - s3:PutObject - s3:ListBucket - s3:ListBucketMultipartUploads Resource: - !Sub '${RawMetricsBucket.Arn}' - !Sub '${RawMetricsBucket.Arn}/' - !Sub '${RawMetricsBucket.Arn}/*' # Write to CloudWatch - PolicyName: RawMetricsDeliveryStreamLogging PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutDestination - logs:PutLogEvents Resource: !Join - '' - - 'arn:aws:logs:' - !Ref AWS::Region - ':' - !Ref AWS::AccountId - ':log-group:*' Metadata: cfn_nag: rules_to_suppress: - id: W11 reason: "The wildcard action in the logs policy is required" KinesisAnalyticsApp: Type: AWS::KinesisAnalytics::Application Properties: ApplicationDescription: 'IOT Device Monitoring Analysis' ApplicationCode: !Sub - | -- Create a common format to be used for all the different metrics for IoT device monitoring CREATE OR REPLACE STREAM FAN_OUT_STREAM ( eventTimeStamp TIMESTAMP, computationType VARCHAR(128), category VARCHAR(128), subcategory INTEGER, unit VARCHAR(128), unitValue DOUBLE); -- 1. Create an output stream, which is used to send unique number of connected IoT devices to the destination CREATE OR REPLACE PUMP connected_device_pump AS INSERT INTO FAN_OUT_STREAM SELECT current_timestamp as eventTimeStamp, 'ConnectedDevicesCount', 'None', 0, 'Count', * FROM ( SELECT STREAM * FROM TABLE(COUNT_DISTINCT_ITEMS_TUMBLING( CURSOR(SELECT STREAM * FROM source_sql_stream_001), 'device', 60 ) ) ); -- 2. Max of the data point (temp) per connected device CREATE OR REPLACE PUMP per_device_max_pump AS INSERT INTO FAN_OUT_STREAM SELECT STREAM STEP(source_sql_stream_001."COL_time" BY INTERVAL '1' MINUTE) AS eventTimeStamp, 'PerDeviceMaxTemp', "device", 0, 'Maximum', MAX("temp") AS max_value FROM source_sql_stream_001 GROUP BY "device", STEP(source_sql_stream_001.rowtime BY INTERVAL '1' MINUTE), STEP(source_sql_stream_001."COL_time" BY INTERVAL '1' MINUTE); -- 3. Min of the data point (temp) per connected device CREATE OR REPLACE PUMP per_device_min_pump AS INSERT INTO FAN_OUT_STREAM SELECT STREAM STEP(source_sql_stream_001."COL_time" BY INTERVAL '1' MINUTE) AS eventTimeStamp, 'PerDeviceMinTemp', "device", 0, 'Minimum', MIN("temp") AS min_value FROM source_sql_stream_001 GROUP BY "device", STEP(source_sql_stream_001.rowtime BY INTERVAL '1' MINUTE), STEP(source_sql_stream_001."COL_time" BY INTERVAL '1' MINUTE); -- 4. Avg of the data point (temp) per connected device CREATE OR REPLACE PUMP per_device_avg_pump AS INSERT INTO FAN_OUT_STREAM SELECT STREAM STEP(source_sql_stream_001."COL_time" BY INTERVAL '1' MINUTE) AS eventTimeStamp, 'PerDeviceAvgTemp', "device", 0, 'Average', AVG("temp") AS avg_value FROM source_sql_stream_001 GROUP BY "device", STEP(source_sql_stream_001.rowtime BY INTERVAL '1' MINUTE), STEP(source_sql_stream_001."COL_time" BY INTERVAL '1' MINUTE); -- Setup for Anomaly detection CREATE OR REPLACE STREAM temp_stream (temp INTEGER, device varchar(4), anomaly_score DOUBLE); CREATE OR REPLACE PUMP temp_pump AS INSERT INTO temp_stream SELECT STREAM "temp", "device", anomaly_score FROM TABLE(RANDOM_CUT_FOREST( CURSOR(SELECT STREAM * FROM source_sql_stream_001) )); -- 5. Anomaly detection on the value sent (temp) CREATE OR REPLACE PUMP anomaly_pump AS INSERT INTO FAN_OUT_STREAM SELECT STREAM STEP(temp_stream.rowtime BY INTERVAL '1' MINUTE) as eventTimeStamp, 'DeviceTempAnomalyScore', device, temp, 'AnomalyScore', anomaly_score FROM temp_stream ORDER BY STEP(temp_stream.rowtime BY INTERVAL '1' MINUTE), anomaly_score DESC; -- 6. Average of the data point (temp) across all devices CREATE OR REPLACE PUMP avg_aggregate_pump AS INSERT INTO FAN_OUT_STREAM SELECT STREAM STEP(source_sql_stream_001."COL_time" BY INTERVAL '1' MINUTE) AS event_timestamp, 'AvgTempValue', 'All', 0, 'Average', AVG("temp") AS avg_value FROM source_sql_stream_001 GROUP BY STEP(source_sql_stream_001.rowtime BY INTERVAL '1' MINUTE), STEP(source_sql_stream_001."COL_time" BY INTERVAL '1' MINUTE); -- 7. Min of the data point (temp) across all devices CREATE OR REPLACE PUMP min_aggregate_pump AS INSERT INTO FAN_OUT_STREAM SELECT STREAM STEP(source_sql_stream_001."COL_time" BY INTERVAL '1' MINUTE) AS event_timestamp, 'MinTempValue', 'All', 0, 'Minimum', MIN("temp") AS min_value FROM source_sql_stream_001 GROUP BY STEP(source_sql_stream_001.rowtime BY INTERVAL '1' MINUTE), STEP(source_sql_stream_001."COL_time" BY INTERVAL '1' MINUTE); -- 8. Max of the data point (temp) across all devices CREATE OR REPLACE PUMP max_aggregate_pump AS INSERT INTO FAN_OUT_STREAM SELECT STREAM STEP(source_sql_stream_001."COL_time" BY INTERVAL '1' MINUTE) AS event_timestamp, 'MaxTempValue', 'All', 0, 'Maximum', MAX("temp") AS max_value FROM source_sql_stream_001 GROUP BY STEP(source_sql_stream_001.rowtime BY INTERVAL '1' MINUTE), STEP(source_sql_stream_001."COL_time" BY INTERVAL '1' MINUTE); --Setup for 9-14 -- Sort stream and apply sessions CREATE OR REPLACE STREAM sorted_stream (event_timestamp TIMESTAMP, device VARCHAR(4), flow INTEGER, temp INTEGER, humidity INTEGER); CREATE OR REPLACE PUMP sort_pump AS INSERT INTO sorted_stream SELECT STREAM "COL_time" AS event_timestamp, "device", "flow", "temp", "humidity" FROM source_sql_stream_001 ORDER BY STEP(source_sql_stream_001.rowtime BY INTERVAL '10' SECOND), "COL_time"; CREATE OR REPLACE STREAM time_between_events_stream (event_timestamp TIMESTAMP, seconds_between_events INTEGER, device VARCHAR(4)); CREATE OR REPLACE PUMP time_between_events_pump AS INSERT INTO time_between_events_stream SELECT STREAM event_timestamp, -- Calculates the time between session events. -- tsdiff takes the difference between to timestamps in ms -- compares the current timestamp in the row to the last timestamp TSDIFF(event_timestamp, -- Lag pulls a column from a previous event relative to the current event -- In this case, we use 1 because we want the time between the two events LAG(event_timestamp, 1) OVER W1) / 1000 AS seconds_between_events, device FROM sorted_stream WINDOW W1 as ( -- If no unique session_id exists or no client event for ending a session, you must define the start and end of a session. -- If users are expected to have multiple sessions online at a given time, another unique identifier must be added to the partition. PARTITION BY device RANGE INTERVAL '1' HOUR PRECEDING ); CREATE OR REPLACE STREAM connected_flag_stream (new_session_indicator BIGINT, event_timestamp TIMESTAMP, seconds_between_events INTEGER, device VARCHAR(4)); CREATE OR REPLACE PUMP connected_flag_pump AS INSERT INTO connected_flag_stream SELECT STREAM -- Flag new connected sessions which makes other analytics easier -- Assumes no device has more than one active session (CASE -- time interval >= 0, part of the same session WHEN seconds_between_events >= 0 AND seconds_between_events <= 60 THEN 0 -- time interval null, new session WHEN seconds_between_events IS NULL OR seconds_between_events > 60 THEN UNIX_TIMESTAMP(event_timestamp) ELSE NULL END) AS new_session_indicator, event_timestamp, seconds_between_events, device FROM time_between_events_stream; --Group sessions together CREATE OR REPLACE STREAM device_session_stream (sesson_id VARCHAR(128), seconds_between_events INTEGER, device VARCHAR(4)); CREATE OR REPLACE PUMP device_session_pump AS INSERT INTO device_session_stream SELECT STREAM (device || '_' || -- If users are expected to have multiple sessions online at a given time, another unique identifer must be added here. CAST(MAX(new_session_indicator) OVER W1 AS VARCHAR(128)) ) as session_id, seconds_between_events, device FROM connected_flag_stream WINDOW W1 AS ( PARTITION BY device RANGE INTERVAL '1' HOUR PRECEDING ); -- Calculate connected time events for devices CREATE OR REPLACE STREAM session_connected_time_stream (sesson_id VARCHAR(128), connected_time_seconds INTEGER); CREATE OR REPLACE PUMP session_connected_time_pump AS INSERT INTO session_connected_time_stream SELECT STREAM sesson_id, SUM(seconds_between_events) OVER W1 AS connected_time_seconds FROM device_session_stream WINDOW W1 AS ( PARTITION BY sesson_id RANGE INTERVAL '1' HOUR PRECEDING ); --Per session time stream for disconnected devices that came back online within an hour CREATE OR REPLACE STREAM per_session_disconnected_time_stream (sesson_id VARCHAR(128), max_disconnected_time_seconds INTEGER, avg_disconnected_time_seconds INTEGER, min_disconnected_time_seconds INTEGER); CREATE OR REPLACE PUMP per_session_disconnected_time_pump AS INSERT INTO per_session_disconnected_time_stream SELECT STREAM sesson_id, MAX(connected_time_seconds) AS max_disconnected_time_seconds, AVG(connected_time_seconds) AS avg_disconnected_time_seconds, MIN(connected_time_seconds) AS min_disconnected_time_seconds FROM session_connected_time_stream WHERE connected_time_seconds > 60 GROUP BY STEP(session_connected_time_stream.rowtime BY INTERVAL '10' SECOND), sesson_id; --9. Max for disconnected devices that came back online within an hour CREATE OR REPLACE PUMP maximum_disconnected_time_pump AS INSERT INTO FAN_OUT_STREAM SELECT STREAM STEP(per_session_disconnected_time_stream.rowtime BY INTERVAL '10' SECOND) AS event_timestamp, 'MaxDisconnTime', 'None', 0, 'Maximum', MAX(max_disconnected_time_seconds) AS max_value FROM per_session_disconnected_time_stream GROUP BY STEP(per_session_disconnected_time_stream.rowtime BY INTERVAL '10' SECOND); --10. Min for disconnected devices that came back online within an hour CREATE OR REPLACE PUMP minimum_disconnected_time_pump AS INSERT INTO FAN_OUT_STREAM SELECT STREAM STEP(per_session_disconnected_time_stream.rowtime BY INTERVAL '10' SECOND) AS event_timestamp, 'MinDisconnTime', 'None', 0, 'Minimum', MIN(min_disconnected_time_seconds) AS min_value FROM per_session_disconnected_time_stream GROUP BY STEP(per_session_disconnected_time_stream.rowtime BY INTERVAL '10' SECOND); --11. Avg for disconnected devices that came back online within an hour CREATE OR REPLACE PUMP average_disconnected_time_pump AS INSERT INTO FAN_OUT_STREAM SELECT STREAM STEP(per_session_disconnected_time_stream.rowtime BY INTERVAL '10' SECOND) AS event_timestamp, 'AvgDisconnTime', 'None', 0, 'Average', AVG(avg_disconnected_time_seconds) AS avg_value FROM per_session_disconnected_time_stream GROUP BY STEP(per_session_disconnected_time_stream.rowtime BY INTERVAL '10' SECOND); --Per session time stream for connected devices CREATE OR REPLACE STREAM per_session_connected_time_stream (sesson_id VARCHAR(128), max_connected_time_seconds INTEGER, avg_connected_time_seconds INTEGER, min_connected_time_seconds INTEGER); CREATE OR REPLACE PUMP per_session_connected_time_pump AS INSERT INTO per_session_connected_time_stream SELECT STREAM sesson_id, MAX(connected_time_seconds) AS max_connected_time_seconds, AVG(connected_time_seconds) AS avg_connected_time_seconds, MIN(connected_time_seconds) AS min_connected_time_seconds FROM session_connected_time_stream WHERE connected_time_seconds <= 60 GROUP BY STEP(session_connected_time_stream.rowtime BY INTERVAL '10' SECOND), sesson_id; --12. Max for connected devices CREATE OR REPLACE PUMP maximum_connected_time_pump AS INSERT INTO FAN_OUT_STREAM SELECT STREAM STEP(per_session_connected_time_stream.rowtime BY INTERVAL '10' SECOND) AS event_timestamp, 'MaxConnTime', 'None', 0, 'Maximum', MAX(max_connected_time_seconds) AS max_value FROM per_session_connected_time_stream GROUP BY STEP(per_session_connected_time_stream.rowtime BY INTERVAL '10' SECOND); --13. Min for connected devices CREATE OR REPLACE PUMP minimum_connected_time_pump AS INSERT INTO FAN_OUT_STREAM SELECT STREAM STEP(per_session_connected_time_stream.rowtime BY INTERVAL '10' SECOND) AS event_timestamp, 'MinConnTime', 'None', 0, 'Minimum', MIN(min_connected_time_seconds) AS min_value FROM per_session_connected_time_stream GROUP BY STEP(per_session_connected_time_stream.rowtime BY INTERVAL '10' SECOND); --14. Avg for connected devices CREATE OR REPLACE PUMP average_connected_time_pump AS INSERT INTO FAN_OUT_STREAM SELECT STREAM STEP(per_session_connected_time_stream.rowtime BY INTERVAL '10' SECOND) AS event_timestamp, 'AvgConnTime', 'None', 0, 'Average', AVG(avg_connected_time_seconds) AS avg_value FROM per_session_connected_time_stream GROUP BY STEP(per_session_connected_time_stream.rowtime BY INTERVAL '10' SECOND); --15. Fan out to multiple Kinesis Analytics Outputs CREATE STREAM ${LambdaStreamName} ( eventTimeStamp TIMESTAMP, computationType VARCHAR(128), category VARCHAR(128), subcategory INTEGER, unit VARCHAR(128), unitValue DOUBLE); CREATE OR REPLACE PUMP fan_out_lambda_pump AS INSERT INTO ${LambdaStreamName} SELECT * FROM FAN_OUT_STREAM; CREATE STREAM ${FireHoseStreamName} ( eventTimeStamp TIMESTAMP, computationType VARCHAR(128), category VARCHAR(128), subcategory INTEGER, unit VARCHAR(128), unitValue DOUBLE); CREATE OR REPLACE PUMP fan_out_firehose_pump AS INSERT INTO ${FireHoseStreamName} SELECT * FROM FAN_OUT_STREAM; - LambdaStreamName: !FindInMap - KinesisAnalyticsApp - Outputs - LambdaStreamName FireHoseStreamName: !FindInMap - KinesisAnalyticsApp - Outputs - FireHoseStreamName Inputs: - NamePrefix: 'SOURCE_SQL_STREAM' InputSchema: RecordColumns: - Name: 'COL_time' SqlType: 'TIMESTAMP' Mapping: '$.ts' - Name: 'device' SqlType: 'VARCHAR(4)' Mapping: '$.device' - Name: 'flow' SqlType: 'INTEGER' Mapping: '$.flow' - Name: 'temp' SqlType: 'INTEGER' Mapping: '$.temp' - Name: 'humidity' SqlType: 'INTEGER' Mapping: '$.humidity' RecordFormat: RecordFormatType: 'JSON' MappingParameters: JSONMappingParameters: RecordRowPath: '$' RecordEncoding: 'UTF-8' KinesisFirehoseInput: ResourceARN: !GetAtt RawMetricsDeliveryStream.Arn RoleARN: !GetAtt KinesisAnalyticsAppRole.Arn KinesisAnalyticsAppRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: kinesisanalytics.amazonaws.com Action: 'sts:AssumeRole' Path: '/' Policies: # Read from RawMetricsDeliveryStream - PolicyName: 'KinesisAnalyticsReadRawMetrics' PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - firehose:DescribeDeliveryStream - firehose:Get* Resource: !Sub '${RawMetricsDeliveryStream.Arn}' # Post to ProcessedMetricsDeliveryStream - PolicyName: 'KinesisAnalyticsPutProcessedMetrics' PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - firehose:DescribeDeliveryStream - firehose:ListDeliveryStreams - firehose:PutRecord - firehose:PutRecordBatch Resource: !Sub '${ProcessedMetricsDeliveryStream.Arn}' # Invoke UpdateDDBLambda - PolicyName: UpdateDDBLambdaInvocation PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - lambda:InvokeFunction Resource: !Sub '${UpdateDDBLambda.Arn}' # Write to CloudWatch - PolicyName: KinesisAnalyticsAppLogging PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutDestination - logs:PutLogEvents Resource: !Join - '' - - 'arn:aws:logs:' - !Ref AWS::Region - ':' - !Ref AWS::AccountId - ':log-group:*' Metadata: cfn_nag: rules_to_suppress: - id: W11 reason: "The wildcard action in the logs policy is required" - id: F3 reason: "The wildcard action in the KinesisAnalyticsReadRawMetrics policy permits the KinesisAnalyticsApp to read from the RawMetricsDeliveryStream. The wildcard resource in the KinesisAnalyticsAppLogging policy permits the KinesisAnalyticsApp to log events to CloudWatch." KinesisAnalyticsAppFirehoseOutput: Type: AWS::KinesisAnalytics::ApplicationOutput Properties: ApplicationName: !Ref KinesisAnalyticsApp Output: DestinationSchema: RecordFormatType: 'CSV' KinesisFirehoseOutput: ResourceARN: !Sub '${ProcessedMetricsDeliveryStream.Arn}' RoleARN: !Sub '${KinesisAnalyticsAppRole.Arn}' Name: !FindInMap - KinesisAnalyticsApp - Outputs - FireHoseStreamName KinesisAnalyticsAppLambdaOutput: Type: AWS::KinesisAnalytics::ApplicationOutput # Use DependsOn to serialize adding Application Outputs to reduce likelihood of errors. DependsOn: KinesisAnalyticsAppFirehoseOutput Properties: ApplicationName: !Ref KinesisAnalyticsApp Output: DestinationSchema: RecordFormatType: 'CSV' LambdaOutput: ResourceARN: !Sub '${UpdateDDBLambda.Arn}' RoleARN: !Sub '${KinesisAnalyticsAppRole.Arn}' Name: !FindInMap - KinesisAnalyticsApp - Outputs - LambdaStreamName ProcessedMetricsDeliveryStream: Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamType: 'DirectPut' S3DestinationConfiguration: BucketARN: !Sub '${ProcessedMetricsBucket.Arn}' BufferingHints: IntervalInSeconds: 60 SizeInMBs: 10 CloudWatchLoggingOptions: Enabled: true LogGroupName: !Ref IoTMetricsLogGroup LogStreamName: 'ProcessedMetricsS3Delivery' CompressionFormat: 'UNCOMPRESSED' EncryptionConfiguration: NoEncryptionConfig: 'NoEncryption' Prefix: !FindInMap - SourceCode - General - LogPrefix RoleARN: !Sub '${ProcessedMetricsDeliveryStreamRole.Arn}' ProcessedMetricsBucket: DeletionPolicy: Retain Type: AWS::S3::Bucket Properties: BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true LoggingConfiguration: DestinationBucketName: !Ref LogsBucket LogFilePrefix: processed-metrics-bucket/ LifecycleConfiguration: Rules: - Id: ExpirationRule Status: Enabled ExpirationInDays: '7' Metadata: cfn_nag: rules_to_suppress: - id: W51 reason: "This is a private bucket. Does not require bucket policy" ProcessedMetricsDeliveryStreamRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - firehose.amazonaws.com Action: - sts:AssumeRole Path: / Policies: # Put objects in ProcessedMetricsBucket - PolicyName: 'ProcessedMetricsS3Delivery' PolicyDocument: Version: '2012-10-17' Statement: Action: - s3:AbortMultipartUpload - s3:GetBucketLocation - s3:PutObject - s3:GetObject - s3:ListBucket - s3:ListBucketMultipartUploads Effect: Allow Resource: - !Sub '${ProcessedMetricsBucket.Arn}' - !Sub '${ProcessedMetricsBucket.Arn}/' - !Sub '${ProcessedMetricsBucket.Arn}/*' # Write to CloudWatch - PolicyName: ProcessedMetricsLogging PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutDestination - logs:PutLogEvents Resource: !Join - '' - - 'arn:aws:logs:' - !Ref AWS::Region - ':' - !Ref AWS::AccountId - ':log-group:*' Metadata: cfn_nag: rules_to_suppress: - id: W11 reason: "The wildcard action in the logs policy is required" # UpdateDDBLambda UpdateDDBLambda: Type: AWS::Lambda::Function Properties: Code: S3Bucket: !Sub - ${Param1}-${AWS::Region} - Param1: !FindInMap - SourceCode - General - S3Bucket S3Key: !Sub - ${Param1}/update_ddb_from_stream.zip - Param1: !FindInMap - SourceCode - General - KeyPrefix Environment: Variables: ANALYTICS_TABLE: !Ref AnalyticsTable SOLUTION_UUID: !GetAtt GenerateUUID.UUID SOLUTION_ID: !FindInMap - Solution - Data - ID SOLUTION_VERSION: !FindInMap - Solution - Data - Version SEND_ANONYMOUS_DATA: !FindInMap - Solution - Data - SendAnonymousUsageData Description: Puts ProcessedMetrics data into AnalyticsTable. Handler: update_ddb_from_stream.lambda_handler MemorySize: 256 Role: !GetAtt UpdateDDBLambdaRole.Arn Runtime: python3.8 Timeout: 300 UpdateDDBLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: root PolicyDocument: Version: '2012-10-17' Statement: # Read from ProcessedMetricsDeliveryStream - Effect: Allow Action: - firehose:DescribeDeliveryStream - firehose:Get* Resource: - !Sub '${ProcessedMetricsDeliveryStream.Arn}' # Update AnalyticsTable - Effect: Allow Action: - dynamodb:GetItem - dynamodb:PutItem Resource: - !Sub '${AnalyticsTable.Arn}' # Write to CloudWatch - PolicyName: UpdateDDBLambdaLogging PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutDestination - logs:PutLogEvents Resource: !Join - '' - - 'arn:aws:logs:' - !Ref AWS::Region - ':' - !Ref AWS::AccountId - ':log-group:*' Metadata: cfn_nag: rules_to_suppress: - id: F3 reason: "The wildcard action in the root policy permits the UpdateDDBLambda function to read from the ProcessedMetricsDeliveryStream. The wilcard resource in the UpdateDDBLambdaLogging policy permits the UpdateDDBLambda function to log events to CloudWatch." - id: W11 reason: "The wildcard action required to log events to CloudWatch." # Database AnalyticsTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: MetricType AttributeType: S - AttributeName: EventTime AttributeType: S KeySchema: - KeyType: HASH AttributeName: MetricType - KeyType: RANGE AttributeName: EventTime ProvisionedThroughput: ReadCapacityUnits: 20 WriteCapacityUnits: 20 AnalyticsTableScalingRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - application-autoscaling.amazonaws.com Action: - sts:AssumeRole Path: '/' Policies: - PolicyName: AnalyticsTableScalingPolicy PolicyDocument: Version: '2012-10-17' Statement: # Allows updating AnalyticsTable capacity. - Effect: Allow Action: - dynamodb:DescribeTable - dynamodb:UpdateTable Resource: - !Sub '${AnalyticsTable.Arn}' # Allows access to AnalyticsTable cloudwatch logs. - Effect: Allow Action: - cloudwatch:PutMetricAlarm - cloudwatch:DescribeAlarms - cloudwatch:GetMetricStatistics - cloudwatch:SetAlarmState - cloudwatch:DeleteAlarms Resource: - '*' Metadata: cfn_nag: rules_to_suppress: - id: W11 reason: "The wildcard action in the root policy permits the UpdateDDBLambda function to read from the ProcessedMetricsDeliveryStream. The wilcard resource in the UpdateDDBLambdaLogging policy permits the UpdateDDBLambda function to log events to CloudWatch." AnalyticsTableWriteCapacityTarget: Type: AWS::ApplicationAutoScaling::ScalableTarget Properties: MaxCapacity: !FindInMap [DDB, Scaling, WriteCapacityMax] MinCapacity: !FindInMap [DDB, Scaling, WriteCapacityMin] ResourceId: !Sub 'table/${AnalyticsTable}' RoleARN: !Sub '${AnalyticsTableScalingRole.Arn}' ScalableDimension: dynamodb:table:WriteCapacityUnits ServiceNamespace: dynamodb AnalyticsTableWriteScalingPolicy: Type: AWS::ApplicationAutoScaling::ScalingPolicy Properties: PolicyName: WriteAutoScalingPolicy PolicyType: TargetTrackingScaling ScalingTargetId: !Ref AnalyticsTableWriteCapacityTarget TargetTrackingScalingPolicyConfiguration: TargetValue: !FindInMap [DDB, Scaling, WriteTargetUtilization] ScaleInCooldown: 300 ScaleOutCooldown: 60 PredefinedMetricSpecification: PredefinedMetricType: DynamoDBWriteCapacityUtilization AnalyticsTableReadCapacityTarget: Type: AWS::ApplicationAutoScaling::ScalableTarget Properties: MaxCapacity: !FindInMap [DDB, Scaling, ReadCapacityMax] MinCapacity: !FindInMap [DDB, Scaling, ReadCapacityMin] ResourceId: !Sub 'table/${AnalyticsTable}' RoleARN: !Sub '${AnalyticsTableScalingRole.Arn}' ScalableDimension: dynamodb:table:ReadCapacityUnits ServiceNamespace: dynamodb AnalyticsTableReadScalingPolicy: Type: AWS::ApplicationAutoScaling::ScalingPolicy Properties: PolicyName: ReadAutoScalingPolicy PolicyType: TargetTrackingScaling ScalingTargetId: !Ref AnalyticsTableReadCapacityTarget TargetTrackingScalingPolicyConfiguration: TargetValue: !FindInMap [DDB, Scaling, ReadTargetUtilization] ScaleInCooldown: 300 ScaleOutCooldown: 60 PredefinedMetricSpecification: PredefinedMetricType: DynamoDBReadCapacityUtilization # Dashboard Website WebsiteBucket: Type: AWS::S3::Bucket DeletionPolicy: Retain Properties: BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true LoggingConfiguration: DestinationBucketName: !Ref LogsBucket LogFilePrefix: website-bucket/ WebsiteConfiguration: IndexDocument: "index.html" ErrorDocument: "index.html" WebsiteBucketPolicy: Type: "AWS::S3::BucketPolicy" Properties: Bucket: Ref: "WebsiteBucket" PolicyDocument: Statement: - Action: - "s3:GetObject" Effect: "Allow" Resource: Fn::Join: - "" - - "arn:aws:s3:::" - Ref: "WebsiteBucket" - "/*" Principal: CanonicalUser: !GetAtt WebsiteOriginAccessIdentity.S3CanonicalUserId WebsiteOriginAccessIdentity: Type: AWS::CloudFront::CloudFrontOriginAccessIdentity Properties: CloudFrontOriginAccessIdentityConfig: Comment: !Sub "access-identity-${WebsiteBucket}" WebsiteDistribution: Type: AWS::CloudFront::Distribution Properties: DistributionConfig: Comment: "Website distribution for solution" Origins: - Id: S3-solution-website DomainName: !Sub "${WebsiteBucket}.s3.${AWS::Region}.amazonaws.com" S3OriginConfig: OriginAccessIdentity: !Sub "origin-access-identity/cloudfront/${WebsiteOriginAccessIdentity}" DefaultCacheBehavior: TargetOriginId: S3-solution-website AllowedMethods: - GET - HEAD - OPTIONS - PUT - POST - PATCH - DELETE CachedMethods: - GET - HEAD - OPTIONS ForwardedValues: QueryString: 'false' ViewerProtocolPolicy: redirect-to-https IPV6Enabled: 'true' ViewerCertificate: CloudFrontDefaultCertificate: 'true' Enabled: 'true' HttpVersion: 'http2' Logging: IncludeCookies: 'false' Bucket: !GetAtt LogsBucket.DomainName Prefix: cloudfront-logs/ ##Logging bucket for cloudFront and other solution buckets LogsBucket: DeletionPolicy: Retain Type: AWS::S3::Bucket Properties: BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true AccessControl: LogDeliveryWrite Metadata: cfn_nag: rules_to_suppress: - id: W35 reason: "This is the logs bucket for all the other S3 Buckets and CloudFront" - id: W51 reason: "This is a private bucket. Does not require bucket policy" # Cognito for Dashboard Users CognitoUserPool: Type: AWS::Cognito::UserPool Properties: AliasAttributes: - email AutoVerifiedAttributes: - email AdminCreateUserConfig: AllowAdminCreateUserOnly: True InviteMessageTemplate: EmailMessage: !Sub |

You are invited to join the Real-Time IoT Device Monitoring dashboard. Your dashboard credentials are as follows:

Username: {username}
Password: {####}

Please sign in to the dashboard with the user name and your temporary password provided above at:
https://${WebsiteDistribution.DomainName}/index.html

EmailSubject: 'Your Real-Time IoT Device Monitoring dashboard Login' UnusedAccountValidityDays: 7 EmailVerificationMessage: !Sub |

You are invited to join the Real-Time IoT Device Monitoring dashboard. Your dashboard credentials are as follows:

Username: {username}
Password: {####}

Please sign in to the dashboard with the user name and temporary password provided above at:
https://${WebsiteDistribution.DomainName}/index.html

EmailVerificationSubject: 'Your Real-Time IoT Device Monitoring dashboard Login' Policies: PasswordPolicy: MinimumLength: 8 RequireLowercase: True RequireNumbers: True RequireSymbols: False RequireUppercase: True Schema: - AttributeDataType: String Name: email Required: True CognitoUserPoolClient: Type: AWS::Cognito::UserPoolClient Properties: GenerateSecret: False WriteAttributes: - address - email - phone_number ReadAttributes: - name - email - email_verified - address - phone_number - phone_number_verified RefreshTokenValidity: 1 UserPoolId: !Ref CognitoUserPool CognitoIdentityPool: Type: AWS::Cognito::IdentityPool Properties: CognitoIdentityProviders: - ClientId: !Ref CognitoUserPoolClient ProviderName: !GetAtt CognitoUserPool.ProviderName AllowUnauthenticatedIdentities: false CognitoIdentityPoolRoleAttachment: Type: AWS::Cognito::IdentityPoolRoleAttachment Properties: IdentityPoolId: !Sub '${CognitoIdentityPool}' Roles: unauthenticated: !GetAtt UnauthenticatedUserRole.Arn authenticated: !GetAtt AuthenticatedUserRole.Arn AuthenticatedUserRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Federated: - cognito-identity.amazonaws.com Action: - sts:AssumeRoleWithWebIdentity Condition: StringEquals: cognito-identity.amazonaws.com:aud: !Sub '${CognitoIdentityPool}' ForAnyValue:StringLike: cognito-identity.amazonaws.com:amr: authenticated Path: / Policies: # Cognito Sync - PolicyName: 'cognito-sync' PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - mobileanalytics:PutEvents - cognito-identity:* Resource: !Sub 'arn:aws:cognito-identity:${AWS::Region}:${AWS::AccountId}:identitypool/${CognitoIdentityPool}' # Get metrics from AnalyticsTable - PolicyName: 'ReadAnalyticsTable' PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - dynamodb:BatchGetItem - dynamodb:GetItem - dynamodb:GetRecords - dynamodb:GetShardIterator - dynamodb:Query - dynamodb:Scan Resource: - !GetAtt AnalyticsTable.Arn Metadata: cfn_nag: rules_to_suppress: - id: F3 reason: "The wildcard action in the cognito-sync policy permits the AuthenticatedUser role to interface with Amazon Cognito." UnauthenticatedUserRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Federated: - cognito-identity.amazonaws.com Action: - sts:AssumeRoleWithWebIdentity Condition: StringEquals: "cognito-identity.amazonaws.com:aud": !Ref CognitoIdentityPool ForAnyValue:StringLike: "cognito-identity.amazonaws.com:amr": unauthenticated Path: / Policies: - PolicyName: root PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - mobileanalytics:PutEvents Resource: !Sub 'arn:aws:cognito-identity:${AWS::Region}:${AWS::AccountId}:identitypool/${CognitoIdentityPool}' CognitoUser: DependsOn: # Wait for the website to come up before emailing a registration link - ConfigureWebsite Type: AWS::Cognito::UserPoolUser Properties: DesiredDeliveryMediums: - EMAIL ForceAliasCreation: True UserAttributes: - Name: email Value: !Ref UserEmail - Name: email_verified Value: True Username: !Ref UserName UserPoolId: !Ref CognitoUserPool # Custom Resource CustomResourceHelper: Type: AWS::Lambda::Function Properties: Code: S3Bucket: !Sub - ${Param1}-${AWS::Region} - Param1: !FindInMap - SourceCode - General - S3Bucket S3Key: !Sub - ${Param1}/custom-resource-helper.zip - Param1: !FindInMap - SourceCode - General - KeyPrefix Environment: Variables: SOLUTION_ID: !FindInMap - Solution - Data - ID SOLUTION_VERSION: !FindInMap - Solution - Data - Version SEND_ANONYMOUS_DATA: !FindInMap - Solution - Data - SendAnonymousUsageData Description: Helps set up the Real Time IoT Device Monitoring with Kinesis solution. Handler: index.handler MemorySize: 256 Role: !GetAtt CustomResourceHelperRole.Arn Runtime: nodejs12.x Timeout: 300 CustomResourceHelperRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: ConfigureWebsitePolicy PolicyDocument: Version: '2012-10-17' Statement: # Get website objects from AWS Solutions bucket. - Effect: Allow Action: - s3:GetObject Resource: - !Sub - arn:aws:s3:::${Param1}-${AWS::Region}/${Param2}/* - Param1: !FindInMap - SourceCode - General - S3Bucket Param2: !FindInMap - SourceCode - General - KeyPrefix # Put website objects into WebsiteBucket - Effect: Allow Action: - s3:PutObject - s3:PutObjectAcl - s3:DeleteObject - s3:ListObjects - s3:ListBucket Resource: - !Sub '${WebsiteBucket.Arn}' - !Sub '${WebsiteBucket.Arn}/' - !Sub '${WebsiteBucket.Arn}/*' # Enable Bucket Encryption - PolicyName: EnableBucketEncryption PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - s3:PutEncryptionConfiguration Resource: - !Sub '${RawMetricsBucket.Arn}' - !Sub '${ProcessedMetricsBucket.Arn}' # Write to CloudWatch - PolicyName: CloudWatchLoggingPolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutDestination - logs:PutLogEvents Resource: !Join - '' - - 'arn:aws:logs:' - !Ref AWS::Region - ':' - !Ref AWS::AccountId - ':log-group:*' Metadata: cfn_nag: rules_to_suppress: - id: W11 reason: "The wildcard action in the logs policy is required" CustomResourceKinesisAnalyticsPolicy: Type: AWS::IAM::Policy Properties: PolicyName: StartKinesisApplicationPolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - 'kinesisanalytics:DescribeApplication' - 'kinesisanalytics:StartApplication' - 'kinesisanalytics:StopApplication' Resource: # KinesisAnalytics Application ARN isn't available via GetAtt - !Sub 'arn:aws:kinesisanalytics:${AWS::Region}:${AWS::AccountId}:application/${KinesisAnalyticsApp}' Roles: - !Ref CustomResourceHelperRole # Custom Resource Invocations GenerateUUID: Type: Custom::LoadLambda Properties: ServiceToken: !GetAtt CustomResourceHelper.Arn CustomResourceAction: GenerateUUID DeployWebsite: Type: Custom::LoadLambda Properties: ServiceToken: !GetAtt CustomResourceHelper.Arn Region: !Ref AWS::Region CustomResourceAction: DeployWebsite SourceS3Bucket: !Sub - ${Param1}-${AWS::Region} - Param1: !FindInMap - SourceCode - General - S3Bucket SourceS3Key: !Sub - ${Param1}/web_site - Param1: !FindInMap - SourceCode - General - KeyPrefix WebsiteBucket: !Ref WebsiteBucket sourceManifest: !Join ["/", [!FindInMap ["SourceCode", "General", "KeyPrefix"], "web-site-manifest.json"]] UUID: !GetAtt GenerateUUID.UUID ConfigureWebsite: Type: Custom::LoadLambda Properties: ServiceToken: !GetAtt CustomResourceHelper.Arn Region: !Ref AWS::Region CustomResourceAction: ConfigureWebsite WebsiteBucket: !Ref WebsiteBucket UUID: !GetAtt GenerateUUID.UUID Configuration: IdentityPoolId: !Ref CognitoIdentityPool UserPoolId: !Ref CognitoUserPool UserPoolClientId: !Ref CognitoUserPoolClient AnalyticsTable: !Ref AnalyticsTable Region: !Ref AWS::Region EnableRawMetricsBucketEncryption: Type: Custom::LoadLambda Properties: ServiceToken: !GetAtt CustomResourceHelper.Arn Region: !Ref AWS::Region CustomResourceAction: EnableBucketEncryption Bucket: !Ref RawMetricsBucket SSEAlgorithm: "AES256" EnableProcessedMetricsBucketEncryption: Type: Custom::LoadLambda Properties: ServiceToken: !GetAtt CustomResourceHelper.Arn Region: !Ref AWS::Region CustomResourceAction: EnableBucketEncryption Bucket: !Ref ProcessedMetricsBucket SSEAlgorithm: "AES256" StartKinesisAnalyticsApp: Type: Custom::LoadLambda DependsOn: - KinesisAnalyticsAppFirehoseOutput - KinesisAnalyticsAppLambdaOutput - CustomResourceKinesisAnalyticsPolicy Properties: ServiceToken: !GetAtt CustomResourceHelper.Arn Region: !Ref AWS::Region ApplicationName: !Ref KinesisAnalyticsApp CustomResourceAction: StartKinesisApplication UUID: !GetAtt GenerateUUID.UUID Outputs: DashboardUrl: Description: The URL to the Dashboard. Value: !Sub 'https://${WebsiteDistribution.DomainName}/index.html' IoTTopicName: Description: The IoT Topic to monitor. Value: !Ref IoTTopicName DemoScriptLocation: Description: The location of the zipped demo script to send messages to your IoT topic. Value: !Sub - https://s3.${AWS::Region}.amazonaws.com/${Bucket}-${AWS::Region}/${Key}/demo.zip - Bucket: !FindInMap - SourceCode - General - S3Bucket Key: !FindInMap - SourceCode - General - KeyPrefix DemoCommand: Description: Command to run the demo script. Value: !Sub - './send-messages.sh --topic ${Topic} --region ${Region}' - Topic: !Ref IoTTopicName Region: !Ref AWS::Region