""" Parses the output from an Amazon Transcribe job into turn-by-turn speech segments with sentiment analysis scores from Amazon Comprehend """ from pathlib import Path from datetime import datetime from urllib.parse import urlparse import pcaconfiguration as cf import subprocess import copy import re import json import csv import boto3 import sys import time # Sentiment helpers MIN_SENTIMENT_LENGTH = 16 NLP_THROTTLE_RETRIES = 1 # PII and other Markers PII_PLACEHOLDER = "[PII]" TMP_DIR = "/tmp" class SpeechSegment: """ Class to hold information about a single speech segment """ def __init__(self): self.segmentStartTime = 0.0 self.segmentEndTime = 0.0 self.segmentSpeaker = "" self.segmentText = "" self.segmentConfidence = [] self.segmentSentimentScore = -1.0 # -1.0 => no sentiment calculated self.segmentPositive = 0.0 self.segmentNegative = 0.0 self.segmentIsPositive = False self.segmentIsNegative = False self.segmentAllSentiments = [] self.segmentCustomEntities = [] class TranscribeParser: def __init__(self, minSentimentPos, minSentimentNeg, customEntityEndpoint): self.min_sentiment_positive = minSentimentPos self.min_sentiment_negative = minSentimentNeg self.transcribeJobInfo = "" self.conversationLanguageCode = "" self.comprehendLanguageCode = "" self.conversationTime = "" self.conversationLocation = "" self.speechSegmentList = [] self.headerEntityDict = {} self.numWordsParsed = 0 self.cummulativeWordAccuracy = 0.0 self.maxSpeakerIndex = 0 self.customEntityEndpointName = customEntityEndpoint self.customEntityEndpointARN = "" self.simpleEntityMap = {} self.matchedSimpleEntities = {} self.audioPlaybackUri = "" self.duration = 0.0 cf.loadConfiguration() # Check the model exists - if now we may use simple file entity detection instead if self.customEntityEndpointName != "": # Get the ARN for our classifier endpoint, getting out quickly if there # isn't one defined or if we can't find the one that is defined comprehendClient = boto3.client("comprehend") recognizerList = comprehendClient.list_endpoints() recognizer = list(filter(lambda x: x["EndpointArn"].endswith(self.customEntityEndpointName), recognizerList["EndpointPropertiesList"])) # Only use it if it exists (!) and is IN_SERVICE if (recognizer == []) or (recognizer[0]["Status"] != "IN_SERVICE"): # Doesn't exist, so ignore the config self.customEntityEndpointName = "" else: self.customEntityEndpointARN = recognizer[0]["EndpointArn"] # Set flag to say if we could do simple entities self.simpleEntityMatchingUsed = (self.customEntityEndpointARN == "") and \ (cf.appConfig[cf.CONF_ENTITY_FILE] != "") def generateSpeakerSentimentTrend(self, speaker, spkNum): ''' Generates and returns a sentiment trend block for this speaker { "Speaker": "string", "AverageSentiment": "float", "SentimentChange": "float" } ''' speakerTrend = {} speakerTrend["Speaker"] = speaker speakerTurns = 0 sumSentiment = 0.0 firstSentiment = 0.0 finalSentiment = 0.0 for segment in self.speechSegmentList: if segment.segmentSpeaker == speaker: # Increment our counter for number of speaker turns and update the last turn score speakerTurns += 1 if segment.segmentIsPositive or segment.segmentIsNegative: # Only really interested in Positive/Negative turns for the stats. We need to # average out the calls between +/- 1, so we sum each turn as follows: # ([sentiment] - [sentimentBase]) / (1 - [sentimentBase]) # with the answer positive/negative based on the sentiment. We rebase as we have # thresholds to declare turns as pos/neg, so might be in the range 0.30-1.00. but # Need this changed to 0.00-1.00 if segment.segmentIsPositive: sentimentBase = self.min_sentiment_positive signModifier = 1.0 else: sentimentBase = self.min_sentiment_negative signModifier = -1.0 # Calculate score and add it to our total turnScore = signModifier * ((segment.segmentSentimentScore - sentimentBase) / (1.0 - sentimentBase)) sumSentiment += turnScore # Assist to first-turn score if this is us, and update the last-turn # score, as we dont' know if this is the last turn for this speaker if speakerTurns == 1: firstSentiment = turnScore finalSentiment = turnScore else: finalSentiment = 0.0 # Log our trends for this speaker speakerTrend["SentimentChange"] = finalSentiment - firstSentiment speakerTrend["AverageSentiment"] = sumSentiment / max(speakerTurns, 1) return speakerTrend def createOutputConversationAnalytics(self): ''' Generates some conversation-level analytics for this document, which includes information about the call, speaker labels, sentiment trends and entities ''' resultsHeaderInfo = {} # Basic information. Note, we expect the input stream processing mechanism # to set the conversation time - if it is not set then we have no choice # but to default this to the current processing time. resultsHeaderInfo["ConversationTime"] = self.conversationTime resultsHeaderInfo["ConversationLocation"] = self.conversationLocation resultsHeaderInfo["ProcessTime"] = str(datetime.now()) resultsHeaderInfo["LanguageCode"] = self.conversationLanguageCode resultsHeaderInfo["Duration"] = str(self.duration) if self.conversationTime == "": resultsHeaderInfo["ConversationTime"] = resultsHeaderInfo["ProcessTime"] # Build up a list of speaker labels from the config; note that if we # have more speakers than configured then we still return something speakerLabels = [] for speaker in range(self.maxSpeakerIndex + 1): nextLabel = {} nextLabel["Speaker"] = "spk_" + str(speaker) try: nextLabel["DisplayText"] = cf.appConfig[cf.CONF_SPEAKER_NAMES][speaker] except: nextLabel["DisplayText"] = "Unknown-" + str(speaker) speakerLabels.append(nextLabel) resultsHeaderInfo["SpeakerLabels"] = speakerLabels # Sentiment Trends sentimentTrends = [] for speaker in range(self.maxSpeakerIndex + 1): sentimentTrends.append(self.generateSpeakerSentimentTrend("spk_" + str(speaker), speaker)) resultsHeaderInfo["SentimentTrends"] = sentimentTrends # Detected custom entity summaries next customEntityList = [] for entity in self.headerEntityDict: nextEntity = {} nextEntity['Name'] = entity nextEntity['Count'] = len(self.headerEntityDict[entity]) nextEntity['Values'] = self.headerEntityDict[entity] customEntityList.append(nextEntity) resultsHeaderInfo["CustomEntities"] = customEntityList # Decide which source information block to add - only one for now transcribeSourceInfo = {} transcribeSourceInfo["TranscribeJobInfo"] = self.createOutputTranscribeJobInfo() sourceInfo = [] sourceInfo.append(transcribeSourceInfo) resultsHeaderInfo["SourceInformation"] = sourceInfo # Add on any file-based entity used if self.simpleEntityMatchingUsed: resultsHeaderInfo["EntityRecognizerName"] = cf.appConfig[cf.CONF_ENTITY_FILE] elif self.customEntityEndpointName != "": resultsHeaderInfo["EntityRecognizerName"] = self.customEntityEndpointName return resultsHeaderInfo def createOutputTranscribeJobInfo(self): ''' "TranscribeJobInfo": { "TranscriptionJobName": "string", "CompletionTime": "string", "VocabularyName": "string", "MediaFormat": "string", "MediaSampleRateHertz": "integer", "MediaFileUri": "string", "MediaOriginalUri": "string", "ChannelIdentification": "boolean", "AverageAccuracy": "float" } ''' transcribeJobInfo = {} # Some fields we pick off the basic job info transcribeJobInfo["TranscriptionJobName"] = self.transcribeJobInfo["TranscriptionJobName"] transcribeJobInfo["CompletionTime"] = str(self.transcribeJobInfo["CompletionTime"]) transcribeJobInfo["MediaFormat"] = self.transcribeJobInfo["MediaFormat"] transcribeJobInfo["MediaSampleRateHertz"] = self.transcribeJobInfo["MediaSampleRateHertz"] transcribeJobInfo["MediaOriginalUri"] = self.transcribeJobInfo["Media"]["MediaFileUri"] transcribeJobInfo["ChannelIdentification"] = int(self.transcribeJobInfo["Settings"]["ChannelIdentification"]) transcribeJobInfo["AverageAccuracy"] = self.cummulativeWordAccuracy / max(float(self.numWordsParsed), 1.0) # Did we create an MP3 output file? If so then use it for playback rather than the original if self.audioPlaybackUri != "": transcribeJobInfo["MediaFileUri"] = self.audioPlaybackUri else: transcribeJobInfo["MediaFileUri"] = transcribeJobInfo["MediaOriginalUri"] # Vocabulary name is optional if "VocabularyName" in self.transcribeJobInfo["Settings"]: transcribeJobInfo["VocabularyName"] = self.transcribeJobInfo["Settings"]["VocabularyName"] return transcribeJobInfo def createOutputSpeechSegments(self): ''' Creates a list of speech segments for this conversation, including custom entities "SpeechSegments": [ { "SegmentStartTime": "float", "SegmentEndTime": "float", "SegmentSpeaker": "string", "OriginalText": "string", "DisplayText": "string", "TextEdited": "boolean", "SentimentIsPositive": "boolean", "SentimentIsNegative": "boolean", "SentimentScore": "float", "BaseSentimentScores": { "Positive": "float", "Negative": "float", "Neutral": "float", "Mixed": "float" }, "EntitiesDetected": [ { "Type": "string", "Text": "string", "BeginOffset": "integer", "EndOffset": "integer", "Score": "float" } ], "WordConfidence": [ { "Text": "string", "Confidence": "float", "StartTime": "float", "EndTime": "float" } ] } ] ''' speechSegments = [] # Loop through each of our speech segments # for segment in self.speechSegmentList: for segment in self.speechSegmentList: nextSegment = {} # Pick everything off our structures nextSegment["SegmentStartTime"] = segment.segmentStartTime nextSegment["SegmentEndTime"] = segment.segmentEndTime nextSegment["SegmentSpeaker"] = segment.segmentSpeaker nextSegment["OriginalText"] = segment.segmentText nextSegment["DisplayText"] = segment.segmentText nextSegment["TextEdited"] = 0 nextSegment["SentimentIsPositive"] = int(segment.segmentIsPositive) nextSegment["SentimentIsNegative"] = int(segment.segmentIsNegative) nextSegment["SentimentScore"] = segment.segmentSentimentScore nextSegment["BaseSentimentScores"] = segment.segmentAllSentiments nextSegment["EntitiesDetected"] = segment.segmentCustomEntities nextSegment["WordConfidence"] = segment.segmentConfidence # Add what we have to the full list speechSegments.append(nextSegment) return speechSegments def outputAsJSON(self): ''' { "ConversationAnalytics": { }, "SpeechSegments": [ ] } ''' outputJson = {} outputJson["ConversationAnalytics"] = self.createOutputConversationAnalytics() outputJson["SpeechSegments"] = self.createOutputSpeechSegments() return outputJson def mergeSpeakerSegments(self, inputSegmentList): """ Merges together two adjacent speaker segments if (a) the speaker is the same, and (b) if the gap between them is less than 3 seconds """ outputSegmentList = [] lastSpeaker = "" lastSegment = None # Step through each of our defined speaker segments for segment in inputSegmentList: if (segment.segmentSpeaker != lastSpeaker) or ((segment.segmentStartTime - lastSegment.segmentEndTime) >= 3.0): # Simple case - speaker change or > 3.0 second gap means new output segment outputSegmentList.append(segment) # This is now our base segment moving forward lastSpeaker = segment.segmentSpeaker lastSegment = segment else: # Same speaker, short time, need to copy this info to the last one lastSegment.segmentEndTime = segment.segmentEndTime lastSegment.segmentText += " " + segment.segmentText segment.segmentConfidence[0]["Text"] = " " + segment.segmentConfidence[0]["Text"] for wordConfidence in segment.segmentConfidence: lastSegment.segmentConfidence.append(wordConfidence) return outputSegmentList def updateHeaderEntityCount(self, entityType, entityValue): """ Updates the header-level entity structure with the given tuple, but duplicates are not added """ # Ensure we have an entry in our collection for this key if entityType not in self.headerEntityDict: self.headerEntityDict[entityType] = [] # If we don't already have this tuple then add it to the header keyDetails = self.headerEntityDict[entityType] if not entityValue in keyDetails: keyDetails.append(entityValue) self.headerEntityDict[entityType] = keyDetails def extractEntitiesFromLine(self, entityLine, speechSegment, typeFilter): """ Takes a speech segment and an entity line from Comprehend - standard or custom models - and if the entity type is in our input type filter (or is blank) then add it to the transcript """ if float(entityLine['Score']) >= cf.appConfig[cf.CONF_ENTITYCONF]: entityType = entityLine['Type'] # If we have a type filter then ensure we match it before adding the entry if (typeFilter == []) or (entityType in typeFilter): # Update our header entry self.updateHeaderEntityCount(entityType, entityLine["Text"]) # Now do the same with the SpeechSegment, but append the full details speechSegment.segmentCustomEntities.append(entityLine) def setComprehendLanguageCode(self, transcribeLangCode): ''' Based upon the language defined by the input stream set the best-match language code for Comprehend to use for this conversation. It is "best-match" as Comprehend can model in EN, but has no differentiation between EN-US and EN-GB. If we cannot determine a language to use then we cannot use Comprehend standard models ''' targetLangModel = "" self.conversationLanguageCode = transcribeLangCode try: for checkLangCode in cf.appConfig[cf.CONF_COMP_LANGS]: if transcribeLangCode.startswith(checkLangCode): targetLangModel = checkLangCode break except: # If anything fails - e.g. no language string - then we have no language for Comprehend pass self.comprehendLanguageCode = targetLangModel def comprehendSingleSentiment(self, text, client): """ Perform sentiment analysis, but try and avert throttling by trying one more time if this exceptions. It is not a replacement for limit increases, but will help limit failures if usage suddenly grows """ sentimentResponse = {} counter = 0 while sentimentResponse == {}: try: sentimentResponse = client.detect_sentiment(Text=text, LanguageCode=self.comprehendLanguageCode) except Exception as e: if counter < NLP_THROTTLE_RETRIES: counter += 1 time.sleep(3) else: raise e return sentimentResponse def comprehendSingleEntity(self, text, client): """ Perform entity analysis, but try and avert throttling by trying one more time if this exceptions. It is not a replacement for limit increases, but will help limit failures if usage suddenly grows """ locationEntityResponse = {} counter = 0 while locationEntityResponse == {}: try: locationEntityResponse = client.detect_entities(Text=text, LanguageCode=self.comprehendLanguageCode) except Exception as e: if counter < NLP_THROTTLE_RETRIES: counter += 1 time.sleep(3) else: raise e return locationEntityResponse def performComprehendNLP(self, segmentList): """ Generates sentiment per speech segment, inserting the results into the input list. If we had no valid language for Comprehend to use then we use Neutral for everything. It also extracts standard LOCATION entities, and calls any custom entity recognition model that has been configured for that language """ client = boto3.client("comprehend") # Work out with Comprehend language model to use if self.comprehendLanguageCode == "": # If there's no language model then everything is Neutral neutralSentimentSet = {'Positive': 0.0, 'Negative': 0.0, 'Neutral': 1.0, 'Mixed': 0.0} # Go through each of our segments for nextSegment in segmentList: if len(nextSegment.segmentText) >= MIN_SENTIMENT_LENGTH: nextText = nextSegment.segmentText # If we have a language model then extract sentiment via Comprehend if self.comprehendLanguageCode != "": # Get sentiment and standard entity detection from Comprehend sentimentResponse = self.comprehendSingleSentiment(nextText, client) locationEntityResponse = self.comprehendSingleEntity(nextText, client) # We're only interested in LOCATION standard entities for detectedEntity in locationEntityResponse["Entities"]: self.extractEntitiesFromLine(detectedEntity, nextSegment, ["LOCATION"]) # Now do the same for any entities we can find in a custom model. At the # time of writing, Custom Entity models in Comprehend are ENGLISH ONLY if (self.customEntityEndpointARN != "") and (self.comprehendLanguageCode == "en"): # Call the custom model and insert customEntityResponse = client.detect_entities(Text=nextText, EndpointArn=self.customEntityEndpointARN) for detectedEntity in customEntityResponse["Entities"]: self.extractEntitiesFromLine(detectedEntity, nextSegment, []) # Now onto the sentiment - begin by storing the raw values positiveBase = sentimentResponse["SentimentScore"]["Positive"] negativeBase = sentimentResponse["SentimentScore"]["Negative"] # If we're over the NEGATIVE threshold then we're negative if negativeBase >= self.min_sentiment_negative: nextSegment.segmentSentiment = "Negative" nextSegment.segmentIsNegative = True nextSegment.segmentSentimentScore = negativeBase # Else if we're over the POSITIVE threshold then we're positive, # otherwise we're either MIXED or NEUTRAL and we don't really care elif positiveBase >= self.min_sentiment_positive: nextSegment.segmentSentiment = "Positive" nextSegment.segmentIsPositive = True nextSegment.segmentSentimentScore = positiveBase # Store all of the original sentiments for future use nextSegment.segmentAllSentiments = sentimentResponse["SentimentScore"] nextSegment.segmentPositive = positiveBase nextSegment.segmentNegative = negativeBase else: # We had no language - default sentiment, no new entities nextSegment.segmentAllSentiments = neutralSentimentSet nextSegment.segmentPositive = 0.0 nextSegment.segmentNegative = 0.0 def generateSpeakerLabel(self, transcribeSpeaker): ''' Takes the Transcribed-generated speaker, which could be spk_{N} or ch_{N}, and returns the label spk_{N}. This allows us to have a consistent label in the output JSON, which means that a header field in the output is able to dynamically swap the display labels. This is needed as we cannot guarantee, especially with speaker-separated, who speaks first ''' index = transcribeSpeaker.find("_") speaker = int(transcribeSpeaker[index + 1:]) if speaker > self.maxSpeakerIndex: self.maxSpeakerIndex = speaker newLabel = "spk_" + str(speaker) return newLabel def createTurnByTurnSegments(self, transcribeJobFilename): """ Creates a list of conversational turns, splitting up by speaker or if there's a noticeable pause in conversation. Notes, this works differently for speaker-separated and channel-separated files. For speaker- the lines are already separated by speaker, so we only worry about splitting up speaker pauses of more than 3 seconds, but for channel- we have to hunt gaps of 100ms across an entire channel, then sort segments from both channels, then merge any together to ensure we keep to the 3-second pause; this way means that channel- files are able to show interleaved speech where speakers are talking over one another. Once all of this is done we inject sentiment into each segment. """ speechSegmentList = [] # Load in the JSON file for processing json_filepath = Path(transcribeJobFilename) data = json.load(open(json_filepath.absolute(), "r", encoding="utf-8")) # Decide on our operational mode and set the overall job language isChannelMode = self.transcribeJobInfo["Settings"]["ChannelIdentification"] isSpeakerMode = not self.transcribeJobInfo["Settings"]["ChannelIdentification"] lastSpeaker = "" lastEndTime = 0.0 skipLeadingSpace = False confidenceList = [] nextSpeechSegment = None # Process a Speaker-separated file if isSpeakerMode: # A segment is a blob of pronunciation and punctuation by an individual speaker for segment in data["results"]["speaker_labels"]["segments"]: # If there is content in the segment then pick out the time and speaker if len(segment["items"]) > 0: # Pick out our next data nextStartTime = float(segment["start_time"]) nextEndTime = float(segment["end_time"]) nextSpeaker = self.generateSpeakerLabel( str(segment["speaker_label"])) # If we've changed speaker, or there's a 3-second gap, create a new row if (nextSpeaker != lastSpeaker) or ((nextStartTime - lastEndTime) >= 3.0): nextSpeechSegment = SpeechSegment() speechSegmentList.append(nextSpeechSegment) nextSpeechSegment.segmentStartTime = nextStartTime nextSpeechSegment.segmentSpeaker = nextSpeaker skipLeadingSpace = True confidenceList = [] nextSpeechSegment.segmentConfidence = confidenceList nextSpeechSegment.segmentEndTime = nextEndTime # Note the speaker and end time of this segment for the next iteration lastSpeaker = nextSpeaker lastEndTime = nextEndTime # For each word in the segment... for word in segment["items"]: # Get the word with the highest confidence pronunciations = list(filter(lambda x: x["type"] == "pronunciation", data["results"]["items"])) word_result = list(filter(lambda x: x["start_time"] == word["start_time"] and x["end_time"] == word["end_time"], pronunciations)) try: result = sorted(word_result[-1]["alternatives"], key=lambda x: x["confidence"])[-1] confidence = float(result["confidence"]) except: result = word_result[-1]["alternatives"][0] confidence = float(result["redactions"][0]["confidence"]) # If we're doing simple entities then track which entities have been seen so far if self.simpleEntityMap != {}: checkTerm = result["content"].lower() if checkTerm in self.simpleEntityMap: self.matchedSimpleEntities[checkTerm] = self.simpleEntityMap[checkTerm] # Write the word, and a leading space if this isn't the start of the segment if (skipLeadingSpace): skipLeadingSpace = False wordToAdd = result["content"] else: wordToAdd = " " + result["content"] # If the next item is punctuation, add it to the current word try: word_result_index = data["results"]["items"].index(word_result[0]) next_item = data["results"]["items"][word_result_index + 1] if next_item["type"] == "punctuation": wordToAdd += next_item["alternatives"][0]["content"] except IndexError: pass # Add word and confidence to the segment and to our overall stats nextSpeechSegment.segmentText += wordToAdd confidenceList.append({"Text": wordToAdd, "Confidence": confidence, "StartTime": float(word["start_time"]), "EndTime": float(word["end_time"])}) self.numWordsParsed += 1 self.cummulativeWordAccuracy += confidence # Process a Channel-separated file elif isChannelMode: # A channel contains all pronunciation and punctuation from a single speaker for channel in data["results"]["channel_labels"]["channels"]: # If there is content in the channel then start processing it if len(channel["items"]) > 0: # We have the same speaker all the way through this channel nextSpeaker = self.generateSpeakerLabel(str(channel["channel_label"])) for word in channel["items"]: # Pick out our next data from a 'pronunciation' if word["type"] == "pronunciation": nextStartTime = float(word["start_time"]) nextEndTime = float(word["end_time"]) # If we've changed speaker, or we haven't and the # pause is very small, then start a new text segment if (nextSpeaker != lastSpeaker) or ((nextSpeaker == lastSpeaker) and ((nextStartTime - lastEndTime) > 0.1)): nextSpeechSegment = SpeechSegment() speechSegmentList.append(nextSpeechSegment) nextSpeechSegment.segmentStartTime = nextStartTime nextSpeechSegment.segmentSpeaker = nextSpeaker skipLeadingSpace = True confidenceList = [] nextSpeechSegment.segmentConfidence = confidenceList nextSpeechSegment.segmentEndTime = nextEndTime # Note the speaker and end time of this segment for the next iteration lastSpeaker = nextSpeaker lastEndTime = nextEndTime # Get the word with the highest confidence pronunciations = list(filter(lambda x: x["type"] == "pronunciation", channel["items"])) word_result = list(filter(lambda x: x["start_time"] == word["start_time"] and x["end_time"] == word["end_time"], pronunciations)) try: result = sorted(word_result[-1]["alternatives"], key=lambda x: x["confidence"])[-1] confidence = float(result["confidence"]) except: result = word_result[-1]["alternatives"][0] confidence = float(result["redactions"][0]["confidence"]) # If we're doing simple entities then track which entities have been seen so far if self.simpleEntityMap != {}: checkTerm = result["content"].lower() if checkTerm in self.simpleEntityMap: self.matchedSimpleEntities[checkTerm] = self.simpleEntityMap[checkTerm] # Write the word, and a leading space if this isn't the start of the segment if (skipLeadingSpace): skipLeadingSpace = False wordToAdd = result["content"] else: wordToAdd = " " + result["content"] # If the next item is punctuation, add it to the current word try: word_result_index = channel["items"].index(word_result[0]) next_item = channel["items"][word_result_index + 1] if next_item["type"] == "punctuation": wordToAdd += next_item["alternatives"][0]["content"] except IndexError: pass # Add word and confidence to the segment and to our overall stats nextSpeechSegment.segmentText += wordToAdd confidenceList.append({"Text": wordToAdd, "Confidence": confidence, "StartTime": float(word["start_time"]), "EndTime": float(word["end_time"])}) self.numWordsParsed += 1 self.cummulativeWordAccuracy += confidence # Sort the segments, as they are in channel-order and not speaker-order, then # merge together turns from the same speaker that are very close together speechSegmentList = sorted(speechSegmentList, key=lambda segment: segment.segmentStartTime) speechSegmentList = self.mergeSpeakerSegments(speechSegmentList) # Inject sentiments into the segment list self.performComprehendNLP(speechSegmentList) # If we ended up with any matched simple entities then insert # them, which we can now do as we now have the sentence order if self.matchedSimpleEntities != {}: self.createSimpleEntityEntries(speechSegmentList) # Now set the overall call duration if we actually had any speech if len(speechSegmentList) > 0: self.duration = float(speechSegmentList[-1].segmentConfidence[-1]["EndTime"]) # Return our full turn-by-turn speaker segment list with sentiment return speechSegmentList def createSimpleEntityEntries(self, speechSegments): """ Searches through the speech segments given and updates them with any of the simple entity mapping entries that we've found. It also updates the line-level items. Both methods simulate the same response that we'd generate if this was via Standard or Custom Comprehend Entities """ # Loop through each segment looking for matches in our cut-down entity list for entity in self.matchedSimpleEntities: # Start by recording this in the header entityEntry = self.matchedSimpleEntities[entity] self.updateHeaderEntityCount(entityEntry["Type"], entityEntry["Original"]) # Work through each segment for segment in speechSegments: # Stop if the entity chars appear somewhere if entity in segment.segmentText.lower(): # Now find the right spot in the segment (if any) and insert that entry offsetStart = 0 for wordEntry in segment.segmentConfidence: nextWord = wordEntry["Text"].lower().strip(" ,?.") offsetEnd = offsetStart + len(wordEntry["Text"]) if entity == nextWord: # Got a match - add this one on newLineEntity = {} newLineEntity["Score"] = 1.0 newLineEntity["Type"] = entityEntry["Type"] newLineEntity["Text"] = wordEntry["Text"].strip(" ,?.") newLineEntity["BeginOffset"] = offsetStart newLineEntity["EndOffset"] = offsetEnd segment.segmentCustomEntities.append(newLineEntity) offsetStart = offsetEnd def calculateTranscribeConversationTime(self, filename): ''' Tries to work out the conversation time based upon patterns in the filename. Currently, the POC customer has this format - 0a.93.a0.3e.00.00-09.25.51.067-09-26-2019.wav, but there may be others, and hence this may need to be a plug-in per customer or something later. If we cannot generate a time then the system later defaults to the current ''' try: # Filename = 0a.93.a0.3e.00.00-09.25.51.067-09-26-2019.wav match = re.search('\d{2}.\d{2}.\d{2}.\d{3}-\d{2}-\d{2}-\d{4}', filename) self.conversationTime = str(datetime.strptime(match.group(), '%H.%M.%S.%f-%m-%d-%Y')) self.conversationLocation = cf.appConfig[cf.CONF_CONVO_LOCATION] except: # If everything fails system will use "now" as the datetime in UTC self.conversationLocation = "Etc/UTC" def loadSimpleEntityStringMap(self): """ Loads in any defined simple entity map for later use - this must be a CSV file, but it will be defined without a language code. We will append the Comprehend language code to the filename and use that, as that will give us multi-language coverage with a single file. Example: Configured File = entityFile.csv -> Processed File for en-US audio = entityFile-en.csv """ if self.simpleEntityMatchingUsed: # First, need to build up the real filename to use for this language. If we don't # have a language (unlikely) then just try to use the base filename as a last resort key = cf.appConfig[cf.CONF_ENTITY_FILE] if (self.comprehendLanguageCode != ""): key = key.split('.csv')[0] + "-" + self.comprehendLanguageCode + ".csv" # Then check that the language-specific mapping file actually exists s3 = boto3.client("s3") bucket = cf.appConfig[cf.CONF_SUPPORT_BUCKET] try: response = s3.get_object(Bucket=bucket, Key=key) except Exception as e: # Mapping file doesn't exist, so just quietly exit but log something print("ERROR: Configured simple entity file {} in bucket {} does not exist - entity detection not possible".format(key, bucket)) self.simpleEntityMatchingUsed = False return # Go download the mapping file and get it into a structure mapFilepath = TMP_DIR + '/' + cf.appConfig[cf.CONF_ENTITY_FILE] s3.download_file(bucket, key, mapFilepath) reader = csv.DictReader(open(mapFilepath, errors="ignore")) try: for row in reader: origTerm = row.pop("Text") checkTerm = origTerm.lower() if not (checkTerm in self.simpleEntityMap): self.simpleEntityMap[checkTerm] = { "Type": row.pop("Type"), "Original": origTerm } except Exception as e: print(e) def createPlaybackMP3Audio(self): """ Creates and MP3-version of the audio file used in the Transcribe job, as the HTML5