# MIT No Attribution # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. AWSTemplateFormatVersion: 2010-09-09 Description: > "CloudFormation for Summit 2022 - ENTH6 - Real-time churn prediction in AI powered contact centres - Lambda, Step functions, API Gateway, EventBridge, and DynamoDB tables" Parameters: AmazonConnectARN: Default: "**** Enter the full Amazon Connect Instance ARN ****" Description: The Amazon Connect Instance ARN Type: String AmazonConnectInstanceId: Default: "**** Enter the Amazon Connect Instance ID ****" Description: The Amazon Connect Instance ID Type: String SageMakerFeatureGroupName: Default: "**** Enter the SageMaker Feature Group Name ****" Description: The Amazon SageMaker Feature Group Name Type: String SageMakerEndPointName: Default: "**** Enter the SageMaker Endpoint Name ****" Description: The Amazon SageMaker Endpoint Name Type: String # API Gateway URLs to be used in the Website Outputs: apiGatewayInvokeURLchurn: Description: API Endpoint to capture Churn updates Value: !Sub https://${Summit2022AgentInterfaceRestApi}.execute-api.${AWS::Region}.amazonaws.com/prod apiGatewayInvokeURLcontract: Description: API Endpoint to update contracted months Value: !Sub https://${Summit2022AgentInterfaceRestApi}.execute-api.${AWS::Region}.amazonaws.com/prod/update Resources: # IAM Role for Lambda functions IAMRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - "sts:AssumeRole" Description: A role to allow all the functions in the Summit2022 Project Policies: - PolicyName: Summit2022role PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - dynamodb:GetItem - dynamodb:UpdateItem - dynamodb:PutItem - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents - s3:PutObject - states:StartExecution - sagemaker:* - connect:update_contact_attributes - connect-contact-lens:list_realtime_contact_analysis_segments Resource: "*" RoleName: !Sub ${AWS::Region}-Summit2022role # IAM Role for Step Functions StepsIAMRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: !Sub 'states.${AWS::Region}.amazonaws.com' Action: 'sts:AssumeRole' Description: A role to allow Step Functions to access Lambda functions Policies: - PolicyName: stepFunctionsLambda PolicyDocument: Statement: - Effect: Allow Action: 'lambda:InvokeFunction' Resource: - !GetAtt LambdaSummit2022FirstSageMakerInvoke.Arn - !GetAtt LambdaSummit2022EndSFN.Arn - !GetAtt LambdaSummit2022CLRealTime.Arn # EventBridge Rule to update Feature Store based on Contact Lens Events FSUpdateEventBridgeRule: Type: AWS::Events::Rule Properties: Description: Rule to update Feature Store with Contact Lens Events Name: Summit2022-FSUpdate EventBusName: default EventPattern: source: - aws.connect detail-type: - "Contact Lens Realtime Rules Matched" State: ENABLED Targets: - Arn: !GetAtt LambdaSummit2022FSUpdate.Arn Id: Id1234 UpdateFSpermissionForEventsToInvokeLambda: Type: AWS::Lambda::Permission Properties: FunctionName: Ref: "LambdaSummit2022FSUpdate" Action: "lambda:InvokeFunction" Principal: "events.amazonaws.com" SourceArn: Fn::GetAtt: - "FSUpdateEventBridgeRule" - "Arn" # EventBridge Rule to Stop the Step Functions machine StopSFNEventBridgeRule: Type: AWS::Events::Rule Properties: Description: Rule to Stop Step Functions when the call ends Name: Summit2022-StopSFN EventBusName: default EventPattern: source: - aws.connect detail-type: - "Amazon Connect Contact Event" resources: - !Ref AmazonConnectARN detail: eventType: - "DISCONNECTED" State: ENABLED Targets: - Arn: !GetAtt LambdaSummit2022StopSFN.Arn Id: Id1235 StopSFNpermissionForEventsToInvokeLambda: Type: AWS::Lambda::Permission Properties: FunctionName: Ref: "LambdaSummit2022StopSFN" Action: "lambda:InvokeFunction" Principal: "events.amazonaws.com" SourceArn: Fn::GetAtt: - "StopSFNEventBridgeRule" - "Arn" # Lambda Functions LambdaSummit2022CustomerLookup: Type: AWS::Lambda::Function Properties: Description: Function to search Customers in the Database Environment: Variables: custTableName: Summit2022CustomerData contactTableName: Summit2022ContactIds stateMachineARN: !GetAtt Summit2022StateMachine.Arn InstanceId: !Ref AmazonConnectInstanceId Code: ZipFile: | # MIT No Attribution # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import json import boto3 import os import datetime # Environment Variables including DynamoDB table with customer data and table to store contact IDs, # Step Functions state machine ARN, and Amazon Connect Instance ID custTableName= os.environ['custTableName'] contactTableName= os.environ['contactTableName'] stateMachine = os.environ['stateMachineARN'] InstanceId = os.environ['InstanceId'] dynamodb = boto3.resource('dynamodb') dynamodbclient = boto3.client('dynamodb') sfn = boto3.client('stepfunctions') # This lambda function will gather customer information stored in DynamoDB based on their phone number and # create an item in the Contact ID DynamoDB table with customer ID and time when the call was recieved. def lambda_handler(event, context): try: # Customer phone number and contact ID passed from Amazon Connect via JSON PhoneNumber = event ['Details']['ContactData']['CustomerEndpoint']['Address'] ContactId = event ['Details']['ContactData']['ContactId'] # Customer lookup to DynamoDB using Customer phone number custTable = dynamodb.Table(custTableName) response = custTable.get_item(Key={'PhoneNumber' : PhoneNumber}) # If a record exists in the table write values to variables if 'Item' in response: customerid = response['Item']['customerid'] FirstName = response['Item']['FirstName'] LastName = response['Item']['LastName'] email = response['Item']['email'] contractedMonths = response['Item']['contractedMonths'] # This will be the Input to the State Functions machine Input = json.dumps({'ContactId': ContactId, 'customerid':customerid, 'InstanceId': InstanceId}) print(Input) time = str(datetime.datetime.utcnow()) # Add an item with Contact ID, customer ID, and time attributes to the DynamoDB table contactResp = dynamodbclient.put_item( TableName=contactTableName, Item={ 'ContactId':{ 'S': ContactId }, 'customerid':{ 'S': customerid }, 'When':{ 'S': time } }, ReturnConsumedCapacity='TOTAL' ) print(contactResp) # Starting State Functions State Machine via API sfnResponse = sfn.start_execution( stateMachineArn= stateMachine, name=ContactId, input= Input, traceHeader='NewContactArrived' ) print(sfnResponse) # Return variables to Amazon Connect return {'message': 'Success', 'customerid' : customerid, 'FirstName' : FirstName, 'LastName' : LastName, 'email' : email, 'contractedMonths': contractedMonths } # If no match return a default message else: return { 'message': 'Fail'} except Exception as e: print (e) return { 'message': 'Fail'} FunctionName: Summit2022CustomerLookup Handler: index.lambda_handler Role: !GetAtt IAMRole.Arn Runtime: python3.9 LambdaSummit2022FirstSageMakerInvoke: Type: AWS::Lambda::Function Properties: Description: Function to make the first call to SageMaker Environment: Variables: SageMakerFGName: !Ref SageMakerFeatureGroupName Code: ZipFile: | # MIT No Attribution # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import json import boto3 import os sagemakerFGname = os.environ['SageMakerFGName'] boto_session = boto3.Session() connect = boto3.client('connect') region = boto_session.region_name # This lambda is called by Step Functions, it does the first capture of the customer features stored in SageMaker Feature store # based on the customer ID and update the Contact Attributes with the Churn value from the store def lambda_handler(event, context): try: # Map event values from Step Functions to variables customerid = event ['customerid'] ContactId = event ['ContactId'] InstanceId = event ['InstanceId'] # API Call to SageMaker Feature Store based on Customer ID to retrieve features featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region) resp = featurestore_runtime.get_record(FeatureGroupName=sagemakerFGname, RecordIdentifierValueAsString=customerid) churn = resp ['Record'][0] ['ValueAsString'] # Update Amazon Connect Contact Attributes with churn value returned store in Feature Store for the customer responseConnect = connect.update_contact_attributes( InitialContactId=ContactId, InstanceId=InstanceId, Attributes={ 'Churn_true': churn } ) # This return is consumed by the Step Functions, Success is required to continue the state machine return { 'message': 'Success', 'Churn': churn, 'InstanceId': InstanceId, 'ContactId': ContactId, 'customerid': customerid, 'count': 0, 'nextToken': '1' } except Exception as e: print (e) return { 'message': 'Fail'} FunctionName: Summit2022FirstSageMakerInvoke Handler: index.lambda_handler Role: !GetAtt IAMRole.Arn Runtime: python3.9 LambdaSummit2022CLRealTime: Type: AWS::Lambda::Function Properties: Description: Function to capture contact lens realtime sentiments (it needs Pandas Layer) Environment: Variables: ChurnPredictionTable: Summit2022ChurnPrediction SentimentsTable: Summit2022Sentiments SageMakerFGName: !Ref SageMakerFeatureGroupName EndPointName: !Ref SageMakerEndPointName Code: ZipFile: | # MIT No Attribution # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import json import boto3 import os # This funciton requires a pandas layer import pandas as pd import io import csv import datetime # Environment Variables including DynamoDB Churn predictions and sentiment tables, SageMaker # Feature Group and EndPoint ChurnPredictionTable = os.environ['ChurnPredictionTable'] SentimentsTable = os.environ['SentimentsTable'] sagemakerFGname = os.environ['SageMakerFGName'] endPointName = os.environ['EndPointName'] contactLens = boto3.client('connect-contact-lens') boto_session = boto3.Session() ddbclient = boto3.client('dynamodb') region = boto_session.region_name runtime_client = boto3.client("runtime.sagemaker") # This lambda reads the Event details from Step Functions, get customer features from SageMaker Feature Store, call the # Contact Lens Real Time API to get the current customer sentiments, inference to the SageMaker endpoint and update # sentiments and churn prediction tables. def lambda_handler(event, context): try: churn_pred = event ['Churn'] customerid = event ['customerid'] ContactId = event ['ContactId'] InstanceId = event ['InstanceId'] count = event ['count'] nextToken = event ['nextToken'] count = count + 1 print (event) # Update the churn associated to the Contact ID in the churn prediction table def updateTransaction(ContactId,churn_pred): Time = str(datetime.datetime.utcnow()) print(Time) responseContact = ddbclient.put_item( TableName=ChurnPredictionTable, Item={ 'ContactId':{ 'S': ContactId }, 'When':{ 'S': Time }, 'ChurnPrediction':{ 'N':churn_pred } }, ReturnConsumedCapacity='TOTAL' ) print(responseContact) # Update historical customer sentiments in the dynamodb table def updateSentiments(ContactId,df): print(f'negative value:{df["pastSenti_neg"]["ValueAsString"]}') print(f'neutral value:{df["pastSenti_nut"]["ValueAsString"]}') print(f'positive value:{df["pastSenti_pos"]["ValueAsString"]}') responseSentiment = ddbclient.put_item( TableName=SentimentsTable, Item={ 'ContactId':{ 'S': ContactId }, 'Neutrals':{ 'N': df["pastSenti_nut"]["ValueAsString"] }, 'Negatives':{ 'N': df["pastSenti_neg"]["ValueAsString"] }, 'Positives':{ 'N': df["pastSenti_pos"]["ValueAsString"] } }, ReturnConsumedCapacity='TOTAL' ) print(responseSentiment) # Calling API to get customer features from SageMaker Feature Store based on Customer ID featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region) fsresponse = featurestore_runtime.get_record(FeatureGroupName=sagemakerFGname, RecordIdentifierValueAsString=customerid) Record = fsresponse ['Record'] # Use pandas to read the Record df = pd.DataFrame(Record).set_index('FeatureName').transpose() print(df) # Call function to update sentiments table updateSentiments(ContactId, df) # First call to the API does not include NextToken # Incoming event provides the NextToken from the previous request to avoid re-processing CL events try: if count == 1: responseCL = contactLens.list_realtime_contact_analysis_segments( InstanceId=InstanceId, ContactId=ContactId, MaxResults=100 ) else: responseCL = contactLens.list_realtime_contact_analysis_segments( InstanceId=InstanceId, ContactId=ContactId, MaxResults=100, NextToken=nextToken ) except Exception as a: print(a) # In an error sitution the same nextToken is used for retry, Success is used to continue the Step Fucntion return{ 'message': 'Success', 'InstanceId': InstanceId, 'ContactId': ContactId, 'customerid': customerid, 'count': count, 'nextToken': nextToken, 'Churn': churn_pred } print(responseCL) if 'NextToken' in responseCL: nextToken = responseCL ['NextToken'] if 'Segments' in responseCL: Segments = responseCL ['Segments'] print(Segments) length = len(Segments) print(length) print(df) df.drop(['event_time','customerID'], inplace=True, axis=1) # Get the sentiment from each of the customer segments returned by the Contact Lens Real Time API if length > 0: i = 0 while i < length: if 'Transcript' in Segments [i]: if Segments [i] ['Transcript'] ['ParticipantId'] == "CUSTOMER": Sentiment = Segments [i] ['Transcript'] ['Sentiment'] if Sentiment == "POSITIVE": df['sentiment'] = 1 elif Sentiment == "NEUTRAL": df['sentiment'] = 0 elif Sentiment == "MIXED": df['sentiment'] = 0 elif Sentiment == "NEGATIVE": df['sentiment'] = -1 # Call API Endpoint with the new sentiment value csv_buffer = io.StringIO() csv_writer = csv.writer(csv_buffer, delimiter=",") csv_writer.writerow(df.values[0][1:]) response = runtime_client.invoke_endpoint( EndpointName=endPointName, ContentType="text/csv", Body=csv_buffer.getvalue().rstrip("\r\n") ) churn_pred = response["Body"].read().decode("ascii") updateTransaction(ContactId,str(churn_pred)) print(churn_pred) else: print("No Customer Sentiment") else: print("No Transcript") i = i+1 else: print("Segment length 0") else: print("No Segments") # This return is consumed by the Step Functions, Success is required to continue the state machine return { 'message': 'Success', 'InstanceId': InstanceId, 'ContactId': ContactId, 'customerid': customerid, 'count': count, 'nextToken': nextToken, 'Churn': churn_pred } except Exception as e: print (e) return { 'message': 'Fail'} FunctionName: Summit2022CLRealTime Handler: index.lambda_handler Role: !GetAtt IAMRole.Arn Runtime: python3.7 LambdaSummit2022FSUpdate: Type: AWS::Lambda::Function Properties: Description: Function to Update FeatureStore with Sentiment based on CLRT rules (it needs Pandas Layer) Environment: Variables: contactTableName: Summit2022ContactIds SageMakerFGName: !Ref SageMakerFeatureGroupName InstanceARN: !Ref AmazonConnectARN Code: ZipFile: | # MIT No Attribution # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import json import boto3 import os # This fucntion requires a pandas layer import pandas as pd contactTableName= os.environ['contactTableName'] sagemakerFGname = os.environ['SageMakerFGName'] instanceArn = os.environ['InstanceARN'] dynamodb = boto3.resource('dynamodb') boto_session = boto3.Session() region = boto_session.region_name # This lambda updates Feature Store with the sentiments captured by the Contact Lens rules def lambda_handler(event, context): try: # Format ContactId from ContactIdARN ContactIdARN = event ['detail']['contactArn'] preContactId = ContactIdARN.replace(instanceArn, "") ContactId = preContactId.replace("/contact/", "") print(ContactId) # Get customerId from contact Id DynamoDB Table contactTable = dynamodb.Table(contactTableName) response = contactTable.get_item(Key={'ContactId' : ContactId}) if 'Item' in response: customerid = response['Item']['customerid'] # Update sentiments in SageMaker Feature Store based on Customer ID featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region) resp = featurestore_runtime.get_record(FeatureGroupName=sagemakerFGname, RecordIdentifierValueAsString=customerid) Record = resp ['Record'] # Call pandas to read Record df = pd.DataFrame(Record).set_index('FeatureName').transpose() # Update df pasSenti_pos, pastSenti_nut, or pastSenti_neg based on the event inforation from Contact Lens Real Time Sentiment = event ['detail'] ['ruleName'] if Sentiment == "PositiveSentiment": df['pastSenti_pos'] = df['pastSenti_pos'].astype(int) + 1 df['sentiment'] = 1 elif Sentiment == "NeutralSentiment": df['pastSenti_nut'] = df['pastSenti_nut'].astype(int) + 1 df['sentiment'] = 0 elif Sentiment == "NegativeSentiment": df['pastSenti_neg'] = df['pastSenti_neg'].astype(int) + 1 df['sentiment'] = -1 # Call update record with the new Table def FeatureValue(**kwargs): result = dict() for key, value in kwargs.items(): if value is not None: result[key] = value return result row = df.iloc[0] record = [ FeatureValue( FeatureName=df.columns[index], ValueAsString=str(row[index]), ) for index in range(0, len(row)) if pd.notna(row[index]) ] print(record) featurestore_runtime.put_record( FeatureGroupName=sagemakerFGname, Record=record, ) return { 'message': 'Success', 'ContactId': ContactId, 'customerid': customerid, 'Resp': json.dumps(Record) } # If no match return a default message else: return { 'message': 'No Customer ID for provided ContactId'} except Exception as e: print (e) return { 'message': 'Fail'} FunctionName: Summit2022FSUpdate Handler: index.lambda_handler Role: !GetAtt IAMRole.Arn Runtime: python3.7 LambdaSummit2022APIGatewayResponse: Type: AWS::Lambda::Function Properties: Description: Function to Return churn values to API Gateway Tables with Time Environment: Variables: custTableName: Summit2022CustomerData SentimentsTable: Summit2022Sentiments Code: ZipFile: | # MIT No Attribution # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import json import boto3 import os contactTableName= os.environ['contactTableName'] sentimentsTableName= os.environ['SentimentsTable'] dynamodb = boto3.resource('dynamodb') dynamodbclient = boto3.client('dynamodb') # This function reads the information in the contact ID and sentiments dynamodb tables based on Contact ID # provided by the web interface via API Gateway and return an array with sentiments and churn predictions def lambda_handler(event, context): try: # ContactId passed from Agent CCP via API Gateway ContactId = event ['ContactId'] myArray = [] mySentiment = [] # Churn prediction lookup on contact ID DynamoDB table using PartiQL response = dynamodbclient.execute_statement( Statement='SELECT * FROM \"' + contactTableName + '\" WHERE \"ContactId\" = \'' + ContactId + '\'' ) for x in response['Items']: myArray.append(float(x['ChurnPrediction']['N'])) # Sentiments lookup on sentiments DynamoDB table using PartiQL response1 = dynamodbclient.execute_statement( Statement='SELECT * FROM \"' + sentimentsTableName + '\" WHERE \"ContactId\" = \'' + ContactId + '\'' ) print(response1) for x in response1['Items']: mySentiment.append(int(x['Negatives']['N'])) mySentiment.append(int(x['Positives']['N'])) mySentiment.append(int(x['Neutrals']['N'])) print(mySentiment) # Return to the Agent CCP including Churn predictions and sentiments via API Gateway return { 'myArray' : myArray, 'mySentiment' : mySentiment } print(myArray) print(mySentiment) except Exception as e: print (e) return { 'message': 'Fail'} FunctionName: Summit2022APIGatewayResponse Handler: index.lambda_handler Role: !GetAtt IAMRole.Arn Runtime: python3.9 LambdaSummit2022UpdateContract: Type: AWS::Lambda::Function Properties: Description: Function to update months in the contract (it needs Pandas Layer) Environment: Variables: custTableName: Summit2022CustomerData SageMakerFGName: !Ref SageMakerFeatureGroupName InstanceId: !Ref AmazonConnectInstanceId Code: ZipFile: | # MIT No Attribution # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import json import boto3 import os # This funciton requires a pandas layer import pandas as pd custTableName= os.environ['custTableName'] sagemakerFGname = os.environ['SageMakerFGName'] InstanceId = os.environ ['InstanceId'] boto_session = boto3.Session() region = boto_session.region_name connect = boto3.client('connect') client = boto3.client('dynamodb') # This lambda update the number of months contracted by the customer based on an update from # the agent in the interface agent CCP via API Gateway def lambda_handler(event, context): try: print(event) # Values provided by API Gateway asigned to variables CustomerId = event ['CustomerId'] ContactId = event ['ContactId'] PhoneNumber = event ['PhoneNumber'] Months = str(event['Months']) # Update Amazon Connect Contact Attributes with new contracted months responseConnect = connect.update_contact_attributes( InitialContactId=ContactId, InstanceId=InstanceId, Attributes={ 'contractedMonths': Months } ) # Update Contracted months in the customer dynamodb table response = client.update_item( TableName=custTableName, Key={ 'PhoneNumber': { 'S': PhoneNumber } }, AttributeUpdates={ 'contractedMonths': { 'Value': { 'S': Months } } }) # Update SageMaker Feature Store with new contracted months using Customer ID featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region) resp = featurestore_runtime.get_record(FeatureGroupName=sagemakerFGname, RecordIdentifierValueAsString=CustomerId) Record = resp ['Record'] # use pandas to read Record df = pd.DataFrame(Record).set_index('FeatureName').transpose() df['mth_remain'] = Months # Update record with the new Table def FeatureValue(**kwargs): result = dict() for key, value in kwargs.items(): if value is not None: result[key] = value return result row = df.iloc[0] record = [ FeatureValue( FeatureName=df.columns[index], ValueAsString=str(row[index]), ) for index in range(0, len(row)) if pd.notna(row[index]) ] print(record) featurestore_runtime.put_record( FeatureGroupName=sagemakerFGname, Record=record, ) # return consumed by API Gateway return { 'statusCode': 200, 'body': json.dumps(event) } except Exception as e: print (e) return { 'message': 'Fail'} FunctionName: Summit2022UpdateContract Handler: index.lambda_handler Role: !GetAtt IAMRole.Arn Runtime: python3.7 LambdaSummit2022StopSFN: Type: AWS::Lambda::Function Properties: Description: Function to stop Step Functions when call is Disconnected Environment: Variables: stateMachineARN: !GetAtt Summit2022StateMachine.Arn Code: ZipFile: | # MIT No Attribution # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import json import boto3 import os stateMachine = os.environ['stateMachineARN'] sfn = boto3.client('stepfunctions') # This function stops the Step Functions state machine when the call has been disconnected def lambda_handler(event, context): ContactId = event ['detail'] ['contactId'] sfnARN = stateMachine + ":" + ContactId try: response = sfn.stop_execution( executionArn=sfnARN, error='No Error', cause='Contact Disconnected' ) print(response) return { 'message': 'Success' } except: print("Step Functions cannot be stopped, it does not exist") return { 'message': 'Fail' } FunctionName: Summit2022StopSFN Handler: index.lambda_handler Role: !GetAtt IAMRole.Arn Runtime: python3.9 LambdaSummit2022EndSFN: Type: AWS::Lambda::Function Properties: Description: Function require to end State Functions after timeout Code: ZipFile: | # MIT No Attribution # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import json # This lambda function is calling after 100 cycles in the Step Functions to stop the state machine def lambda_handler(event, context): customerid = event ['customerid'] ContactId = event ['ContactId'] InstanceId = event ['InstanceId'] return { 'message': 'Success', 'InstanceId': InstanceId, 'ContactId': ContactId, 'customerid': customerid } FunctionName: Summit2022EndSFN Handler: index.lambda_handler Role: !GetAtt IAMRole.Arn Runtime: python3.9 # Customer DynamoDB Table with customer details, partition key PhoneNumber CustomerDDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: PhoneNumber AttributeType: S KeySchema: - AttributeName: PhoneNumber KeyType: HASH ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 TableName: Summit2022CustomerData # Sentiments DynamoDB Table, partition key contact Id SentimentsDDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: ContactId AttributeType: S KeySchema: - AttributeName: ContactId KeyType: HASH ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 TableName: Summit2022Sentiments # Contacts ID DynamoDB Table, store customer ID per contact ID ContactIdsDDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: ContactId AttributeType: S KeySchema: - AttributeName: ContactId KeyType: HASH ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 TableName: Summit2022ContactIds # Churn prediction table storing all the predictions per contact ID, # partition key Contact ID and sort key Time ChurnPredictionDDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: ContactId AttributeType: S - AttributeName: When AttributeType: S KeySchema: - AttributeName: ContactId KeyType: HASH - AttributeName: When KeyType: RANGE ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 TableName: Summit2022ChurnPrediction # StepFunctions Summit2022StateMachine: Type: AWS::StepFunctions::StateMachine DependsOn: - LambdaSummit2022FirstSageMakerInvoke - LambdaSummit2022EndSFN - LambdaSummit2022CLRealTime Properties: StateMachineName: Summit2022StateMachine DefinitionString: !Sub | { "Comment": "Summit2022StateMachine", "StartAt": "SageMakerInitialChurn", "States": { "SageMakerInitialChurn": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "${LambdaSummit2022FirstSageMakerInvoke.Arn}" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "Comment": "Call SageMaker to get Current Status", "Next": "First Wait 30sec" }, "First Wait 30sec": { "Type": "Wait", "Seconds": 30, "Comment": "Wait before retrieving Real-Time Data", "Next": "Choice" }, "Choice": { "Type": "Choice", "Choices": [ { "Variable": "$.count", "NumericGreaterThan": 60, "Next": "EndSFN" } ], "Default": "GetCLRealTime" }, "EndSFN": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "${LambdaSummit2022EndSFN.Arn}" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "Comment": "End the process", "End": true }, "GetCLRealTime": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "${LambdaSummit2022CLRealTime.Arn}" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "Comment": "Call Contact Lens RealTime API", "Next": "Wait 10sec" }, "Wait 10sec": { "Type": "Wait", "Seconds": 10, "Next": "Choice" } } } RoleArn: !GetAtt StepsIAMRole.Arn # API Gateway Summit2022AgentInterfaceRestApi: Type: AWS::ApiGateway::RestApi DependsOn: - LambdaSummit2022APIGatewayResponse - LambdaSummit2022UpdateContract Properties: Description: Return churn predictions for a contactID Name: Summit2022-AgentInterface Parameters: endpointConfigurationTypes: REGIONAL Sumit2022ApiMethod: Type: AWS::ApiGateway::Method DependsOn: - Summit2022AgentInterfaceRestApi Properties: AuthorizationType: NONE HttpMethod: POST Integration: IntegrationHttpMethod: POST Type: AWS Uri: !Sub arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${LambdaSummit2022APIGatewayResponse.Arn}/invocations IntegrationResponses: - StatusCode: 200 ResponseParameters: method.response.header.Access-Control-Allow-Headers: "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token'" method.response.header.Access-Control-Allow-Methods: "'POST,OPTIONS'" method.response.header.Access-Control-Allow-Origin: "'*'" ResponseTemplates: application/json: '' PassthroughBehavior: WHEN_NO_MATCH ResourceId: !GetAtt Summit2022AgentInterfaceRestApi.RootResourceId RestApiId: !Ref Summit2022AgentInterfaceRestApi MethodResponses: - StatusCode: 200 ResponseModels: application/json: 'Empty' ResponseParameters: method.response.header.Access-Control-Allow-Headers: false method.response.header.Access-Control-Allow-Methods: false method.response.header.Access-Control-Allow-Origin: false Sumit2022ApiOptionsMethod: Type: AWS::ApiGateway::Method DependsOn: - Summit2022AgentInterfaceRestApi Properties: AuthorizationType: NONE RestApiId: !Ref Summit2022AgentInterfaceRestApi ResourceId: !GetAtt Summit2022AgentInterfaceRestApi.RootResourceId HttpMethod: OPTIONS Integration: IntegrationResponses: - StatusCode: 200 ResponseParameters: method.response.header.Access-Control-Allow-Headers: "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token'" method.response.header.Access-Control-Allow-Methods: "'POST,OPTIONS'" method.response.header.Access-Control-Allow-Origin: "'*'" ResponseTemplates: application/json: '' PassthroughBehavior: WHEN_NO_MATCH Type: MOCK MethodResponses: - StatusCode: 200 ResponseModels: application/json: 'Empty' ResponseParameters: method.response.header.Access-Control-Allow-Headers: false method.response.header.Access-Control-Allow-Methods: false method.response.header.Access-Control-Allow-Origin: false APIupdatePath: Type: 'AWS::ApiGateway::Resource' Properties: RestApiId: !Ref Summit2022AgentInterfaceRestApi ParentId: !GetAtt - Summit2022AgentInterfaceRestApi - RootResourceId PathPart: update Sumit2022ApiUpdateMethod: Type: AWS::ApiGateway::Method DependsOn: - Summit2022AgentInterfaceRestApi - APIupdatePath Properties: AuthorizationType: NONE HttpMethod: POST Integration: IntegrationHttpMethod: POST Type: AWS Uri: !Sub arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${LambdaSummit2022UpdateContract.Arn}/invocations IntegrationResponses: - StatusCode: 200 ResponseParameters: method.response.header.Access-Control-Allow-Headers: "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token'" method.response.header.Access-Control-Allow-Methods: "'POST,OPTIONS'" method.response.header.Access-Control-Allow-Origin: "'*'" ResponseTemplates: application/json: '' PassthroughBehavior: WHEN_NO_MATCH ResourceId: !Ref APIupdatePath RestApiId: !Ref Summit2022AgentInterfaceRestApi MethodResponses: - StatusCode: 200 ResponseModels: application/json: 'Empty' ResponseParameters: method.response.header.Access-Control-Allow-Headers: false method.response.header.Access-Control-Allow-Methods: false method.response.header.Access-Control-Allow-Origin: false Sumit2022ApiUpdateOptionsMethod: Type: AWS::ApiGateway::Method DependsOn: - Summit2022AgentInterfaceRestApi - APIupdatePath Properties: AuthorizationType: NONE RestApiId: !Ref Summit2022AgentInterfaceRestApi ResourceId: !Ref APIupdatePath HttpMethod: OPTIONS Integration: IntegrationResponses: - StatusCode: 200 ResponseParameters: method.response.header.Access-Control-Allow-Headers: "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token'" method.response.header.Access-Control-Allow-Methods: "'POST,OPTIONS'" method.response.header.Access-Control-Allow-Origin: "'*'" ResponseTemplates: application/json: '' PassthroughBehavior: WHEN_NO_MATCH Type: MOCK MethodResponses: - StatusCode: 200 ResponseModels: application/json: 'Empty' ResponseParameters: method.response.header.Access-Control-Allow-Headers: false method.response.header.Access-Control-Allow-Methods: false method.response.header.Access-Control-Allow-Origin: false Summit2022ApiDeployment: Type: AWS::ApiGateway::Deployment DependsOn: - Sumit2022ApiMethod - Sumit2022ApiOptionsMethod - Sumit2022ApiUpdateMethod - Sumit2022ApiUpdateOptionsMethod Properties: RestApiId: !Ref Summit2022AgentInterfaceRestApi StageName: prod lambdaApiGatewayInvoke1: Type: AWS::Lambda::Permission Properties: FunctionName: !GetAtt LambdaSummit2022APIGatewayResponse.Arn Action: lambda:InvokeFunction Principal: apigateway.amazonaws.com SourceArn: !Sub arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${Summit2022AgentInterfaceRestApi}/*/POST/ lambdaApiGatewayInvoke2: Type: AWS::Lambda::Permission Properties: FunctionName: !GetAtt LambdaSummit2022UpdateContract.Arn Action: lambda:InvokeFunction Principal: apigateway.amazonaws.com SourceArn: !Sub arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${Summit2022AgentInterfaceRestApi}/*/POST/update