AWSTemplateFormatVersion: '2010-09-09' Parameters: storageDestinationS3: Type: String AllowedPattern: '(?!^(\d+\.)+\d+$)^(^([a-z0-9]|[a-z0-9][a-z0-9\-]*[a-z0-9])\.)*([a-z0-9]|[a-z0-9][a-z0-9\-]*[a-z0-9])([a-z0-9\-]+((\/))?)*([a-z0-9])+$' ConstraintDescription: "Invalid storage destination" Description: Storage bucket for saving agent events, contact trace records, and logs. Required. Metadata: 'AWS::CloudFormation::Interface': ParameterGroups: - Label: default: Storage Configuration Parameters: - storageDestinationS3 ParameterLabels: storageDestinationS3: default: S3 Bucket Outputs: glueCatelog: Description: this is the glue catelog create Value: !Ref ConnectGlueDatabase CTRKinesisFirehose: Description: The ARN of the Kinesis Firehose that will receive CTR events. Manual configuration required in the Amazon Connect admin -> Data Streaming section.Topic. Value: !Ref FirehoseCTRStream ContactTraceRecordsAthenaNamedQuery: Description: Creates Athena table for CTRs for analysis. Value: !Join - '' - - !Sub 'https://console.aws.amazon.com/athena/home?force®ion=${AWS::Region}' - '#query/saved/' - !Ref ContactTraceRecordsAthenaNamedQuery CtrOnlyUsageReportNamedQuery: Description: Creates Athena query for analyzing call usage based on only the CTR data. Value: !Join - '' - - !Sub 'https://console.aws.amazon.com/athena/home?force®ion=${AWS::Region}' - '#query/saved/' - !Ref CtrOnlyUsageReportNamedQuery Resources: FirehoseRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: Effect: Allow Principal: Service: - firehose.amazonaws.com Action: - sts:AssumeRole Condition: StringEquals: sts:ExternalID: - !Sub ${AWS::AccountId} Path: / FirehoseLambdaTransformPolicy: Type: AWS::IAM::Policy Properties: PolicyName: lambdatransform PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - lambda:InvokeFunction - lambda:GetFunctionConfiguration Resource: - !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${lineBreakTransformLambda}* Roles: - Ref: FirehoseRole FirehoseLogsPolicy: Type: AWS::IAM::Policy Properties: PolicyName: FirehoseLogsPolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:PutLogEvents - logs:CreateLogStream Resource: - !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/${FirehoseCTRStream}:log-stream:* - !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/${FireHoseAgentEvents}:log-stream:* Roles: - Ref: FirehoseRole FirehosePolicy: Type: AWS::IAM::Policy Properties: PolicyName: FirehosePolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - firehose:PutRecord - firehose:PutRecordBatch - firehose:UpdateDestination Resource: - !Sub arn:aws:firehose:${AWS::Region}:${AWS::AccountId}:deliverystream/${FirehoseCTRStream} - !Sub arn:aws:firehose:${AWS::Region}:${AWS::AccountId}:deliverystream/${FireHoseAgentEvents} - !Sub arn:aws:firehose:${AWS::Region}:${AWS::AccountId}:deliverystream/* Roles: - Ref: FirehoseRole FirehoseS3Policy: Type: AWS::IAM::Policy Properties: PolicyName: FirehoseS3Policy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - s3:AbortMultipartUpload - s3:GetBucketLocation - s3:GetObject - s3:ListBucket - s3:ListBucketMultipartUploads - s3:PutObject - s3:PutObjectAcl Resource: - !Sub 'arn:aws:s3:::${storageDestinationS3}/*' - !Sub 'arn:aws:s3:::${storageDestinationS3}' - Effect: Allow Action: - kinesis:GetRecords - kinesis:DescribeStream - kinesis:ListShards - kinesis:GetShardIterator Resource: - !Join - '' - - !Sub arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/ - !Ref AgentEventKinesisStream Roles: - Ref: FirehoseRole FirehoseStreamsSubscribePolicy: Type: AWS::IAM::Policy DependsOn: AgentEventKinesisStream Properties: PolicyName: kinesis-streams PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - kinesis:GetRecords - kinesis:DescribeStream - kinesis:ListShards - kinesis:GetShardIterator Resource: - !Join - '' - - !Sub arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/ - !Ref AgentEventKinesisStream Roles: - Ref: FirehoseRole AgentEventKinesisStream: Type: AWS::Kinesis::Stream Properties: Name: !Sub ${AWS::StackName}-AgentEvents-Stream ShardCount: 1 StreamEncryption: EncryptionType: KMS KeyId: alias/aws/kinesis FireHoseAgentEventsLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub /aws/kinesisfirehose/${AWS::StackName}-AE-to-S3 FireHoseAgentEventsLogS3: Type: AWS::Logs::LogStream Properties: LogGroupName: Ref: FireHoseAgentEventsLogGroup transformLambdaPolicy: Type: 'AWS::IAM::ManagedPolicy' Properties : PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - 'logs:CreateLogGroup' - 'logs:CreateLogStream' - 'logs:PutLogEvents' Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/*:log-stream:* transformLambdaRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "lambda.amazonaws.com" Action: - "sts:AssumeRole" ManagedPolicyArns: - !Ref transformLambdaPolicy lineBreakTransformLambda: Type: "AWS::Lambda::Function" Properties: Description: > AWS Lambda Function to write to SQS queue for processing derived user details to S3 for diagnostic purposes. Handler: "index.handler" Role: !Sub ${transformLambdaRole.Arn} Runtime: "nodejs12.x" MemorySize: 128 Timeout: 360 Code: ZipFile: | 'use strict'; console.log('Loading function'); const AWS = require('aws-sdk'); //Record processing status enumerations const STATUS_OK = 'Ok'; const STATUS_DROPPED = 'Dropped'; const STATUS_FAIL = 'ProcessingFailed'; function addLineBreak (data) { data = Buffer.from(data, 'base64').toString('utf-8'); console.log('Processing data: ' + data); var i = data.search('}{'); while (i>=0) { data = data.substring(0,i) + '\n' + data.substring(i+2); i = data.search('}{'); } data = data + '\n'; data = Buffer.from(data).toString('base64'); return data; }; function processRecord(record) { try { record.data = addLineBreak(record.data); return { recordId: record.recordId, result: STATUS_OK, data: record.data }; } catch (err) { console.log('Error occurred during transform: ' + err); return { recordId: record.recordId, result: STATUS_FAIL, data: record.data }; } }; exports.handler = (event, context, callback) => { /* Process the list of records and transform them */ const output = event.records.map((record) => ( processRecord(record) )); console.log(`Processing completed. Successful records ${output.length}.`); callback(null, { records: output }); }; FireHoseAgentEvents: Type: AWS::KinesisFirehose::DeliveryStream DependsOn: - FirehoseRole - FirehoseStreamsSubscribePolicy - AgentEventKinesisStream - FireHoseAgentEventsLogGroup - FireHoseAgentEventsLogS3 - lineBreakTransformLambda Properties: DeliveryStreamName: !Sub ${AWS::StackName}-AE-to-S3 DeliveryStreamType: KinesisStreamAsSource KinesisStreamSourceConfiguration: KinesisStreamARN: !GetAtt AgentEventKinesisStream.Arn RoleARN: !GetAtt FirehoseRole.Arn ExtendedS3DestinationConfiguration: ProcessingConfiguration: Enabled: 'true' Processors: - Parameters: - ParameterName: LambdaArn ParameterValue: !GetAtt lineBreakTransformLambda.Arn Type: Lambda BucketARN: !Sub 'arn:aws:s3:::${storageDestinationS3}' Prefix: 'AgentEvents/' ErrorOutputPrefix: 'AgentEventsError/' BufferingHints: IntervalInSeconds: 60 SizeInMBs: 1 CloudWatchLoggingOptions: Enabled: true LogGroupName: Ref: FireHoseAgentEventsLogGroup LogStreamName: Ref: FireHoseAgentEventsLogS3 CompressionFormat: UNCOMPRESSED RoleARN: !GetAtt FirehoseRole.Arn FireHoseCTRStreamLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub /aws/kinesisfirehose/${AWS::StackName}-ctr FirehoseCTRStreamLogS3: Type: AWS::Logs::LogStream Properties: LogGroupName: Ref: FireHoseCTRStreamLogGroup FirehoseCTRStream: Type: AWS::KinesisFirehose::DeliveryStream DependsOn: - FireHoseCTRStreamLogGroup - FirehoseCTRStreamLogS3 Properties: ExtendedS3DestinationConfiguration: BucketARN: !Sub 'arn:aws:s3:::${storageDestinationS3}' Prefix: 'CTR/' ErrorOutputPrefix: 'CTRError/' BufferingHints: IntervalInSeconds: 60 SizeInMBs: 1 CloudWatchLoggingOptions: Enabled: true LogGroupName: Ref: FireHoseCTRStreamLogGroup LogStreamName: Ref: FirehoseCTRStreamLogS3 CompressionFormat: UNCOMPRESSED RoleARN: !GetAtt FirehoseRole.Arn ConnectGlueDatabase: Type: AWS::Glue::Database Properties: DatabaseInput: Name: connect-analytics Description: "Glue database for Athena queries for Amazon Connect Diagnostics" CatalogId: !Ref AWS::AccountId ContactTraceRecordsAthenaNamedQuery: Type: AWS::Athena::NamedQuery Properties: Database: !Ref ConnectGlueDatabase Description: "Generate table for CTR data" Name: !Sub ${AWS::StackName}-ctr QueryString: !Sub > CREATE EXTERNAL TABLE amazonconnect_CTR ( AWSAccountId string, AWSContactTraceRecordFormatVersion string, Agent struct< ARN: string, AfterContactWorkDuration: int, AfterContactWorkEndTimestamp: string, AfterContactWorkStartTimestamp: string, AgentInteractionDuration: int, ConnectedToAgentTimestamp: string, CustomerHoldDuration: int, HierarchyGroups: string, LongestHoldDuration: int, NumberOfHolds: int, RoutingProfile: struct< ARN: string, Name: string >, Username: string >, AgentConnectionAttempts int, Attributes string, Channel string, ConnectedToSystemTimestamp string, ContactId string, CustomerEndpoint struct< Address: string, Type: string >, DisconnectTimestamp string, InitialContactId string, InitiationMethod string, InitiationTimestamp string, InstanceARN string, LastUpstringTimestamp string, MediaStreams array< struct< Type: string > >, NextContactId string, PreviousContactId string, Queue struct< ARN: string, DequeueTimestamp: string, Duration: int, EnqueueTimestamp: string, Name: string >, Recording string, Recordings string, SystemEndpoint struct< Address: string, Type: string >, TransferCompletedTimestamp string, TransferredToEndpoint string ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' LOCATION 's3://${storageDestinationS3}/CTR' CtrOnlyUsageReportNamedQuery: Type: AWS::Athena::NamedQuery Properties: Database: !Ref ConnectGlueDatabase Description: "Generate usage data based on only on CTR data" Name: !Sub ${AWS::StackName}-ctr-usage QueryString: !Sub > SELECT ctr.awsaccountid, ctr.instancearn, ctr.channel, ctr.queue.name as queue_name, ctr.systemendpoint.address as system_endpoint, ctr.contactid, ctr.initialContactId, ctr.customerendpoint.address as customer_endpoint, ctr.Agent.ARN as AgentARN, ctr.transferredtoendpoint as TransferEndpoint, ctr.initiationmethod as Call_Initiation_Method, ctr.InitiationTimestamp as CTR_Initiation_Timestamp, ctr.Agent.ConnectedToAgentTimestamp as Connected_To_Agent_Timestamp, ctr.DisconnectTimestamp as Disconnect_Timestamp, ctr.ConnectedToSystemTimestamp as Connected_To_System_Timestamp, ctr.TransferCompletedTimestamp as Transfer_Completed_Timestamp, CASE WHEN (ctr.initiationmethod <> 'CALLBACK') THEN CAST(to_unixtime(from_iso8601_timestamp(ctr.DisconnectTimestamp)) AS INTEGER)-CAST(to_unixtime(from_iso8601_timestamp(ctr.InitiationTimestamp)) as INTEGER) WHEN (ctr.initiationmethod = 'CALLBACK') THEN CAST(to_unixtime(from_iso8601_timestamp(ctr.DisconnectTimestamp)) AS INTEGER)-CAST(to_unixtime(from_iso8601_timestamp(ctr.ConnectedToSystemTimestamp)) as INTEGER) END AS Connect_Service_Seconds, CAST( CAST( to_unixtime(from_iso8601_timestamp(ctr.DisconnectTimestamp)) AS DECIMAL(20))- CAST( to_unixtime( from_iso8601_timestamp(ctr.TransferCompletedTimestamp) ) AS DECIMAL(20)) AS INTEGER ) AS Thirdparty_Telco_inSeconds, CAST( CAST( to_unixtime(from_iso8601_timestamp(ctr.DisconnectTimestamp)) AS DECIMAL(20))- CAST( to_unixtime( from_iso8601_timestamp(ctr.ConnectedToSystemTimestamp) ) AS DECIMAL(20)) AS INTEGER ) AS CustomerCallLength_Telco_inSeconds, CASE WHEN (ctr.initiationmethod <> 'CALLBACK') THEN CASE (CAST(to_unixtime(from_iso8601_timestamp(ctr.DisconnectTimestamp)) AS INTEGER)-CAST(to_unixtime(from_iso8601_timestamp(ctr.InitiationTimestamp)) as INTEGER) < 10) WHEN true THEN 10 ELSE CAST(to_unixtime(from_iso8601_timestamp(ctr.DisconnectTimestamp)) AS INTEGER)-CAST(to_unixtime(from_iso8601_timestamp(ctr.InitiationTimestamp)) as INTEGER) END WHEN (ctr.initiationmethod = 'CALLBACK') THEN CASE (CAST(to_unixtime(from_iso8601_timestamp(ctr.DisconnectTimestamp)) AS INTEGER)-CAST(to_unixtime(from_iso8601_timestamp(ctr.ConnectedToSystemTimestamp)) as INTEGER) < 10) WHEN true THEN 10 ELSE CAST(to_unixtime(from_iso8601_timestamp(ctr.DisconnectTimestamp)) AS INTEGER)-CAST(to_unixtime(from_iso8601_timestamp(ctr.ConnectedToSystemTimestamp)) as INTEGER) END END AS Connect_Service_Seconds_Rounded, CASE (CAST(to_unixtime(from_iso8601_timestamp(ctr.DisconnectTimestamp)) AS INTEGER)-CAST(to_unixtime(from_iso8601_timestamp(ctr.TransferCompletedTimestamp)) as INTEGER) < (60)) WHEN true THEN CAST(60 AS INTEGER) ELSE CAST( CAST( to_unixtime(from_iso8601_timestamp(ctr.DisconnectTimestamp)) AS DECIMAL(20))- CAST( to_unixtime( from_iso8601_timestamp(ctr.TransferCompletedTimestamp) ) AS DECIMAL(20)) AS INTEGER ) END AS Thirdparty_Telco_inSeconds_Rounded, CASE (CAST(to_unixtime(from_iso8601_timestamp(ctr.DisconnectTimestamp)) AS INTEGER)-CAST(to_unixtime(from_iso8601_timestamp(ctr.ConnectedToSystemTimestamp)) as INTEGER) < (60)) WHEN true THEN CAST(60 AS INTEGER) ELSE CAST( CAST( to_unixtime(from_iso8601_timestamp(ctr.DisconnectTimestamp)) AS DECIMAL(20))- CAST( to_unixtime( from_iso8601_timestamp(ctr.ConnectedToSystemTimestamp) ) AS DECIMAL(20)) AS INTEGER ) END AS CustomerCallLength_Telco_inSeconds_Rounded FROM "amazonconnect_ctr" ctr ORDER BY ctr.InitiationTimestamp DESC