/* * Copyright 2012-2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * http://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ package com.amazonaws.services.sqs.buffered; import java.util.ArrayList; import java.util.Collections; import java.util.List; import com.amazonaws.AmazonClientException; import com.amazonaws.handlers.AsyncHandler; import com.amazonaws.services.sqs.model.DeleteMessageRequest; import com.amazonaws.services.sqs.model.DeleteMessageResult; public class QueueBufferConfig { public static final int MAX_BATCH_SIZE_DEFAULT = 10; /** the maximum number of entries in a batch command */ private int maxBatchSize; /** Updated as the service now supports messages of size max 256 KiB. */ public static final long SERVICE_MAX_BATCH_SIZE_BYTES = 256 * 1024; /** * The maximum time (milliseconds) a send batch is held open for additional outbound requests. * The longer this timeout, the longer messages wait for other messages to be added to the * batch. Increasing this timeout reduces the number of calls made and increases throughput, but * also increases average message latency. */ private long maxBatchOpenMs; /** 200 milliseconds */ public static final long MAX_BATCH_OPEN_MS_DEFAULT = 200; /** * If true, even synchronous calls to delete messages will be made using background * asynchronous batches. The client will return results indicating that the messages were deleted successfully * even if the background calls eventually fail; the actual errors will be logged instead. * This can be beneficial for decreasing message acknowledgement latency at the cost of potential * duplicate messages (which can be produced by SQS itself anyway). */ private boolean deleteInBackground = false; /** * Should we use long polling or not? */ private boolean longPoll; /** true */ private static final boolean LONG_POLL_DEFAULT = true; /** * The maximum number of concurrent batches for each type of outbound request. The greater the * number, the greater the throughput that can be achieved (at the expense of consuming more * threads). */ private int maxInflightOutboundBatches; /** 5 batches */ public static final int MAX_INFLIGHT_OUTBOUND_BATCHES_DEFAULT = 5; /** * The maximum number of concurrent receive message batches. The greater this number, the faster * the queue will be pulling messages from the SQS servers (at the expense of consuming more * threads). */ private int maxInflightReceiveBatches; /** 10 batches */ public static final int MAX_INFLIGHT_RECEIVE_BATCHES_DEFAULT = 10; /** * If more than that number of completed receive batches are waiting in the buffer, the querying * for new messages will stop. The larger this number, the more messages the buffer queue will * pre-fetch and keep in the buffer on the client side, and the faster receive requests will be * satisfied. The visibility timeout of a pre-fetched message starts at the point of pre-fetch, * which means that while the message is in the local buffer it is unavailable for other clients * to process, and when this client retrieves it, part of the visibility timeout may have * already expired. The number of messages prefetched will not exceed maxBatchSize * * maxDoneReceiveBatches. */ private int maxDoneReceiveBatches; /** 10 batches */ public static final int MAX_DONE_RECEIVE_BATCHES_DEFAULT = 10; /** * Maximum permitted size of a SendMessage or SendMessageBatch message, in bytes */ private long maxBatchSizeBytes; /** 256 kilobytes */ public static final long MAX_BATCH_SIZE_BYTES_DEFAULT = SERVICE_MAX_BATCH_SIZE_BYTES; /** * Custom visibility timeout to use when retrieving messages from SQS. If set to a value greater * than zero, this timeout will override the default visibility timeout set on the SQS queue. * Set it to -1 to use the default visiblity timeout of the queue. Visibility timeout of 0 * seconds is not supported. */ private int visibilityTimeoutSeconds; /** -1, which means use the visibility timeout of the queue */ public static final int VISIBILITY_TIMEOUT_SECONDS_DEFAULT = -1; /** * Specifies the amount of time, in seconds, the receive call will block on the server waiting * for messages to arrive if the queue is empty when the receive call is first made. This * setting has no effect if long polling is disabled. */ private int longPollWaitTimeoutSeconds; public static final int LONGPOLL_WAIT_TIMEOUT_SECONDS_DEFAULT = 20; /** * Configures the minimum wait time for incoming receive message requests. Without a non-zero * minimum wait time, threads can easily waste CPU time busy-waiting against empty local buffers. * Avoid setting this to 0 unless you are confident threads will do useful work in-between * each call to receive messages! *
* This will be applied to both requests that explicitly set WaitTimeSeconds and * those that inherit the ReceiveMessageWaitTimeSeconds queue attribute. */ private int minReceiveWaitTimeMs = MIN_RECEIVE_WAIT_TIME_MS_DEFAULT; /** 50 ms, which is in the ballpark for typical latency contacting a remote service like SQS */ public static final int MIN_RECEIVE_WAIT_TIME_MS_DEFAULT = 50; /** * Specifies the message attributes receive calls will request. Only receive message requests that * request the same set of attributes will be satisfied from the receive buffers. *
* The default value is an empty list, so any receive requests that require message attributes
* will not be fulfilled from buffers.
*/
private List
* The default value is an empty list, so any receive requests that require attributes
* will not be fulfilled from buffers.
*/
private List
* The default value is false which indicates flushOnShutdown is disabled.
*
* The default value is an empty list, so any receive requests that require attributes
* will not be fulfilled from buffers.
*/
public List
* The default value is an empty list, so any receive requests that require attributes
* will not be fulfilled from buffers.
*/
public void setReceiveAttributeNames(List
* The default value is an empty list, so any receive requests that require attributes
* will not be fulfilled from buffers.
*/
public QueueBufferConfig withReceiveAttributeNames(List
* The default value is an empty list, so any receive requests that require message attributes
* will not be fulfilled from buffers.
*/
public List
* The default value is an empty list, so any receive requests that require message attributes
* will not be fulfilled from buffers.
*/
public void setReceiveMessageAttributeNames(List
* The default value is an empty list, so any receive requests that require message attributes
* will not be fulfilled from buffers.
*/
public QueueBufferConfig withReceiveMessageAttributeNames(List
* The larger this number, the more messages the queue buffer will pre-fetch and keep in the
* buffer on the client side, and the faster receive requests will be satisfied.
* The visibility timeout of a pre-fetched message starts at the point of pre-fetch, which means
* that while the message is in the local buffer it is unavailable for other clients to process,
* and when this client retrieves it, part of the visibility timeout may have already expired.
* The number of messages prefetched will not exceed 10 * maxDoneReceiveBatches, as there can be
* a maximum of 10 messages per batch.
*/
public int getMaxDoneReceiveBatches() {
return maxDoneReceiveBatches;
}
/**
* If more than that number of completed receive batches are waiting in the buffer, the querying
* for new messages will stop. The larger this number, the more messages the buffer queue will
* pre-fetch and keep in the buffer on the client side, and the faster receive requests will be
* satisfied. The visibility timeout of a pre-fetched message starts at the point of pre-fetch,
* which means that while the message is in the local buffer it is unavailable for other clients
* to process, and when this client retrieves it, part of the visibility timeout may have
* already expired. The number of messages prefetched will not exceed maxBatchSize *
* maxDoneReceiveBatches.
*/
public void setMaxDoneReceiveBatches(int maxDoneReceiveBatches) {
this.maxDoneReceiveBatches = maxDoneReceiveBatches;
}
/**
* If more than that number of completed receive batches are waiting in the buffer, the querying
* for new messages will stop. The larger this number, the more messages the buffer queue will
* pre-fetch and keep in the buffer on the client side, and the faster receive requests will be
* satisfied. The visibility timeout of a pre-fetched message starts at the point of pre-fetch,
* which means that while the message is in the local buffer it is unavailable for other clients
* to process, and when this client retrieves it, part of the visibility timeout may have
* already expired. The number of messages prefetched will not exceed maxBatchSize *
* maxDoneReceiveBatches.
*/
public QueueBufferConfig withMaxDoneReceiveBatches(int maxDoneReceiveBatches) {
setMaxDoneReceiveBatches(maxDoneReceiveBatches);
return this;
}
/**
* Maximum permitted size of a SendMessage or SendMessageBatch message, in bytes. This setting
* is also enforced on the server, and if this client submits a request of a size larger than
* the server can support, the server will reject the request.
*/
public long getMaxBatchSizeBytes() {
return maxBatchSizeBytes;
}
/**
* Maximum permitted size of a SendMessage or SendMessageBatch message, in bytes. This setting
* is also enforced on the server, and if this client submits a request of a size larger than
* the server can support, the server will reject the request.
*
* @throws IllegalArgumentException
* if the size being set is greater than the service allowed size for message body.
*/
public void setMaxBatchSizeBytes(long maxBatchSizeBytes) {
if (maxBatchSizeBytes > SERVICE_MAX_BATCH_SIZE_BYTES) {
throw new IllegalArgumentException(
"Maximum Size of the message cannot be greater than the allowed limit of "
+ SERVICE_MAX_BATCH_SIZE_BYTES + " bytes");
}
this.maxBatchSizeBytes = maxBatchSizeBytes;
}
/**
* Maximum permitted size of a SendMessage or SendMessageBatch message, in bytes. This setting
* is also enforced on the server, and if this client submits a request of a size larger than
* the server can support, the server will reject the request.
*
* @throws IllegalArgumentException
* if the size being set is greater than the service allowed size for message body.
*/
public QueueBufferConfig withMaxBatchSizeBytes(long maxBatchSizeBytes) {
setMaxBatchSizeBytes(maxBatchSizeBytes);
return this;
}
/**
* Custom visibility timeout to use when retrieving messages from SQS. If set to a value greater
* than zero, this timeout will override the default visibility timeout set on the SQS queue.
* Set it to -1 to use the default visiblity timeout of the queue. Visibility timeout of 0
* seconds is not supported.
*/
public int getVisibilityTimeoutSeconds() {
return visibilityTimeoutSeconds;
}
/**
* Custom visibility timeout to use when retrieving messages from SQS. If set to a value greater
* than zero, this timeout will override the default visibility timeout set on the SQS queue.
* Set it to -1 to use the default visiblity timeout of the queue. Visibility timeout of 0
* seconds is not supported.
*/
public void setVisibilityTimeoutSeconds(int visibilityTimeoutSeconds) {
this.visibilityTimeoutSeconds = visibilityTimeoutSeconds;
}
/**
* Custom visibility timeout to use when retrieving messages from SQS. If set to a value greater
* than zero, this timeout will override the default visibility timeout set on the SQS queue.
* Set it to -1 to use the default visiblity timeout of the queue. Visibility timeout of 0
* seconds is not supported.
*/
public QueueBufferConfig withVisibilityTimeoutSeconds(int visibilityTimeoutSeconds) {
setVisibilityTimeoutSeconds(visibilityTimeoutSeconds);
return this;
}
/**
* Specifies the amount of time, in seconds, the receive call will block on the server waiting
* for messages to arrive if the queue is empty when the receive call is first made. This
* setting has no effect if long polling is disabled.
*/
public void setLongPollWaitTimeoutSeconds(int longPollWaitTimeoutSeconds) {
this.longPollWaitTimeoutSeconds = longPollWaitTimeoutSeconds;
}
/**
* Specifies the amount of time, in seconds, the receive call will block on the server waiting
* for messages to arrive if the queue is empty when the receive call is first made. This
* setting has no effect if long polling is disabled.
*/
public int getLongPollWaitTimeoutSeconds() {
return longPollWaitTimeoutSeconds;
}
/**
* Specifies the amount of time, in seconds, the receive call will block on the server waiting
* for messages to arrive if the queue is empty when the receive call is first made. This
* setting has no effect if long polling is disabled.
*/
public QueueBufferConfig withLongPollWaitTimeoutSeconds(int longPollWaitTimeoutSeconds) {
setLongPollWaitTimeoutSeconds(longPollWaitTimeoutSeconds);
return this;
}
/**
* Configures the minimum wait time for incoming receive message requests. Without a non-zero
* minimum wait time, threads can easily waste CPU time busy-waiting against empty local buffers.
* Avoid setting this to 0 unless you are confident threads will do useful work in-between
* each call to receive messages!
*
* This will be applied to both requests that explicitly set WaitTimeSeconds and
* those that inherit the ReceiveMessageWaitTimeSeconds queue attribute.
*/
public int getMinReceiveWaitTimeMs() {
return minReceiveWaitTimeMs;
}
/**
* Configures the minimum wait time for incoming receive message requests. Without a non-zero
* minimum wait time, threads can easily waste CPU time busy-waiting against empty local buffers.
* Avoid setting this to 0 unless you are confident threads will do useful work in-between
* each call to receive messages!
*
* This will be applied to both requests that explicitly set WaitTimeSeconds and
* those that inherit the ReceiveMessageWaitTimeSeconds queue attribute.
*/
public void setMinReceiveWaitTimeMs(int minReceiveWaitTimeMs) {
this.minReceiveWaitTimeMs = minReceiveWaitTimeMs;
}
/**
* Configures the minimum wait time for incoming receive message requests. Without a non-zero
* minimum wait time, threads can easily waste CPU time busy-waiting against empty local buffers.
* Avoid setting this to 0 unless you are confident threads will do useful work in-between
* each call to receive messages!
*
* This will be applied to both requests that explicitly set WaitTimeSeconds and
* those that inherit the ReceiveMessageWaitTimeSeconds queue attribute.
*/
public QueueBufferConfig withMinReceiveWaitTimeMs(int minReceiveWaitTimeMs) {
setMinReceiveWaitTimeMs(minReceiveWaitTimeMs);
return this;
}
/**
* Specifies the maximum number of entries the buffered client will put in a single batch
* request.
*/
public int getMaxBatchSize() {
return maxBatchSize;
}
/**
* Specifies the maximum number of entries the buffered client will put in a single batch
* request.
*/
public void setMaxBatchSize(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
}
/**
* Specifies the maximum number of entries the buffered client will put in a single batch
* request.
*/
public QueueBufferConfig withMaxBatchSize(int maxBatchSize) {
setMaxBatchSize(maxBatchSize);
return this;
}
/**
* Specifies the attributes receive calls will request. Only receive message requests that
* request the same set of attributes will be satisfied from the receive buffers.
*