U n^m@szdZddlZddlZddlZddlZddlZddlZddlZddlZddl Z ddl m Z ddl mZddlmZddlZddlmZmZdZdZGd d d ejZeeZeee jjZd Z e!Z"d%d dZ#ddZ$ddZ%Gddde&Z'Gddde!Z(Gddde!Z)Gddde!Z*Gddde!Z+Gdddej,Z-Gdd d e!Z.Gd!d"d"e!Z/Gd#d$d$e!Z0dS)&aAbstractions over S3's upload/download operations. This module provides high level abstractions for efficient uploads/downloads. It handles several things for the user: * Automatically switching to multipart transfers when a file is over a specific size threshold * Uploading/downloading a file in parallel * Throttling based on max bandwidth * Progress callbacks to monitor transfers * Retries. While botocore handles retries for streaming uploads, it is not possible for it to handle retries for streaming downloads. This module handles retries for both cases so you don't need to implement any retry logic yourself. This module has a reasonable set of defaults. It also allows you to configure many aspects of the transfer process including: * Multipart threshold size * Max parallel downloads * Max bandwidth * Socket timeouts * Retry amounts There is no support for s3->s3 multipart copies at this time. .. _ref_s3transfer_usage: Usage ===== The simplest way to use this module is: .. code-block:: python client = boto3.client('s3', 'us-west-2') transfer = S3Transfer(client) # Upload /tmp/myfile to s3://bucket/key transfer.upload_file('/tmp/myfile', 'bucket', 'key') # Download s3://bucket/key to /tmp/myfile transfer.download_file('bucket', 'key', '/tmp/myfile') The ``upload_file`` and ``download_file`` methods also accept ``**kwargs``, which will be forwarded through to the corresponding client operation. Here are a few examples using ``upload_file``:: # Making the object public transfer.upload_file('/tmp/myfile', 'bucket', 'key', extra_args={'ACL': 'public-read'}) # Setting metadata transfer.upload_file('/tmp/myfile', 'bucket', 'key', extra_args={'Metadata': {'a': 'b', 'c': 'd'}}) # Setting content type transfer.upload_file('/tmp/myfile.json', 'bucket', 'key', extra_args={'ContentType': "application/json"}) The ``S3Transfer`` clas also supports progress callbacks so you can provide transfer progress to users. Both the ``upload_file`` and ``download_file`` methods take an optional ``callback`` parameter. Here's an example of how to print a simple progress percentage to the user: .. code-block:: python class ProgressPercentage(object): def __init__(self, filename): self._filename = filename self._size = float(os.path.getsize(filename)) self._seen_so_far = 0 self._lock = threading.Lock() def __call__(self, bytes_amount): # To simplify we'll assume this is hooked up # to a single filename. with self._lock: self._seen_so_far += bytes_amount percentage = (self._seen_so_far / self._size) * 100 sys.stdout.write( " %s %s / %s (%.2f%%)" % (self._filename, self._seen_so_far, self._size, percentage)) sys.stdout.flush() transfer = S3Transfer(boto3.client('s3', 'us-west-2')) # Upload /tmp/myfile to s3://bucket/key and print upload progress. transfer.upload_file('/tmp/myfile', 'bucket', 'key', callback=ProgressPercentage('/tmp/myfile')) You can also provide a TransferConfig object to the S3Transfer object that gives you more fine grained control over the transfer. For example: .. code-block:: python client = boto3.client('s3', 'us-west-2') config = TransferConfig( multipart_threshold=8 * 1024 * 1024, max_concurrency=10, num_download_attempts=10, ) transfer = S3Transfer(client, config) transfer.upload_file('/tmp/foo', 'bucket', 'key') N)six)ReadTimeoutError)IncompleteReadError)RetriesExceededErrorS3UploadFailedErrorzAmazon Web Servicesz0.2.1c@seZdZddZdS) NullHandlercCsdSN)selfrecordr r o/private/var/folders/sd/whlwsn6x1_qgglc0mjv25_695qk2gl/T/pip-install-4zq3fp6i/s3transfer/s3transfer/__init__.pyemitszNullHandler.emitN)__name__ __module__ __qualname__r r r r r rsricCsdddt|DS)Ncss|]}ttjVqdSr)randomchoicestring hexdigits).0_r r r sz(random_file_extension..)joinrange)Z num_digitsr r r random_file_extensionsrcKs"|dkrt|jdr|jdS)NZ PutObjectZ UploadPartdisable_callback)hasattrbodyrrequestZoperation_namekwargsr r r disable_upload_callbackss r$cKs"|dkrt|jdr|jdS)Nrenable_callback)rr r%r!r r r enable_upload_callbackss r&c@s eZdZdS)QueueShutdownErrorN)rrrr r r r r'sr'c@s~eZdZdddZedddZddZdd d Zd d Zd dZ ddZ ddZ ddZ ddZ ddZddZddZdS) ReadFileChunkNTcCsF||_||_|j|j|||d|_|j|jd|_||_||_dS)a Given a file object shown below: |___________________________________________________| 0 | | full_file_size |----chunk_size---| start_byte :type fileobj: file :param fileobj: File like object :type start_byte: int :param start_byte: The first byte from which to start reading. :type chunk_size: int :param chunk_size: The max chunk size to read. Trying to read pass the end of the chunk size will behave like you've reached the end of the file. :type full_file_size: int :param full_file_size: The entire content length associated with ``fileobj``. :type callback: function(amount_read) :param callback: Called whenever data is read from this object. )requested_size start_byteactual_file_sizerN)_fileobj _start_byte_calculate_file_size_sizeseek _amount_read _callback_callback_enabled)r fileobjr* chunk_sizeZfull_file_sizecallbackr%r r r __init__szReadFileChunk.__init__cCs,t|d}t|j}|||||||S)aWConvenience factory function to create from a filename. :type start_byte: int :param start_byte: The first byte from which to start reading. :type chunk_size: int :param chunk_size: The max chunk size to read. Trying to read pass the end of the chunk size will behave like you've reached the end of the file. :type full_file_size: int :param full_file_size: The entire content length associated with ``fileobj``. :type callback: function(amount_read) :param callback: Called whenever data is read from this object. :type enable_callback: bool :param enable_callback: Indicate whether to invoke callback during read() calls. :rtype: ``ReadFileChunk`` :return: A new instance of ``ReadFileChunk`` rb)openosfstatfilenost_size)clsfilenamer*r5r6r%f file_sizer r r from_filenames   zReadFileChunk.from_filenamecCs||}t||Sr)min)r r4r)r*r+Zmax_chunk_sizer r r r.sz"ReadFileChunk._calculate_file_sizecCsh|dkr|j|j}nt|j|j|}|j|}|jt|7_|jdk rd|jrd|t||Sr)r/r1rCr,readlenr2r3)r amountZamount_to_readdatar r r rDs zReadFileChunk.readcCs d|_dS)NTr3r r r r r%szReadFileChunk.enable_callbackcCs d|_dSNFrHrIr r r rszReadFileChunk.disable_callbackcCs<|j|j||jdk r2|jr2|||j||_dSr)r,r0r-r2r3r1)r wherer r r r0szReadFileChunk.seekcCs|jdSr)r,closerIr r r rLszReadFileChunk.closecCs|jSr)r1rIr r r tell!szReadFileChunk.tellcCs|jSr)r/rIr r r __len__$szReadFileChunk.__len__cCs|Srr rIr r r __enter__,szReadFileChunk.__enter__cOs |dSr)rL)r argsr#r r r __exit__/szReadFileChunk.__exit__cCstgSr)iterrIr r r __iter__2szReadFileChunk.__iter__)NT)NT)N)rrrr7 classmethodrBr.rDr%rr0rLrMrNrOrQrSr r r r r(s$ (  r(c@s"eZdZdZdddZddZdS)StreamReaderProgressz} |jjf||||| d|} | d} | |dW5QRSQRXdS)Nr)r{r|r} PartNumberBodyETag)rr)ror`rmZ upload_part) r r?rrxrrrwr6Z part_numberopen_chunk_readerr retagr r r rs"z"MultipartUploader._upload_one_partN) rrrrv concurrentfuturesThreadPoolExecutorr7ryrrrr r r r rhas  rhc@s(eZdZdZddZddZddZdS) ShutdownQueueaYA queue implementation that can be shutdown. Shutting down a queue means that this class adds a trigger_shutdown method that will trigger all subsequent calls to put() to fail with a ``QueueShutdownError``. It purposefully deviates from queue.Queue, and is *not* meant to be a drop in replacement for ``queue.Queue``. cCsd|_t|_tj||SrJ) _shutdown threadingLock_shutdown_lockqueueQueue_init)r maxsizer r r rs zShutdownQueue._initc Cs&|jd|_tdW5QRXdS)NTzThe IO queue is now shutdown.)rrrrrIr r r trigger_shutdownszShutdownQueue.trigger_shutdownc Cs.|j|jrtdW5QRXtj||S)Nz6Cannot put item to queue when queue has been shutdown.)rrr'rrput)r itemr r r rszShutdownQueue.putN)rrrrYrrrr r r r rs rc@sNeZdZejjfddZdddZddZdd Z d d Z d d Z ddZ dS)MultipartDownloadercCs*||_||_||_||_t|jj|_dSr)rmrnrorpr max_io_queue_ioqueuerqr r r r7s zMultipartDownloader.__init__Nc Csv|jdd`}t|j|||||}||} t|j|} || } tjj| | gtjj d} | | W5QRXdS)Nr)Z return_when) rprr_download_file_as_futureZsubmit_perform_io_writesrrwaitZFIRST_EXCEPTION_process_future_results) r rrxr? object_sizerwr6 controllerZdownload_parts_handlerZ parts_futureZio_writes_handlerZ io_futureresultsr r r download_files(  z!MultipartDownloader.download_filecCs|\}}|D] }|q dSr)result)r rfinished unfinishedfuturer r r rsz+MultipartDownloader._process_future_resultsc Cs|jj}tt|t|}|jj}t|j ||||||} z0|j |d} t| | t|W5QRXW5|j t XdS)Nr)rnrrrrrrrr_download_rangerrSHUTDOWN_SENTINELrplistrr) r rrxr?rr6rrrZdownload_partialrr r r rs "z,MultipartDownloader._download_file_as_futurecCs6||}||dkrd}n ||d}d||f}|S)Nrrz bytes=%s-%sr )r r part_indexrZ start_rangeZ end_range range_paramr r r _calculate_range_params    z*MultipartDownloader._calculate_range_paramc sz||||}|jj} d} t| D]} zxtd|jj|||d} t| d|d||} t fdddD] }|j | |f| t |7} qvWW`dSt jt jttfk r}z$tjd || | d d |} WYq&W5d}~XYq&Xq&t| W5td|XdS) Nz$EXITING _download_range for part: %szMaking get_object call.)r{r|ZRangeri@cs SrrDr  buffer_sizestreaming_bodyr r z5MultipartDownloader._download_range..rCRetrying exception caught (%s), retrying request, (attempt %s / %s)Tr~)rrrrnnum_download_attemptsrrm get_objectrUrRrrrEsockettimeouterrorrrr)r rrxr?rrr6rr max_attemptslast_exceptionirZ current_indexchunkrr rr rsT     z#MultipartDownloader._download_rangec Cs|j|d}|j}|tkr:tdW5QRdSz |\}}||||Wqt k r}z tjd|dd|j W5d}~XYqXqW5QRXdS)NwbzCShutdown sentinel received in IO handler, shutting down IO handler.z!Caught exception in IO thread: %sTr~) ror9rgetrrrr0writerr)r r?r@ZtaskoffsetrGrr r r r$s     z&MultipartDownloader._perform_io_writes)N) rrrrrrr7rrrrrrr r r r rs  !rc@s(eZdZdeddeddfddZdS)TransferConfigr dcCs"||_||_||_||_||_dSr)multipart_thresholdrrrr)r rrrrrr r r r79s zTransferConfig.__init__N)rrrMBr7r r r r r8s rc@seZdZdddddgZdddd d d d d dddddddddddgZd,ddZd-ddZddZd.ddZddZ d d!Z d"d#Z d$d%Z d&d'Z d(d)Zd*d+ZdS)/ S3TransferZ VersionIdrjrirkrlZACL CacheControlZContentDispositionZContentEncodingZContentLanguageZ ContentTypeZExpiresZGrantFullControlZ GrantReadZ GrantReadACPZ GrantWriteACLZMetadataZServerSideEncryptionZ StorageClassZ SSEKMSKeyIdNcCs2||_|dkrt}||_|dkr(t}||_dSr)rmrrnrZ_osutil)r rrrsrtr r r r7fszS3Transfer.__init__cCs|dkr i}|||j|jjj}|jdtdd|jdtdd|j ||j j krl| |||||n||||||dS)zUpload a file to an S3 object. Variants have also been injected into S3 client, Bucket and Object. You don't have to use S3Transfer.upload_file() directly. Nzrequest-created.s3zs3upload-callback-disable)Z unique_idzs3upload-callback-enable)_validate_all_known_argsALLOWED_UPLOAD_ARGSrmmetaeventsZregister_firstr$Z register_lastr&rr^rnr_multipart_upload _put_object)r r?rrxr6rwrr r r ros"  zS3Transfer.upload_filec CsJ|jj}||d|j||d }|jjf|||d|W5QRXdS)Nr)r6)r{r|r)rr`r^rmZ put_object)r r?rrxr6rwrr r r r rs zS3Transfer._put_objectcCs|dkr i}|||j||||}|tjt}z|||||||Wn2tk rtj d|dd|j |YnX|j ||dS)zDownload an S3 object to a file. Variants have also been injected into S3 client, Bucket and Object. You don't have to use S3Transfer.download_file() directly. Nz.r)rmrrUrr9rRr) r rrxr?rwr6rr@rr rr rszS3Transfer._do_get_objectcCs|jjf||d|dS)NrzZ ContentLength)rmZ head_object)r rrxrwr r r rszS3Transfer._object_sizecCs(t|j|j|j}||||||dSr)rhrmrnrr)r r?rrxr6rwZuploaderr r r rszS3Transfer._multipart_upload)NN)NN)NN)rrrrrr7rrrrrrrrrrr r r r rFsR     r)r)1rYr:rrloggingrrrrconcurrent.futuresrZbotocore.compatrZ6botocore.vendored.requests.packages.urllib3.exceptionsrZbotocore.exceptionsrZs3transfer.compatreZs3transfer.exceptionsrr __author__ __version__Handlerr getLoggerrr addHandlermovesrrobjectrrr$r&rr'r(rUrZrhrrrrrr r r r  sFq       K l