/** * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. */ #pragma once #include #include #include #include #include #include #include #include #include #include #include #include namespace Aws { namespace Utils { template < typename T > class Array; } namespace Transfer { class TransferHandle; typedef std::function CreateDownloadStreamCallback; static const char CLASS_TAG[] = "TransferManager"; struct DownloadConfiguration { DownloadConfiguration() : versionId("") {} Aws::String versionId; // TBI: controls for in-memory parts vs. resumable file-based parts with state serialization to/from file }; class AWS_TRANSFER_API PartState { public: PartState(); PartState(int partId, uint64_t bestProgressInBytes, uint64_t sizeInBytes, bool lastPart = false); int GetPartId() const { return m_partId; } uint64_t GetBestProgressInBytes() const { return m_bestProgressInBytes; } void SetBestProgressInBytes(uint64_t progressInBytes) { m_bestProgressInBytes = progressInBytes; } uint64_t GetSizeInBytes() const { return m_sizeInBytes; } void SetSizeInBytes(uint64_t sizeInBytes) { m_sizeInBytes = sizeInBytes; } void Reset(); void OnDataTransferred(uint64_t amount, const std::shared_ptr &transferHandle); void SetETag(const Aws::String& eTag) { m_eTag = eTag; } const Aws::String& GetETag() const { return m_eTag; } Aws::IOStream *GetDownloadPartStream() const { return m_downloadPartStream; } void SetDownloadPartStream(Aws::IOStream *downloadPartStream) { m_downloadPartStream = downloadPartStream; } unsigned char* GetDownloadBuffer() const { return m_downloadBuffer; } void SetDownloadBuffer(unsigned char* downloadBuffer) { m_downloadBuffer = downloadBuffer; } void SetRangeBegin(uint64_t rangeBegin) { m_rangeBegin = rangeBegin; } uint64_t GetRangeBegin() const { return m_rangeBegin; } bool IsLastPart() { return m_lastPart; } void SetLastPart() { m_lastPart = true; } private: int m_partId; Aws::String m_eTag; uint64_t m_currentProgressInBytes; uint64_t m_bestProgressInBytes; uint64_t m_sizeInBytes; uint64_t m_rangeBegin; std::atomic m_downloadPartStream; std::atomic m_downloadBuffer; bool m_lastPart; }; using PartPointer = std::shared_ptr< PartState >; using PartStateMap = Aws::Map< int, PartPointer >; enum class TransferStatus { //this value is only used for directory synchronization EXACT_OBJECT_ALREADY_EXISTS, //Operation is still queued and has not begun processing NOT_STARTED, //Operation is now running IN_PROGRESS, //Operation was canceled. A Canceled operation can still be retried CANCELED, //Operation failed, A failed operation can still be retried. FAILED, //Operation was successful COMPLETED, //Operation either failed or was canceled and a user deleted the multipart upload from S3. ABORTED }; enum class TransferDirection { UPLOAD, DOWNLOAD }; /** * This is the interface for interacting with an in-process transfer. All operations from TransferManager return an instance of this class. * In addition to the status of the transfer and details about what operation is being performed, this class also has the Cancel() operation which is * used to cancel a transfer, and WaitUntilCompleted() which will cause the calling thread to block until the transfer is finished. * * In the context that by the time you are using this class, it is thread safe. */ class AWS_TRANSFER_API TransferHandle { public: /** * Initialize with required information for an UPLOAD */ TransferHandle(const Aws::String& bucketName, const Aws::String& keyName, uint64_t totalSize, const Aws::String& targetFilePath = ""); /** * Initialize with required information for a DOWNLOAD */ TransferHandle(const Aws::String& bucketName, const Aws::String& keyName, const Aws::String& targetFilePath = ""); /** * Alternate DOWNLOAD constructor */ TransferHandle(const Aws::String& bucketName, const Aws::String& keyName, CreateDownloadStreamCallback createDownloadStreamFn, const Aws::String& targetFilePath = ""); /** * Alternate DOWNLOAD constructor */ TransferHandle(const Aws::String& bucketName, const Aws::String& keyName, const uint64_t fileOffset, const uint64_t downloadBytes, CreateDownloadStreamCallback createDownloadStreamFn, const Aws::String& targetFilePath = ""); ~TransferHandle(); /** * Whether or not this transfer is being performed using parallel parts via a multi-part s3 api. */ inline bool IsMultipart() const { return m_isMultipart.load(); } /** * Whether or not this transfer is being performed using parallel parts via a multi-part s3 api. */ inline void SetIsMultipart(bool value) { m_isMultipart.store(value); } /** * If this is a multi-part transfer, this is the ID of it. e.g. UploadId for UploadPart */ inline const Aws::String GetMultiPartId() const { std::lock_guard locker(m_getterSetterLock); return m_multipartId; } /** * If this is a multi-part transfer, this is the ID of it. e.g. UploadId for UploadPart */ inline void SetMultipartId(const Aws::String& value) { std::lock_guard locker(m_getterSetterLock); m_multipartId = value; } /** * Returns a copy of the completed parts, in the structure of . Used for all transfers. */ PartStateMap GetCompletedParts() const; /** * Set a pending part to completed along with its etag. Used fore all transfers. */ void ChangePartToCompleted(const PartPointer& partState, const Aws::String &eTag); /** * Returns a copy of the pending parts. Used for all transfers. */ PartStateMap GetPendingParts() const; /** * Returns true or false if there are currently any pending parts. */ bool HasPendingParts() const; /** * Set a part to pending. Used for all transfers. */ void AddPendingPart(const PartPointer& partState); /** * Returns a copy of the queued parts. Used for all transfers. */ PartStateMap GetQueuedParts() const; /** * Returns true or false if there are currently any queued parts. */ bool HasQueuedParts() const; /** * Set a part to queued. Used for all transfers. */ void AddQueuedPart(const PartPointer& partState); /** * Returns a copy of the failed parts. Used for all transfers. */ PartStateMap GetFailedParts() const; /** * Returns true or false if there are currently any failed parts. */ bool HasFailedParts() const; /** * Set a pending part to failed. Used for all transfers. */ void ChangePartToFailed(const PartPointer& partState); /** * Get all parts using transactions, mostly for internal purposes. */ void GetAllPartsTransactional(PartStateMap& queuedParts, PartStateMap& pendingParts, PartStateMap& failedParts, PartStateMap& completedParts); /** * Returns true or false if any parts have been created for this transfer */ bool HasParts() const; /** * Returns false if Cancel has been called. Largely for internal use. */ bool ShouldContinue() const; /** * Cancel the transfer. This will happen asynchronously, so if you need to wait for it to be canceled, either handle the callbacks, * or call WaitUntilFinished. */ void Cancel(); /** * Reset the cancellation status for a retry. This will be done automatically by Transfer Manager. */ void Restart(); /** * Total bytes transferred successfully on this transfer operation. * We implement transfer progress with two invariants: * (1) Never lock; given a callback that can happen hundreds of times a second or more on a solid connection, it isn't acceptable to lock each time * (2) Never go backwards, in spite of part upload/download failures. Negative progress (canceling a highly concurrent transfer can * lead to an enormous step backwards if many parts are aborted at once) is a confusing and undesirable user experience. * In this sense, progress represents a high-water mark, and in the presence of heavy failures or cancellation, it may appear to pause until the * necessary retries exceed the previous high-water mark. */ inline uint64_t GetBytesTransferred() const { return m_bytesTransferred.load(); } /** * Total bytes transferred successfully on this transfer operation. */ void UpdateBytesTransferred(uint64_t amount) { m_bytesTransferred.fetch_add(amount); } /** * The offset from which to start downloading */ inline uint64_t GetBytesOffset() const { return m_offset; } /** * The calculated total size of the object being transferred. */ inline uint64_t GetBytesTotalSize() const { return m_bytesTotalSize.load(); } /** * Sets the total size of the object being transferred. */ inline void SetBytesTotalSize(uint64_t value) { m_bytesTotalSize.store(value); } /** * Gets the total bytes that is already transferred and available to users starting from the beginning of the file. * For multiple-part downloads, it's guaranteed that these bytes are committed to the underlying stream already. * For single-part downloads, it's also true since we write() directly to the underlying stream. * A potential use case is to poll and stream bytes to users as we are still doing multi-part downloading. */ inline uint64_t GetBytesAvailableFromStart() const { return m_bytesAvailableFromStart.load(std::memory_order_relaxed); } /** * Bucket portion of the object location in Amazon S3. */ inline const Aws::String& GetBucketName() const { return m_bucket; } /** * Key of the object location in Amazon S3. */ inline const Aws::String& GetKey() const { return m_key; } /** * If known, this is the location of the local file being uploaded from, or downloaded to. If you use the stream api however, this will * always be blank. */ inline const Aws::String& GetTargetFilePath() const { return m_fileName; } /** * (Download only) version id of the object to retrieve; if not specified in constructor, then latest is used */ const Aws::String GetVersionId() const { std::lock_guard locker(m_getterSetterLock); return m_versionId; } void SetVersionId(const Aws::String& versionId) { std::lock_guard locker(m_getterSetterLock); m_versionId = versionId; } /** * Upload or Download? */ inline TransferDirection GetTransferDirection() const { return m_direction; } /** * Content type of the object being transferred */ inline const Aws::String GetContentType() const { std::lock_guard locker(m_getterSetterLock); return m_contentType; } /** * Content type of the object being transferred */ inline void SetContentType(const Aws::String& value) { std::lock_guard locker(m_getterSetterLock); m_contentType = value; } /** * In case of an upload, this is the metadata that was placed on the object when it was uploaded. * In the case of a download, this is the object metadata from the GetObject operation. */ inline const Aws::Map GetMetadata() const { std::lock_guard locker(m_getterSetterLock); return m_metadata; } /** * In case of an upload, this is the metadata that was placed on the object when it was uploaded. * In the case of a download, this is the object metadata from the GetObject operation. */ inline void SetMetadata(const Aws::Map& value) { std::lock_guard locker(m_getterSetterLock); m_metadata = value; } /** * Add a new entry to or update an existed entry of m_metadata, useful when users want to get ETag directly from metadata. */ inline void AddMetadataEntry(const Aws::String& key, const Aws::String& value) { std::lock_guard locker(m_getterSetterLock); m_metadata[key] = value; } /** * Arbitrary user context that can be accessed from the callbacks */ inline void SetContext(const std::shared_ptr& context) { std::lock_guard locker(m_getterSetterLock); m_context = context; } /** * Returns arbitrary user context or nullptr if it's not set. */ inline std::shared_ptr GetContext() const { std::lock_guard locker(m_getterSetterLock); return m_context; } /** * The current status of the operation */ TransferStatus GetStatus() const; /** * The current status of the operation */ void UpdateStatus(TransferStatus value); /** * The last error that was encountered by the transfer. You can handle each error individually via the errorCallback callback function * in the TransferConfiguration. */ inline const Aws::Client::AWSError GetLastError() const { std::lock_guard locker(m_getterSetterLock); return m_lastError; } /** * The last error that was encountered by the transfer. You can handle each error individually via the errorCallback callback function * in the TransferConfiguration. */ inline void SetError(const Aws::Client::AWSError& error) { std::lock_guard locker(m_getterSetterLock); m_lastError = error; } /** * Blocks the calling thread until the operation has finished. This function does not busy wait. It is safe for your CPU. */ void WaitUntilFinished() const; const CreateDownloadStreamCallback& GetCreateDownloadStreamFunction() const { return m_createDownloadStreamFn; } void WritePartToDownloadStream(Aws::IOStream* partStream, uint64_t writeOffset); void ApplyDownloadConfiguration(const DownloadConfiguration& downloadConfig); bool LockForCompletion() { bool expected = false; return m_lastPart.compare_exchange_strong(expected, true/*desired*/); } /* * Returns a unique identifier tied to this particular transfer handle. */ Aws::String GetId() const; private: void CleanupDownloadStream(); std::atomic m_isMultipart; Aws::String m_multipartId; TransferDirection m_direction; PartStateMap m_completedParts; PartStateMap m_pendingParts; PartStateMap m_queuedParts; PartStateMap m_failedParts; std::atomic m_bytesTransferred; std::atomic m_lastPart; std::atomic m_bytesTotalSize; std::atomic m_bytesAvailableFromStart; /* The next part number to watch, that is able to grow m_bytesAvailableFromStart. */ uint32_t m_nextPartToWatch; uint64_t m_offset; Aws::String m_bucket; Aws::String m_key; Aws::String m_fileName; Aws::String m_contentType; Aws::String m_versionId; Aws::Map m_metadata; TransferStatus m_status; Aws::Client::AWSError m_lastError; std::atomic m_cancel; std::shared_ptr m_context; const Utils::UUID m_handleId; CreateDownloadStreamCallback m_createDownloadStreamFn; Aws::IOStream* m_downloadStream; /* in case customer stream is not based off 0 */ uint64_t m_downloadStreamBaseOffset; mutable std::mutex m_downloadStreamLock; mutable std::mutex m_partsLock; mutable std::mutex m_statusLock; mutable std::condition_variable m_waitUntilFinishedSignal; mutable std::mutex m_getterSetterLock; }; AWS_TRANSFER_API Aws::OStream& operator << (Aws::OStream& s, TransferStatus status); } }