B t `;5 @sddgZddlZddlZddlZddlmZddlmZmZm Z ddlm Z ddlm Z m Z ejdkrpddlZnddlZdd lmZere jjZd d Zd d Zn*e reZe jZe jZneZd d Zdd ZddZGdddeZeeZdddZdS) DispatcherDispatcherServerN) defaultdict)get_nameserver using_pyro3 using_pyro4)Pyro)set_maxconnectionsget_dispatchers)r) iteritemscCs|S)N)methodr r rcCs|S)Nr )objr r rrrcCs|S)Nr )rr r rr&rcCs|S)Nr )rr r rr'rcCsBx<|sddZd d!Zd"d#Zd$d%Zd&d'Zd?d*d+Zd,d-Zd@d.d/Zd0d1ZdAd2d3ZdBd4d5Zd6d7Zd8d9Zd S)CrcKsxtdkrtdtr"tjj|ttj|_ttj|_ | dd|_ t |_ t |_| dd|_|j rttddS)NzPyro or Pyro4 is not availableverboseF worker_limitzVerbose output enabled...)_pyro ImportErrorrcoreObjBase__init__rr _task_queue _result_queuepop_verboseset_registered_workers_acquired_workers _worker_limitprint)selfkwdsr r rr 5s  zDispatcher.__init__cCs,t|}||jstd|j|dS)Nz?List contains one or more worker names that were not registered)r%issubsetr& ValueErrorr'difference_update)r*namesr r rrelease_acquired_workersGs z#Dispatcher.release_acquired_workerscCs4||jkrtd|jr$td||j|dS)Nz(Worker name '%s' has not been registeredz"Unregistering worker with name: %s)r&r-r$r)remove)r*namer r runregister_workerOs   zDispatcher.unregister_workercCs(tdtr|n |jdS)Nz8Dispatcher received request to shut down - initiating...)r)rZ getDaemonshutdownZ _pyroDaemon)r*r r rr4WszDispatcher.shutdownNcCs:|jr&tdt|ddt||j||dS)Nz&Received request to add task=; queue type=)r$r)strr!put)r*tasktyper r radd_task_szDispatcher.add_taskcs\|jr$tdtfddDx2D]*}|j|}x|D]}||qBWq*WdS)Nz2Received request to add bulk task set. Task ids=%sc3s$|]}|dd|DfVqdS)cSsg|] }|dqS)r5r ).0r8r r r msz2Dispatcher.add_tasks...Nr )r; task_type)tasksr r msz'Dispatcher.add_tasks..)r$r)dictr!r7)r*r>r=Z task_queuer8r )r>r add_tasksis   zDispatcher.add_taskscCs6|jr"tdt|dt||j||dS)Nz+Received request to add result with result=z ; queue type=)r$r)r6r"r7)r*resultr9r r r add_resulttszDispatcher.add_resultcs\|jr$tdtfddDx2D]*}|j|}x|D]}||qBWq*WdS)Nz7Received request to add bulk result set for task ids=%sc3s$|]}|dd|DfVqdS)cSsg|] }|dqS)r5r )r;rBr r rr<sz4Dispatcher.add_results...Nr )r; result_type)resultsr rr?sz)Dispatcher.add_results..)r$r)r@r"r7)r*rErD result_queuerBr )rEr add_results~s   zDispatcher.add_resultscCsj|jrtdt|yt|j|Wntk r<YnXyt|j|Wntk rdYnXdS)Nz@Received request to clear task and result queues for queue type=)r$r)r6rr!KeyErrorr")r*r9r r r clear_queues zDispatcher.clear_queuecCsx|D]}|j|dqWdS)N)r9)rI)r*typesr9r r r clear_queuess zDispatcher.clear_queuescCsttj|_ttj|_dS)N)rrr!r")r*r r rclear_all_queuess zDispatcher.clear_all_queuescCsB|jrtdt|yt|j|Wntk r<YnXdS)Nz4Received request to clear task queue for queue type=)r$r)r6rr!rH)r*r9r r rclear_task_queues zDispatcher.clear_task_queuecCsx|D]}|j|dqWdS)N)r9)rM)r*rJr9r r rclear_task_queuess zDispatcher.clear_task_queuescCsttj|_dS)N)rrr!)r*r r rclear_all_task_queuessz Dispatcher.clear_all_task_queuescCsB|jrtdt|yt|j|Wntk r<YnXdS)Nz6Received request to clear result queue for queue type=)r$r)r6rr"rH)r*r9r r rclear_result_queues zDispatcher.clear_result_queuecCsx|D]}|j|dqWdS)N)r9)rP)r*rJr9r r rclear_result_queuess zDispatcher.clear_result_queuescCsttj|_dS)N)rrr")r*r r rclear_all_result_queuessz"Dispatcher.clear_all_result_queuescCs|j|j}|j||S)N)r&r'update)r*Z worker_namesr r racquire_available_workerss  z$Dispatcher.acquire_available_workerscCs\||jkrtd|jdks,t|j|jkrX|j||jrTtdt|j|fdSdS)Nz,Worker name '%s' has already been registeredz#Registering worker %s with name: %sTF)r&r-r(lenaddr$r))r*r2r r rregister_workers   zDispatcher.register_workerTcCsf|jr2tdt|dt|dt|dy|j|j||d}|Stjk r`dSXdS)Nz/Received request to get a task from queue type=z; block=z ; timeout=z seconds)blocktimeout)r$r)r6r!rrr)r*r9rYrZr8r r rget_tasks*zDispatcher.get_taskc Cs|jrtdt|i}x|D]\}}}g}y||j|j||dWntjk rbYnLXxH|j|ry||j|j||dWqftjk rYqfXqfWt |dkr | |g |q W|S)Nz;Received request to get tasks in bulk. Queue request types=)rYrZr) r$r)r6appendr!rrrqsizerU setdefaultextend)r*type_block_timeout_listretr9rYrZZ task_listr r r get_taskss(   zDispatcher.get_taskscCs^|jr.tdt|dt|dt|y|j|j||dStjk rXdSXdS)Nz1Received request to get a result from queue type=z; block=z ; timeout=)rYrZ)r$r)r6r"rrr)r*r9rYrZr r r get_results&zDispatcher.get_resultc Cs|jrtdt|i}x|D]\}}}g}y||j|j||dWntjk rbYnLXxH|j|ry||j|j||dWqftjk rYqfXqfWt |dkr | |g |q W|S)Nz=Received request to get results in bulk. Queue request types=)rYrZr) r$r)r6r\r"rrrr]rUr^r_)r*r`ra type_namerYrZZ result_listr r r get_results s(   zDispatcher.get_resultscCs$|jrtdt||j|S)Nz8Received request for number of tasks in queue with type=)r$r)r6r!r])r*r9r r r num_tasks"s zDispatcher.num_taskscCs$|jrtdt||j|S)Nz:Received request for number of results in queue with type=)r$r)r6r"r])r*r9r r r num_results(s zDispatcher.num_resultscCsH|jrtdg}x0t|jD]\}}|dkr"||q"W|S)Nz3Received request for the set of queues with resultsr)r$r)listr"itemsr]r\)r*rE queue_namerFr r rqueues_with_results.s zDispatcher.queues_with_resultsc Cst|jrtdg}x\t|jD]J\}}x@|dkrjy||jdddWq,tj k rfYq,Xq,Wq"W|S)Nz@Received request to obtain all available results from all queuesrF)rYrZ) r$r)rhr"rir]r\rrr)r*rErjrFr r rget_results_all_queues=sz!Dispatcher.get_results_all_queues)N)N)N)N)N)NTrX)NTrX)N)N)__name__ __module__ __qualname__r onewayr0r3r4r:rArCrGrIrKrLrMrNrOrPrQrRrTrWr[rbrcrerfrgrkrlr r r rr3s:            :PyUtilibServerFTc Cst|dt||dd} |rHx*t|| dD]\} } tdtddSWtrhtjj||d} | | ntj||d} try| |Wntj j k rYnXy| |d Wntj j k rYnX|r:y| |d Wntj j k rYnXn4|r:y| |d Wntj j k r8YnXt||d } |d tt}trn| | |} n| | |} | || tr| n| td | S) N)max_allowed_connectionsr)hostportZ caller_name)groupnsz!Multiple dispatchers not allowed.z!dispatch_srvr is shutting down...)rsrtz .dispatcher)rrz .dispatcher.zDispatcher is ready.)r rr r)rrrZDaemonZ useNameServerZ createGrouperrorsZ NamingError unregisterr1rr6uuidZuuid4connectregisterrZ _pyroRelease_releaseZ requestLoop)ruZ daemon_hostZ daemon_portZnameserver_hostZnameserver_portrrrrZ clear_grouprvr2uridaemonZdispZ proxy_namer r rrTsT       ) rqNrNNFNNT)__all__ossysrz collectionsrZpyutilib.pyro.utilrrrrrr r version_infoqueuersixr rrbaserpZexposeobjectrrrr r r r sH