package com.aws.ssa.keyspaces.retry; import com.datastax.oss.driver.api.core.ConsistencyLevel; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.context.DriverContext; import com.datastax.oss.driver.api.core.retry.RetryDecision; import com.datastax.oss.driver.api.core.retry.RetryPolicy; import com.datastax.oss.driver.api.core.servererrors.CoordinatorException; import com.datastax.oss.driver.api.core.servererrors.ReadTimeoutException; import com.datastax.oss.driver.api.core.servererrors.WriteTimeoutException; import com.datastax.oss.driver.api.core.servererrors.WriteType; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; import com.datastax.oss.driver.shaded.guava.common.util.concurrent.Uninterruptibles; import edu.umd.cs.findbugs.annotations.NonNull; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; /** * This is a conservative retry policy adapted for the Amazon Keyspaces Service. * It allows for a configurable number of attempts, but by default the number of attempts is {@value KeyspacesRetryOption#DEFAULT_KEYSPACES_RETRY_MAX_ATTEMPTS} *
* This policy will either reattempt request on the same host or rethrow the exception to the calling thread. The main difference between * this policy from the original {@link com.datastax.oss.driver.internal.core.retry.DefaultRetryPolicy} is that the {@link AmazonKeyspacesExponentialRetryPolicy} will call {@link RetryDecision#RETRY_SAME} instead of {@link RetryDecision#RETRY_NEXT} *
* In Amazon Keyspaces, it's likely that {@link WriteTimeoutException} or {@link ReadTimeoutException} is the result of exceeding current table * capacity. Learn more about Amazon Keyspaces capacity here: @see Amazon Keyspaces CapacityModes. * In most cases you should allow for small number of retries, and handle the exception in your application threads. * *
To activate this policy, modify the {@code advanced.retry-policy} section in the driver * configuration, for example: * *
* datastax-java-driver {
* advanced.retry-policy {
* class = com.aws.ssa.keyspaces.retry.AmazonKeyspacesRetryPolicy
* max-attempts = 2
* }
* }
*
*/
@ThreadSafe
public class AmazonKeyspacesExponentialRetryPolicy implements RetryPolicy {
private static final Logger LOG = LoggerFactory.getLogger(AmazonKeyspacesExponentialRetryPolicy.class);
@VisibleForTesting
public static final String RETRYING_ON_READ_TIMEOUT = "[{}] Retrying on read timeout on same host (consistency: {}, required responses: {}, received responses: {}, data retrieved: {}, retries: {})";
@VisibleForTesting
public static final String RETRYING_ON_WRITE_TIMEOUT = "[{}] Retrying on write timeout on same host (consistency: {}, write type: {}, required acknowledgments: {}, received acknowledgments: {}, retries: {})";
@VisibleForTesting
public static final String RETRYING_ON_UNAVAILABLE = "[{}] Retrying on unavailable exception on next host (consistency: {}, required replica: {}, alive replica: {}, retries: {})";
@VisibleForTesting
public static final String RETRYING_ON_ABORTED = "[{}] Retrying on aborted request on next host (retries: {})";
@VisibleForTesting
public static final String RETRYING_ON_ERROR = "[{}] Retrying on node error on next host (retries: {})";
private final String logPrefix;
private final Integer maxRetryCount;
//private final Integer maxTimeToWait;
public AmazonKeyspacesExponentialRetryPolicy(DriverContext context) {
this(context, context.getConfig().getDefaultProfile().getName());
}
public AmazonKeyspacesExponentialRetryPolicy(DriverContext context, Integer maxRetryCount) {
String profileName = context.getConfig().getDefaultProfile().getName();
this.maxRetryCount = maxRetryCount;
this.logPrefix = (context != null ? context.getSessionName() : null) + "|" + profileName;
}
public AmazonKeyspacesExponentialRetryPolicy(DriverContext context, String profileName) {
DriverExecutionProfile retryExecutionProfile = context.getConfig().getProfile(profileName);
maxRetryCount = retryExecutionProfile.getInt(KeyspacesRetryOption.KEYSPACES_RETRY_MAX_ATTEMPTS, KeyspacesRetryOption.DEFAULT_KEYSPACES_RETRY_MAX_ATTEMPTS);
this.logPrefix = (context != null ? context.getSessionName() : null) + "|" + profileName;
}
protected RetryDecision determineRetryDecision(int retryCount) {
if (retryCount < maxRetryCount) {
timeToWait(retryCount);
return RetryDecision.RETRY_SAME;
} else {
return RetryDecision.RETHROW;
}
}
protected void timeToWait(int retryCount){
int timeToWaitCalculation = (retryCount + 1) * ThreadLocalRandom.current().nextInt(1, 20);
int timeToWaitFinal = Math.min(500, timeToWaitCalculation);
Uninterruptibles.sleepUninterruptibly(timeToWaitFinal, TimeUnit.MILLISECONDS);
}
/**
* {@inheritDoc}
*
* This implementation triggers a maximum of configured retry (to the same connection) * *
Otherwise, the exception is rethrown. */ @Override public RetryDecision onReadTimeout( @NonNull Request request, @NonNull ConsistencyLevel cl, int blockFor, int received, boolean dataPresent, int retryCount) { RetryDecision decision = determineRetryDecision(retryCount); LOG.trace(RETRYING_ON_READ_TIMEOUT, logPrefix, cl, blockFor, received, false, retryCount); return decision; } /** * {@inheritDoc} * *
This implementation triggers a maximum of configured retry (to the same connection) * *
Otherwise, the exception is rethrown. */ @Override public RetryDecision onWriteTimeout( @NonNull Request request, @NonNull ConsistencyLevel cl, @NonNull WriteType writeType, int blockFor, int received, int retryCount) { RetryDecision decision = determineRetryDecision(retryCount); LOG.trace(RETRYING_ON_WRITE_TIMEOUT, logPrefix, cl, blockFor, received, false, retryCount); return decision; } /** * {@inheritDoc} * *
This implementation triggers a maximum of configured retry (to the same connection) * *
Otherwise, the exception is rethrown. */ @Override public RetryDecision onUnavailable( @NonNull Request request, @NonNull ConsistencyLevel cl, int required, int alive, int retryCount) { RetryDecision decision = determineRetryDecision(retryCount); LOG.trace(RETRYING_ON_UNAVAILABLE, logPrefix, cl, required, alive, retryCount); return decision; } /** * {@inheritDoc} * *
This implementation triggers a maximum of configured retry (to the same connection) */ @Override public RetryDecision onRequestAborted( @NonNull Request request, @NonNull Throwable error, int retryCount) { RetryDecision decision = determineRetryDecision(retryCount); LOG.trace(RETRYING_ON_ABORTED, logPrefix, retryCount, error); return decision; } /** * {@inheritDoc} * *
This implementation triggers a maximum of configured retry (to the same connection) */ @Override public RetryDecision onErrorResponse( @NonNull Request request, @NonNull CoordinatorException error, int retryCount) { RetryDecision decision = determineRetryDecision(retryCount); LOG.trace(RETRYING_ON_ERROR, logPrefix, retryCount, error); return decision; } @Override public void close() { // nothing to do } }