package comprehend import java.io.{File, FileOutputStream, PrintWriter} import java.nio.file.Paths import java.util.UUID import org.json4s._ import org.json4s.jackson.JsonMethods._ import org.json4s.jackson.Serialization import com.amazonaws.services.lambda.runtime.{Context, RequestHandler} import com.amazonaws.services.lambda.runtime.events.ScheduledEvent import software.amazon.awssdk.services.comprehend.ComprehendClient import software.amazon.awssdk.services.comprehend.model.{InputDataConfig, InputFormat, OutputDataConfig, StartEntitiesDetectionJobRequest, StartKeyPhrasesDetectionJobRequest, StartSentimentDetectionJobRequest} import software.amazon.awssdk.services.dynamodb.DynamoDbClient import software.amazon.awssdk.services.dynamodb.model.{AttributeAction, AttributeValue, AttributeValueUpdate, UpdateItemRequest} import software.amazon.awssdk.services.s3.S3Client import software.amazon.awssdk.services.s3.model.{GetObjectRequest, PutObjectRequest} import software.amazon.awssdk.services.transcribe.TranscribeClient import software.amazon.awssdk.services.transcribe.model.GetTranscriptionJobRequest import scala.io.Source import scala.jdk.CollectionConverters._ /** * Class used to process CloudWatch events after Transcribe Finishes processing a job. */ class App extends RequestHandler[ScheduledEvent, String] { /** Prefix used to store the entities in the document. */ val bucketEntitiesPrefix: String = "entities/" /** Prefix used to store the key phrases. */ val bucketKeyPhrasesPrefix: String = "keyPhrases/" /** Prefix used to store the transcription text. */ val bucketPlainTranscriptPrefix: String = "transcripts/" /** Prefix used to store the sentiment analysis. */ val bucketSentimentPrefix: String = "sentiment/" /** Prefix used to store the topics found. */ val bucketTopicsPrefix: String = "topics/" /** Name of the bucket containing the data. */ val bucketName: String = sys.env.getOrElse("BUCKET_NAME", "transcribe-sentiment-poc") /** Comprehend client. */ val comprehendClient: ComprehendClient = ComprehendClient.builder().build() /** Role ARN passed to Comprehend to be able to read the transcripts. */ val comprehendRoleArn: String = sys.env("ROLE_ARN") /** Client for DynamoDb. */ val dynamoDbClient: DynamoDbClient = DynamoDbClient.builder().build() /** Code for the language to use in Comprehend. */ val languageCode: String = sys.env.getOrElse("LANGUAGE_CODE", "es") /** The S3 client. */ val s3Client: S3Client = S3Client.builder().build() /** DynamoDB process status table. */ val tableName: String = sys.env.getOrElse("TABLE_NAME", "transcribe-sentiment-poc-table") /** Client for Transcribe service. */ val transcribeClient: TranscribeClient = TranscribeClient.builder().build() /** Formats used for JSON Serialization. */ implicit val formats: Formats = Serialization.formats(NoTypeHints) /** * Process the EventBridge event generated by Amazon Transcribe when the process finishes or fails. * * @param input with the event. * @param context of the execution. * @return The result. */ override def handleRequest(input: ScheduledEvent, context: Context): String = { val logger = context.getLogger logger.log(s"RoleArn: $comprehendRoleArn") val jobId = input.getDetail.get("TranscriptionJobName").toString val status = input.getDetail.get("TranscriptionJobStatus").toString val time = input.getTime.toInstant.getMillis val key = Map.newBuilder[String, AttributeValue] key += "id" -> AttributeValue.builder().s(jobId).build() val item = Map.newBuilder[String, AttributeValueUpdate] item += "transcribe" -> AttributeValueUpdate.builder() .action(AttributeAction.PUT) .value(AttributeValue.builder() .n(time.toString) .build()) .build() item += "status" -> AttributeValueUpdate.builder() .action(AttributeAction.PUT) .value(AttributeValue.builder().s(status).build()) .build() logger.log(s"JobId: $jobId, Status: $status") if ("COMPLETED" == status) { val request = GetTranscriptionJobRequest.builder() .transcriptionJobName(jobId) .build() val response = transcribeClient.getTranscriptionJob(request) println(response.toString) item += "transcribe" -> AttributeValueUpdate.builder() .action(AttributeAction.PUT) .value(AttributeValue.builder() .n(response.transcriptionJob().completionTime().toEpochMilli.toString) .build()) .build() response.transcriptionJob().transcript().transcriptFileUri() val (transcriptKey, truncKey, fileSize) = processTranscript(jobId) val s3TranscriptUri = s"s3://$bucketName/$transcriptKey" val inputDataConfig = InputDataConfig.builder() .inputFormat(InputFormat.ONE_DOC_PER_FILE) .s3Uri(s3TranscriptUri) .build() val sentimentInputConfig = InputDataConfig.builder() .inputFormat(InputFormat.ONE_DOC_PER_FILE) .s3Uri(s"s3://$bucketName/${truncKey.getOrElse(transcriptKey)}") .build() val sentimentOutputDataConfig = OutputDataConfig.builder() .s3Uri(s"s3://$bucketName/$bucketSentimentPrefix$jobId/") .build() val startSentimentDetectionJobRequest = StartSentimentDetectionJobRequest.builder() .dataAccessRoleArn(comprehendRoleArn) .inputDataConfig(sentimentInputConfig) .jobName(s"$jobId-Sentiment") .languageCode(languageCode) .outputDataConfig(sentimentOutputDataConfig) .build() val startSentimentDetectionJobResponse = comprehendClient .startSentimentDetectionJob(startSentimentDetectionJobRequest) item += "sentimentJob" -> AttributeValueUpdate.builder() .action(AttributeAction.PUT) .value(AttributeValue.builder().s(startSentimentDetectionJobResponse.jobId()).build()) .build() val keyPhrasesOutputDataConfig = OutputDataConfig.builder() .s3Uri(s"s3://$bucketName/$bucketKeyPhrasesPrefix$jobId/") .build() val startKeyPhrasesDetectionJobRequest = StartKeyPhrasesDetectionJobRequest.builder() .dataAccessRoleArn(comprehendRoleArn) .inputDataConfig(inputDataConfig) .jobName(s"$jobId-KeyPhrases") .languageCode(languageCode) .outputDataConfig(keyPhrasesOutputDataConfig) .build() val startKeyPhrasesDetectionJobResponse = comprehendClient .startKeyPhrasesDetectionJob(startKeyPhrasesDetectionJobRequest) item += "keyPhrasesJob" -> AttributeValueUpdate.builder() .action(AttributeAction.PUT) .value(AttributeValue.builder().s(startKeyPhrasesDetectionJobResponse.jobId()).build()) .build() val entitiesDetectionJobResponse = comprehendClient.startEntitiesDetectionJob( StartEntitiesDetectionJobRequest.builder() .dataAccessRoleArn(comprehendRoleArn) .inputDataConfig(inputDataConfig) .jobName(s"$jobId-Entities") .languageCode(languageCode) .outputDataConfig( OutputDataConfig.builder() .s3Uri(s"s3://$bucketName/$bucketEntitiesPrefix$jobId/") .build() ) .build() ) item += "entitiesJob" -> AttributeValueUpdate.builder() .action(AttributeAction.PUT) .value(AttributeValue.builder().s(entitiesDetectionJobResponse.jobId()).build()) .build() } val updateItemRequest = UpdateItemRequest.builder() .attributeUpdates(item.result().asJava) .key(key.result.asJava) .tableName(tableName) .build() dynamoDbClient.updateItem(updateItemRequest) "Ok" } /** * Extracts the transcript to prepare it for comprehend. * * @param jobId of the Transcribe process. * @return The S3 key of the file. */ private [this] def processTranscript(jobId: String): (String, Option[String], Long) = { val getObjectRequest = GetObjectRequest.builder() .bucket(bucketName) .key(s"$jobId.json") .build() val tempFile = Paths.get(sys.env.getOrElse("java.io.tmpdir", "/tmp"), s"${UUID.randomUUID().toString}.json") s3Client.getObject(getObjectRequest, tempFile) tempFile.toFile.deleteOnExit() val transcript = parse(Source.fromFile(tempFile.toFile).reader()) val text = compact(render((transcript \ "results" \ "transcripts")(0) \ "transcript")) val textTmpFile = File.createTempFile("transcript-", ".txt") val writer = new PrintWriter(textTmpFile) writer.print(text) writer.flush() writer.close() val key = s"$bucketPlainTranscriptPrefix$jobId.txt" val putObjectRequest = PutObjectRequest.builder() .bucket(bucketName) .contentType("text/plain") .key(key) .build() s3Client.putObject(putObjectRequest, textTmpFile.toPath) val truncKey = if (textTmpFile.length > 5000) { val newKey = s"$bucketPlainTranscriptPrefix$jobId-trunc.txt" val outChan = new FileOutputStream(textTmpFile, true).getChannel outChan.truncate(5000) outChan.close() s3Client.putObject(PutObjectRequest.builder() .bucket(bucketName) .key(newKey) .build() , textTmpFile.toPath) Some(newKey) } else { None } (key, truncKey, textTmpFile.length) } }