/* * Copyright 2011-2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://aws.amazon.com/apache2.0 * * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES * OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and * limitations under the License. */ package com.amazonaws.internal; import com.amazonaws.annotation.SdkInternalApi; import com.amazonaws.annotation.SdkTestInternalApi; @SdkInternalApi public class TokenBucket { private static final double MIN_FILL_RATE = 0.5; private static final double MIN_CAPACITY = 1.0; private static final double SMOOTH = 0.8; private static final double BETA = 0.7; private static final double SCALE_CONSTANT = 0.4; private final Clock clock; private Double fillRate; private Double maxCapacity; private double currentCapacity; private Double lastTimestamp; private boolean enabled; private double measuredTxRate; private double lastTxRateBucket; private long requestCount; private double lastMaxRate; private double lastThrottleTime; private double timeWindow; public interface Clock { double time(); } public TokenBucket() { clock = new DefaultClock(); initialize(); } @SdkTestInternalApi TokenBucket(Clock clock) { this.clock = clock; initialize(); } /** * * Acquire tokens from the bucket. If the bucket contains enough capacity * to satisfy the request, this method will return immediately, otherwise * the method will block the calling thread until enough tokens are refilled. *
*
     * _TokenBucketAcquire(amount)
     *   # Client side throttling is not enabled until we see a throttling error.
     *   if not enabled
     *     return
     *
     *   _TokenBucketRefill()
     *   # Next see if we have enough capacity for the requested amount.
     *   if amount <= current_capacity
     *     current_capacity = current_capacity - amount
     *   else
     *     sleep((amount - current_capacity) / fill_rate)
     *     current_capacity = current_capacity - amount
     *   return
     * 
     * * This is equivalent to {@code acquire(amount, false)}. * * @param amount The amount of tokens to acquire. * * @return Whether the amount was successfully acquired. */ public boolean acquire(double amount) { return acquire(amount, false); } /** * * Acquire tokens from the bucket. If the bucket contains enough capacity * to satisfy the request, this method will return immediately. Otherwise, * the behavior depends on the value of {@code fastFail}. If it is {@code * true}, then it will return {@code false} immediately, signaling that * enough capacity could not be acquired. Otherwise if {@code fastFail} is * {@code false}, then it will wait the required amount of time to fill the * bucket with enough tokens to satisfy {@code amount}. *
     * _TokenBucketAcquire(amount)
     *   # Client side throttling is not enabled until we see a throttling error.
     *   if not enabled
     *     return
     *
     *   _TokenBucketRefill()
     *   # Next see if we have enough capacity for the requested amount.
     *   if amount <= current_capacity
     *     current_capacity = current_capacity - amount
     *   else
     *     sleep((amount - current_capacity) / fill_rate)
     *     current_capacity = current_capacity - amount
     *   return
     * 
     *
     * @param amount The amount of tokens to acquire.
     * @param fastFail Whether this method should return immediately instead
     *                 of waiting if {@code amount} exceeds the current
     *                 capacity.
     *
     * @return Whether the amount was successfully acquired.
     */
    public boolean acquire(double amount, boolean fastFail) {
        double waitTime = 0.0;
        synchronized (this) {
            // If rate limiting is not enabled, we technically have an uncapped limit
            if (!enabled) {
                return true;
            }
            refill();
            double originalCapacity = currentCapacity;
            double unfulfilled = tryAcquireCapacity(amount);
            if (unfulfilled > 0.0 && fastFail) {
                currentCapacity = originalCapacity;
                return false;
            }
            // If all the tokens couldn't be acquired immediately, wait enough
            // time to fill the remainder.
            if (unfulfilled > 0) {
                waitTime = unfulfilled / fillRate;
            }
        }
        if (waitTime > 0) {
            sleep(waitTime);
        }
        return true;
    }
    /**
     *
     * @param amount The amount of capacity to acquire from the bucket.
     * @return The unfulfilled amount.
     */
    double tryAcquireCapacity(double amount) {
        double result;
        if (amount <= currentCapacity) {
            result = 0;
        } else {
            result = amount - currentCapacity;
        }
        currentCapacity = currentCapacity - amount;
        return result;
    }
    private void initialize() {
        fillRate = null;
        maxCapacity = null;
        currentCapacity = 0.0;
        lastTimestamp = null;
        enabled = false;
        measuredTxRate = 0.0;
        lastTxRateBucket = Math.floor(clock.time());
        requestCount = 0;
        lastMaxRate = 0.0;
        lastThrottleTime = clock.time();
    }
    /**
     * 
     * _TokenBucketRefill()
     *   timestamp = time()
     *   if last_timestamp is unset
     *     last_timestamp = timestamp
     *     return
     *   fill_amount = (timestamp - last_timestamp) * fill_rate
     *   current_capacity = min(max_capacity, current_capacity + fill_amount)
     *   last_timestamp = timestamp
     * 
     */
    // Package private for testing
    synchronized void refill() {
        double timestamp = clock.time();
        if (lastTimestamp == null) {
            lastTimestamp = timestamp;
            return;
        }
        double fillAmount = (timestamp - lastTimestamp) * fillRate;
        currentCapacity = Math.min(maxCapacity, currentCapacity + fillAmount);
        lastTimestamp = timestamp;
    }
    /**
     * 
     * _TokenBucketUpdateRate(new_rps)
     *   # Refill based on our current rate before we update to the new fill rate.
     *   _TokenBucketRefill()
     *   fill_rate = max(new_rps, MIN_FILL_RATE)
     *   max_capacity = max(new_rps, MIN_CAPACITY)
     *   # When we scale down we can't have a current capacity that exceeds our
     *   # max_capacity.
     *   current_capacity = min(current_capacity, max_capacity)
     * 
     */
    private synchronized void updateRate(double newRps) {
        refill();
        fillRate = Math.max(newRps, MIN_FILL_RATE);
        maxCapacity = Math.max(newRps, MIN_CAPACITY);
        currentCapacity = Math.min(currentCapacity, maxCapacity);
    }
    /**
     * 
     * t = time()
     * time_bucket = floor(t * 2) / 2
     * request_count = request_count + 1
     * if time_bucket > last_tx_rate_bucket
     *   current_rate = request_count / (time_bucket - last_tx_rate_bucket)
     *   measured_tx_rate = (current_rate * SMOOTH) + (measured_tx_rate * (1 - SMOOTH))
     *   request_count = 0
     *   last_tx_rate_bucket = time_bucket
     * 
     */
    private synchronized void updateMeasuredRate() {
        double t = clock.time();
        double timeBucket = Math.floor(t * 2) / 2;
        requestCount = requestCount + 1;
        if (timeBucket > lastTxRateBucket) {
            double currentRate = requestCount / (timeBucket - lastTxRateBucket);
            measuredTxRate = (currentRate * SMOOTH) + (measuredTxRate * (1 - SMOOTH));
            requestCount = 0;
            lastTxRateBucket = timeBucket;
        }
    }
    synchronized void enable() {
        enabled = true;
    }
    /**
     * 
     * _UpdateClientSendingRate(response)
     *   _UpdateMeasuredRate()
     *
     *   if IsThrottlingError(response)
     *     if not enabled
     *       rate_to_use = measured_tx_rate
     *     else
     *       rate_to_use = min(measured_tx_rate, fill_rate)
     *
     *     # The fill_rate is from the token bucket.
     *     last_max_rate = rate_to_use
     *     _CalculateTimeWindow()
     *     last_throttle_time = time()
     *     calculated_rate = _CUBICThrottle(rate_to_use)
     *     TokenBucketEnable()
     *   else
     *     _CalculateTimeWindow()
     *     calculated_rate = _CUBICSuccess(time())
     *
     *   new_rate = min(calculated_rate, 2 * measured_tx_rate)
     *   _TokenBucketUpdateRate(new_rate)
     * 
     */
    public synchronized void updateClientSendingRate(boolean throttlingResponse) {
        updateMeasuredRate();
        double calculatedRate;
        if (throttlingResponse) {
            double rateToUse;
            if (!enabled) {
                rateToUse = measuredTxRate;
            } else {
                rateToUse = Math.min(measuredTxRate, fillRate);
            }
            lastMaxRate = rateToUse;
            calculateTimeWindow();
            lastThrottleTime = clock.time();
            calculatedRate = cubicThrottle(rateToUse);
            enable();
        } else {
            calculateTimeWindow();
            calculatedRate = cubicSuccess(clock.time());
        }
        double newRate = Math.min(calculatedRate, 2 * measuredTxRate);
        updateRate(newRate);
    }
    /**
     * 
     * _CalculateTimeWindow()
     *   # This is broken out into a separate calculation because it only
     *   # gets updated when last_max_rate change so it can be cached.
     *   _time_window = ((last_max_rate * (1 - BETA)) / SCALE_CONSTANT) ^ (1 / 3)
     * 
     */
    // Package private for testing
    synchronized void calculateTimeWindow() {
        timeWindow = Math.pow((lastMaxRate * (1 - BETA)) / SCALE_CONSTANT, 1.0 / 3);
    }
    /**
     * Sleep for a given amount of seconds.
     * @param seconds The amount of time to sleep in seconds.
     */
    // Package private for testing
    void sleep(double seconds) {
        long millisToSleep = (long) (seconds * 1000);
        try {
            Thread.sleep(millisToSleep);
        } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(ie);
        }
    }
    /**
     * 
     * _CUBICThrottle(rate_to_use)
     *   calculated_rate = rate_to_use * BETA
     *   return calculated_rate
     * 
     */
    // Package private for testing
    double cubicThrottle(double rateToUse) {
        double calculatedRate = rateToUse * BETA;
        return calculatedRate;
    }
    /**
     * 
     * _CUBICSuccess(timestamp)
     *   dt = timestamp - last_throttle_time
     *   calculated_rate = (SCALE_CONSTANT * ((dt - _time_window) ^ 3)) + last_max_rate
     *   return calculated_rate
     * 
     */
    // Package private for testing
    synchronized double cubicSuccess(double timestamp) {
        double dt = timestamp - lastThrottleTime;
        double calculatedRate = SCALE_CONSTANT * Math.pow(dt - timeWindow, 3) + lastMaxRate;
        return calculatedRate;
    }
    static class DefaultClock implements Clock {
        @Override
        public double time() {
            long timeMillis = System.nanoTime();
            return timeMillis / 1000000000.;
        }
    }
    @SdkTestInternalApi
    synchronized void setLastMaxRate(double lastMaxRate) {
        this.lastMaxRate = lastMaxRate;
    }
    @SdkTestInternalApi
    synchronized void setLastThrottleTime(double lastThrottleTime) {
        this.lastThrottleTime = lastThrottleTime;
    }
    @SdkTestInternalApi
    synchronized double getMeasuredTxRate() {
        return measuredTxRate;
    }
    @SdkTestInternalApi
    synchronized double getFillRate() {
        return fillRate;
    }
    @SdkTestInternalApi
    synchronized void setCurrentCapacity(double currentCapacity) {
        this.currentCapacity = currentCapacity;
    }
    @SdkTestInternalApi
    synchronized double getCurrentCapacity() {
        return currentCapacity;
    }
    @SdkTestInternalApi
    synchronized void setFillRate(double fillRate) {
        this.fillRate = fillRate;
    }
}