B ’äî\Ï‹ã @sðdZddlZddlZddlZddlZddlZddlZddlZddl m Z ddl m Z ddl m Z ddl mZddlmZddlmZdd lmZdd lmZdd lmZdd lmZdd lmZddlmZddlmZe e¡ZdZe dddddddg¡Z e ddddddddg¡Z!ej"dd„ƒZ#dd„Z$Gdd „d e%ƒZ&Gd!d"„d"e%ƒZ'Gd#d$„d$eƒZ(Gd%d&„d&eƒZ)Gd'd(„d(e%ƒZ*Gd)d*„d*e%ƒZ+Gd+d,„d,e%ƒZ,Gd-d.„d.eƒZ-e- .d*e+¡Gd/d0„d0ej/ƒZ0Gd1d2„d2e0ƒZ1Gd3d4„d4e0ƒZ2dS)5aCSpeeds up S3 throughput by using processes Getting Started =============== The :class:`ProcessPoolDownloader` can be used to download a single file by calling :meth:`ProcessPoolDownloader.download_file`: .. code:: python from s3transfer.processpool import ProcessPoolDownloader with ProcessPoolDownloader() as downloader: downloader.download_file('mybucket', 'mykey', 'myfile') This snippet downloads the S3 object located in the bucket ``mybucket`` at the key ``mykey`` to the local file ``myfile``. Any errors encountered during the transfer are not propagated. To determine if a transfer succeeded or failed, use the `Futures`_ interface. The :class:`ProcessPoolDownloader` can be used to download multiple files as well: .. code:: python from s3transfer.processpool import ProcessPoolDownloader with ProcessPoolDownloader() as downloader: downloader.download_file('mybucket', 'mykey', 'myfile') downloader.download_file('mybucket', 'myotherkey', 'myotherfile') When running this snippet, the downloading of ``mykey`` and ``myotherkey`` happen in parallel. The first ``download_file`` call does not block the second ``download_file`` call. The snippet blocks when exiting the context manager and blocks until both downloads are complete. Alternatively, the ``ProcessPoolDownloader`` can be instantiated and explicitly be shutdown using :meth:`ProcessPoolDownloader.shutdown`: .. code:: python from s3transfer.processpool import ProcessPoolDownloader downloader = ProcessPoolDownloader() downloader.download_file('mybucket', 'mykey', 'myfile') downloader.download_file('mybucket', 'myotherkey', 'myotherfile') downloader.shutdown() For this code snippet, the call to ``shutdown`` blocks until both downloads are complete. Additional Parameters ===================== Additional parameters can be provided to the ``download_file`` method: * ``extra_args``: A dictionary containing any additional client arguments to include in the `GetObject `_ API request. For example: .. code:: python from s3transfer.processpool import ProcessPoolDownloader with ProcessPoolDownloader() as downloader: downloader.download_file( 'mybucket', 'mykey', 'myfile', extra_args={'VersionId': 'myversion'}) * ``expected_size``: By default, the downloader will make a HeadObject call to determine the size of the object. To opt-out of this additional API call, you can provide the size of the object in bytes: .. code:: python from s3transfer.processpool import ProcessPoolDownloader MB = 1024 * 1024 with ProcessPoolDownloader() as downloader: downloader.download_file( 'mybucket', 'mykey', 'myfile', expected_size=2 * MB) Futures ======= When ``download_file`` is called, it immediately returns a :class:`ProcessPoolTransferFuture`. The future can be used to poll the state of a particular transfer. To get the result of the download, call :meth:`ProcessPoolTransferFuture.result`. The method blocks until the transfer completes, whether it succeeds or fails. For example: .. code:: python from s3transfer.processpool import ProcessPoolDownloader with ProcessPoolDownloader() as downloader: future = downloader.download_file('mybucket', 'mykey', 'myfile') print(future.result()) If the download succeeds, the future returns ``None``: .. code:: python None If the download fails, the exception causing the failure is raised. For example, if ``mykey`` did not exist, the following error would be raised .. code:: python botocore.exceptions.ClientError: An error occurred (404) when calling the HeadObject operation: Not Found .. note:: :meth:`ProcessPoolTransferFuture.result` can only be called while the ``ProcessPoolDownloader`` is running (e.g. before calling ``shutdown`` or inside the context manager). Process Pool Configuration ========================== By default, the downloader has the following configuration options: * ``multipart_threshold``: The threshold size for performing ranged downloads in bytes. By default, ranged downloads happen for S3 objects that are greater than or equal to 8 MB in size. * ``multipart_chunksize``: The size of each ranged download in bytes. By default, the size of each ranged download is 8 MB. * ``max_request_processes``: The maximum number of processes used to download S3 objects. By default, the maximum is 10 processes. To change the default configuration, use the :class:`ProcessTransferConfig`: .. code:: python from s3transfer.processpool import ProcessPoolDownloader from s3transfer.processpool import ProcessTransferConfig config = ProcessTransferConfig( multipart_threshold=64 * 1024 * 1024, # 64 MB max_request_processes=50 ) downloader = ProcessPoolDownloader(config=config) Client Configuration ==================== The process pool downloader creates ``botocore`` clients on your behalf. In order to affect how the client is created, pass the keyword arguments that would have been used in the :meth:`botocore.Session.create_client` call: .. code:: python from s3transfer.processpool import ProcessPoolDownloader from s3transfer.processpool import ProcessTransferConfig downloader = ProcessPoolDownloader( client_kwargs={'region_name': 'us-west-2'}) This snippet ensures that all clients created by the ``ProcessPoolDownloader`` are using ``us-west-2`` as their region. éN)ÚMB)ÚALLOWED_DOWNLOAD_ARGS)ÚMAXINT)Ú BaseManager)ÚCancelledError)ÚRetriesExceededError)ÚBaseTransferFuture)ÚBaseTransferMeta)ÚS3_RETRYABLE_DOWNLOAD_ERRORS)Úcalculate_num_parts)Úcalculate_range_parameter)ÚOSUtils)ÚCallArgsZSHUTDOWNÚDownloadFileRequestÚ transfer_idÚbucketÚkeyÚfilenameÚ extra_argsÚ expected_sizeÚ GetObjectJobÚ temp_filenameÚoffsetccstƒ}dVt tj|¡dS)N)Ú"_add_ignore_handler_for_interruptsÚsignalÚSIGINT)Úoriginal_handler©rúy|j |jj¡Stk r8|j ¡| ¡‚YnXdS)N)rtÚpoll_for_resultrurrNÚ_connectÚcancel)r'rrrÚresultìs  z ProcessPoolTransferFuture.resultcCs|j |jjtƒ¡dS)N)rtÚnotify_exceptionrurr)r'rrrrzsz ProcessPoolTransferFuture.cancelN) r)r*r+r(Úpropertyr\rwr{rzrrrrr^×s  r^c@s<eZdZdZdd„Zedd„ƒZedd„ƒZedd „ƒZd S) r]z2Holds metadata about the ProcessPoolTransferFuturecCs||_||_i|_dS)N)Ú _transfer_idÚ _call_argsÚ _user_context)r'rrGrrrr( sz ProcessPoolTransferMeta.__init__cCs|jS)N)r)r'rrrrGsz!ProcessPoolTransferMeta.call_argscCs|jS)N)r~)r'rrrrsz#ProcessPoolTransferMeta.transfer_idcCs|jS)N)r€)r'rrrÚ user_contextsz$ProcessPoolTransferMeta.user_contextN) r)r*r+Ú__doc__r(r}rGrrrrrrr]s   r]c@seZdZddd„Zdd„ZdS)r-NcCs||_|jdkri|_dS)zèCreates S3 clients for processes Botocore sessions and clients are not pickleable so they cannot be inherited across Process boundaries. Instead, they must be instantiated once a process is running. N)Ú_client_kwargs)r'r=rrrr(s zClientFactory.__init__cCstj ¡jd|jŽS)zCreate a botocore S3 clientÚs3)r„)ÚbotocoreÚsessionÚSessionÚ create_clientrƒ)r'rrrrˆ's zClientFactory.create_client)N)r)r*r+r(rˆrrrrr-s r-c@s\eZdZdd„Zdd„Zdd„Zdd„Zd d „Zd d „Zd d„Z dd„Z dd„Z dd„Z dS)racCsi|_d|_t ¡|_dS)aAMonitors transfers for cross-proccess communication Notifications can be sent to the monitor and information can be retrieved from the monitor for a particular transfer. This abstraction is ran in a ``multiprocessing.managers.BaseManager`` in order to be shared across processes. rN)Ú_transfer_statesÚ _id_countr6r7Ú _init_lock)r'rrrr(.s zTransferMonitor.__init__c Cs6|j&|j}tƒ|j|<|jd7_|SQRXdS)Né)r‹rŠÚ TransferStater‰)r'rrrrrA>s  z#TransferMonitor.notify_new_transfercCs |j|jS)z¦Determine a particular transfer is complete :param transfer_id: Unique identifier for the transfer :return: True, if done. False, otherwise. )r‰rw)r'rrrrrvEszTransferMonitor.is_donecCs|j| ¡dS)zqNotify a particular transfer is complete :param transfer_id: Unique identifier for the transfer N)r‰Úset_done)r'rrrrÚ notify_doneMszTransferMonitor.notify_donecCs&|j| ¡|j|j}|r"|‚dS)aPoll for the result of a transfer :param transfer_id: Unique identifier for the transfer :return: If the transfer succeeded, it will return the result. If the transfer failed, it will raise the exception associated to the failure. N)r‰Úwait_till_doneÚ exception)r'rr‘rrrrxTs  zTransferMonitor.poll_for_resultcCs||j|_dS)zÁNotify an exception was encountered for a transfer :param transfer_id: Unique identifier for the transfer :param exception: The exception encountered for that transfer N)r‰r‘)r'rr‘rrrr|bs z TransferMonitor.notify_exceptioncCs&x |j ¡D]}|js tƒ|_q WdS)N)r‰Úvaluesrwrr‘)r'Ztransfer_staterrrrOnsz-TransferMonitor.notify_cancel_all_in_progresscCs |j|jS)zûRetrieve the exception encountered for the transfer :param transfer_id: Unique identifier for the transfer :return: The exception encountered for that transfer. Otherwise if there were no exceptions, returns None. )r‰r‘)r'rrrrÚ get_exceptionsszTransferMonitor.get_exceptioncCs||j|_dS)z¿Notify the amount of jobs expected for a transfer :param transfer_id: Unique identifier for the transfer :param num_jobs: The number of jobs to complete the transfer N)r‰Újobs_to_complete)r'rZnum_jobsrrrÚ notify_expected_jobs_to_complete|sz0TransferMonitor.notify_expected_jobs_to_completecCs|j| ¡S)zÄNotify that a single job is completed for a transfer :param transfer_id: Unique identifier for the transfer :return: The number of jobs remaining to complete the transfer )r‰Údecrement_jobs_to_complete)r'rrrrÚnotify_job_complete„sz#TransferMonitor.notify_job_completeN) r)r*r+r(rArvrrxr|rOr“r•r—rrrrra-s  rac@speZdZdZdd„Zedd„ƒZdd„Zdd „Zed d „ƒZ e j d d „ƒZ ed d„ƒZ e j dd„ƒZ dd„Z dS)rz6Represents the current state of an individual transfercCs$d|_t ¡|_t ¡|_d|_dS)Nr)Ú _exceptionr6ÚEventÚ _done_eventr7Ú _job_lockÚ_jobs_to_complete)r'rrrr(–s  zTransferState.__init__cCs |j ¡S)N)ršÚis_set)r'rrrrwœszTransferState.donecCs|j ¡dS)N)ršÚset)r'rrrrŽ szTransferState.set_donecCs|j t¡dS)N)ršÚwaitr)r'rrrr£szTransferState.wait_till_donecCs|jS)N)r˜)r'rrrr‘¦szTransferState.exceptioncCs ||_dS)N)r˜)r'Úvalrrrr‘ªscCs|jS)N)rœ)r'rrrr”®szTransferState.jobs_to_completecCs ||_dS)N)rœ)r'r rrrr”²sc Cs&|j|jd8_|jSQRXdS)NrŒ)r›rœ)r'rrrr–¶sz(TransferState.decrement_jobs_to_completeN) r)r*r+r‚r(r}rwrŽrr‘Úsetterr”r–rrrrrs   rc@s eZdZdS)r_N)r)r*r+rrrrr_¼sr_cs,eZdZ‡fdd„Zdd„Zdd„Z‡ZS)ÚBaseS3TransferProcesscstt|ƒ ¡||_d|_dS)N)Úsuperr¢r(r.Ú_client)r'rc)Ú __class__rrr(ÄszBaseS3TransferProcess.__init__c Cs*|j ¡|_tƒ| ¡WdQRXdS)N)r.rˆr¤rÚ_do_run)r'rrrÚrunÉs  zBaseS3TransferProcess.runcCs tdƒ‚dS)Nz _do_run())ÚNotImplementedError)r'rrrr¦ÚszBaseS3TransferProcess._do_run)r)r*r+r(r§r¦Ú __classcell__rr)r¥rr¢Ãs r¢cs\eZdZ‡fdd„Zdd„Zdd„Zdd„Zd d „Zd d „Zd d„Z dd„Z dd„Z ‡Z S)rhcs2tt|ƒ |¡||_||_||_||_||_dS)aTSubmit GetObjectJobs to fulfill a download file request :param transfer_config: Configuration for transfers. :param client_factory: ClientFactory for creating S3 clients. :param transfer_monitor: Monitor for notifying and retrieving state of transfer. :param osutil: OSUtils object to use for os-related behavior when performing the transfer. :param download_request_queue: Queue to retrieve download file requests. :param worker_queue: Queue to submit GetObjectJobs for workers to perform. N)r£rhr(r/r:r4r2r3)r'rbrcrdrerfrg)r¥rrr(ßs zGetObjectSubmitter.__init__c CsŒx†|j ¡}|tkr"t d¡dSy| |¡Wqtk r‚}z4tjd||dd|j |j |¡|j  |j ¡Wdd}~XYqXqWdS)Nz#Submitter shutdown signal received.zFException caught when submitting jobs for download file request %s: %sT)Úexc_info) r2ÚgetrsrBrCÚ_submit_get_object_jobsÚ Exceptionr:r|rr)r'rFÚerrrr¦ös   zGetObjectSubmitter._do_runcCsB| |¡}| ||¡}||jjkr0| ||¡n| |||¡dS)N)Ú _get_sizeÚ_allocate_temp_filer/r$Ú_submit_single_get_object_jobÚ_submit_ranged_get_object_jobs)r'rFÚsizerrrrr¬s    z*GetObjectSubmitter._submit_get_object_jobscCs4|j}|dkr0|jjf|j|jdœ|j—Žd}|S)N)ÚBucketÚKeyZ ContentLength)rr¤Z head_objectrrr)r'rFrrrrr¯szGetObjectSubmitter._get_sizecCs |j |j¡}|j ||¡|S)N)r4Zget_temp_filenamerÚallocate)r'rFr³rrrrr°sz&GetObjectSubmitter._allocate_temp_filec Cs4| |jd¡|j|j|j|j|d|j|jddS)NrŒr)rrrrrrr)Ú_notify_jobs_to_completerÚ_submit_get_object_jobrrrr)r'rFrrrrr±"s z0GetObjectSubmitter._submit_single_get_object_jobc Cs~|jj}t||ƒ}| |j|¡xXt|ƒD]L}||}t|||ƒ}d|i} |  |j¡|j |j|j |j ||| |j dq*WdS)NZRange)rrrrrrr) r/r%r r·rrjr Úupdaterr¸rrr) r'rFrr³Z part_sizeZ num_partsÚirZrange_parameterZget_object_kwargsrrrr²0s$    z1GetObjectSubmitter._submit_ranged_get_object_jobscKs|j tf|Ž¡dS)N)r3rDr)r'Zget_object_job_kwargsrrrr¸Fsz)GetObjectSubmitter._submit_get_object_jobcCs t d||¡|j ||¡dS)Nz3Notifying %s job(s) to complete for transfer_id %s.)rBrCr:r•)r'rr”rrrr·Is z+GetObjectSubmitter._notify_jobs_to_complete) r)r*r+r(r¦r¬r¯r°r±r²r¸r·r©rr)r¥rrhÞs   rhcsXeZdZdZdeZ‡fdd„Zdd„Zdd„Zd d „Z d d „Z d d„Z dd„Z ‡Z S)rkéécs,tt|ƒ |¡||_||_||_||_dS)aüFulfills GetObjectJobs Downloads the S3 object, writes it to the specified file, and renames the file to its final location if it completes the final job for a particular transfer. :param queue: Queue for retrieving GetObjectJob's :param client_factory: ClientFactory for creating S3 clients :param transfer_monitor: Monitor for notifying :param osutil: OSUtils object to use for os-related behavior when performing the transfer. N)r£rkr(Ú_queuer.r:r4)r'rircrdre)r¥rrr(Xs zGetObjectWorker.__init__cCs†x€|j ¡}|tkr"t d¡dS|j |j¡s<| |¡n t d|¡|j  |j¡}t d||j¡|s|  |j|j |j ¡qWdS)Nz Worker shutdown signal received.zBSkipping get object job %s because there was a previous exception.z%%s jobs remaining for transfer_id %s.) r½r«rsrBrCr:r“rÚ_run_get_object_jobr—Ú_finalize_downloadrr)r'ÚjobÚ remainingrrrr¦ks&  zGetObjectWorker._do_runc Csly"|j|j|j|j|j|jdWnDtk rf}z&tjd||dd|j   |j |¡Wdd}~XYnXdS)N)rrrrrzBException caught when downloading object for get object job %s: %sT)rª) Ú_do_get_objectrrrrrr­rBrCr:r|r)r'rÀr®rrrr¾‚s z#GetObjectWorker._run_get_object_jobc Cs”d}x‚t|jƒD]t}y.|jjf||dœ|—Ž}| |||d¡dStk r‚} z"tjd| |d|jdd| }Wdd} ~ XYqXqWt|ƒ‚dS)N)r´rµZBodyzCRetrying exception caught (%s), retrying request, (attempt %s / %s)rŒT)rª) rjÚ _MAX_ATTEMPTSr¤Z get_objectÚ_write_to_filer rBrCr) r'rrrrrZlast_exceptionrºÚresponser®rrrrÂs zGetObjectWorker._do_get_objectc sPt|dƒ<}| |¡t‡‡fdd„dƒ}x|D]}| |¡q0WWdQRXdS)Nzrb+cs ˆ ˆj¡S)N)ÚreadÚ _IO_CHUNKSIZEr)Úbodyr'rrÚ¡óz0GetObjectWorker._write_to_file..rÊ)ÚopenÚseekÚiterÚwrite)r'rrrÈÚfÚchunksÚchunkr)rÈr'rrÄžs    zGetObjectWorker._write_to_filecCs8|j |¡r|j |¡n| |||¡|j |¡dS)N)r:r“r4Ú remove_fileÚ_do_file_renamer)r'rrrrrrr¿¥s z"GetObjectWorker._finalize_downloadc CsTy|j ||¡Wn<tk rN}z|j ||¡|j |¡Wdd}~XYnXdS)N)r4Z rename_filer­r:r|rÒ)r'rrrr®rrrrÓ¬s zGetObjectWorker._do_file_rename)r)r*r+rÃrrÇr(r¦r¾rÂrÄr¿rÓr©rr)r¥rrkRs  rk)3r‚Ú collectionsÚ contextlibÚloggingr0r6rZbotocore.sessionr…Zs3transfer.constantsrrZs3transfer.compatrrZs3transfer.exceptionsrrZs3transfer.futuresrr Zs3transfer.utilsr r r r rÚ getLoggerr)rBrsÚ namedtuplerrÚcontextmanagerrrÚobjectr!r,r^r]r-rarr_ÚregisterÚProcessr¢rhrkrrrrÚÂsl              90`/ t