package com.amazonaws.transcribestreaming;
import com.amazonaws.kvstranscribestreaming.MetricsUtil;
import com.amazonaws.regions.Regions;
import org.apache.commons.lang3.Validate;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.transcribestreaming.TranscribeStreamingAsyncClient;
import software.amazon.awssdk.services.transcribestreaming.model.AudioStream;
import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionRequest;
import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionResponseHandler;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
/**
* Build a client wrapper around the Amazon Transcribe client to retry
* on an exception that can be retried.
*
*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this
* software and associated documentation files (the "Software"), to deal in the Software
* without restriction, including without limitation the rights to use, copy, modify,
* merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
* PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
public class TranscribeStreamingRetryClient implements AutoCloseable {
private static final int DEFAULT_MAX_RETRIES = 3;
private static final int DEFAULT_MAX_SLEEP_TIME_MILLS = 100;
private int maxRetries = DEFAULT_MAX_RETRIES;
private int sleepTime = DEFAULT_MAX_SLEEP_TIME_MILLS;
private final TranscribeStreamingAsyncClient client;
private final MetricsUtil metricsUtil;
List> nonRetriableExceptions = Arrays.asList(SdkClientException.class);
private static final Logger logger = LoggerFactory.getLogger(TranscribeStreamingRetryClient.class);
/**
* Create a TranscribeStreamingRetryClient with given credential and configuration
*
* @param creds Creds to use for transcription
* @param endpoint Endpoint to use for transcription
* @param region Region to use for transcriptions
* @param metricsUtil
* @throws URISyntaxException if the endpoint is not a URI
*/
public TranscribeStreamingRetryClient(AwsCredentialsProvider creds,
String endpoint, Regions region, MetricsUtil metricsUtil) throws URISyntaxException {
this(TranscribeStreamingAsyncClient.builder()
.credentialsProvider(creds)
.endpointOverride(new URI(endpoint))
.region(Region.of(region.getName()))
.build(), metricsUtil);
}
/**
* Initiate TranscribeStreamingRetryClient with TranscribeStreamingAsyncClient
*
* @param client TranscribeStreamingAsyncClient
* @param metricsUtil
*/
public TranscribeStreamingRetryClient(TranscribeStreamingAsyncClient client, MetricsUtil metricsUtil) {
this.client = client;
this.metricsUtil = metricsUtil;
}
/**
* Get Max retries
*
* @return Max retries
*/
public int getMaxRetries() {
return maxRetries;
}
/**
* Set Max retries
*
* @param maxRetries Max retries
*/
public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
}
/**
* Get sleep time
*
* @return sleep time between retries
*/
public int getSleepTime() {
return sleepTime;
}
/**
* Set sleep time between retries
*
* @param sleepTime sleep time
*/
public void setSleepTime(int sleepTime) {
this.sleepTime = sleepTime;
}
/**
* Initiate a Stream Transcription with retry.
*
* @param request StartStreamTranscriptionRequest to use to start transcription
* @param publisher The source audio stream as Publisher
* @param responseHandler StreamTranscriptionBehavior object that defines how the response needs to be handled.
* @return Completable future to handle stream response.
*/
public CompletableFuture startStreamTranscription(final StartStreamTranscriptionRequest request,
final Publisher publisher,
final StreamTranscriptionBehavior responseHandler,
final String channel) {
Validate.notNull(request);
Validate.notNull(publisher);
Validate.notNull(responseHandler);
CompletableFuture finalFuture = new CompletableFuture<>();
recursiveStartStream(rebuildRequestWithSession(request), publisher, responseHandler, finalFuture, 0, channel);
return finalFuture;
}
/**
* Recursively call startStreamTranscription() to be called till the request is completed or till we run out of retries.
*
* @param request StartStreamTranscriptionRequest
* @param publisher The source audio stream as Publisher
* @param responseHandler StreamTranscriptionBehavior object that defines how the response needs to be handled.
* @param finalFuture final future to finish on completing the chained futures.
* @param retryAttempt Current attempt number
*/
private void recursiveStartStream(final StartStreamTranscriptionRequest request,
final Publisher publisher,
final StreamTranscriptionBehavior responseHandler,
final CompletableFuture finalFuture,
final int retryAttempt, final String channel) {
CompletableFuture result = client.startStreamTranscription(request, publisher,
getResponseHandler(responseHandler));
result.whenComplete((r, e) -> {
if (e != null) {
logger.debug("Error occurred on channel " + channel +" : " + e.getMessage());
e.printStackTrace();
if (retryAttempt <= maxRetries && isExceptionRetriable(e)) {
logger.debug("Retriable error occurred and will be retried.");
logger.debug("Sleeping for sometime before retrying...");
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e1) {
logger.error("Sleep between retries interrupted. Failed with exception: ", e);
finalFuture.completeExceptionally(e);
}
logger.debug("Making retry attempt: " + (retryAttempt + 1));
recursiveStartStream(request, publisher, responseHandler, finalFuture, retryAttempt + 1, channel);
} else {
metricsUtil.recordMetric("TranscribeStreamError", 1);
logger.error("Encountered unretriable exception or ran out of retries.", e);
responseHandler.onError(e);
finalFuture.completeExceptionally(e);
}
} else {
logger.info("Completable future is complete.");
//metricsUtil.recordMetric("TranscribeStreamError", 0);
responseHandler.onComplete();
finalFuture.complete(null);
}
});
}
private StartStreamTranscriptionRequest rebuildRequestWithSession(StartStreamTranscriptionRequest request) {
return StartStreamTranscriptionRequest.builder()
.languageCode(request.languageCode())
.mediaEncoding(request.mediaEncoding())
.mediaSampleRateHertz(request.mediaSampleRateHertz())
.sessionId(UUID.randomUUID().toString())
.build();
}
/**
* StartStreamTranscriptionResponseHandler implements subscriber of transcript stream
* Output is printed to standard output
*/
private StartStreamTranscriptionResponseHandler getResponseHandler(
StreamTranscriptionBehavior transcriptionBehavior) {
final StartStreamTranscriptionResponseHandler build = StartStreamTranscriptionResponseHandler.builder()
.onResponse(r -> {
transcriptionBehavior.onResponse(r);
})
.onError(e -> {
//Do nothing here. Don't close any streams that shouldn't be cleaned up yet.
logger.info("Reached on error but doing nothing" + e);
})
.onComplete(() -> {
//Do nothing here. Don't close any streams that shouldn't be cleaned up yet.
logger.info("Reached on complete.");
})
.subscriber(event -> {
try {
transcriptionBehavior.onStream(event);
}
// We swallow any exception occurred while processing the TranscriptEvent and continue transcribing
// Transcribe errors will however cause the future to complete exceptionally and we'll retry (if applicable)
catch (Exception e) {
}
})
.build();
return build;
}
/**
* Check if the exception can be retried.
*
* @param e Exception that occurred
* @return True if the exception is retriable
*/
private boolean isExceptionRetriable(Throwable e) {
if (nonRetriableExceptions.contains(e.getCause().getClass())) {
return false;
}
return true;
}
@Override
public void close() throws Exception {
logger.debug("TranscribeStreamingRetryClient closed");
this.client.close();
}
}