o d@sdZddlZddlZddlZddlZddlZddlZddlmZddl Z ddl m Z ddl mZmZddlmZmZmZddlmZmZddlmZmZdd lmZmZmZmZmZe e!Z"d Z#e$d gd Z%e$d gdZ&ej'ddZ(ddZ)GdddZ*GdddZ+GdddeZ,GdddeZ-GdddZ.GdddZ/Gdd d Z0Gd!d"d"eZ1e12de/Gd#d$d$ej3Z4Gd%d&d&e4Z5Gd'd(d(e4Z6dS))aCSpeeds up S3 throughput by using processes Getting Started =============== The :class:`ProcessPoolDownloader` can be used to download a single file by calling :meth:`ProcessPoolDownloader.download_file`: .. code:: python from s3transfer.processpool import ProcessPoolDownloader with ProcessPoolDownloader() as downloader: downloader.download_file('mybucket', 'mykey', 'myfile') This snippet downloads the S3 object located in the bucket ``mybucket`` at the key ``mykey`` to the local file ``myfile``. Any errors encountered during the transfer are not propagated. To determine if a transfer succeeded or failed, use the `Futures`_ interface. The :class:`ProcessPoolDownloader` can be used to download multiple files as well: .. code:: python from s3transfer.processpool import ProcessPoolDownloader with ProcessPoolDownloader() as downloader: downloader.download_file('mybucket', 'mykey', 'myfile') downloader.download_file('mybucket', 'myotherkey', 'myotherfile') When running this snippet, the downloading of ``mykey`` and ``myotherkey`` happen in parallel. The first ``download_file`` call does not block the second ``download_file`` call. The snippet blocks when exiting the context manager and blocks until both downloads are complete. Alternatively, the ``ProcessPoolDownloader`` can be instantiated and explicitly be shutdown using :meth:`ProcessPoolDownloader.shutdown`: .. code:: python from s3transfer.processpool import ProcessPoolDownloader downloader = ProcessPoolDownloader() downloader.download_file('mybucket', 'mykey', 'myfile') downloader.download_file('mybucket', 'myotherkey', 'myotherfile') downloader.shutdown() For this code snippet, the call to ``shutdown`` blocks until both downloads are complete. Additional Parameters ===================== Additional parameters can be provided to the ``download_file`` method: * ``extra_args``: A dictionary containing any additional client arguments to include in the `GetObject `_ API request. For example: .. code:: python from s3transfer.processpool import ProcessPoolDownloader with ProcessPoolDownloader() as downloader: downloader.download_file( 'mybucket', 'mykey', 'myfile', extra_args={'VersionId': 'myversion'}) * ``expected_size``: By default, the downloader will make a HeadObject call to determine the size of the object. To opt-out of this additional API call, you can provide the size of the object in bytes: .. code:: python from s3transfer.processpool import ProcessPoolDownloader MB = 1024 * 1024 with ProcessPoolDownloader() as downloader: downloader.download_file( 'mybucket', 'mykey', 'myfile', expected_size=2 * MB) Futures ======= When ``download_file`` is called, it immediately returns a :class:`ProcessPoolTransferFuture`. The future can be used to poll the state of a particular transfer. To get the result of the download, call :meth:`ProcessPoolTransferFuture.result`. The method blocks until the transfer completes, whether it succeeds or fails. For example: .. code:: python from s3transfer.processpool import ProcessPoolDownloader with ProcessPoolDownloader() as downloader: future = downloader.download_file('mybucket', 'mykey', 'myfile') print(future.result()) If the download succeeds, the future returns ``None``: .. code:: python None If the download fails, the exception causing the failure is raised. For example, if ``mykey`` did not exist, the following error would be raised .. code:: python botocore.exceptions.ClientError: An error occurred (404) when calling the HeadObject operation: Not Found .. note:: :meth:`ProcessPoolTransferFuture.result` can only be called while the ``ProcessPoolDownloader`` is running (e.g. before calling ``shutdown`` or inside the context manager). Process Pool Configuration ========================== By default, the downloader has the following configuration options: * ``multipart_threshold``: The threshold size for performing ranged downloads in bytes. By default, ranged downloads happen for S3 objects that are greater than or equal to 8 MB in size. * ``multipart_chunksize``: The size of each ranged download in bytes. By default, the size of each ranged download is 8 MB. * ``max_request_processes``: The maximum number of processes used to download S3 objects. By default, the maximum is 10 processes. To change the default configuration, use the :class:`ProcessTransferConfig`: .. code:: python from s3transfer.processpool import ProcessPoolDownloader from s3transfer.processpool import ProcessTransferConfig config = ProcessTransferConfig( multipart_threshold=64 * 1024 * 1024, # 64 MB max_request_processes=50 ) downloader = ProcessPoolDownloader(config=config) Client Configuration ==================== The process pool downloader creates ``botocore`` clients on your behalf. In order to affect how the client is created, pass the keyword arguments that would have been used in the :meth:`botocore.Session.create_client` call: .. code:: python from s3transfer.processpool import ProcessPoolDownloader from s3transfer.processpool import ProcessTransferConfig downloader = ProcessPoolDownloader( client_kwargs={'region_name': 'us-west-2'}) This snippet ensures that all clients created by the ``ProcessPoolDownloader`` are using ``us-west-2`` as their region. N)deepcopy)Config)MAXINT BaseManager)ALLOWED_DOWNLOAD_ARGSMBPROCESS_USER_AGENT)CancelledErrorRetriesExceededError)BaseTransferFutureBaseTransferMeta)S3_RETRYABLE_DOWNLOAD_ERRORSCallArgsOSUtilscalculate_num_partscalculate_range_parameterSHUTDOWNDownloadFileRequest transfer_idbucketkeyfilename extra_args expected_size GetObjectJob)rrr temp_filenameroffsetrccs t}dVttj|dSN)"_add_ignore_handler_for_interruptssignalSIGINT)Zoriginal_handlerr"q/private/var/folders/v1/_jykv66s6qd26_69j1njbrl80000gr/T/pip-target-p1gutpg6/lib/python/s3transfer/processpool.py ignore_ctrl_csr$cCsttjtjSr)r r!SIG_IGNr"r"r"r#rsrc@s$eZdZdededfddZdS)ProcessTransferConfig cCs||_||_||_dS)auConfiguration for the ProcessPoolDownloader :param multipart_threshold: The threshold for which ranged downloads occur. :param multipart_chunksize: The chunk size of each ranged download. :param max_request_processes: The maximum number of processes that will be making S3 API transfer-related requests at a time. N)multipart_thresholdmultipart_chunksizemax_request_processes)selfr)r*r+r"r"r#__init__s zProcessTransferConfig.__init__N)__name__ __module__ __qualname__rr-r"r"r"r#r& s r&c@seZdZd$ddZ d$ddZddZdd Zd d Zd d ZddZ ddZ ddZ ddZ ddZ ddZddZddZddZd d!Zd"d#ZdS)%ProcessPoolDownloaderNcCsx|duri}t||_||_|durt|_td|_td|_t|_ d|_ t |_ d|_d|_d|_g|_dS)aDownloads S3 objects using process pools :type client_kwargs: dict :param client_kwargs: The keyword arguments to provide when instantiating S3 clients. The arguments must match the keyword arguments provided to the `botocore.session.Session.create_client()` method. :type config: ProcessTransferConfig :param config: Configuration for the downloader NiF) ClientFactory_client_factory_transfer_configr&multiprocessingQueue_download_request_queue _worker_queuer_osutil_started threadingLock _start_lock_manager_transfer_monitor _submitter_workers)r, client_kwargsconfigr"r"r#r-$s      zProcessPoolDownloader.__init__c Csv||dur i}|||j}t||||||d}td||j|t |||||d}| ||} | S)asDownloads the object's contents to a file :type bucket: str :param bucket: The name of the bucket to download from :type key: str :param key: The name of the key to download from :type filename: str :param filename: The name of a file to download to. :type extra_args: dict :param extra_args: Extra arguments that may be passed to the client operation :type expected_size: int :param expected_size: The expected size in bytes of the download. If provided, the downloader will not call HeadObject to determine the object's size and use the provided value instead. The size is needed to determine whether to do a multipart download. :rtype: s3transfer.futures.TransferFuture :returns: Transfer future representing the download Nrz%Submitting download file request: %s.)rrrrr) _start_if_needed_validate_all_known_argsr?notify_new_transferrloggerdebugr7putr_get_transfer_future) r,rrrrrrdownload_file_request call_argsfuturer"r"r# download_fileEs4    z#ProcessPoolDownloader.download_filecCs |dS)zhShutdown the downloader It will wait till all downloads are complete before returning. N)_shutdown_if_neededr,r"r"r#shutdown{s zProcessPoolDownloader.shutdowncCs|Srr"rPr"r"r# __enter__szProcessPoolDownloader.__enter__cGs*t|tr|jdur|j|dSr) isinstanceKeyboardInterruptr?notify_cancel_all_in_progressrQ)r,exc_type exc_valueargsr"r"r#__exit__s    zProcessPoolDownloader.__exit__cCsJ|j|js|WddSWddS1swYdSr)r=r:_startrPr"r"r#rD  "z&ProcessPoolDownloader._start_if_neededcC"|||d|_dS)NT)_start_transfer_monitor_manager_start_submitter_start_get_object_workersr:rPr"r"r#rZ zProcessPoolDownloader._startcCs4|D]}|tvrdt}td|d|qdS)Nz, zInvalid extra_args key 'z', must be one of: )rjoin ValueError)r,providedkwargZ download_argsr"r"r#rEs z.ProcessPoolDownloader._validate_all_known_argscCst||d}t|j|d}|S)N)rLr)monitormeta)ProcessPoolTransferMetaProcessPoolTransferFuturer?)r,rrLrfrMr"r"r#rJsz*ProcessPoolDownloader._get_transfer_futurecCs.tdt|_|jt|j|_dS)Nz$Starting the TransferMonitorManager.)rGrHTransferMonitorManagerr>startrTransferMonitorr?rPr"r"r#r]s  z5ProcessPoolDownloader._start_transfer_monitor_managercCs:tdt|j|j|j|j|j|jd|_ |j dS)Nz Starting the GetObjectSubmitter.)transfer_configclient_factorytransfer_monitorosutildownload_request_queue worker_queue) rGrHGetObjectSubmitterr4r3r?r9r7r8r@rjrPr"r"r#r^s z&ProcessPoolDownloader._start_submittercCsRtd|jjt|jjD]}t|j|j|j|j d}| |j |qdS)NzStarting %s GetObjectWorkers.)queuermrnro) rGrHr4r+rangeGetObjectWorkerr8r3r?r9rjrAappendr,_Zworkerr"r"r#r_sz/ProcessPoolDownloader._start_get_object_workerscCsJ|j|jr|WddSWddS1swYdSr)r=r: _shutdownrPr"r"r#rOr[z)ProcessPoolDownloader._shutdown_if_neededcCr\)NF)_shutdown_submitter_shutdown_get_object_workers"_shutdown_transfer_monitor_managerr:rPr"r"r#ryr`zProcessPoolDownloader._shutdowncCstd|jdS)Nz)Shutting down the TransferMonitorManager.)rGrHr>rQrPr"r"r#r|s z8ProcessPoolDownloader._shutdown_transfer_monitor_managercCs$td|jt|jdS)Nz%Shutting down the GetObjectSubmitter.)rGrHr7rISHUTDOWN_SIGNALr@rarPr"r"r#rzs  z)ProcessPoolDownloader._shutdown_submittercCs:td|jD]}|jtq|jD]}|qdS)Nz#Shutting down the GetObjectWorkers.)rGrHrAr8rIr}rarwr"r"r#r{s    z2ProcessPoolDownloader._shutdown_get_object_workers)NN)r.r/r0r-rNrQrRrYrDrZrErJr]r^r_rOryr|rzr{r"r"r"r#r1#s& " 6     r1c@s8eZdZddZeddZddZddZd d Zd S) rhcCs||_||_dS)a`The future associated to a submitted process pool transfer request :type monitor: TransferMonitor :param monitor: The monitor associated to the process pool downloader :type meta: ProcessPoolTransferMeta :param meta: The metadata associated to the request. This object is visible to the requester. N)_monitor_meta)r,rerfr"r"r#r-s z"ProcessPoolTransferFuture.__init__cC|jSr)rrPr"r"r#rfzProcessPoolTransferFuture.metacCs|j|jjSr)r~is_donerrrPr"r"r#donezProcessPoolTransferFuture.donecCs6z |j|jjWSty|j|wr)r~poll_for_resultrrrT_connectcancelrPr"r"r#results  z ProcessPoolTransferFuture.resultcCs|j|jjtdSr)r~notify_exceptionrrr rPr"r"r#rs z ProcessPoolTransferFuture.cancelN) r.r/r0r-propertyrfrrrr"r"r"r#rhs  rhc@s<eZdZdZddZeddZeddZedd Zd S) rgz2Holds metadata about the ProcessPoolTransferFuturecCs||_||_i|_dSr) _transfer_id _call_args _user_context)r,rrLr"r"r#r-s z ProcessPoolTransferMeta.__init__cCrr)rrPr"r"r#rL!rz!ProcessPoolTransferMeta.call_argscCrr)rrPr"r"r#r%rz#ProcessPoolTransferMeta.transfer_idcCrr)rrPr"r"r# user_context)rz$ProcessPoolTransferMeta.user_contextN) r.r/r0__doc__r-rrLrrr"r"r"r#rgs  rgc@seZdZdddZddZdS)r2NcCsX||_|jdur i|_t|jdt}|jst|_n |jdt7_||jd<dS)zCreates S3 clients for processes Botocore sessions and clients are not pickleable so they cannot be inherited across Process boundaries. Instead, they must be instantiated once a process is running. NrC )_client_kwargsrgetrZuser_agent_extrar)r,rBZ client_configr"r"r#r-/s zClientFactory.__init__cCstjj di|jS)zCreate a botocore S3 clients3N)r)botocoresessionSession create_clientrrPr"r"r#rAs zClientFactory.create_clientr)r.r/r0r-rr"r"r"r#r2.s  r2c@s\eZdZddZddZddZddZd d Zd d Zd dZ ddZ ddZ ddZ dS)rkcCsi|_d|_t|_dS)a@Monitors transfers for cross-process communication Notifications can be sent to the monitor and information can be retrieved from the monitor for a particular transfer. This abstraction is ran in a ``multiprocessing.managers.BaseManager`` in order to be shared across processes. rN)_transfer_states _id_countr;r< _init_lockrPr"r"r#r-Is zTransferMonitor.__init__cCsN|j|j}t|j|<|jd7_|WdS1s wYdSN)rr TransferStaterr,rr"r"r#rFYs  $z#TransferMonitor.notify_new_transfercC |j|jS)zDetermine a particular transfer is complete :param transfer_id: Unique identifier for the transfer :return: True, if done. False, otherwise. )rrrr"r"r#r`s zTransferMonitor.is_donecCs|j|dS)zqNotify a particular transfer is complete :param transfer_id: Unique identifier for the transfer N)rset_donerr"r"r# notify_donehszTransferMonitor.notify_donecCs&|j||j|j}|r|dS)aPoll for the result of a transfer :param transfer_id: Unique identifier for the transfer :return: If the transfer succeeded, it will return the result. If the transfer failed, it will raise the exception associated to the failure. N)rwait_till_done exceptionr,rrr"r"r#ros  zTransferMonitor.poll_for_resultcC||j|_dS)zNotify an exception was encountered for a transfer :param transfer_id: Unique identifier for the transfer :param exception: The exception encountered for that transfer Nrrrr"r"r#r}s z TransferMonitor.notify_exceptioncCs"|jD] }|jst|_qdSr)rvaluesrr r)r,Ztransfer_stater"r"r#rUs z-TransferMonitor.notify_cancel_all_in_progresscCr)zRetrieve the exception encountered for the transfer :param transfer_id: Unique identifier for the transfer :return: The exception encountered for that transfer. Otherwise if there were no exceptions, returns None. rrr"r"r# get_exceptions zTransferMonitor.get_exceptioncCr)zNotify the amount of jobs expected for a transfer :param transfer_id: Unique identifier for the transfer :param num_jobs: The number of jobs to complete the transfer N)rjobs_to_complete)r,rZnum_jobsr"r"r# notify_expected_jobs_to_completesz0TransferMonitor.notify_expected_jobs_to_completecCs|j|S)zNotify that a single job is completed for a transfer :param transfer_id: Unique identifier for the transfer :return: The number of jobs remaining to complete the transfer )rdecrement_jobs_to_completerr"r"r#notify_job_completesz#TransferMonitor.notify_job_completeN) r.r/r0r-rFrrrrrUrrrr"r"r"r#rkHs  rkc@speZdZdZddZeddZddZdd Zed d Z e j d d Z ed dZ e j ddZ ddZ dS)rz6Represents the current state of an individual transfercCs$d|_t|_t|_d|_dS)Nr) _exceptionr;Event _done_eventr< _job_lock_jobs_to_completerPr"r"r#r-s   zTransferState.__init__cCs |jSr)ris_setrPr"r"r#r zTransferState.donecCs|jdSr)rsetrPr"r"r#rszTransferState.set_donecCs|jtdSr)rwaitrrPr"r"r#rrzTransferState.wait_till_donecCrrrrPr"r"r#rrzTransferState.exceptioncC ||_dSrrr,valr"r"r#rrcCrrrrPr"r"r#rrzTransferState.jobs_to_completecCrrrrr"r"r#rrcCs>|j|jd8_|jWdS1swYdSr)rrrPr"r"r#rs$z(TransferState.decrement_jobs_to_completeN) r.r/r0rr-rrrrrsetterrrr"r"r"r#rs        rc@s eZdZdS)riN)r.r/r0r"r"r"r#risrics,eZdZfddZddZddZZS)BaseS3TransferProcesscst||_d|_dSr)superr-r3_client)r,rm __class__r"r#r-s  zBaseS3TransferProcess.__init__cCs@|j|_t |WddS1swYdSr)r3rrr$_do_runrPr"r"r#runs  "zBaseS3TransferProcess.runcCstd)Nz _do_run())NotImplementedErrorrPr"r"r#rszBaseS3TransferProcess._do_run)r.r/r0r-rr __classcell__r"r"rr#rs rcs\eZdZfddZddZddZddZd d Zd d Zd dZ ddZ ddZ Z S)rrcs.t|||_||_||_||_||_dS)aTSubmit GetObjectJobs to fulfill a download file request :param transfer_config: Configuration for transfers. :param client_factory: ClientFactory for creating S3 clients. :param transfer_monitor: Monitor for notifying and retrieving state of transfer. :param osutil: OSUtils object to use for os-related behavior when performing the transfer. :param download_request_queue: Queue to retrieve download file requests. :param worker_queue: Queue to submit GetObjectJobs for workers to perform. N)rr-r4r?r9r7r8)r,rlrmrnrorprqrr"r#r-s  zGetObjectSubmitter.__init__c Cs |j}|tkrtddSz||Wn*tyB}ztjd||dd|j|j ||j |j WYd}~nd}~wwq)NTz#Submitter shutdown signal received.zFException caught when submitting jobs for download file request %s: %sexc_info) r7rr}rGrH_submit_get_object_jobs Exceptionr?rrr)r,rKer"r"r#rs.  zGetObjectSubmitter._do_runcCsD||}|||}||jjkr|||dS||||dSr) _get_size_allocate_temp_filer4r)_submit_single_get_object_job_submit_ranged_get_object_jobsr,rKsizerr"r"r#r/s   z*GetObjectSubmitter._submit_get_object_jobscCs4|j}|dur|jjd|j|jd|jd}|S)NZBucketKeyZ ContentLengthr")rrZ head_objectrrr)r,rKrr"r"r#r;szGetObjectSubmitter._get_sizecCs |j|j}|j|||Sr)r9Zget_temp_filenamerallocaterr"r"r#rEs z&GetObjectSubmitter._allocate_temp_filec Cs4||jd|j|j|j|j|d|j|jddS)Nrrrrrrrrr)_notify_jobs_to_completer_submit_get_object_jobrrrr)r,rKrr"r"r#rLs z0GetObjectSubmitter._submit_single_get_object_jobc Csz|jj}t||}||j|t|D]&}||}t|||}d|i} | |j|j |j|j |j ||| |j dqdS)NRanger) r4r*rrrrtrupdaterrrrr) r,rKrrZ part_sizeZ num_partsirZrange_parameterZget_object_kwargsr"r"r#rZs,   z1GetObjectSubmitter._submit_ranged_get_object_jobscKs|jtdi|dS)Nr")r8rIr)r,Zget_object_job_kwargsr"r"r#rssz)GetObjectSubmitter._submit_get_object_jobcCs td|||j||dS)Nz3Notifying %s job(s) to complete for transfer_id %s.)rGrHr?r)r,rrr"r"r#rvsz+GetObjectSubmitter._notify_jobs_to_complete) r.r/r0r-rrrrrrrrrr"r"rr#rrs   rrcsXeZdZdZdeZfddZddZddZd d Z d d Z d dZ ddZ Z S)rucs(t|||_||_||_||_dS)aFulfills GetObjectJobs Downloads the S3 object, writes it to the specified file, and renames the file to its final location if it completes the final job for a particular transfer. :param queue: Queue for retrieving GetObjectJob's :param client_factory: ClientFactory for creating S3 clients :param transfer_monitor: Monitor for notifying :param osutil: OSUtils object to use for os-related behavior when performing the transfer. N)rr-_queuer3r?r9)r,rsrmrnrorr"r#r-s  zGetObjectWorker.__init__cCs |j}|tkrtddS|j|js||ntd||j |j}td||j|s?| |j|j |j q)NTz Worker shutdown signal received.zBSkipping get object job %s because there was a previous exception.z%%s jobs remaining for transfer_id %s.) rrr}rGrHr?rr_run_get_object_jobr_finalize_downloadrr)r,job remainingr"r"r#rs0    zGetObjectWorker._do_runc Csnz|j|j|j|j|j|jdWdSty6}ztjd||dd|j |j |WYd}~dSd}~ww)N)rrrrrzBException caught when downloading object for get object job %s: %sTr) _do_get_objectrrrrrrrGrHr?rr)r,rrr"r"r#rs$ z#GetObjectWorker._run_get_object_jobc Csd}t|jD]<}z|jjd||d|}||||dWdStyC} ztjd| |d|jdd| }WYd} ~ qd} ~ wwt|)NrZBodyzCRetrying exception caught (%s), retrying request, (attempt %s / %s)rTrr") rt _MAX_ATTEMPTSrZ get_object_write_to_filer rGrHr ) r,rrrrrZlast_exceptionrresponserr"r"r#rs, zGetObjectWorker._do_get_objectcsbt|d"}||tfddd}|D]}||qWddS1s*wYdS)Nzrb+cs jSr)read _IO_CHUNKSIZEr"bodyr,r"r#s z0GetObjectWorker._write_to_file..)openseekiterwrite)r,rrrfchunkschunkr"rr#rs   "zGetObjectWorker._write_to_filecCs8|j|r |j|n|||||j|dSr)r?rr9 remove_file_do_file_renamer)r,rrrr"r"r#rs z"GetObjectWorker._finalize_downloadc CsVz |j||WdSty*}z|j|||j|WYd}~dSd}~wwr)r9Z rename_filerr?rr)r,rrrrr"r"r#rszGetObjectWorker._do_file_rename)r.r/r0rrrr-rrrrrrrr"r"rr#rus ru)7r collections contextlibloggingr5r r;copyrZbotocore.sessionrZbotocore.configrZs3transfer.compatrrZs3transfer.constantsrrrZs3transfer.exceptionsr r Zs3transfer.futuresr r Zs3transfer.utilsr rrrr getLoggerr.rGr} namedtuplerrcontextmanagerr$rr&r1rhrgr2rkrriregisterProcessrrrrur"r"r"r#sV 7    G0`0