/******************************************************************************* * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * Licensed under the Apache License, Version 2.0 (the "License"). You may not use * this file except in compliance with the License. A copy of the License is located at * * http://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR * CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. * ***************************************************************************** * __ _ _ ___ * ( )( \/\/ )/ __) * /__\ \ / \__ \ * (_)(_) \/\/ (___/ * * AWS SDK for .NET * API Version: 2006-03-01 * */ using System; using System.Collections.Generic; using System.IO; using System.Text; using System.Threading; using Amazon.Runtime.Internal.Util; using Amazon.S3.Model; using Amazon.S3.Util; using Amazon.S3.Encryption; using Amazon.Runtime; namespace Amazon.S3.Transfer.Internal { /// /// The command to manage an upload using the S3 multipart API. /// internal partial class MultipartUploadCommand : BaseCommand { readonly object WAIT_FOR_COMPLETION_LOCK = new object(); readonly object QUEUE_ACCESS_LOCK = new object(); Thread[] _executedThreads; UploadPartInvoker[] _invokers; /// /// Runs the multipart upload. /// public override void Execute() { var initRequest = ConstructInitiateMultipartUploadRequest(); var initResponse = this._s3Client.InitiateMultipartUpload(initRequest); Logger.DebugFormat("Initiated upload: {0}", initResponse.UploadId); try { Logger.DebugFormat("Queue up the UploadPartRequests to be executed"); long filePosition = 0; for (int partNumber = 1; filePosition < this._contentLength; partNumber++) { var uploadRequest = ConstructUploadPartRequest(partNumber, filePosition, initResponse); this._partsToUpload.Enqueue(uploadRequest); filePosition += this._partSize; } this._totalNumberOfParts = this._partsToUpload.Count; Logger.DebugFormat("Starting threads to execute the {0} UploadPartRequests in the queue", this._totalNumberOfParts); startInvokerPool(); Logger.DebugFormat("Waiting for threads to complete. ({0})", initResponse.UploadId); waitTillAllThreadsComplete(); Logger.DebugFormat("Beginning completing multipart. ({0})", initResponse.UploadId); var compRequest = ConstructCompleteMultipartUploadRequest(initResponse); this._s3Client.CompleteMultipartUpload(compRequest); Logger.DebugFormat("Done completing multipart. ({0})", initResponse.UploadId); } catch (Exception e) { Logger.Error(e, "Exception while uploading. ({0})", initResponse.UploadId); shutdown(initResponse.UploadId); throw; } finally { if (this._fileTransporterRequest.InputStream != null && !this._fileTransporterRequest.IsSetFilePath() && this._fileTransporterRequest.AutoCloseStream) { this._fileTransporterRequest.InputStream.Close(); } if (Logger != null) { Logger.Flush(); } } } private void AbortMultipartUpload(string uploadId) { try { this._s3Client.AbortMultipartUpload(new AbortMultipartUploadRequest() { BucketName = this._fileTransporterRequest.BucketName, Key = this._fileTransporterRequest.Key, UploadId = uploadId }); } catch (Exception e) { Logger.InfoFormat("Error attempting to abort multipart for key {0}: {1}", this._fileTransporterRequest.Key, e.Message); } } private void startInvokerPool() { int threadCount = CalculateConcurrentServiceRequests(); this._executedThreads = new Thread[threadCount]; this._invokers = new UploadPartInvoker[threadCount]; for (int i = 0; i < threadCount; i++) { this._invokers[i] = new UploadPartInvoker(this); Thread thread = new Thread(new ThreadStart(this._invokers[i].Execute)); thread.Name = "Uploader " + i; thread.IsBackground = true; this._executedThreads[i] = thread; thread.Start(); } } private void waitTillAllThreadsComplete() { lock (this.WAIT_FOR_COMPLETION_LOCK) { while (this._uploadResponses.Count != this._totalNumberOfParts) { Monitor.Wait(this.WAIT_FOR_COMPLETION_LOCK, 100); // Look for any exceptions from the upload threads. foreach (UploadPartInvoker invoker in this._invokers) { checkForLastException(invoker); } } } } private static void checkForLastException(UploadPartInvoker invoker) { if (invoker.LastException != null) throw invoker.LastException; } private void shutdown(string uploadId) { // To get the AbortMultipartUpload the best chance of actually being able to // abort the upload look through the list of threads multiple times // to make sure they have been successfully aborted. bool anyAlive = true; for (int i = 0; anyAlive && i < 5; i++) { anyAlive = false; foreach (Thread thread in this._executedThreads) { try { if (thread.IsAlive) { thread.Abort(); anyAlive = true; } } catch { } } } AbortMultipartUpload(uploadId); } private void addResponse(UploadPartResponse response) { lock (this.WAIT_FOR_COMPLETION_LOCK) { this._uploadResponses.Add(response); } } /// /// Used as the ThreadStart for the threads doing the upload. /// partial class UploadPartInvoker { IAmazonS3 _s3Client; MultipartUploadCommand _uploader; Exception _lastException; internal UploadPartInvoker(MultipartUploadCommand uploader) { this._uploader = uploader; this._s3Client = this._uploader._s3Client; } internal Exception LastException { get { return this._lastException; } } private UploadPartRequest getNextPartRequest() { lock (this._uploader.QUEUE_ACCESS_LOCK) { if (this._uploader._partsToUpload.Count == 0) { return null; } return this._uploader._partsToUpload.Dequeue(); } } internal void Execute() { UploadPartRequest request = null; while ((request = getNextPartRequest()) != null) { this._lastException = null; try { this._uploader.addResponse(this._s3Client.UploadPart(request)); } catch (ThreadAbortException) { throw; } catch (Exception e) { this._lastException = e; lock (this._uploader.WAIT_FOR_COMPLETION_LOCK) { Monitor.Pulse(this._uploader.WAIT_FOR_COMPLETION_LOCK); } break; } } } } } }