package com.aws.ssa.keyspaces.throttler; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.RequestThrottlingException; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.context.DriverContext; import com.datastax.oss.driver.api.core.session.throttling.RequestThrottler; import com.datastax.oss.driver.api.core.session.throttling.Throttled; import com.datastax.oss.driver.internal.core.adminrequest.ThrottledAdminRequestHandler; import com.datastax.oss.driver.internal.core.cql.CqlRequestHandler; import com.datastax.oss.driver.shaded.guava.common.util.concurrent.BurstyRateLimiterFactory; import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter; import edu.umd.cs.findbugs.annotations.NonNull; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; /** * A request throttler that limits the rate of requests per second. The limit is configurable through client driver * configuration. The rate of request is controlled by Guava SmoothBursty Ratelimiter that allows two minutes of capacity to * aggregate if not used. The second limiter is dynamically configured based on the number of connections defined in the pool setting * that limits overall throughput during burst behavior. The limiter will control the number of cql request per second but expects the table * to have proper capacity for the table. * * This is a blocking implementation but it will timeout based on the configured request timeout * * The most well known usecase for this rate limiter is bulk loading data at consistent rates or batch processing. * *

To activate this throttler, modify the {@code advanced.throttler} section in the driver * configuration, for example: * *

     * datastax-java-driver {
     *    advanced.throttler = {
     *           class = com.aws.ssa.keyspaces.throttler.AmazonKeyspacesFixedRateThrottler
     *           max-requests-per-second = 1000
     *           endpoint-type = VPC
     *           register-timeout = 3 seconds
     *     }
     * }
     * 
* max-requests-per-second : the number of CQL request per second max. Average over 2 minutes * endpoint-type : connected through private endpoint or public endpoint * register-timeout : time to wait for permits from limiter. Should be less than request timeout * */ @ThreadSafe public class AmazonKeyspacesFixedRateThrottler implements RequestThrottler { private static final Logger LOG = LoggerFactory.getLogger(AmazonKeyspacesFixedRateThrottler.class); private final String logPrefix; public static int REQUEST_THROTTLER_MAX_REQUESTS_PER_SECOND_DEFAULT = 1000; /*** Amazon Keyspaces supports up to 3000 CQL queries per TCP connection per second, but there is no limit on * the number of connections a driver can establish. The Cassandra drivers establish a * connection pool to Cassandra and load balance queries over that pool of connections. The following setting is * set to 2000 request per second to allow for some overhead ***/ public static int REQUEST_PER_CONNECTION_DEFAULT = 2000; /*** Amazon Keyspaces will store some unused capacity which allow for spikes in traffic to bust above the provisioned rate. * The rate limiter will generate configured maxRequestRate every second, and will be usable for two minutes ***/ public static int REQUEST_BURST_CAPACITY_IN_SECONDS = 120; /*** * Amazon Keyspaces exposes 9 peer IP addresses to drivers, and the default behavior of most drivers is to establish a single connection to each peer IP address. */ public static int PUBLIC_ENDPOINT_DEFAULT_HOST = 9; /*** * Amazon Keyspaces Virtual Private Cloud Endpoint (VPCE) exposes host per availability zone. The default behavior will establish one connection to each peer IP address. * the driver. */ public static int VPC_ENDPOINT_DEFAULT_HOST = 2; /*** * Rate limiter used to meter the CQL Request Per Second up to maxRequestsPerSecond */ private final RateLimiter limiter; /*** * Rate limiter used to meter the CQL Request Per Second up to the total number of numberOfConnections */ private final RateLimiter maxConnectionsLimiter; /*** * Number of hosts available when creating to a new session */ private final Integer numberOfHosts; /*** * Configured Rate of desired throughput */ private long maxRequestsPerSecond; /*** * Configured timeout per operation or time to wait for permits from ratelimiter */ private long registerTimeoutInMs; /*** * Configured number of connections for each host IP available */ private int numberOfConnectionsPerHost; /*** Default constructor that takes in values from the configuration ***/ public AmazonKeyspacesFixedRateThrottler(DriverContext context) { this(context, context.getConfig() .getDefaultProfile() .getLong(DefaultDriverOption.REQUEST_THROTTLER_MAX_REQUESTS_PER_SECOND, REQUEST_THROTTLER_MAX_REQUESTS_PER_SECOND_DEFAULT), context.getConfig() .getDefaultProfile() .getDuration(KeyspacesThrottleOption.KEYSPACES_THROTTLE_TIMEOUT,context.getConfig() .getDefaultProfile() .getDuration(DefaultDriverOption.REQUEST_TIMEOUT)).toMillis(), context.getConfig() .getDefaultProfile() .getInt(KeyspacesThrottleOption.KEYSPACES_THROTTLE_NUMBER_OF_HOSTS, KeyspacesThrottleOption.DEFAULT_NUMBER_OF_HOSTS), context.getConfig() .getDefaultProfile() .getInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE)); } /*** Initialization of the Throttler ***/ public AmazonKeyspacesFixedRateThrottler(DriverContext context, long maxRequestsPerSecond, long registerTimeoutInMs , int numberOfHosts, int numberOfConnectionsPerHost) { this.logPrefix = context.getSessionName(); this.maxRequestsPerSecond = maxRequestsPerSecond; this.registerTimeoutInMs = registerTimeoutInMs; this.numberOfHosts = numberOfHosts; this.numberOfConnectionsPerHost = numberOfConnectionsPerHost; //must be greater than 0 if(this.maxRequestsPerSecond <= 0){ LOG.error( "[{}] Throttler max request per second (advanced.throttler.max-requests-per-second) must be set greater than zero, currently {}", logPrefix, maxRequestsPerSecond); throw new IllegalArgumentException("Throttler maxRequestsPerSecond (advanced.throttler.max-requests-per-second) must be set greater than zero, currently " + maxRequestsPerSecond ); } //Requires some timeout if(this.registerTimeoutInMs <= 0){ LOG.error( "[{}] Throttler register timeout (advanced.throttler.register-timeout) must be set greater than zero, currently {}", logPrefix, registerTimeoutInMs); throw new IllegalArgumentException("Invalid timeout for registerTimeoutInMs (advanced.throttler.register-timeout) must be set greater or equal to zero, currently " + registerTimeoutInMs ); } long requestTimeout = context.getConfig() .getDefaultProfile() .getDuration(DefaultDriverOption.REQUEST_TIMEOUT).toMillis(); if(this.registerTimeoutInMs > requestTimeout){ LOG.error( "[{}] Throttler register timeout (advanced.throttler.register-timeout) must be less than or equal to request-timeout (basic.request.timeout), currently {}", logPrefix, registerTimeoutInMs); throw new IllegalArgumentException("Invalid timeout set for registerTimeoutInMs (advanced.throttler.register-timeout) must be set greater or equal to request timeout (basic.request.timeout), register timeout:" + registerTimeoutInMs + "ms , request timeout:"+ requestTimeout +" ms"); } LOG.info( "[{}] Initializing with maxRequestsPerSecond = {} and registerTimeoutInMs = {}", logPrefix, maxRequestsPerSecond, registerTimeoutInMs); LOG.info( "[{}] Based on Throttler max of {} request per second, the recommended number of connections for number of hosts: {}, currently {}", logPrefix, maxRequestsPerSecond, simpleConnectionRecommendation(maxRequestsPerSecond, numberOfHosts), numberOfConnectionsPerHost); LOG.info( "[{}] Based on Throttler max of {} request per second, the recommended number of connections for the Public Endpoint is: {}, currently {}", logPrefix, maxRequestsPerSecond, simpleConnectionRecommendation(maxRequestsPerSecond, numberOfHosts), numberOfConnectionsPerHost); //The rate for the number of request per second based on the number of connections. Should be greater than maxRequestsPerSecond int maxRequestPerSecondByForConnections = calculateConnectionMaxRequestPerSecond(numberOfHosts, numberOfConnectionsPerHost); if(maxRequestPerSecondByForConnections < maxRequestsPerSecond){ LOG.warn( "[{}] Cannot reach Max Request Per Second of {}. Specified number of hosts {}, and number of connections {} will provide at most {} request per second. Try increasing advanced.connection.pool.local.size or check system.peers table fo the number of hosts ", logPrefix, maxRequestsPerSecond, numberOfHosts, numberOfConnectionsPerHost, maxRequestPerSecondByForConnections); throw new IllegalArgumentException("maxRequestsPerSecond (advanced.throttler.max-requests-per-second) greater than throughput for numberOfConnectionsPerHost (advanced.connection.pool.local.size): " + numberOfConnectionsPerHost ); } //Aggregate permits over two minutes to allow for burst of unused capacity this.limiter = BurstyRateLimiterFactory.create(maxRequestsPerSecond, REQUEST_BURST_CAPACITY_IN_SECONDS); //Fixed number of permits that expire every second. Ceiling with no bursting this.maxConnectionsLimiter = RateLimiter.create(maxRequestPerSecondByForConnections); try(CqlSession internalSession = CqlSession.builder().withConfigLoader(context.getConfigLoader()).build()){ int number = internalSession.execute("select * from system.peers").all().size(); if(number < numberOfHosts){ LOG.warn( "[{}] Number of host in system.peers is less than the number of host configured {}. If using a private VPC endpoint make sure privilages allow for ", logPrefix, maxRequestsPerSecond, numberOfHosts, numberOfConnectionsPerHost, maxRequestPerSecondByForConnections); }else if (number > numberOfHosts){ LOG.warn( "[{}] Number of host in system.peers is more than the number of host configured {}. Increase the number of host configured host to improve throughput", logPrefix, maxRequestsPerSecond, numberOfHosts, numberOfConnectionsPerHost, maxRequestPerSecondByForConnections); } }; } /*** * Calculate the number of request per second based on the number of connections, number of hosts, and 2000 request per second. * @param numberOfHosts Number of hosts in the peers table. Depends on region and end point * @param numberOfConnectionsPerHost Number of connections for each host ip * @return max rate per second based on the number of connections */ public static int calculateConnectionMaxRequestPerSecond(Integer numberOfHosts, int numberOfConnectionsPerHost){ return numberOfConnectionsPerHost * numberOfHosts * REQUEST_PER_CONNECTION_DEFAULT; } /*** * Calculate recommended connections based on the current maxRequestRate specified. Max Connections rate should be greater than configured max request rate * @param maxRequestsPerSecond number of configured request per second * @param numberOfHosts type of endpoint that will be used to identify the number of hosts * @return */ public static double simpleConnectionRecommendation(long maxRequestsPerSecond, Integer numberOfHosts){ return Math.max(1.0, Math.ceil(maxRequestsPerSecond/(double)(numberOfHosts * REQUEST_PER_CONNECTION_DEFAULT))); } /*** * Blocking Rate limiter on register. Will timeout based on the configured timeout. * @param request */ @Override public void register(@NonNull Throttled request) { //Allow admin request if(request instanceof ThrottledAdminRequestHandler) { request.onThrottleReady(false); return; } long startTime = System.currentTimeMillis(); //fail if connections not available which should be higher limit than maxRequestRate //can occur if bursting for a single second where the rate limiter is over 2 minutes. if(maxConnectionsLimiter.tryAcquire(1, registerTimeoutInMs, TimeUnit.MILLISECONDS) == false){ fail(request, String.format("Timeout waiting for connection permits. Increase number of connections. request timeout: %d seconds)", this.maxRequestsPerSecond, this.registerTimeoutInMs)); } long elapsedTime = System.currentTimeMillis() - startTime; //registerTimeoutInMs should account for acquiring from both limiters. Ensure that this value is greater than or equal to 0 long timeoutForRequestPermits = (elapsedTime>=registerTimeoutInMs)?0:registerTimeoutInMs - elapsedTime; if(limiter.tryAcquire(1, timeoutForRequestPermits, TimeUnit.MILLISECONDS)){ request.onThrottleReady(false); }else{ fail(request, String.format("Timeout waiting for rate permits. Increase maxRequestsPerSecond (current maxrequests/s: %d, request timeout: %d seconds)", this.maxRequestsPerSecond, this.registerTimeoutInMs)); } } private static void fail(Throttled request, String message) { request.onThrottleFailure(new RequestThrottlingException(message)); } @Override public void signalSuccess(@NonNull Throttled request) { //nothing to do } @Override public void signalError(@NonNull Throttled request, @NonNull Throwable error) { LOG.warn(logPrefix + " signalError Throttled Request", error); } @Override public void signalTimeout(@NonNull Throttled request) { LOG.warn( "[{}] Timeout Throttled Request signalTimeout", logPrefix); } @Override public void close() { } public long getMaxRequestsPerSecond(){ return this.maxRequestsPerSecond; } public void setMaxRequestsPerSecond(long maxRequestsPerSecond){ this.maxRequestsPerSecond = maxRequestsPerSecond; limiter.setRate(maxRequestsPerSecond); } }