# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. AWSTemplateFormatVersion: '2010-09-09' Parameters: pFirehoseDeliveryBufferSeconds: Description: Kinesis Firehose buffer time in seconds Type: Number Default: 300 pFirehoseDeliveryBufferSize: Description: Kinesis Firehose buffer size in MB (due to usage of Apache Parquet, the minimum is 64 MB) Type: Number Default: 64 pS3Path: Type: String Description: S3 Prefix to use when storing objects delivered by the Kinesis Firehose Default: "data" pIotTopic: Description: Query to use for the IoT Rule Type: String Default: "performance/mqtt" pAthenaWorkGroup: Type: String Description: DataBase Name Default: "MQTT_Load_Test_Workgroup" pAthenaQueryLatency: Type: String Description: Athena Query name (Latency Check) Default: "End2End_IoT_Latency" pAthenaQueryMessageLoss: Type: String Description: Athena Query name (Message Loss Check) Default: "End2End_IoT_MessageLoss" Resources: GlueDatabase: Type: AWS::Glue::Database Properties: CatalogId: !Ref AWS::AccountId DatabaseInput: Description: "Database for IoT Crawler" IamRoleKinesisDeliveryStream: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - firehose.amazonaws.com Action: - 'sts:AssumeRole' Description: "Role allowing required access to Kinesis Firehose" Path: / Policies: - PolicyName: "KinesisAccess-policy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - 's3:PutObject' - 's3:GetObject' - 's3:AbortMultipartUpload' - 's3:GetBucketLocation' - 's3:ListBucketMultipartUploads' - 's3:ListObject' Resource: - !GetAtt S3Bucket.Arn - !Join ['', [!GetAtt S3Bucket.Arn, "/*"]] - Effect: Allow Action: 'glue:GetTableVersions' Resource: !Sub 'arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:*' - Effect: Allow Action: 'logs:PutLogEvents' Resource: - !Sub "arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/*:log-stream:*" KinesisDeliveryStream: Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamEncryptionConfigurationInput: KeyType: AWS_OWNED_CMK DeliveryStreamType: 'DirectPut' ExtendedS3DestinationConfiguration: BucketARN: !GetAtt S3Bucket.Arn CompressionFormat: 'UNCOMPRESSED' RoleARN: !GetAtt IamRoleKinesisDeliveryStream.Arn Prefix: !Sub '${pS3Path}/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/minute=!{timestamp:mm}/' ErrorOutputPrefix: 'error/!{firehose:error-output-type}/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/minute=!{timestamp:mm}/' BufferingHints: IntervalInSeconds: !Ref pFirehoseDeliveryBufferSeconds SizeInMBs: !Ref pFirehoseDeliveryBufferSize CloudWatchLoggingOptions: Enabled: true LogGroupName: !Sub "/mqtt-load-test-${AWS::AccountId}-${AWS::Region}" LogStreamName: IoTS3Delivery S3BackupMode: Disabled S3BucketPolicy: Type: AWS::S3::BucketPolicy Properties: Bucket: !Ref S3Bucket PolicyDocument: Version: "2012-10-17" Statement: - Effect: Deny Action: - 's3:*' Resource: - !Sub 'arn:${AWS::Partition}:s3:::${S3Bucket}' - !Sub 'arn:${AWS::Partition}:s3:::${S3Bucket}/*' Principal: AWS: "*" Condition: Bool: 'aws:SecureTransport': 'false' S3Bucket: Type: "AWS::S3::Bucket" DeletionPolicy: Retain UpdateReplacePolicy: Retain Properties: BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 PublicAccessBlockConfiguration: BlockPublicAcls: True BlockPublicPolicy: True IgnorePublicAcls: True RestrictPublicBuckets: True LoggingConfiguration: DestinationBucketName: !Ref S3LoggingBucket LogFilePrefix: "IoT-logs/" VersioningConfiguration: Status: Enabled S3AthenaBucketPolicy: Type: AWS::S3::BucketPolicy Properties: Bucket: !Ref S3AthenaBucket PolicyDocument: Version: "2012-10-17" Statement: - Effect: Deny Action: - 's3:*' Resource: - !Sub 'arn:${AWS::Partition}:s3:::${S3AthenaBucket}' - !Sub 'arn:${AWS::Partition}:s3:::${S3AthenaBucket}/*' Principal: AWS: "*" Condition: Bool: 'aws:SecureTransport': 'false' S3AthenaBucket: Type: "AWS::S3::Bucket" DeletionPolicy: Retain UpdateReplacePolicy: Retain Properties: BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 PublicAccessBlockConfiguration: BlockPublicAcls: True BlockPublicPolicy: True IgnorePublicAcls: True RestrictPublicBuckets: True LoggingConfiguration: DestinationBucketName: !Ref S3LoggingBucket LogFilePrefix: "Athena-logs/" VersioningConfiguration: Status: Enabled S3LoggingBucketPolicy: Type: AWS::S3::BucketPolicy Properties: Bucket: !Ref S3LoggingBucket PolicyDocument: Version: "2012-10-17" Statement: - Effect: Deny Action: - 's3:*' Resource: - !Sub 'arn:${AWS::Partition}:s3:::${S3LoggingBucket}' - !Sub 'arn:${AWS::Partition}:s3:::${S3LoggingBucket}/*' Principal: AWS: "*" Condition: Bool: 'aws:SecureTransport': 'false' S3LoggingBucket: Type: 'AWS::S3::Bucket' DeletionPolicy: Retain UpdateReplacePolicy: Retain Metadata: cfn_nag: rules_to_suppress: - id: W35 reason: "This is the logging Bucket, no further logging is required." Properties: AccessControl: LogDeliveryWrite BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 VersioningConfiguration: Status: Enabled IamRoleIotRule: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - iot.amazonaws.com Action: - 'sts:AssumeRole' Description: "Role allowing required access for IoT Rule" Path: / Policies: - PolicyName: "IotRuleAccess-policy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "firehose:PutRecord" - "firehose:PutRecordBatch" Resource: - !GetAtt KinesisDeliveryStream.Arn IotRule: Type: AWS::IoT::TopicRule Properties: TopicRulePayload: Actions: - Firehose: DeliveryStreamName: !Ref KinesisDeliveryStream RoleArn: !GetAtt IamRoleIotRule.Arn Separator: "\n" Sql: !Sub "SELECT *, timestamp() as DestinationTimestampUNIX FROM '${pIotTopic}'" IamRoleIoTCrawler: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "glue.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - s3:GetObject - s3:PutObject - s3:ReplicateObject - s3:ListAllMyBuckets - s3:RestoreObject - s3:CreateBucket - s3:ListBucket - s3:DeleteObject - s3:DeleteBucket Resource: - !GetAtt S3Bucket.Arn - !Join [ '', [ !GetAtt S3Bucket.Arn, "/*" ] ] - Effect: "Allow" Action: - glue:UpdateDatabase - glue:CreateTable - glue:CreateDatabase - glue:UpdateTable - glue:CreatePartition - glue:UpdatePartition - glue:GetTables - glue:GetTable - glue:GetDatabase - glue:GetDatabases Resource: !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${GlueDatabase}" ManagedPolicyArns: - "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole" GlueCrawler: Type: AWS::Glue::Crawler Properties: Role: !GetAtt IamRoleIoTCrawler.Arn DatabaseName: !Ref GlueDatabase Targets: S3Targets: - Path: !Sub "s3://${S3Bucket}/${pS3Path}/" SchemaChangePolicy: UpdateBehavior: "LOG" DeleteBehavior: "LOG" Configuration: "{\"Version\":1.0,\"CrawlerOutput\":{\"Partitions\":{\"AddOrUpdateBehavior\":\"InheritFromTable\"}}}" AthenaWorkGroup: Type: AWS::Athena::WorkGroup DeletionPolicy: Retain UpdateReplacePolicy: Retain Properties: Name: !Ref pAthenaWorkGroup Description: "MQTT Performance Test Workgroup" State: ENABLED WorkGroupConfiguration: BytesScannedCutoffPerQuery: 200000000 EnforceWorkGroupConfiguration: false PublishCloudWatchMetricsEnabled: false RequesterPaysEnabled: true ResultConfiguration: OutputLocation: !Sub "s3://${S3AthenaBucket}" AthenaQueryLatency: Type: AWS::Athena::NamedQuery Properties: Database: !Ref GlueDatabase Description: "Query to check the latency of received messages" Name: !Ref pAthenaQueryLatency WorkGroup: !Ref AthenaWorkGroup QueryString: !Sub | CREATE OR REPLACE VIEW "${pS3Path}_latency" AS SELECT id , row_number() OVER (ORDER by id ASC) AS nr , SourceTimestampUNIX , DestinationTimestampUNIX , SourceTimestamp , from_unixtime(DestinationTimestampUNIX / 1000) AS DestinationTimestamp , ((DestinationTimestampUNIX - SourceTimestampUNIX) / 1000) AS e2e_latency_sec , (DestinationTimestampUNIX - SourceTimestampUNIX) AS e2e_latency_millisec FROM "${pS3Path}" ORDER BY id ASC AthenaQueryMessageLoss: Type: AWS::Athena::NamedQuery Properties: Database: !Ref GlueDatabase Description: "Query to check message loss based on difference between original message id and received number" Name: !Ref pAthenaQueryMessageLoss WorkGroup: !Ref AthenaWorkGroup QueryString: !Sub | CREATE OR REPLACE VIEW "${pS3Path}_msg_lost" AS SELECT DISTINCT serialised_diff FROM ( SELECT ( id - nr ) AS serialised_diff FROM "${pS3Path}_latency" ) Outputs: oS3Bucket: Description: S3 Bucket for Kinesis Firehose message delivery Value: !Ref S3Bucket oS3AthenaBucket: Description: S3 Bucket for Athena Query Results Value: !Ref S3AthenaBucket oGlueCrawler: Description: Glue Crawler for indexing data on the S3 Bucket Value: !Ref GlueCrawler oAthenaWorkGroup: Description: Athena Workgroup Value: !Ref AthenaWorkGroup oAthenaQueryLatency: Description: Athena Query (Latency) Value: !Ref AthenaQueryLatency oAthenaQueryMessageLoss: Description: Athena Query (Message Loss) Value: !Ref AthenaQueryMessageLoss