package com.aws.ssa.keyspaces.retry; import com.datastax.oss.driver.api.core.ConsistencyLevel; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.context.DriverContext; import com.datastax.oss.driver.api.core.retry.RetryDecision; import com.datastax.oss.driver.api.core.retry.RetryPolicy; import com.datastax.oss.driver.api.core.servererrors.CoordinatorException; import com.datastax.oss.driver.api.core.servererrors.ReadTimeoutException; import com.datastax.oss.driver.api.core.servererrors.WriteTimeoutException; import com.datastax.oss.driver.api.core.servererrors.WriteType; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; import com.datastax.oss.driver.shaded.guava.common.util.concurrent.Uninterruptibles; import edu.umd.cs.findbugs.annotations.NonNull; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; /** * This is a conservative retry policy adapted for the Amazon Keyspaces Service. * It allows for a configurable number of attempts, but by default the number of attempts is {@value KeyspacesRetryOption#DEFAULT_KEYSPACES_RETRY_MAX_ATTEMPTS} *
* This policy will either reattempt request on the same host or rethrow the exception to the calling thread. The main difference between * this policy from the original {@link com.datastax.oss.driver.internal.core.retry.DefaultRetryPolicy} is that the {@link AmazonKeyspacesExponentialRetryPolicy} will call {@link RetryDecision#RETRY_SAME} instead of {@link RetryDecision#RETRY_NEXT} *
* In Amazon Keyspaces, it's likely that {@link WriteTimeoutException} or {@link ReadTimeoutException} is the result of exceeding current table * capacity. Learn more about Amazon Keyspaces capacity here: @see Amazon Keyspaces CapacityModes. * In most cases you should allow for small number of retries, and handle the exception in your application threads. * *
To activate this policy, modify the {@code advanced.retry-policy} section in the driver * configuration, for example: * *
* datastax-java-driver { * advanced.retry-policy { * class = com.aws.ssa.keyspaces.retry.AmazonKeyspacesRetryPolicy * max-attempts = 2 * } * } **/ @ThreadSafe public class AmazonKeyspacesExponentialRetryPolicy implements RetryPolicy { private static final Logger LOG = LoggerFactory.getLogger(AmazonKeyspacesExponentialRetryPolicy.class); @VisibleForTesting public static final String RETRYING_ON_READ_TIMEOUT = "[{}] Retrying on read timeout on same host (consistency: {}, required responses: {}, received responses: {}, data retrieved: {}, retries: {})"; @VisibleForTesting public static final String RETRYING_ON_WRITE_TIMEOUT = "[{}] Retrying on write timeout on same host (consistency: {}, write type: {}, required acknowledgments: {}, received acknowledgments: {}, retries: {})"; @VisibleForTesting public static final String RETRYING_ON_UNAVAILABLE = "[{}] Retrying on unavailable exception on next host (consistency: {}, required replica: {}, alive replica: {}, retries: {})"; @VisibleForTesting public static final String RETRYING_ON_ABORTED = "[{}] Retrying on aborted request on next host (retries: {})"; @VisibleForTesting public static final String RETRYING_ON_ERROR = "[{}] Retrying on node error on next host (retries: {})"; private final String logPrefix; private final Integer maxRetryCount; //private final Integer maxTimeToWait; public AmazonKeyspacesExponentialRetryPolicy(DriverContext context) { this(context, context.getConfig().getDefaultProfile().getName()); } public AmazonKeyspacesExponentialRetryPolicy(DriverContext context, Integer maxRetryCount) { String profileName = context.getConfig().getDefaultProfile().getName(); this.maxRetryCount = maxRetryCount; this.logPrefix = (context != null ? context.getSessionName() : null) + "|" + profileName; } public AmazonKeyspacesExponentialRetryPolicy(DriverContext context, String profileName) { DriverExecutionProfile retryExecutionProfile = context.getConfig().getProfile(profileName); maxRetryCount = retryExecutionProfile.getInt(KeyspacesRetryOption.KEYSPACES_RETRY_MAX_ATTEMPTS, KeyspacesRetryOption.DEFAULT_KEYSPACES_RETRY_MAX_ATTEMPTS); this.logPrefix = (context != null ? context.getSessionName() : null) + "|" + profileName; } protected RetryDecision determineRetryDecision(int retryCount) { if (retryCount < maxRetryCount) { timeToWait(retryCount); return RetryDecision.RETRY_SAME; } else { return RetryDecision.RETHROW; } } protected void timeToWait(int retryCount){ int timeToWaitCalculation = (retryCount + 1) * ThreadLocalRandom.current().nextInt(1, 20); int timeToWaitFinal = Math.min(500, timeToWaitCalculation); Uninterruptibles.sleepUninterruptibly(timeToWaitFinal, TimeUnit.MILLISECONDS); } /** * {@inheritDoc} * *
This implementation triggers a maximum of configured retry (to the same connection) * *
Otherwise, the exception is rethrown. */ @Override public RetryDecision onReadTimeout( @NonNull Request request, @NonNull ConsistencyLevel cl, int blockFor, int received, boolean dataPresent, int retryCount) { RetryDecision decision = determineRetryDecision(retryCount); LOG.trace(RETRYING_ON_READ_TIMEOUT, logPrefix, cl, blockFor, received, false, retryCount); return decision; } /** * {@inheritDoc} * *
This implementation triggers a maximum of configured retry (to the same connection) * *
Otherwise, the exception is rethrown. */ @Override public RetryDecision onWriteTimeout( @NonNull Request request, @NonNull ConsistencyLevel cl, @NonNull WriteType writeType, int blockFor, int received, int retryCount) { RetryDecision decision = determineRetryDecision(retryCount); LOG.trace(RETRYING_ON_WRITE_TIMEOUT, logPrefix, cl, blockFor, received, false, retryCount); return decision; } /** * {@inheritDoc} * *
This implementation triggers a maximum of configured retry (to the same connection) * *
Otherwise, the exception is rethrown. */ @Override public RetryDecision onUnavailable( @NonNull Request request, @NonNull ConsistencyLevel cl, int required, int alive, int retryCount) { RetryDecision decision = determineRetryDecision(retryCount); LOG.trace(RETRYING_ON_UNAVAILABLE, logPrefix, cl, required, alive, retryCount); return decision; } /** * {@inheritDoc} * *
This implementation triggers a maximum of configured retry (to the same connection) */ @Override public RetryDecision onRequestAborted( @NonNull Request request, @NonNull Throwable error, int retryCount) { RetryDecision decision = determineRetryDecision(retryCount); LOG.trace(RETRYING_ON_ABORTED, logPrefix, retryCount, error); return decision; } /** * {@inheritDoc} * *
This implementation triggers a maximum of configured retry (to the same connection) */ @Override public RetryDecision onErrorResponse( @NonNull Request request, @NonNull CoordinatorException error, int retryCount) { RetryDecision decision = determineRetryDecision(retryCount); LOG.trace(RETRYING_ON_ERROR, logPrefix, retryCount, error); return decision; } @Override public void close() { // nothing to do } }