B u `@sxdZyddlmZWn ek r4ddlmZYnXddlmZddlmZm Z eddgddZ Gdd d eZ d S) )PyroAsynchronousActionManager) OrderedDict)attempt_import)AsynchronousActionManager ActionStatusz pyutilib.pyropyu_pyro)Z alt_namescseZdZdfdd ZfddZddZd d Zd d Zd dZddZ ddZ ddZ d ddZ ddZ ddZddZddZZS)!rNrcsJ||_||_||_d|_i|_i|_i|_d|_tt | t |_ dS)NF) hostport_verbose_paused_paused_task_dict_dispatcher_name_to_client_dispatcher_proxies_last_extracted_ah_idsuperr__init__rresults)selfrr verbose) __class__;/tmp/pip-unpacked-wheel-n62dbgi3/pyomo/opt/parallel/pyro.pyr sz&PyroAsynchronousActionManager.__init__cstt|t|_dS)z% Clear manager state N)rrclearrr)r)rrrr6sz#PyroAsynchronousActionManager.clearcCst|jr&tdt|jt|jft|jrLtdt|jt|jft|_d|_i|_x|j D] }| qlWi|_i|_ dS)zClose the manager.zEWARNING: %s is closing with %s local results waiting to be processed.zAWARNING: %s is closing with %s paused tasks waiting to be queued.FN) lenrprinttype__name__r rr r valuescloser)rclientrrrr=s   z#PyroAsynchronousActionManager.closecCs d|_dS)NT)r )rrrrpausePsz#PyroAsynchronousActionManager.pausecCsNd|_t|jrDx2|jD](}|j|}|j|j||jdkdqWi|_dS)NF)r)r rr r Z add_tasksr )rdispatcher_namerrrrunpauseSs    z%PyroAsynchronousActionManager.unpausecCs|j|jdS)N)rpopid)rahrrr get_results\sz)PyroAsynchronousActionManager.get_resultscsjj|}t|rNxRt|dkrJ|fdd|Dt|rqWnxjdkrdqPWdS)a& Wait for all actions to complete. The arguments to this method are expected to be ActionHandle objects or iterators that return ActionHandle objects. If no arguments are provided, then this method will terminate after all queued actions are complete. rcsg|]}|jjkr|qSr)r%r).0r&)rrr jsz:PyroAsynchronousActionManager.wait_all..N)_flattenrdifference_update_download_resultsqueued_action_counter)rargsahsr)rrwait_all_s  z&PyroAsynchronousActionManager.wait_allcGs|j|}t|r@xx|D]}|j|jkr|SqW|qWnvxt|jdkrZ|qBW|jjdd\}}||jkr||_|||j|<|jjdd\}}||j|<|j|SdS)NrF)last)r*rr%rr,popitemr event_handle)rr.r/r&Zah_idresultrrrwait_anyqs"       z&PyroAsynchronousActionManager.wait_anycCs(x|j|jkrPq|qW||S)z< Wait for the specified action to complete. )r%rr,r')rr&rrrwait_fors   z&PyroAsynchronousActionManager.wait_forcCsT|dkrtj|j|jd}n tj|d}|j|jkrD|j|j||j|j<|S)N)rr ) dispatcher)rZClientrr URIr r)rr7rrrr_create_clients   z,PyroAsynchronousActionManager._create_clientc Os|dd}|dd}||}|j|f||}tj||j|d}|jr||jkrbt|j|<||j|kr~g|j||<|j|| |n |j |} | j ||j dk|d|st j|_|j|j||jd8_|S)N queue_namegenerate_responseT)datar%ZgenerateResponser!)rZ override_type)r$_get_dispatcher_name_get_task_datarZTaskr%r r dictappendr Zadd_taskr rdonestatusr3updater-) rr&r.kwdsr:r;r"Z task_dataZtaskrrrr_perform_queues,      z,PyroAsynchronousActionManager._perform_queuecCstt|jddS)Nz: This method is abstract)NotImplementedErrorrr)rr:rrrr=sz2PyroAsynchronousActionManager._get_dispatcher_namecKstt|jddS)Nz: This method is abstract)rFrr)rr&rDrrrr>sz,PyroAsynchronousActionManager._get_task_datacCstt|jddS)Nz: This method is abstract)rFrr)rrrrr,sz/PyroAsynchronousActionManager._download_results)NNr)N)r __module__ __qualname__rrrr r#r'r0r5r6r9rEr=r>r, __classcell__rr)rrrs   'rN) __all__ collectionsr ImportErrorZ ordereddictZpyomo.common.dependenciesrZpyomo.opt.parallel.managerrrrrrrrr s