B @` @sRdZddlmZddlmZddlmZmZeddgddZGdd d eZ d S) )PyroAsynchronousActionManager) OrderedDict)attempt_import)AsynchronousActionManager ActionStatusz pyutilib.pyropyu_pyro)Z alt_namescseZdZdfdd ZfddZddZd d Zd d Zd dZddZ ddZ ddZ d ddZ ddZ ddZddZddZZS)!rNrcsJ||_||_||_d|_i|_i|_i|_d|_tt | t |_ dS)NF) hostport_verbose_paused_paused_task_dict_dispatcher_name_to_client_dispatcher_proxies_last_extracted_ah_idsuperr__init__rresults)selfrr verbose) __class__;/tmp/pip-unpacked-wheel-bi3529v6/pyomo/opt/parallel/pyro.pyrsz&PyroAsynchronousActionManager.__init__cstt|t|_dS)z% Clear manager state N)rrclearrr)r)rrrr2sz#PyroAsynchronousActionManager.clearcCst|jr&tdt|jt|jft|jrLtdt|jt|jft|_d|_i|_x|j D] }| qlWi|_i|_ dS)zClose the manager.zEWARNING: %s is closing with %s local results waiting to be processed.zAWARNING: %s is closing with %s paused tasks waiting to be queued.FN) lenrprinttype__name__r rr r valuescloser)rclientrrrr9s   z#PyroAsynchronousActionManager.closecCs d|_dS)NT)r )rrrrpauseLsz#PyroAsynchronousActionManager.pausecCsNd|_t|jrDx2|jD](}|j|}|j|j||jdkdqWi|_dS)NF)r)r rr r Z add_tasksr )rdispatcher_namerrrrunpauseOs    z%PyroAsynchronousActionManager.unpausecCs|j|jdS)N)rpopid)rahrrr get_resultsXsz)PyroAsynchronousActionManager.get_resultscsjj|}t|rNxRt|dkrJ|fdd|Dt|rqWnxjdkrdqPWdS)a& Wait for all actions to complete. The arguments to this method are expected to be ActionHandle objects or iterators that return ActionHandle objects. If no arguments are provided, then this method will terminate after all queued actions are complete. rcsg|]}|jjkr|qSr)r%r).0r&)rrr fsz:PyroAsynchronousActionManager.wait_all..N)_flattenrdifference_update_download_resultsqueued_action_counter)rargsahsr)rrwait_all[s  z&PyroAsynchronousActionManager.wait_allcGs|j|}t|r@xx|D]}|j|jkr|SqW|qWnvxt|jdkrZ|qBW|jjdd\}}||jkr||_|||j|<|jjdd\}}||j|<|j|SdS)NrF)last)r*rr%rr,popitemr event_handle)rr.r/r&Zah_idresultrrrwait_anyms"       z&PyroAsynchronousActionManager.wait_anycCs(x|j|jkrPq|qW||S)z< Wait for the specified action to complete. )r%rr,r')rr&rrrwait_fors   z&PyroAsynchronousActionManager.wait_forcCsT|dkrtj|j|jd}n tj|d}|j|jkrD|j|j||j|j<|S)N)rr ) dispatcher)rZClientrr URIr r)rr7rrrr_create_clients   z,PyroAsynchronousActionManager._create_clientc Os|dd}|dd}||}|j|f||}tj||j|d}|jr||jkrbt|j|<||j|kr~g|j||<|j|| |n |j |} | j ||j dk|d|st j|_|j|j||jd8_|S)N queue_namegenerate_responseT)datar%ZgenerateResponser!)rZ override_type)r$_get_dispatcher_name_get_task_datarZTaskr%r r dictappendr Zadd_taskr rdonestatusr3updater-) rr&r.kwdsr:r;r"Z task_dataZtaskrrrr_perform_queues,      z,PyroAsynchronousActionManager._perform_queuecCstt|jddS)Nz: This method is abstract)NotImplementedErrorrr)rr:rrrr=sz2PyroAsynchronousActionManager._get_dispatcher_namecKstt|jddS)Nz: This method is abstract)rFrr)rr&rDrrrr>sz,PyroAsynchronousActionManager._get_task_datacCstt|jddS)Nz: This method is abstract)rFrr)rrrrr,sz/PyroAsynchronousActionManager._download_results)NNr)N)r __module__ __qualname__rrrr r#r'r0r5r6r9rEr=r>r, __classcell__rr)rrrs   'rN) __all__Zpyomo.common.collectionsrZpyomo.common.dependenciesrZpyomo.opt.parallel.managerrrrrrrrr s