o ?cpq@sldZddlZddlZddlZddlZddlZddlZddlZddl Z ddl Z ddl Z ddl m Z ddlmZddlmZddlZddlmZmZdZdZGd d d ejZeeZeed Ze Z!d%d dZ"ddZ#ddZ$Gddde%Z&GdddZ'GdddZ(GdddZ)GdddZ*Gdddej+Z,Gdd d Z-Gd!d"d"Z.Gd#d$d$Z/dS)&aAbstractions over S3's upload/download operations. This module provides high level abstractions for efficient uploads/downloads. It handles several things for the user: * Automatically switching to multipart transfers when a file is over a specific size threshold * Uploading/downloading a file in parallel * Throttling based on max bandwidth * Progress callbacks to monitor transfers * Retries. While botocore handles retries for streaming uploads, it is not possible for it to handle retries for streaming downloads. This module handles retries for both cases so you don't need to implement any retry logic yourself. This module has a reasonable set of defaults. It also allows you to configure many aspects of the transfer process including: * Multipart threshold size * Max parallel downloads * Max bandwidth * Socket timeouts * Retry amounts There is no support for s3->s3 multipart copies at this time. .. _ref_s3transfer_usage: Usage ===== The simplest way to use this module is: .. code-block:: python client = boto3.client('s3', 'us-west-2') transfer = S3Transfer(client) # Upload /tmp/myfile to s3://bucket/key transfer.upload_file('/tmp/myfile', 'bucket', 'key') # Download s3://bucket/key to /tmp/myfile transfer.download_file('bucket', 'key', '/tmp/myfile') The ``upload_file`` and ``download_file`` methods also accept ``**kwargs``, which will be forwarded through to the corresponding client operation. Here are a few examples using ``upload_file``:: # Making the object public transfer.upload_file('/tmp/myfile', 'bucket', 'key', extra_args={'ACL': 'public-read'}) # Setting metadata transfer.upload_file('/tmp/myfile', 'bucket', 'key', extra_args={'Metadata': {'a': 'b', 'c': 'd'}}) # Setting content type transfer.upload_file('/tmp/myfile.json', 'bucket', 'key', extra_args={'ContentType': "application/json"}) The ``S3Transfer`` class also supports progress callbacks so you can provide transfer progress to users. Both the ``upload_file`` and ``download_file`` methods take an optional ``callback`` parameter. Here's an example of how to print a simple progress percentage to the user: .. code-block:: python class ProgressPercentage(object): def __init__(self, filename): self._filename = filename self._size = float(os.path.getsize(filename)) self._seen_so_far = 0 self._lock = threading.Lock() def __call__(self, bytes_amount): # To simplify we'll assume this is hooked up # to a single filename. with self._lock: self._seen_so_far += bytes_amount percentage = (self._seen_so_far / self._size) * 100 sys.stdout.write( " %s %s / %s (%.2f%%)" % (self._filename, self._seen_so_far, self._size, percentage)) sys.stdout.flush() transfer = S3Transfer(boto3.client('s3', 'us-west-2')) # Upload /tmp/myfile to s3://bucket/key and print upload progress. transfer.upload_file('/tmp/myfile', 'bucket', 'key', callback=ProgressPercentage('/tmp/myfile')) You can also provide a TransferConfig object to the S3Transfer object that gives you more fine grained control over the transfer. For example: .. code-block:: python client = boto3.client('s3', 'us-west-2') config = TransferConfig( multipart_threshold=8 * 1024 * 1024, max_concurrency=10, num_download_attempts=10, ) transfer = S3Transfer(client, config) transfer.upload_file('/tmp/foo', 'bucket', 'key') N)six)IncompleteReadError)ReadTimeoutError)RetriesExceededErrorS3UploadFailedErrorzAmazon Web Servicesz0.6.0c@seZdZddZdS) NullHandlercCsdSN)selfrecordr r n/private/var/folders/cw/wlscbxl13mj6wd668h7l9g9sllkg5j/T/pip-target-b31awkwq/lib/python/s3transfer/__init__.pyemitzNullHandler.emitN)__name__ __module__ __qualname__r r r r r rs ricCsdddt|DS)Ncss|] }ttjVqdSr)randomchoicestring hexdigits).0_r r r sz(random_file_extension..)joinrange)Z num_digitsr r r random_file_extensionsrcK*|dvrt|jdr|jdSdSdS)NZ PutObjectZ UploadPartdisable_callback)hasattrbodyr requestZoperation_namekwargsr r r disable_upload_callbacks r&cKr)Nrenable_callback)r!r"r(r#r r r enable_upload_callbacksr'r)c@s eZdZdS)QueueShutdownErrorN)rrrr r r r r*sr*c@seZdZ  dddZe  dddZddZdd d Zd d Zd dZ ddZ ddZ ddZ ddZ ddZddZddZdS) ReadFileChunkNTcCsF||_||_|j|j|||d|_|j|jd|_||_||_dS)a Given a file object shown below: |___________________________________________________| 0 | | full_file_size |----chunk_size---| start_byte :type fileobj: file :param fileobj: File like object :type start_byte: int :param start_byte: The first byte from which to start reading. :type chunk_size: int :param chunk_size: The max chunk size to read. Trying to read pass the end of the chunk size will behave like you've reached the end of the file. :type full_file_size: int :param full_file_size: The entire content length associated with ``fileobj``. :type callback: function(amount_read) :param callback: Called whenever data is read from this object. )requested_size start_byteactual_file_sizerN)_fileobj _start_byte_calculate_file_size_sizeseek _amount_read _callback_callback_enabled)r fileobjr- chunk_sizeZfull_file_sizecallbackr(r r r __init__s% zReadFileChunk.__init__cCs,t|d}t|j}|||||||S)aWConvenience factory function to create from a filename. :type start_byte: int :param start_byte: The first byte from which to start reading. :type chunk_size: int :param chunk_size: The max chunk size to read. Trying to read pass the end of the chunk size will behave like you've reached the end of the file. :type full_file_size: int :param full_file_size: The entire content length associated with ``fileobj``. :type callback: function(amount_read) :param callback: Called whenever data is read from this object. :type enable_callback: bool :param enable_callback: Indicate whether to invoke callback during read() calls. :rtype: ``ReadFileChunk`` :return: A new instance of ``ReadFileChunk`` rb)openosfstatfilenost_size)clsfilenamer-r8r9r(f file_sizer r r from_filenames " zReadFileChunk.from_filenamecCs||}t||Sr)min)r r7r,r-r.Zmax_chunk_sizer r r r1s z"ReadFileChunk._calculate_file_sizecCsh|dur |j|j}n t|j|j|}|j|}|jt|7_|jdur2|jr2|t||Sr)r2r4rFr/readlenr5r6)r amountZamount_to_readdatar r r rGs zReadFileChunk.readcC d|_dSNTr6r r r r r($ zReadFileChunk.enable_callbackcCrKNFrMrNr r r r 'rOzReadFileChunk.disable_callbackcCs<|j|j||jdur|jr|||j||_dSr)r/r3r0r5r6r4)r wherer r r r3*s zReadFileChunk.seekcCs|jdSr)r/closerNr r r rR1szReadFileChunk.closecC|jSr)r4rNr r r tell4szReadFileChunk.tellcCrSr)r2rNr r r __len__7szReadFileChunk.__len__cCs|Srr rNr r r __enter__?rzReadFileChunk.__enter__cOs |dSr)rR)r argsr%r r r __exit__B zReadFileChunk.__exit__cCstgSr)iterrNr r r __iter__EszReadFileChunk.__iter__rLr)rrrr: classmethodrEr1rGr(r r3rRrTrUrVrXr[r r r r r+s& 2 '   r+c@s"eZdZdZdddZddZdS)StreamReaderProgressz  zMultipartUploader.upload_filec Cs||}g}|jj} tt|j|t| } |jj } |j | d)} t |j ||||| ||} | | td| dD]}||q=Wd|S1sPwY|S)N max_workers)rrvmultipart_chunksizeintmathceilrwrffloatmax_concurrencyrx functoolspartial_upload_one_partmaprappend)r rrBrrr9rZupload_parts_extra_argsr part_size num_partsrexecutorZupload_partialpartr r r rs2     zMultipartUploader._upload_partsc Csr|jj} | |||d|| } |jjd||||| d|} | d} | |dWdS1s2wYdS)Nr)rrr PartNumberBodyETag)rrr )rwrhruZ upload_part) r rBrrrrrr9Z part_numberopen_chunk_readerr"retagr r r rs" $z"MultipartUploader._upload_one_partN) rrrr~ concurrentfuturesThreadPoolExecutorr:rrrrr r r r rpus    rpc@s(eZdZdZddZddZddZdS) ShutdownQueueaYA queue implementation that can be shutdown. Shutting down a queue means that this class adds a trigger_shutdown method that will trigger all subsequent calls to put() to fail with a ``QueueShutdownError``. It purposefully deviates from queue.Queue, and is *not* meant to be a drop in replacement for ``queue.Queue``. cCsd|_t|_tj||SrP) _shutdown threadingLock_shutdown_lockqueueQueue_init)r maxsizer r r rs zShutdownQueue._initcCs<|jd|_tdWddS1swYdS)NTzThe IO queue is now shutdown.)rrrrrNr r r trigger_shutdowns "zShutdownQueue.trigger_shutdowncCsB|j|jr tdWdn1swYtj||S)Nz6Cannot put item to queue when queue has been shutdown.)rrr*rrput)r itemr r r rszShutdownQueue.putN)rrrrarrrr r r r rs   rc@sPeZdZejjfddZ dddZddZdd Z d d Z d d Z ddZ dS)MultipartDownloadercCs*||_||_||_||_t|jj|_dSr)rurvrwrxr max_io_queue_ioqueueryr r r r: s zMultipartDownloader.__init__Nc Cs|jdd6}t|j|||||}||} t|j|} || } tjj| | gtjj d} | | WddS1s?wYdS)Nr) return_when) rxrr_download_file_as_futuresubmit_perform_io_writesrrwaitFIRST_EXCEPTION_process_future_results) r rrrB object_sizerr9 controllerZdownload_parts_handlerZ parts_futureZio_writes_handlerZ io_futureresultsr r r download_files(   "z!MultipartDownloader.download_filecCs|\}}|D]}|qdSr)result)r rfinished unfinishedfuturer r r r0s z+MultipartDownloader._process_future_resultsc Cs|jj}tt|t|}|jj}t|j ||||||} z2|j |d} t | | t |Wdn1s;wYW|jtdSW|jtdS|jtw)Nr)rvrrrrrrrr_download_rangerxlistrrrrSHUTDOWN_SENTINEL) r rrrBrr9rrrZdownload_partialrr r r r5s& z,MultipartDownloader._download_file_as_futurecCs:||}||dkr d}n||d}d|d|}|S)Nrrzbytes=-r )r r part_indexrZ start_rangeZ end_range range_paramr r r _calculate_range_paramJs   z*MultipartDownloader._calculate_range_paramc sz|||||}|jj} d} t| D]f} zAtd|jj|||d} t| d|d||} t fdddD]}|j | |f| t |7} q:WWtd|dSt jtttfyx}ztjd || | d d |} WYd}~qd}~wwt| td|w) NzMaking get_object call.)rrRangeri@cs SrrGr  buffer_sizestreaming_bodyr r i z5MultipartDownloader._download_range..z$EXITING _download_range for part: %sCRetrying exception caught (%s), retrying request, (attempt %s / %s)Tr)rrvnum_download_attemptsrrrru get_objectr]rZrrrHsockettimeoutrkrrr)r rrrBrrr9rr max_attemptslast_exceptionirZ current_indexchunkrr rr rSsV    z#MultipartDownloader._download_rangec Cs|j|dE} |j}|tur td WddSz|\}}||||Wnt yJ}ztjd|dd|j d}~wwq 1sOwYdS)NwbTzCShutdown sentinel received in IO handler, shutting down IO handler.z!Caught exception in IO thread: %sr) rwr<rgetrrrr3writerr)r rBrCtaskoffsetrJrr r r rs2    z&MultipartDownloader._perform_io_writesr) rrrrrrr:rrrrrrr r r r rs   /rc@s(eZdZdeddeddfddZdS)TransferConfigr dcCs"||_||_||_||_||_dSr)multipart_thresholdrrrr)r rrrrrr r r r:s  zTransferConfig.__init__N)rrrMBr:r r r r rsrc@s~eZdZgdZgdZdddZ dddZdd Z dd d Zd d Z ddZ ddZ ddZ ddZ ddZddZdS) S3Transfer)Z VersionIdrrrqrsrt)ZACL CacheControlZContentDispositionZContentEncodingZContentLanguageZ ContentTypeZExpiresZGrantFullControlZ GrantReadZ GrantReadACPZ GrantWriteACLZMetadatartZServerSideEncryptionZ StorageClassrrrqrsZ SSEKMSKeyIdZSSEKMSEncryptionContextZTaggingNcCs2||_|dur t}||_|durt}||_dSr)rurrvrb_osutil)r rzr{r|r r r r:s zS3Transfer.__init__cCs|duri}|||j|jjj}|jdtdd|jdtdd|j ||j j kr7| |||||dS||||||dS)zUpload a file to an S3 object. Variants have also been injected into S3 client, Bucket and Object. You don't have to use S3Transfer.upload_file() directly. Nzrequest-created.s3zs3upload-callback-disable) unique_idzs3upload-callback-enable)_validate_all_known_argsALLOWED_UPLOAD_ARGSrumetaeventsZregister_firstr&Z register_lastr)rrfrvr_multipart_upload _put_object)r rBrrr9rrr r r rs(  zS3Transfer.upload_filecCs`|jj}||d|j||d}|jjd|||d|WddS1s)wYdS)Nr)r9)rrrr )rrhrfruZ put_object)r rBrrr9rrr"r r r rs "zS3Transfer._put_objectcCs|duri}|||j||||}|tjt}z |||||||Wnty>tj d|dd|j |w|j ||dS)zDownload an S3 object to a file. Variants have also been injected into S3 client, Bucket and Object. You don't have to use S3Transfer.download_file() directly. Nzs2     zS3Transfer._get_objectc s||jjd||d|}t|d||j|d}tfdddD]}||q$WddS1s7wYdS)Nrrrcs dS)Ni rr rr r rarz+S3Transfer._do_get_object..rr )rurr]rr<rZr) r rrrBrr9rrCrr rr r[s "zS3Transfer._do_get_objectcCs|jjd||d|dS)NrZ ContentLengthr )ruZ head_object)r rrrr r r rdszS3Transfer._object_sizecCs(t|j|j|j}||||||dSr)rprurvrr)r rBrrr9rZuploaderr r r riszS3Transfer._multipart_upload)NN)rrrrrr:rrrrrrrrrrr r r r rs        r)r)0raconcurrent.futuresrrloggingrr=rrrrrZbotocore.compatrZbotocore.exceptionsrZ6botocore.vendored.requests.packages.urllib3.exceptionsrZs3transfer.compatrmZs3transfer.exceptionsrr __author__ __version__Handlerr getLoggerrr addHandlerrobjectrrr&r)rr*r+r]rbrprrrrrr r r r sJ q      q"