/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
using Amazon.Util;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace Amazon.Runtime.Internal
{
///
/// This class is responsible for managing adaptive retry mode client rate limiting.
///
public class TokenBucket
{
private const int MaxAttempts = 15;
private readonly object _bucketLock = new object();
private readonly double _minFillRate;
private readonly double _minCapacity;
private readonly double _beta;
private readonly double _scaleConstant;
private readonly double _smooth;
private static readonly DateTime _epoch = new DateTime(1970, 1, 1);
///
/// The rate at which token are replenished.
///
protected double? FillRate { get; set; }
///
/// The maximum capacity allowed in the token
///
protected double? MaxCapacity { get; set; }
///
/// The current capacity of the token
///
protected double CurrentCapacity { get; set; }
///
/// The last time the token bucket was refilled.
///
protected double? LastTimestamp { get; set; }
///
/// The smoothed rate which tokens are being retrieved.
///
protected double MeasuredTxRate { get; set; }
///
/// The last half second time bucket used.
///
protected double LastTxRateBucket { get; set; }
///
/// The number of requests seen within the current time
///
protected long RequestCount { get; set; }
///
/// The maximum rate when the client was last throttled.
///
protected double LastMaxRate { get; set; }
///
/// The last time when the client was throttled.
///
protected double LastThrottleTime { get; set; }
///
/// The cached time window calculation
///
protected double TimeWindow { get; set; }
///
/// Boolean indicating if the token bucket is enabled.
///
protected bool Enabled { get; set; }
public TokenBucket()
: this(minFillRate: 0.5, minCapacity: 1.0, beta: 0.7, scaleConstant: 0.4, smooth: 0.8)
{
}
public TokenBucket(double minFillRate, double minCapacity, double beta, double scaleConstant, double smooth)
{
_minFillRate = minFillRate;
_minCapacity = minCapacity;
_beta = beta;
_scaleConstant = scaleConstant;
_smooth = smooth;
LastTxRateBucket = Math.Floor(GetTimestamp());
LastThrottleTime = GetTimestamp();
}
///
/// This method attempts to acquire capacity from the client's token
///
/// The amount of capacity to obtain from the token bucket
/// Indicates that the client should or shouldn't sleep and
/// try again if capacity cannot be obtained.
///
public bool TryAcquireToken(double amount, bool failFast)
{
var result = SetupAcquireToken(amount);
if (result != null)
{
return result.Value;
}
//We should never get to 15 attempts as the operation is only
//waiting for any good capacity. If we still cannot obtain
//capacity return false indicating that capacity couldn't be
//obtained.
for (int attempt = 0; attempt < MaxAttempts; attempt++)
{
var delay = ObtainCapacity(amount);
if (delay == 0)
{
break;
}
//If the client has asked us to fail quickly if we cannot get a send token
//return indicating a token couldn't be obtained.
if (failFast || attempt + 1 == MaxAttempts)
{
return false;
}
WaitForToken(delay);
}
return true;
}
#if AWS_ASYNC_API
///
/// This method attempts to acquire capacity from the client's token
///
/// The amount of capacity to obtain from the token bucket
/// Indicates that the client should or shouldn't sleep and
/// try again if capacity cannot be obtained.
///
/// Token which can be used to cancel the task.
public async System.Threading.Tasks.Task TryAcquireTokenAsync(double amount, bool failFast, CancellationToken cancellationToken)
{
var result = SetupAcquireToken(amount);
if(result != null)
{
return result.Value;
}
//We should never get to 15 attempts as the operation is only
//waiting for any good capacity. If we still cannot obtain
//capacity return false indicating that capacity couldn't be
//obtained.
for (int attempt = 0; attempt < MaxAttempts; attempt++)
{
var delay = ObtainCapacity(amount);
if(delay == 0)
{
break;
}
//If the client has asked us to fail quickly if we cannot get a send token
//return indicating a token couldn't be obtained.
if (failFast || attempt + 1 == MaxAttempts)
{
return false;
}
await WaitForTokenAsync(delay, cancellationToken).ConfigureAwait(false);
}
return true;
}
#endif
private bool? SetupAcquireToken(double amount)
{
if (amount <= 0)
{
return false;
}
lock (_bucketLock)
{
if (!Enabled)
{
return true;
}
TokenBucketRefill();
}
return null;
}
private int ObtainCapacity(double amount)
{
//Next see if we have capacity for the requested amount.
double currentCapacity;
double fillRate;
lock (_bucketLock)
{
if (amount <= CurrentCapacity)
{
CurrentCapacity -= amount;
return 0;
}
currentCapacity = CurrentCapacity;
fillRate = FillRate.Value;
}
return CalculateWait(amount, currentCapacity, fillRate);
}
///
/// Updates the sending rate within the client's token bucket
///
/// Indicates if the request resulted in a throttling error.
public void UpdateClientSendingRate(bool isThrottlingError)
{
lock (_bucketLock)
{
UpdateMeasuredRate();
double calculatedRate;
if (isThrottlingError)
{
double rateToUse;
if (!Enabled)
{
rateToUse = MeasuredTxRate;
}
else
{
rateToUse = Math.Min(MeasuredTxRate, FillRate.Value);
}
//The fill_rate is from the token
LastMaxRate = rateToUse;
CalculateTimeWindow();
LastThrottleTime = GetTimestamp();
calculatedRate = CUBICThrottle(rateToUse);
Enabled = true;
}
else
{
CalculateTimeWindow();
calculatedRate = CUBICSuccess(GetTimestamp());
}
var newRate = Math.Min(calculatedRate, 2.0 * MeasuredTxRate);
TokenBucketUpdateRate(newRate);
}
}
protected virtual void TokenBucketRefill()
{
double timestamp = GetTimestamp();
if (LastTimestamp == null)
{
LastTimestamp = timestamp;
return;
}
double fillAmount = (timestamp - LastTimestamp.Value) * FillRate.Value;
CurrentCapacity = Math.Min(MaxCapacity.Value, CurrentCapacity + fillAmount);
LastTimestamp = timestamp;
}
protected virtual void TokenBucketUpdateRate(double newRps)
{
//Refill based on our current rate before we update to the new fill rate.
TokenBucketRefill();
FillRate = Math.Max(newRps, _minFillRate);
MaxCapacity = Math.Max(newRps, _minCapacity);
//When we scale down we can't have a current capacity that exceeds our max_capacity.
CurrentCapacity = Math.Min(CurrentCapacity, MaxCapacity.Value);
}
protected virtual void UpdateMeasuredRate()
{
var timestamp = GetTimestamp();
var time_bucket = Math.Floor(timestamp * 2) / 2;
RequestCount++;
if (time_bucket > LastTxRateBucket)
{
var current_rate = RequestCount / (time_bucket - LastTxRateBucket);
MeasuredTxRate = (current_rate * _smooth) + (MeasuredTxRate * (1 - _smooth));
RequestCount = 0;
LastTxRateBucket = time_bucket;
}
}
protected virtual void CalculateTimeWindow()
{
//This is broken out into a separate calculation because it only
//gets updated when LastMaxRate changes so it can be cached.
TimeWindow = Math.Pow(((LastMaxRate * (1.0 - _beta)) / _scaleConstant), (1.0 / 3.0));
}
///
/// Calculates the rate
///
///
/// Returns the calculated rate for a successful call
protected virtual double CUBICSuccess(double timestamp)
{
timestamp -= LastThrottleTime;
return (_scaleConstant * Math.Pow(timestamp - TimeWindow, 3)) + LastMaxRate;
}
///
/// Calculates the rate.
///
/// The rate to use in the calculation
/// Returns the calculated rate for a throttled call
protected virtual double CUBICThrottle(double rateToUse)
{
return rateToUse * _beta;
}
protected virtual int CalculateWait(double amount, double currentCapacity, double fillRate)
{
return (int)((amount - currentCapacity) / fillRate * 1000.0);
}
protected virtual void WaitForToken(int delayMs)
{
AWSSDKUtils.Sleep(delayMs);
}
#if AWS_ASYNC_API
protected virtual async System.Threading.Tasks.Task WaitForTokenAsync(int delayMs, CancellationToken cancellationToken)
{
await System.Threading.Tasks.Task.Delay(delayMs, cancellationToken).ConfigureAwait(false);
}
#endif
protected virtual double GetTimestamp()
{
return GetTimeInSeconds();
}
private static double GetTimeInSeconds()
{
return (DateTime.UtcNow - _epoch).TotalSeconds;
}
}
}