z]c @sdZddlZddlZddlZddlZddlZddlZddlmZddl Z ddl m Z ddl mZddl mZddl mZddlmZdd lmZdd lmZdd lmZdd lmZdd lmZddlmZddlmZddlmZddlmZddlmZej e!Z"dZ#ej$dddddddgZ%ej$ddddddddgZ&ej'dZ(dZ)d e*fd!YZ+d"e*fd#YZ,d$efd%YZ-d&efd'YZ.d(e*fd)YZ/d*e*fd+YZ0d,e*fd-YZ1d.efd/YZ2e2j3d*e0d0ej4fd1YZ5d2e5fd3YZ6d4e5fd5YZ7dS(6sCSpeeds up S3 throughput by using processes Getting Started =============== The :class:`ProcessPoolDownloader` can be used to download a single file by calling :meth:`ProcessPoolDownloader.download_file`: .. code:: python from s3transfer.processpool import ProcessPoolDownloader with ProcessPoolDownloader() as downloader: downloader.download_file('mybucket', 'mykey', 'myfile') This snippet downloads the S3 object located in the bucket ``mybucket`` at the key ``mykey`` to the local file ``myfile``. Any errors encountered during the transfer are not propagated. To determine if a transfer succeeded or failed, use the `Futures`_ interface. The :class:`ProcessPoolDownloader` can be used to download multiple files as well: .. code:: python from s3transfer.processpool import ProcessPoolDownloader with ProcessPoolDownloader() as downloader: downloader.download_file('mybucket', 'mykey', 'myfile') downloader.download_file('mybucket', 'myotherkey', 'myotherfile') When running this snippet, the downloading of ``mykey`` and ``myotherkey`` happen in parallel. The first ``download_file`` call does not block the second ``download_file`` call. The snippet blocks when exiting the context manager and blocks until both downloads are complete. Alternatively, the ``ProcessPoolDownloader`` can be instantiated and explicitly be shutdown using :meth:`ProcessPoolDownloader.shutdown`: .. code:: python from s3transfer.processpool import ProcessPoolDownloader downloader = ProcessPoolDownloader() downloader.download_file('mybucket', 'mykey', 'myfile') downloader.download_file('mybucket', 'myotherkey', 'myotherfile') downloader.shutdown() For this code snippet, the call to ``shutdown`` blocks until both downloads are complete. Additional Parameters ===================== Additional parameters can be provided to the ``download_file`` method: * ``extra_args``: A dictionary containing any additional client arguments to include in the `GetObject `_ API request. For example: .. code:: python from s3transfer.processpool import ProcessPoolDownloader with ProcessPoolDownloader() as downloader: downloader.download_file( 'mybucket', 'mykey', 'myfile', extra_args={'VersionId': 'myversion'}) * ``expected_size``: By default, the downloader will make a HeadObject call to determine the size of the object. To opt-out of this additional API call, you can provide the size of the object in bytes: .. code:: python from s3transfer.processpool import ProcessPoolDownloader MB = 1024 * 1024 with ProcessPoolDownloader() as downloader: downloader.download_file( 'mybucket', 'mykey', 'myfile', expected_size=2 * MB) Futures ======= When ``download_file`` is called, it immediately returns a :class:`ProcessPoolTransferFuture`. The future can be used to poll the state of a particular transfer. To get the result of the download, call :meth:`ProcessPoolTransferFuture.result`. The method blocks until the transfer completes, whether it succeeds or fails. For example: .. code:: python from s3transfer.processpool import ProcessPoolDownloader with ProcessPoolDownloader() as downloader: future = downloader.download_file('mybucket', 'mykey', 'myfile') print(future.result()) If the download succeeds, the future returns ``None``: .. code:: python None If the download fails, the exception causing the failure is raised. For example, if ``mykey`` did not exist, the following error would be raised .. code:: python botocore.exceptions.ClientError: An error occurred (404) when calling the HeadObject operation: Not Found .. note:: :meth:`ProcessPoolTransferFuture.result` can only be called while the ``ProcessPoolDownloader`` is running (e.g. before calling ``shutdown`` or inside the context manager). Process Pool Configuration ========================== By default, the downloader has the following configuration options: * ``multipart_threshold``: The threshold size for performing ranged downloads in bytes. By default, ranged downloads happen for S3 objects that are greater than or equal to 8 MB in size. * ``multipart_chunksize``: The size of each ranged download in bytes. By default, the size of each ranged download is 8 MB. * ``max_request_processes``: The maximum number of processes used to download S3 objects. By default, the maximum is 10 processes. To change the default configuration, use the :class:`ProcessTransferConfig`: .. code:: python from s3transfer.processpool import ProcessPoolDownloader from s3transfer.processpool import ProcessTransferConfig config = ProcessTransferConfig( multipart_threshold=64 * 1024 * 1024, # 64 MB max_request_processes=50 ) downloader = ProcessPoolDownloader(config=config) Client Configuration ==================== The process pool downloader creates ``botocore`` clients on your behalf. In order to affect how the client is created, pass the keyword arguments that would have been used in the :meth:`botocore.Session.create_client` call: .. code:: python from s3transfer.processpool import ProcessPoolDownloader from s3transfer.processpool import ProcessTransferConfig downloader = ProcessPoolDownloader( client_kwargs={'region_name': 'us-west-2'}) This snippet ensures that all clients created by the ``ProcessPoolDownloader`` are using ``us-west-2`` as their region. iN(tdeepcopy(tConfig(tMB(tALLOWED_DOWNLOAD_ARGS(tPROCESS_USER_AGENT(tMAXINT(t BaseManager(tCancelledError(tRetriesExceededError(tBaseTransferFuture(tBaseTransferMeta(tS3_RETRYABLE_DOWNLOAD_ERRORS(tcalculate_num_parts(tcalculate_range_parameter(tOSUtils(tCallArgstSHUTDOWNtDownloadFileRequestt transfer_idtbuckettkeytfilenamet extra_argst expected_sizet GetObjectJobt temp_filenametoffsetccs%t}dVtjtj|dS(N(t"_add_ignore_handler_for_interruptstsignaltSIGINT(toriginal_handler((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyt ignore_ctrl_cs cCstjtjtjS(N(RRtSIG_IGN(((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyRstProcessTransferConfigcBs"eZdededdZRS(ii cCs||_||_||_dS(suConfiguration for the ProcessPoolDownloader :param multipart_threshold: The threshold for which ranged downloads occur. :param multipart_chunksize: The chunk size of each ranged download. :param max_request_processes: The maximum number of processes that will be making S3 API transfer-related requests at a time. N(tmultipart_thresholdtmultipart_chunksizetmax_request_processes(tselfR"R#R$((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyt__init__s  (t__name__t __module__RR&(((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyR!stProcessPoolDownloadercBseZdddZdddZdZdZdZdZdZ dZ dZ d Z d Z d Zd Zd ZdZdZdZRS(cCs|dkri}nt||_||_|dkrHt|_ntjd|_tjd|_t |_ t |_ t j|_d|_d|_d|_g|_dS(sDownloads S3 objects using process pools :type client_kwargs: dict :param client_kwargs: The keyword arguments to provide when instantiating S3 clients. The arguments must match the keyword arguments provided to the `botocore.session.Session.create_client()` method. :type config: ProcessTransferConfig :param config: Configuration for the downloader iN(tNonet ClientFactoryt_client_factoryt_transfer_configR!tmultiprocessingtQueuet_download_request_queuet _worker_queueRt_osutiltFalset_startedt threadingtLockt _start_lockt_managert_transfer_monitort _submittert_workers(R%t client_kwargstconfig((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyR&#s         c Cs|j|dkri}n|j||jj}td|d|d|d|d|d|}tjd||jj |t d|d|d|d|d|}|j ||} | S( ssDownloads the object's contents to a file :type bucket: str :param bucket: The name of the bucket to download from :type key: str :param key: The name of the key to download from :type filename: str :param filename: The name of a file to download to. :type extra_args: dict :param extra_args: Extra arguments that may be passed to the client operation :type expected_size: int :param expected_size: The expected size in bytes of the download. If provided, the downloader will not call HeadObject to determine the object's size and use the provided value instead. The size is needed to determine whether to do a multipart download. :rtype: s3transfer.futures.TransferFuture :returns: Transfer future representing the download RRRRRRs%Submitting download file request: %s.N( t_start_if_neededR*t_validate_all_known_argsR9tnotify_new_transferRtloggertdebugR0tputRt_get_transfer_future( R%RRRRRRtdownload_file_requestt call_argstfuture((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyt download_fileDs"        cCs|jdS(shShutdown the downloader It will wait till all downloads are complete before returning. N(t_shutdown_if_needed(R%((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pytshutdownqscCs|S(N((R%((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyt __enter__xscGs?t|tr1|jdk r1|jjq1n|jdS(N(t isinstancetKeyboardInterruptR9R*tnotify_cancel_all_in_progressRJ(R%texc_typet exc_valuetargs((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyt__exit__{scCs*|j|js |jnWdQXdS(N(R7R4t_start(R%((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyR>s  cCs+|j|j|jt|_dS(N(t_start_transfer_monitor_managert_start_submittert_start_get_object_workerstTrueR4(R%((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyRSs   cCsCx<|D]4}|tkrtd|djtfqqWdS(Ns/Invalid extra_args key '%s', must be one of: %ss, (Rt ValueErrortjoin(R%tprovidedtkwarg((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyR?s   cCs1td|d|}td|jd|}|S(NRFRtmonitortmeta(tProcessPoolTransferMetatProcessPoolTransferFutureR9(R%RRFR]RG((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyRDs cCs?tjdt|_|jjt|jj|_dS(Ns$Starting the TransferMonitorManager.(RARBtTransferMonitorManagerR8tstartRtTransferMonitorR9(R%((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyRTs  c Cs`tjdtd|jd|jd|jd|jd|jd|j|_ |j j dS(Ns Starting the GetObjectSubmitter.ttransfer_configtclient_factoryttransfer_monitortosutiltdownload_request_queuet worker_queue( RARBtGetObjectSubmitterR-R,R9R2R0R1R:Ra(R%((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyRUs      c Cs~tjd|jjxat|jjD]M}td|jd|jd|jd|j }|j |j j |q)WdS(NsStarting %s GetObjectWorkers.tqueueRdReRf( RARBR-R$trangetGetObjectWorkerR1R,R9R2RaR;tappend(R%t_tworker((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyRVs       cCs*|j|jr |jnWdQXdS(N(R7R4t _shutdown(R%((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyRIs  cCs+|j|j|jt|_dS(N(t_shutdown_submittert_shutdown_get_object_workerst"_shutdown_transfer_monitor_managerR3R4(R%((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyRps   cCstjd|jjdS(Ns)Shutting down the TransferMonitorManager.(RARBR8RJ(R%((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyRss cCs.tjd|jjt|jjdS(Ns%Shutting down the GetObjectSubmitter.(RARBR0RCtSHUTDOWN_SIGNALR:RY(R%((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyRqs cCsStjdx!|jD]}|jjtqWx|jD]}|jq;WdS(Ns#Shutting down the GetObjectWorkers.(RARBR;R1RCRtRY(R%RnRo((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyRrs  N(R'R(R*R&RHRJRKRRR>RSR?RDRTRURVRIRpRsRqRr(((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyR)"s$! ,           R_cBs;eZdZedZdZdZdZRS(cCs||_||_dS(saThe future associated to a submitted process pool transfer request :type monitor: TransferMonitor :param monitor: The monitor associated to the proccess pool downloader :type meta: ProcessPoolTransferMeta :param meta: The metadata associated to the request. This object is visible to the requester. N(t_monitort_meta(R%R\R]((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyR&s cCs|jS(N(Rv(R%((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyR]scCs|jj|jjS(N(Rutis_doneRvR(R%((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pytdonescCsLy|jj|jjSWn+tk rG|jj|jnXdS(N(Rutpoll_for_resultRvRRMt_connecttcancel(R%((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pytresults    cCs |jj|jjtdS(N(Rutnotify_exceptionRvRR(R%((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyR{s (R'R(R&tpropertyR]RxR|R{(((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyR_s    R^cBsDeZdZdZedZedZedZRS(s2Holds metadata about the ProcessPoolTransferFuturecCs||_||_i|_dS(N(t _transfer_idt _call_argst _user_context(R%RRF((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyR& s  cCs|jS(N(R(R%((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyRFscCs|jS(N(R(R%((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyRscCs|jS(N(R(R%((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyt user_contexts(R'R(t__doc__R&R~RFRR(((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyR^ s  R+cBseZddZdZRS(cCs{||_|jdkr$i|_nt|jjdt}|jsWt|_n|jdt7_||jdtt|j|||_||_||_||_dS(sFulfills GetObjectJobs Downloads the S3 object, writes it to the specified file, and renames the file to its final location if it completes the final job for a particular transfer. :param queue: Queue for retrieving GetObjectJob's :param client_factory: ClientFactory for creating S3 clients :param transfer_monitor: Monitor for notifying :param osutil: OSUtils object to use for os-related behavior when performing the transfer. N(RRlR&t_queueR,R9R2(R%RjRdReRf((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyR&bs    cCsxtr|jj}|tkr5tjddS|jj|jsZ|j |ntjd||jj |j}tjd||j|s|j |j|j |j qqWdS(Ns Worker shutdown signal received.sBSkipping get object job %s because there was a previous exception.s%%s jobs remaining for transfer_id %s.(RWRRRtRARBR9RRt_run_get_object_jobRt_finalize_downloadRR(R%tjobt remaining((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyRus&     c Csy;|jd|jd|jd|jd|jd|jWnBtk r}tjd||dt |j j |j |nXdS(NRRRRRsBException caught when downloading object for get object job %s: %sR( t_do_get_objectRRRRRRRARBRWR9R}R(R%RR((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyRs  c Csd}xt|jD]}y=|jjd|d||}|j|||ddSWqtk r} tjd| |d|jdt | }qXqWt |dS(NRRtBodysCRetrying exception caught (%s), retrying request, (attempt %s / %s)iR( R*Rkt _MAX_ATTEMPTSRt get_objectt_write_to_fileR RARBRWR( R%RRRRRtlast_exceptionRtresponseR((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyRs   csbt|dM}|j|tfdd}x|D]}|j|qAWWdQXdS(Nsrb+csjjS(N(treadt _IO_CHUNKSIZE((tbodyR%(s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyttR(topentseektitertwrite(R%RRRtftchunkstchunk((RR%s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyRs   cCsL|jj|r%|jj|n|j||||jj|dS(N(R9RR2t remove_filet_do_file_renameR(R%RRR((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyRscCsTy|jj||Wn6tk rO}|jj|||jj|nXdS(N(R2t rename_fileRR9R}R(R%RRRR((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyRs ( R'R(RRRR&RRRRRR(((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pyRl\s      (8Rt collectionst contextlibtloggingR.R5RtcopyRtbotocore.sessionRtbotocore.configRts3transfer.constantsRRRts3transfer.compatRRts3transfer.exceptionsRRts3transfer.futuresR R ts3transfer.utilsR R R RRt getLoggerR'RARtt namedtupleRRtcontextmanagerRRtobjectR!R)R_R^R+RbRR`tregistertProcessRRiRl(((s:/tmp/pip-build-kBFYxq/s3transfer/s3transfer/processpool.pytsp          0`/t