/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Net;
using System.Text;
using System.Threading;
using Amazon.Glacier.Model;
using Amazon.Glacier.Transfer.Internal;
using Amazon.Runtime.Internal;
using Amazon.SimpleNotificationService;
using Amazon.SimpleNotificationService.Model;
using Amazon.SQS;
using Amazon.SQS.Model;
using Amazon.SQS.Util;
using Amazon.Util;
using ThirdParty.Json.LitJson;
using System.Threading.Tasks;
namespace Amazon.Glacier.Transfer.Internal
{
internal partial class DownloadFileCommand : IDisposable
{
internal async Task ExecuteAsync()
{
await this.setupTopicAndQueueAsync().ConfigureAwait(false);
try
{
var jobId = await initiateJobAsync().ConfigureAwait(false);
await processQueueAsync(jobId).ConfigureAwait(false);
}
finally
{
this.tearDownTopicAndQueueAsync().Wait();
}
}
async Task processQueueAsync(string jobId)
{
Message message = await readNextMessageAsync().ConfigureAwait(false);
await processMessageAsync(message, jobId).ConfigureAwait(false);
await this.sqsClient.DeleteMessageAsync(new DeleteMessageRequest() { QueueUrl = this.queueUrl, ReceiptHandle = message.ReceiptHandle }).ConfigureAwait(false);
}
async Task processMessageAsync(Message message, string jobId)
{
var messageJobId = getJobIdFromMessage(message);
if (messageJobId == null)
return;
var command = new DownloadJobCommand(this.manager, this.vaultName, jobId, this.filePath, this.options);
await command.ExecuteAsync().ConfigureAwait(false);
}
///
/// Poll messages from the queue. Given the download process takes many hours there is extra
/// long retry logic.
///
/// The next message in the queue;
async Task readNextMessageAsync()
{
int retryAttempts = 0;
var receiveRequest = new ReceiveMessageRequest() { QueueUrl = this.queueUrl, MaxNumberOfMessages = 1 };
while (true)
{
try
{
var receiveResponse = await this.sqsClient.ReceiveMessageAsync(receiveRequest).ConfigureAwait(false);
retryAttempts = 0;
if (receiveResponse.Messages.Count == 0)
{
await Task.Delay((int)(this.options.PollingInterval * 1000 * 60)).ConfigureAwait(false);
continue;
}
return receiveResponse.Messages[0];
}
catch (Exception)
{
retryAttempts++;
if (retryAttempts <= MAX_OPERATION_RETRY)
Task.Delay(1000 * 60).Wait();
else
throw;
}
}
}
async Task initiateJobAsync()
{
var request = new InitiateJobRequest()
{
AccountId = this.options.AccountId,
VaultName = this.vaultName,
JobParameters = new JobParameters()
{
ArchiveId = this.archiveId,
SNSTopic = topicArn,
Type = "archive-retrieval"
}
};
((Amazon.Runtime.Internal.IAmazonWebServiceRequest)request).AddBeforeRequestHandler(new ArchiveTransferManager.UserAgentPostFix("DownloadArchive").UserAgentRequestEventHandlerSync);
var response = await this.manager.GlacierClient.InitiateJobAsync(request).ConfigureAwait(false);
return response.JobId;
}
internal async Task setupTopicAndQueueAsync()
{
var guidStr = Guid.NewGuid().ToString("N");
this.topicArn = (await this.snsClient.CreateTopicAsync(new CreateTopicRequest() { Name = "GlacierDownload-" + guidStr }).ConfigureAwait(false)).TopicArn;
this.queueUrl = (await this.sqsClient.CreateQueueAsync(new CreateQueueRequest() { QueueName = "GlacierDownload-" + guidStr }).ConfigureAwait(false)).QueueUrl;
this.queueArn = (await this.sqsClient.GetQueueAttributesAsync(new GetQueueAttributesRequest() { QueueUrl = this.queueUrl, AttributeNames = new List { SQSConstants.ATTRIBUTE_QUEUE_ARN } }).ConfigureAwait(false)).Attributes[SQSConstants.ATTRIBUTE_QUEUE_ARN];
await this.snsClient.SubscribeAsync(new SubscribeRequest()
{
Endpoint = this.queueArn,
Protocol = "sqs",
TopicArn = this.topicArn
}).ConfigureAwait(false);
var policy = SQS_POLICY.Replace("{QuereArn}", this.queueArn).Replace("{TopicArn}", this.topicArn);
var setQueueAttributesRequest = new SetQueueAttributesRequest()
{
QueueUrl = this.queueUrl
};
setQueueAttributesRequest.Attributes.Add("Policy", policy);
await this.sqsClient.SetQueueAttributesAsync(setQueueAttributesRequest).ConfigureAwait(false);
}
internal async Task tearDownTopicAndQueueAsync()
{
await this.snsClient.DeleteTopicAsync(new DeleteTopicRequest() { TopicArn = this.topicArn }).ConfigureAwait(false);
await this.sqsClient.DeleteQueueAsync(new DeleteQueueRequest() { QueueUrl = this.queueUrl }).ConfigureAwait(false);
}
}
}