B _@sdZyddlmZWn ek r4ddlmZYnXddlmZddlmZm Z m Z m Z eddgddZ Gdd d eZ d S) )PyroAsynchronousActionManager) OrderedDict)attempt_import)AsynchronousActionManagerActionManagerError ActionHandle ActionStatusz pyutilib.pyropyu_pyro)Z alt_namescseZdZdfdd ZfddZddZd d Zd d Zd dZddZ ddZ ddZ d ddZ ddZ ddZddZddZZS)!rNrcsJ||_||_||_d|_i|_i|_i|_d|_tt | t |_ dS)NF) hostport_verbose_paused_paused_task_dict_dispatcher_name_to_client_dispatcher_proxies_last_extracted_ah_idsuperr__init__rresults)selfr r verbose) __class__;/tmp/pip-unpacked-wheel-d4p3hk07/pyomo/opt/parallel/pyro.pyr"sz&PyroAsynchronousActionManager.__init__cstt|t|_dS)z% Clear manager state N)rrclearrr)r)rrrr8sz#PyroAsynchronousActionManager.clearcCst|jr&tdt|jt|jft|jrLtdt|jt|jft|_d|_i|_x|j D] }| qlWi|_i|_ dS)zClose the manager.zEWARNING: %s is closing with %s local results waiting to be processed.zAWARNING: %s is closing with %s paused tasks waiting to be queued.FN) lenrprinttype__name__rrr rvaluescloser)rclientrrrr ?s   z#PyroAsynchronousActionManager.closecCs d|_dS)NT)r )rrrrpauseRsz#PyroAsynchronousActionManager.pausecCsNd|_t|jrDx2|jD](}|j|}|j|j||jdkdqWi|_dS)NF)r)r rrrZ add_tasksr )rdispatcher_namer!rrrunpauseUs    z%PyroAsynchronousActionManager.unpausecCs|j|jdS)N)rpopid)rahrrr get_results^sz)PyroAsynchronousActionManager.get_resultscsjj|}t|rNxRt|dkrJ|fdd|Dt|rqWnxjdkrdqPWdS)a& Wait for all actions to complete. The arguments to this method are expected to be ActionHandle objects or iterators that return ActionHandle objects. If no arguments are provided, then this method will terminate after all queued actions are complete. rcsg|]}|jjkr|qSr)r'r).0r()rrr lsz:PyroAsynchronousActionManager.wait_all..N)_flattenrdifference_update_download_resultsqueued_action_counter)rargsahsr)rrwait_allas  z&PyroAsynchronousActionManager.wait_allcGs|j|}t|r@xx|D]}|j|jkr|SqW|qWnvxt|jdkrZ|qBW|jjdd\}}||jkr||_|||j|<|jjdd\}}||j|<|j|SdS)NrF)last)r,rr'rr.popitemr event_handle)rr0r1r(Zah_idresultrrrwait_anyss"       z&PyroAsynchronousActionManager.wait_anycCs(x|j|jkrPq|qW||S)z< Wait for the specified action to complete. )r'rr.r))rr(rrrwait_fors   z&PyroAsynchronousActionManager.wait_forcCsT|dkrtj|j|jd}n tj|d}|j|jkrD|j|j||j|j<|S)N)r r ) dispatcher)r ZClientr r URIrr )rr9r!rrr_create_clients   z,PyroAsynchronousActionManager._create_clientc Os|dd}|dd}||}|j|f||}tj||j|d}|jr||jkrbt|j|<||j|kr~g|j||<|j|| |n |j |} | j ||j dk|d|st j|_|j|j||jd8_|S)N queue_namegenerate_responseT)datar'ZgenerateResponser#)rZ override_type)r&_get_dispatcher_name_get_task_datar ZTaskr'r rdictappendrZadd_taskr rdonestatusr5updater/) rr(r0kwdsr<r=r$Z task_dataZtaskr!rrr_perform_queues,      z,PyroAsynchronousActionManager._perform_queuecCstt|jddS)Nz: This method is abstract)NotImplementedErrorrr)rr<rrrr?sz2PyroAsynchronousActionManager._get_dispatcher_namecKstt|jddS)Nz: This method is abstract)rHrr)rr(rFrrrr@sz,PyroAsynchronousActionManager._get_task_datacCstt|jddS)Nz: This method is abstract)rHrr)rrrrr.sz/PyroAsynchronousActionManager._download_results)NNr)N)r __module__ __qualname__rrr r"r%r)r2r7r8r;rGr?r@r. __classcell__rr)rrr s   'rN)__all__ collectionsr ImportErrorZ ordereddictZpyomo.common.dependenciesrZpyomo.opt.parallel.managerrrrrr rrrrr s