/* * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Permission is hereby granted, free of charge, to any person obtaining a copy of this * software and associated documentation files (the "Software"), to deal in the Software * without restriction, including without limitation the rights to use, copy, modify, * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ package com.amazonaws.transcribestreaming; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.reactivestreams.Publisher; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.signer.EventStreamAws4Signer; import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.transcribestreaming.TranscribeStreamingAsyncClient; import software.amazon.awssdk.services.transcribestreaming.model.*; /** * This class wraps the AWS SDK implementation of the AWS Transcribe API with some retry logic to handle common * error cases, such as flaky network connections. */ public class TranscribeStreamingRetryClient { private static final int DEFAULT_MAX_RETRIES = 10; private static final int DEFAULT_MAX_SLEEP_TIME_MILLS = 100; private int maxRetries = DEFAULT_MAX_RETRIES; private int sleepTime = DEFAULT_MAX_SLEEP_TIME_MILLS; private final TranscribeStreamingAsyncClient client; List> nonRetriableExceptions = Arrays.asList(BadRequestException.class); /** * Create a TranscribeStreamingRetryClient with given credential and configuration * @param creds Creds to used for transcription * @param endpoint Endpoint to use for transcription * @param region Region to use for transcriptions * @throws URISyntaxException if the endpoint is not a URI */ public TranscribeStreamingRetryClient(AwsCredentialsProvider creds, String endpoint, Region region) throws URISyntaxException { this(TranscribeStreamingAsyncClient.builder() .overrideConfiguration( c -> c.putAdvancedOption( SdkAdvancedClientOption.SIGNER, EventStreamAws4Signer.create())) .credentialsProvider(creds) .endpointOverride(new URI(endpoint)) .region(region) .build()); } /** * Initiate TranscribeStreamingRetryClient with TranscribeStreamingAsyncClient * @param client TranscribeStreamingAsyncClient */ public TranscribeStreamingRetryClient(TranscribeStreamingAsyncClient client) { this.client = client; } /** * Get Max retries * @return Max retries */ public int getMaxRetries() { return maxRetries; } /** * Set Max retries * @param maxRetries Max retries */ public void setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; } /** * Get sleep time * @return sleep time between retries */ public int getSleepTime() { return sleepTime; } /** * Set sleep time between retries * @param sleepTime sleep time */ public void setSleepTime(int sleepTime) { this.sleepTime = sleepTime; } /** * Initiate a Stream Transcription with retry. * @param request StartStreamTranscriptionRequest to use to start transcription * @param publisher The source audio stream as Publisher * @param responseHandler StreamTranscriptionBehavior object that defines how the response needs to be handled. * @return Completable future to handle stream response. */ public CompletableFuture startStreamTranscription(final StartStreamTranscriptionRequest request, final Publisher publisher, final StreamTranscriptionBehavior responseHandler) { CompletableFuture finalFuture = new CompletableFuture<>(); recursiveStartStream(rebuildRequestWithSession(request), publisher, responseHandler, finalFuture, 0); return finalFuture; } /** * Recursively call startStreamTranscription() to be called till the request is completed or till we run out of retries. * @param request StartStreamTranscriptionRequest * @param publisher The source audio stream as Publisher * @param responseHandler StreamTranscriptionBehavior object that defines how the response needs to be handled. * @param finalFuture final future to finish on completing the chained futures. * @param retryAttempt Current attempt number */ private void recursiveStartStream(final StartStreamTranscriptionRequest request, final Publisher publisher, final StreamTranscriptionBehavior responseHandler, final CompletableFuture finalFuture, final int retryAttempt) { CompletableFuture result = client.startStreamTranscription(request, publisher, getResponseHandler(responseHandler)); result.whenComplete((r, e) -> { if (e != null) { if (retryAttempt <= maxRetries && isExceptionRetriable(e)) { System.out.println("Retry attempt:" + (retryAttempt+1) ); try { Thread.sleep(sleepTime); } catch (InterruptedException e1) { e1.printStackTrace(); } recursiveStartStream(request, publisher, responseHandler, finalFuture, retryAttempt + 1); } else { responseHandler.onError(e); finalFuture.completeExceptionally(e); } } else { responseHandler.onComplete(); finalFuture.complete(null); } }); } private StartStreamTranscriptionRequest rebuildRequestWithSession(StartStreamTranscriptionRequest request) { return StartStreamTranscriptionRequest.builder() .languageCode(request.languageCode()) .mediaEncoding(request.mediaEncoding()) .mediaSampleRateHertz(request.mediaSampleRateHertz()) .showSpeakerLabel(true) .sessionId(UUID.randomUUID().toString()) .build(); } /** * StartStreamTranscriptionResponseHandler implements subscriber of transcript stream * Output is printed to standard output */ private StartStreamTranscriptionResponseHandler getResponseHandler( StreamTranscriptionBehavior transcriptionBehavior) { final StartStreamTranscriptionResponseHandler build = StartStreamTranscriptionResponseHandler.builder() .onResponse(r -> { transcriptionBehavior.onResponse(r); }) .onError(e -> { //Do nothing here. Make sure you don't close any streams that should not be cleaned up yet. }) .onComplete(() -> { //Do nothing here. Make sure you don't close any streams that should not be cleaned up yet. }) .subscriber(event -> transcriptionBehavior.onStream(event)) .build(); return build; } /** * Check if the exception is retriable or not. * @param e Exception that occurred * @return True if the exception is retriable */ private boolean isExceptionRetriable(Throwable e) { e.printStackTrace(); if (nonRetriableExceptions.contains(e.getClass())) { return false; } return true; } public void close() { this.client.close(); } }