# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). # You may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. --- AWSTemplateFormatVersion: "2010-09-09" Transform: AWS::Serverless-2016-10-31 Description: Amazon Transcribe Live Call Analytics with Agent Assist - Connect Integration Stack Parameters: # Required ConnectInstanceArn: Type: String Description: Amazon Connect instance ARN CallEventProcessorFunctionRoleName: Type: String Description: >- Name of the AISTACK CallEventProcessorFunction IAM Role CallDataStreamName: Type: String Description: >- Name of Kinesis Data Stream to publish events to CallDataStreamArn: Type: String Description: >- Arn of Kinesis Data Stream to publish events to EventSourcingTableName: Type: String Description: >- DynamoDB Event Table name EventSourcingTableArn: Type: String Description: >- DynamoDB Event Table Arn Resources: LambdaRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Statement: - Action: "sts:AssumeRole" Effect: Allow Principal: Service: lambda.amazonaws.com Version: 2012-10-17 ManagedPolicyArns: - !Sub "arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" Policies: - PolicyName: InlinePolicy PolicyDocument: Version: "2012-10-17" Statement: - Action: - connect:AssociateInstanceStorageConfig - connect:ListInstanceStorageConfigs - connect:DisassociateInstanceStorageConfig Effect: Allow Resource: - !Ref ConnectInstanceArn - Action: - kinesis:DescribeStream Effect: Allow Resource: - !Ref CallDataStreamArn - Action: - iam:PutRolePolicy Effect: Allow Resource: - !Sub "arn:aws:iam::${AWS::AccountId}:role/aws-service-role/connect.amazonaws.com/AWSServiceRoleForAmazonConnect_*" AssociateInstanceFunction: Type: "AWS::Lambda::Function" Properties: Role: !GetAtt LambdaRole.Arn Handler: index.handler Runtime: python3.8 Timeout: 900 Code: ZipFile: | import boto3 import cfnresponse import json connect = boto3.client('connect') def associateConnectInstance(props): print("Associating connect instance to LCA Kinesis stream") response = connect.associate_instance_storage_config( InstanceId=props["ConnectInstanceArn"], ResourceType="REAL_TIME_CONTACT_ANALYSIS_SEGMENTS", StorageConfig={ "StorageType": "KINESIS_STREAM", "KinesisStreamConfig": { "StreamArn": props["CallDataStreamArn"] } } ) print(f"associate_instance_storage_config: {response}") print("Configured Connect instance to integrate with LCA Kinesis Data Stream") def disassociateConnectInstance(props): response = connect.list_instance_storage_configs( InstanceId=props["ConnectInstanceArn"], ResourceType="REAL_TIME_CONTACT_ANALYSIS_SEGMENTS" ) print(f"list_instance_storage_configs: {response}") associationId=None for storageConfig in response["StorageConfigs"]: if storageConfig["StorageType"] == "KINESIS_STREAM": associationId = storageConfig["AssociationId"] if associationId: print(f"Existing storage config found for KINESIS_STREAM. Disassociating associationId: {associationId}" ) response = connect.disassociate_instance_storage_config( InstanceId=props["ConnectInstanceArn"], AssociationId=associationId, ResourceType="REAL_TIME_CONTACT_ANALYSIS_SEGMENTS" ) print(f"Done deleting storage associations") def handler(event, context): print(json.dumps(event)) status = cfnresponse.SUCCESS responseData = {} reason = "Success" props = event["ResourceProperties"] if event['RequestType'] != 'Delete': try: disassociateConnectInstance(props) associateConnectInstance(props) except Exception as e: print(e) reason = f"Exception thrown: {e}" status = cfnresponse.FAILED cfnresponse.send(event, context, status, responseData, reason=reason) # Trigger Lambda function AssociateInstanceFunctionResource: Type: AWS::CloudFormation::CustomResource Properties: ServiceToken: !GetAtt AssociateInstanceFunction.Arn ConnectInstanceArn: !Ref ConnectInstanceArn CallDataStreamArn: !Ref CallDataStreamArn UpdateReplacePolicy: Delete DeletionPolicy: Delete # Add policy allowing AISTACK CallEventProcessorFunction Lambda to invoke Connect API TranscriptProcessorConnectPolicy: Type: 'AWS::IAM::Policy' Properties: PolicyName: ConnectPolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - connect:GetContactAttributes Resource: !Sub "${ConnectInstanceArn}/contact/*" Roles: - !Ref CallEventProcessorFunctionRoleName ContactEventProcessorFunction: Type: "AWS::Lambda::Function" Properties: Role: !GetAtt ContactEventProcessorFunctionRole.Arn Handler: index.handler Runtime: python3.8 Timeout: 20 Description: >- AWS Lambda Function triggered when 'CONNECTED_TO_AGENT' event is received from Connect. Environment: Variables: KINESIS_STREAM_NAME: !Ref CallDataStreamName EVENT_SOURCING_TABLE_NAME: !Ref EventSourcingTableName WAIT_FOR_CALL_SECS: 15 Code: ZipFile: | import boto3 import json import os import time connect = boto3.client('connect') dynamodb = boto3.client('dynamodb') kinesis = boto3.client('kinesis') # Input event is 'CONNECTED_TO_AGENT' event from EventBridge / Connect. # Function sends UPDATE_AGENT message to KDS only if/when the call has started transcribing - i.e. # the CallId is found in the DynamoDB event souring table within WAIT_FOR_CALL_SECS. def is_known_call(callId, tablename, timeout): timeout_time = time.time() + int(timeout) while time.time() < timeout_time: response = dynamodb.get_item( TableName=tablename, Key={ "PK": {"S": f"c#{callId}"}, "SK": {"S": f"c#{callId}"} }, ProjectionExpression="PK" ) if "Item" in response: return True else: print(f"Call not found in DynamoDB - wait 1 sec and try again: {callId}") time.sleep(1) print(f"Call not found in DynamoDB after timeout period {timeout} seconds: {callId}") return False def get_agentId(instanceArn, agentArn): if agentArn: response = connect.describe_user( UserId=agentArn, InstanceId=instanceArn ) username = response["User"].get("Username") firstname = response["User"].get("IdentityInfo",{}).get("FirstName") lastname = response["User"].get("IdentityInfo",{}).get("LastName") agentId = f"{firstname} {lastname} ({username})" return agentId def send_update_agent_message(streamName, callId, agentId): msg = { "CallId": callId, "AgentId": agentId, "EventType": "UPDATE_AGENT", } print(f"Writing UPDATE_AGENT message to KDS: {msg}") kinesis.put_record( StreamName=streamName, Data=json.dumps(msg).encode('utf-8'), PartitionKey=callId ) def handler(event, context): print(json.dumps(event)) callId = event["detail"].get("contactId") if is_known_call( callId, os.environ["EVENT_SOURCING_TABLE_NAME"], os.environ["WAIT_FOR_CALL_SECS"] ): instanceArn = event["detail"].get("instanceArn") agentArn = event["detail"].get("agentInfo",{}).get("agentArn") agentId = get_agentId(instanceArn, agentArn) send_update_agent_message(os.environ["KINESIS_STREAM_NAME"], callId, agentId) else: print("Call not transcribing. Skipping.") return "Done" ContactEventProcessorFunctionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / ManagedPolicyArns: - arn:aws:iam::aws:policy/CloudWatchLambdaInsightsExecutionRolePolicy - arn:aws:iam::aws:policy/AWSXrayWriteOnlyAccess Policies: - PolicyName: lambda-policy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: - !Sub "arn:${AWS::Partition}:logs:${AWS::Region}:\ ${AWS::AccountId}:log-group:/aws/lambda/*" - Effect: Allow Action: - dynamodb:Query - dynamodb:Scan - dynamodb:GetItem Resource: - !Ref EventSourcingTableArn - Action: - connect:DescribeUser Effect: Allow Resource: - !Sub "${ConnectInstanceArn}/agent/*" - Action: - kinesis:PutRecord Effect: Allow Resource: - !Ref CallDataStreamArn AllowEventBridgeToCallContactEventFunction: Type: "AWS::Lambda::Permission" Properties: FunctionName: !Ref ContactEventProcessorFunction Action: "lambda:InvokeFunction" Principal: events.amazonaws.com SourceArn: !GetAtt EventBridgeRuleToTriggerContactEventFunction.Arn SourceAccount: !Ref AWS::AccountId EventBridgeRuleToTriggerContactEventFunction: Type: AWS::Events::Rule Properties: Description: "This rule is triggered by CONNECTED_TO_AGENT Connect Contact events" EventPattern: detail-type: - "Amazon Connect Contact Event" source: - aws.connect detail: instanceArn: - !Ref ConnectInstanceArn channel: - VOICE eventType: - CONNECTED_TO_AGENT Targets: - Id: ContactEventProcessorFunction Arn: !GetAtt ContactEventProcessorFunction.Arn State: "ENABLED"