) new ChangeMessageVisibilityBatchTask();
else
// this should never happen
throw new IllegalArgumentException("Unsupported request type " + request.getClass().getName());
}
/**
* Flushes all outstanding outbound requests ({@code SendMessage}, {@code DeleteMessage},
* {@code ChangeMessageVisibility}) in this buffer.
*
* The call returns successfully when all outstanding outbound requests submitted before the
* call are completed (i.e. processed by SQS).
*/
public void flush() {
try {
synchronized (sendMessageLock) {
inflightSendMessageBatches.acquire(config.getMaxInflightOutboundBatches());
inflightSendMessageBatches.release(config.getMaxInflightOutboundBatches());
}
synchronized (deleteMessageLock) {
inflightDeleteMessageBatches.acquire(config.getMaxInflightOutboundBatches());
inflightDeleteMessageBatches.release(config.getMaxInflightOutboundBatches());
}
synchronized (changeMessageVisibilityLock) {
inflightChangeMessageVisibilityBatches.acquire(config.getMaxInflightOutboundBatches());
inflightChangeMessageVisibilityBatches.release(config.getMaxInflightOutboundBatches());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* Submits an outbound request for delivery to the queue associated with this buffer.
*
*
* @param operationLock
* the lock synchronizing calls for the call type ( {@code sendMessage},
* {@code deleteMessage}, {@code changeMessageVisibility} )
* @param openOutboundBatchTask
* the open batch task for this call type
* @param request
* the request to submit
* @param inflightOperationBatches
* the permits controlling the batches for this type of request
* @return never null
* @throws AmazonClientException
* (see the various outbound calls for details)
*/
@SuppressWarnings("unchecked")
, R extends AmazonWebServiceRequest, Result> QueueBufferFuture submitOutboundRequest(Object operationLock,
OBT[] openOutboundBatchTask,
R request,
final Semaphore inflightOperationBatches,
QueueBufferCallback callback) {
/*
* Callers add requests to a single batch task (openOutboundBatchTask) until it is full or
* maxBatchOpenMs elapses. The total number of batch task in flight is controlled by the
* inflightOperationBatch semaphore capped at maxInflightOutboundBatches.
*/
QueueBufferFuture theFuture = null;
try {
synchronized (operationLock) {
if (openOutboundBatchTask[0] == null
|| ((theFuture = openOutboundBatchTask[0].addRequest(request, callback))) == null) {
OBT obt = (OBT) newOutboundBatchTask(request);
inflightOperationBatches.acquire();
openOutboundBatchTask[0] = obt;
// Register a listener for the event signaling that the
// batch task has completed (successfully or not).
openOutboundBatchTask[0].setOnCompleted(new Listener>() {
@Override
public void invoke(OutboundBatchTask task) {
inflightOperationBatches.release();
}
});
if (log.isTraceEnabled()) {
log.trace("Queue " + qUrl + " created new batch for " + request.getClass().toString() + " "
+ inflightOperationBatches.availablePermits() + " free slots remain");
}
theFuture = openOutboundBatchTask[0].addRequest(request, callback);
executor.execute(openOutboundBatchTask[0]);
if (null == theFuture) {
// this can happen only if the request itself is flawed,
// so that it can't be added to any batch, even a brand
// new one
throw new AmazonClientException("Failed to schedule request " + request + " for execution");
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
AmazonClientException toThrow = new AmazonClientException("Interrupted while waiting for lock.");
toThrow.initCause(e);
throw toThrow;
}
return theFuture;
}
/**
* Task to send a batch of outbound requests to SQS.
*
* The batch task is constructed open and accepts requests until full, or until
* {@code maxBatchOpenMs} elapses. At that point, the batch closes and the collected requests
* are assembled into a single batch request to SQS. Specialized for each type of outbound
* request.
*
* Instances of this class (and subclasses) are thread-safe.
*
* @param
* the type of the SQS request to batch
* @param
* the type of result he futures issued by this task will return
*/
private abstract class OutboundBatchTask implements Runnable {
protected final List requests;
protected final ArrayList> futures;
private boolean closed;
private volatile Listener> onCompleted;
public OutboundBatchTask() {
this.requests = new ArrayList(config.getMaxBatchSize());
this.futures = new ArrayList>(config.getMaxBatchSize());
}
public void setOnCompleted(Listener> value) {
onCompleted = value;
}
/**
* Adds a request to the batch if it is still open and has capacity.
*
* @return the future that can be used to get the results of the execution, or null if the
* addition failed.
*/
public synchronized QueueBufferFuture addRequest(R request, QueueBufferCallback callback) {
if (closed) {
return null;
}
QueueBufferFuture theFuture = addIfAllowed(request, callback);
// if the addition did not work, or this addition made us full,
// we can close the request.
if ((null == theFuture) || isFull()) {
closed = true;
notify();
}
return theFuture;
}
/**
* Adds the request to the batch if capacity allows it. Called by {@code addRequest} with a
* lock on {@code this} held.
*
* @param request
* @return the future that will be signaled when the request is completed and can be used to
* retrieve the result. Can be null if the addition could not be done
*/
private QueueBufferFuture addIfAllowed(R request, QueueBufferCallback callback) {
if (isOkToAdd(request)) {
requests.add(request);
QueueBufferFuture theFuture = new QueueBufferFuture(callback);
futures.add(theFuture);
onRequestAdded(request);
return theFuture;
} else {
return null;
}
}
/**
* Checks whether it's okay to add the request to this buffer. Called by
* {@code addIfAllowed} with a lock on {@code this} held.
*
* @param request
* the request to add
* @return true if the request is okay to add, false otherwise
*/
protected boolean isOkToAdd(R request) {
return requests.size() < config.getMaxBatchSize();
}
/**
* A hook to be run when a request is successfully added to this buffer. Called by
* {@code addIfAllowed} with a lock on {@code this} held.
*
* @param request
* the request that was added
*/
protected void onRequestAdded(R request) {
// to be overridden by subclasses
}
/**
* Checks whether the buffer is now full. Called by {@code addIfAllowed} with a lock on
* {@code this} held.
*
* @return whether the buffer is filled to capacity
*/
protected boolean isFull() {
return requests.size() >= config.getMaxBatchSize();
}
/**
* Processes the batch once closed. Is NOT called with a lock on {@code this}.
* However, it's passed a local copy of both the {@code requests} and {@code futures} lists
* made while holding the lock.
*/
protected abstract void process(List requests, List> futures);
@Override
public final void run() {
try {
long deadlineMs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS)
+ config.getMaxBatchOpenMs() + 1;
long t = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
List requests;
List> futures;
synchronized (this) {
while (!closed && (t < deadlineMs)) {
t = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
// zero means "wait forever", can't have that.
long toWait = Math.max(1, deadlineMs - t);
wait(toWait);
}
closed = true;
requests = new ArrayList(this.requests);
futures = new ArrayList>(this.futures);
}
process(requests, futures);
} catch (InterruptedException e) {
failAll(e);
} catch (AmazonClientException e) {
failAll(e);
} catch (RuntimeException e) {
failAll(e);
throw e;
} catch (Error e) {
failAll(new AmazonClientException("Error encountered", e));
throw e;
} finally {
// make a copy of the listener since it (theoretically) can be
// modified from the outside.
Listener> listener = onCompleted;
if (listener != null) {
listener.invoke(this);
}
}
}
private void failAll(Exception e) {
for (QueueBufferFuture f : futures) {
f.setFailure(e);
}
}
}
private class SendMessageBatchTask extends OutboundBatchTask {
int batchSizeBytes = 0;
@Override
protected boolean isOkToAdd(SendMessageRequest request) {
return (requests.size() < config.getMaxBatchSize())
&& ((request.getMessageBody().getBytes().length + batchSizeBytes) <= config.getMaxBatchSizeBytes());
}
@Override
protected void onRequestAdded(SendMessageRequest request) {
batchSizeBytes += request.getMessageBody().getBytes().length;
}
@Override
protected boolean isFull() {
return (requests.size() >= config.getMaxBatchSize()) || (batchSizeBytes >= config.getMaxBatchSizeBytes());
}
@Override
protected void process(List requests,
List> futures) {
if (requests.isEmpty()) {
return;
}
SendMessageBatchRequest batchRequest = new SendMessageBatchRequest().withQueueUrl(qUrl);
ResultConverter.appendUserAgent(batchRequest, AmazonSQSBufferedAsyncClient.USER_AGENT);
List entries = new ArrayList(requests.size());
for (int i = 0, n = requests.size(); i < n; i++) {
entries.add(RequestCopyUtils.createSendMessageBatchRequestEntryFrom(Integer.toString(i),
requests.get(i)));
}
batchRequest.setEntries(entries);
SendMessageBatchResult batchResult = sqsClient.sendMessageBatch(batchRequest);
for (SendMessageBatchResultEntry entry : batchResult.getSuccessful()) {
int index = Integer.parseInt(entry.getId());
futures.get(index).setSuccess(ResultConverter.convert(entry));
}
for (BatchResultErrorEntry errorEntry : batchResult.getFailed()) {
int index = Integer.parseInt(errorEntry.getId());
if (errorEntry.isSenderFault()) {
futures.get(index).setFailure(ResultConverter.convert(errorEntry));
} else {
// retry.
try {
// this will retry internally up to 3 times.
futures.get(index).setSuccess(sqsClient.sendMessage(requests.get(index)));
} catch (AmazonClientException ace) {
futures.get(index).setFailure(ace);
}
}
}
}
}
private class DeleteMessageBatchTask extends OutboundBatchTask {
@Override
protected void process(List requests,
List> futures) {
if (requests.isEmpty()) {
return;
}
DeleteMessageBatchRequest batchRequest = new DeleteMessageBatchRequest().withQueueUrl(qUrl);
ResultConverter.appendUserAgent(batchRequest, AmazonSQSBufferedAsyncClient.USER_AGENT);
List entries = new ArrayList(
requests.size());
for (int i = 0, n = requests.size(); i < n; i++) {
entries.add(new DeleteMessageBatchRequestEntry().withId(Integer.toString(i)).withReceiptHandle(
requests.get(i).getReceiptHandle()));
}
batchRequest.setEntries(entries);
DeleteMessageBatchResult batchResult = sqsClient.deleteMessageBatch(batchRequest);
for (DeleteMessageBatchResultEntry entry : batchResult.getSuccessful()) {
int index = Integer.parseInt(entry.getId());
futures.get(index).setSuccess(new DeleteMessageResult());
}
for (BatchResultErrorEntry errorEntry : batchResult.getFailed()) {
int index = Integer.parseInt(errorEntry.getId());
if (errorEntry.isSenderFault()) {
futures.get(index).setFailure(ResultConverter.convert(errorEntry));
} else {
try {
// retry.
futures.get(index).setSuccess(sqsClient.deleteMessage(requests.get(index)));
} catch (AmazonClientException ace) {
futures.get(index).setFailure(ace);
}
}
}
}
}
private class ChangeMessageVisibilityBatchTask extends OutboundBatchTask {
@Override
protected void process(List requests,
List> futures) {
if (requests.isEmpty()) {
return;
}
ChangeMessageVisibilityBatchRequest batchRequest = new ChangeMessageVisibilityBatchRequest()
.withQueueUrl(qUrl);
ResultConverter.appendUserAgent(batchRequest, AmazonSQSBufferedAsyncClient.USER_AGENT);
List entries = new ArrayList(
requests.size());
for (int i = 0, n = requests.size(); i < n; i++) {
entries.add(new ChangeMessageVisibilityBatchRequestEntry().withId(Integer.toString(i))
.withReceiptHandle(requests.get(i).getReceiptHandle())
.withVisibilityTimeout(requests.get(i).getVisibilityTimeout()));
}
batchRequest.setEntries(entries);
ChangeMessageVisibilityBatchResult batchResult = sqsClient.changeMessageVisibilityBatch(batchRequest);
for (ChangeMessageVisibilityBatchResultEntry entry : batchResult.getSuccessful()) {
int index = Integer.parseInt(entry.getId());
futures.get(index).setSuccess(new ChangeMessageVisibilityResult());
}
for (BatchResultErrorEntry errorEntry : batchResult.getFailed()) {
int index = Integer.parseInt(errorEntry.getId());
if (errorEntry.isSenderFault()) {
futures.get(index).setFailure(ResultConverter.convert(errorEntry));
} else {
try {
// retry.
futures.get(index).setSuccess(sqsClient.changeMessageVisibility(requests.get(index)));
} catch (AmazonClientException ace) {
futures.get(index).setFailure(ace);
}
}
}
}
}
}