B t `։@sdZddlZddlZddlZddlZddlZddlZddlZddlZddl Z ddl m Z ddl m Z ddl mZddlmZddlmZddlmZdd lmZdd lmZydd lmZWn ek rdd lmZYnXdd lmZdd lmZyddl m!Z!Wnek r$ddl Z YnXdZ"e#e$Z%da&a'a(a)a*a+Gddde,Z-ddZ.GdddZ/GdddeZ0ddZ1Gddde Z2ddZ3ddZ4Gdd d eZ5dS)!a Overview ======== The multiprocess plugin enables you to distribute your test run among a set of worker processes that run tests in parallel. This can speed up CPU-bound test runs (as long as the number of work processeses is around the number of processors or cores available), but is mainly useful for IO-bound tests that spend most of their time waiting for data to arrive from someplace else. .. note :: See :doc:`../doc_tests/test_multiprocess/multiprocess` for additional documentation and examples. Use of this plugin on python 2.5 or earlier requires the multiprocessing_ module, also available from PyPI. .. _multiprocessing : http://code.google.com/p/python-multiprocessing/ How tests are distributed ========================= The ideal case would be to dispatch each test to a worker process separately. This ideal is not attainable in all cases, however, because many test suites depend on context (class, module or package) fixtures. The plugin can't know (unless you tell it -- see below!) if a context fixture can be called many times concurrently (is re-entrant), or if it can be shared among tests running in different processes. Therefore, if a context has fixtures, the default behavior is to dispatch the entire suite to a worker as a unit. Controlling distribution ^^^^^^^^^^^^^^^^^^^^^^^^ There are two context-level variables that you can use to control this default behavior. If a context's fixtures are re-entrant, set ``_multiprocess_can_split_ = True`` in the context, and the plugin will dispatch tests in suites bound to that context as if the context had no fixtures. This means that the fixtures will execute concurrently and multiple times, typically once per test. If a context's fixtures can be shared by tests running in different processes -- such as a package-level fixture that starts an external http server or initializes a shared database -- then set ``_multiprocess_shared_ = True`` in the context. These fixtures will then execute in the primary nose process, and tests in those contexts will be individually dispatched to run in parallel. How results are collected and reported ====================================== As each test or suite executes in a worker process, results (failures, errors, and specially handled exceptions like SkipTest) are collected in that process. When the worker process finishes, it returns results to the main nose process. There, any progress output is printed (dots!), and the results from the test run are combined into a consolidated result set. When results have been received for all dispatched tests, or all workers have died, the result summary is output as normal. Beware! ======= Not all test suites will benefit from, or even operate correctly using, this plugin. For example, CPU-bound tests will run more slowly if you don't have multiple processors. There are also some differences in plugin interactions and behaviors due to the way in which tests are dispatched and loaded. In general, test loading under this plugin operates as if it were always in directed mode instead of discovered mode. For instance, doctests in test modules will always be found when using this plugin with the doctest plugin. But the biggest issue you will face is probably concurrency. Unless you have kept your tests as religiously pure unit tests, with no side-effects, no ordering issues, and no external dependencies, chances are you will experience odd, intermittent and unexplainable failures and errors when using this plugin. This doesn't necessarily mean the plugin is broken; it may mean that your test suite is not safe for concurrency. New Features in 1.1.0 ===================== * functions generated by test generators are now added to the worker queue making them multi-threaded. * fixed timeout functionality, now functions will be terminated with a TimedOutException exception when they exceed their execution time. The worker processes are not terminated. * added ``--process-restartworker`` option to restart workers once they are done, this helps control memory usage. Sometimes memory leaks can accumulate making long runs very difficult. * added global _instantiate_plugins to configure which plugins are started on the worker processes. N)TextTestRunner)failure)loader)Plugin)bytes_)TextTestResult) ContextSuite) test_address)_WritelnDecorator)Empty)warn)StringIOc@seZdZdddZddZdS)TimedOutException Timed OutcCs ||_dS)N)value)selfrr=/tmp/pip-unpacked-wheel-cjhnoqsi/nose/plugins/multiprocess.py__init__szTimedOutException.__init__cCs t|jS)N)reprr)rrrr__str__szTimedOutException.__str__N)r)__name__ __module__ __qualname__rrrrrrrs rcCs~yZddlm}mattjtj}|}ttj||j|j|j|j |j f\aaaa a Wnt k rxt dt YnXdS)Nr)ManagerProcesszKmultiprocessing module is not available, multiprocess plugin cannot be used)multiprocessingrrsignalSIGINTSIG_IGNQueuePoolEventValueArray ImportErrorr RuntimeWarning)roldmrrr _import_mps&r)c@s,eZdZddZddZddZddZd S) TestLetcCs<y||_Wntk r"YnX||_t||_dS)N)id_idAttributeErrorshortDescription_short_descriptionstr_str)rcaserrrrs  zTestLet.__init__cCs|jS)N)r,)rrrrr+sz TestLet.idcCs|jS)N)r/)rrrrr.szTestLet.shortDescriptioncCs|jS)N)r1)rrrrrszTestLet.__str__N)rrrrr+r.rrrrrr*sr*c@s8eZdZdZdZiZddZddZddZd d Z d S) MultiProcesszF Run tests in multiple processes. Requires processing module. icCs\|jdd|dddddd|jd d|d d d d dd|jdd|ddddddS)z0 Register command-line options. z --processesstoreZNOSE_PROCESSESrmultiprocess_workersZNUMaNSpread test run among this many processes. Set a number equal to the number of processors or cores in your machine for best results. Pass a negative number to have the number of processes automatically set to the number of cores. Passing 0 means to disable parallel testing. Default is 0 unless NOSE_PROCESSES is set. [NOSE_PROCESSES])actiondefaultdestmetavarhelpz--process-timeoutZNOSE_PROCESS_TIMEOUT multiprocess_timeoutZSECONDSzfSet timeout for return of results from each test runner process. Default is 10. [NOSE_PROCESS_TIMEOUT]z--process-restartworker store_trueZNOSE_PROCESS_RESTARTWORKERFmultiprocess_restartworkerzIf set, will restart each worker process once their tests are done, this helps control memory leaks from killing the system. [NOSE_PROCESS_RESTARTWORKER])r6r7r8r:N) add_optionget)rparserenvrrroptionss    zMultiProcess.optionsc Cs y|jdWntk r$YnXt|ds:d|_dS|jrDdS||_yt|j}Wnt t fk rtd}YnX|rt t dkrd|_dS|dkryddl }|}Wntk rd|_dSXd|_||j_t|j}||j_t|j}||j_d|jd<dS)z# Configure plugin. activer5FNrT)statuspopKeyErrorhasattrZenabledworkerconfigintr5 TypeError ValueErrorr)rr cpu_countNotImplementedErrorfloatr<r>)rrCrJworkersrtrrrr configuresB     zMultiProcess.configurecCs |j|_dS)zbRemember loader class so MultiProcessTestRunner can instantiate the right loader. N) __class__ loaderClass)rrrrrprepareTestLoaderszMultiProcess.prepareTestLoadercCst|j|jj|j|jdS)z9Replace test runner with MultiProcessTestRunner. )stream verbosityrJrV)MultiProcessTestRunnerrXrJrYrV)rrunnerrrrprepareTestRunner szMultiProcess.prepareTestRunnerN) rrr__doc__ZscorerErCrTrWr\rrrrr3s )r3cCs tdS)N)r)sigframerrr signalhandlersr`cseZdZdZfddZddZddZdd Zd d Ze eZd d Z e e Z ddZ ddZ e e Z ddZ ddZZS)rZg@c s&|dtj|_tt|jf|dS)NrV)rFrZdefaultTestLoaderrVsuperrZr)rkw)rUrrrszMultiProcessTestRunner.__init__c Csx|||D]l}td|t|t|tjjrVt|jt j rVtd||qt|t r|j t j krtd||qqt|t rZ| |rZtd|y |WnDttfk rYntdt||tYnbX|||jrF|jj |g}x*|ddD]}t|ddr(d|_q(W||||||q||||} td t|| |qWdS) NzNext batch %s (%s)zCase is a Failurez%s has shared fixturesz%s setup failed_multiprocess_shared_FTzQueued test %s (%s) to %s) nextBatchlogdebugtype isinstancenoser2TesttestrFailurercontextsharedFixturessetUpKeyboardInterrupt SystemExitsysexc_infoaddErrorappendfactoryr@getattr_multiprocess_can_split_collectaddtasklen) rrl testQueuetasks to_teardownresultr2Z ancestorsZan test_addrrrrrzs<      zMultiProcessTestRunner.collectc Cstdtd}tdt}t}tt||||||||j|jt |j f d} || _ || _ || _ ttjt} | ttj| | S)Ncd)targetargs)r#rtimer"rr[rVrUpickledumpsrJ currentaddr currentstartkeyboardCaughtrSIGILLr`start) riworkerr} resultQueue shouldStoprrrrpr'rrr startProcessEs(z#MultiProcessTestRunner.startProcessc Cs2td||t|jj|}|dk r.|}|jj|j}|dk rL||_t }t }g}g}g}g} t } | } t } | |||| | td|jjx@t|jjD]0} || ||| | }||td| dqWt|}|jj}d}yxb|r^tdt|||yL|j|d\}}}}td||t|yNy||Wn$tk rxtd ||YnX|t|7}||Wn:tk rtd |td tt|d YnX|||g|| ||jjr| s| P|jj rbtd |||j!dd| "sb|#sbtd|||||| | ||<Wnt$k rtdt||#t|d}x:t%|D],\}}|&rt'|j(j)d}t |j*j)}t|d kr||jjdkrtd|nd}t|d kr||jjdkrtd||t'd|j(_)|j+,t }xz|j+"s|&rt ||j-krt.d||/||||| | ||<}Pt0|j1t2j3t 4dqVWqW|s|#rtdPYnX|jj}xZ|D]R}|&rt|j(j)d krt |j*j)}||jjkrt5||jj|}qWqWtdt|t|WnFt6t7fk r}z"t8d|}| 9|t:;Wdd}~XYnXyx^| D]V}td|y |<Wn6t6t7fk rYn| 9|t:;YnXqWt }| =| >| ||jj?| |dkrtdx&|D]}|&rh|j@dddqhWxJt%|D]>\}}|&rtd ||!|&rtd|qWWnTt6t7fk r,t8d x |D]}|&r|/qW|r&|nYnX| S)!a Execute the test (which may be a test suite). If the test is a suite, distribute it out among as many processes as have been configured, at as fine a level as is possible given the context fixtures defined in the suite or any sub-suites. z%s.run(%s) (%s)NzStarting %s workerszStarted worker process %sz5Waiting for results (%s/%s tasks), next timeout=%.3fs)timeoutz1Results received for worker %d, %s, new tasks: %dz)worker %s failed to remove from tasks: %szGot result for unknown task? %sz current: %srzjoining worker %sz!starting new process on worker %sz8Timed out with %s tasks pending (empty testQueue=%r): %sFasciig?zLworker %d has finished its work item, but is not exiting? do we wait for it?Tztimed out worker %s: %srzterminating worker %szAll workers deadzCompleted %s tasks (%s remain)z4parent received ctrl-c when waiting for test resultsz#Tearing down shared fixtures for %szTell all workers to stopSTOP)blockzfailed to join worker %sz=parent received ctrl-c when shutting down: stop all processes)ArfrgosgetpidrJpluginsZ prepareTestZsetOutputStreamrXr r"Z _makeResultrrzr5rangerrvr|r<r@removerMr extendrGr0list consolidateZ stopOnErrorZ wasSuccessfulsetr>joinis_setemptyr enumerateis_aliverrrrrclear waitkilltimeerror terminatekillpidrrsleepminrqrrinforursrttearDownZ printErrorsZ printSummaryfinalizeput)rrlwrapperwrappedr}rr~Z completedrQrrrrirZ total_tasksZ nexttimeoutZ thrownErrorraddrZ newtask_addrs batch_resultZ any_alivewZ worker_addrZtimeprocessingZ startkilltimeer2stoprIrrrrun[s                  "              zMultiProcessTestRunner.runcCstd}t|tjjr.t|jdr.d|j_|jj}t |}|j ||fdd|dk r^|t |7}|dk rp| ||S)NargF)r) rirjr2rkrHrlZ descriptorrrZaddressrr0rv)r}r~r2rrrrrr{s   zMultiProcessTestRunner.addtaskcCst|dr|\}}}n(t|dr6t|j\}}}n td|g}|dkrp|dkrdtd|q||n(tj|\}}| dr|}|||dk r||d t t |S)NrrnzUnable to convert %s to addresszUnaddressable case %sr:) rHrr rn Exceptionrvrpathsplit startswithrmapr0)r2filemodcallpartsdirnamebasenamerrrr!s"       zMultiProcessTestRunner.addressccst|drt|jddsdSt|tr2||jsJt|ddrJt|tjst|trt |}t |dkrt|ddd|jkr|d}|Vn(x&|D]}x| |D] }|VqWqWdS)NrnZ_multiprocess_TZ can_splitrr) rHrxrnrirZ hasFixtures checkCanSplitunittestZ TestSuiterr|re)rrlZ containedr2batchrrrre:s"        z MultiProcessTestRunner.nextBatchcCs|sdSt|ddrdSdS)a8 Callback that we use to check whether the fixtures found in a context or ancestor are ones we care about. Contexts can tell us that their fixtures are reentrant by setting _multiprocess_can_split_. So if we see that, we return False to disregard those fixtures. FryT)rx)rnZfixtrrrrZs  z$MultiProcessTestRunner.checkCanSplitcCs t|dd}|sdSt|ddS)NrnFrd)rx)rr2rnrrrrojs z%MultiProcessTestRunner.sharedFixturescCstd|y|\}}}}}Wn2tk rPtd|tjt|dSX|j||j |7_ |j ||j |xRt |D]B\}\} } } ||jkrg| | f|j|<|j|\} } } | | qWtd||j dS)Nzbatch result is %szresult in unexpected format %szRan %s tests (total: %s))rfrgrMrrmrsrtrXwritetestsRunfailuresrerrorsritems errorClasses)rrroutputrrrrkeystoragelabelisfailZ mystorageZ_junkrrrrps"      z"MultiProcessTestRunner.consolidate)rrrrrrzrrr{ staticmethodrrerror __classcell__rr)rUrrZs (7 rZc Csfy@yt|||||||||| Stk r<td|YnXWn tk r`td|YnXdS)Nz&Worker %s keyboard interrupt, stoppingz%Worker %s timed out waiting for tasks)__runnerrqrfrgr ) ixr}rrrrrrV resultClassrJrrrr[s r[c  st} tdk rFx*tD]"} | } | | ij| q Wjjj t d|t |d} t| j_fdd}fdd}dd}xt|d D]\}}|rt d |P|}| |g}|_g|_||_t d |||yZ|dk r"|t|}t||_t|_||td |_||||j||fWnPtk r>}zt|t}|r|t |jr|rd }nd}t |||td |_t!j"t#$|||||j||fn6|rd}nd}t |||||||j||f|s.Wdd}~XYnxt%k rjtd |_t d|YnLtd |_t d|t!j"t#$|||||j||fYnXj&rPqWt d|dS)NzWorker %s executing, pid=%d)rJcsjjdS)N)r)r@r<r)rJr}rrr@sz__runner..getcs4tt}|djd}j|}|r0|S|S)Nr)Z descriptionsrYrJ)r r rYrZprepareTestResult)rXrZ plug_result)rJrrr makeResults  z__runner..makeResultcSstdd|jD}dd|jD}i}x8t|jD]&\}\}}}dd|D||f||<q4W|j|j|||fS)NcSsg|]\}}t||fqSr)r*).0rerrrrr sz+__runner..batch..cSsg|]\}}t||fqSr)r*)rrrrrrrscSsg|]\}}t||fqSr)r*)rrrrrrrs)rrrrrrXgetvaluer)rrrrrrrrrrrrs z__runner..batchrzWorker %d STOPPEDzWorker %s Test is %s (%s)rz,Worker %s timed out, failing current test %sz5Worker %s keyboard interrupt, failing current test %szWorker %s test %s timed outz$Worker %s test %s keyboard interruptzWorker %s system exitz1Worker %s error running test or returning resultszWorker %s ending)'rloadsZ parserClass_instantiate_pluginsZ addOptionsrZ addPluginrTrCbeginrfrgrrNoSharedFixtureContextSuiteZ suiteClassiterr exceptionZloadTestsFromNamesr}r~rr0rrrrrqrirrr|rrmrsrtrrr>)rr}rrrrrrVrrJZ dummy_parserZ pluginclassZpluginrr@rrrrrrlrrmsgr)rJrr}rrs                    rcs@eZdZdZdZdZdZfddZfddZddZ Z S) ra Context suite that never fires shared fixtures. When a context sets _multiprocess_shared_, fixtures in that context are executed by the main process. Using this suite class prevents them from executing in the runner process as well. Ncs$t|ddrdStt||dS)NrdF)rxrar setupContext)rrn)rUrrrs z(NoSharedFixtureContextSuite.setupContextcs$t|ddrdStt||dS)NrdF)rxrarteardownContext)rrn)rUrrrs z+NoSharedFixtureContextSuite.teardownContextcCstdt|||j|jr0||||}}n ||}}y |Wn4tk r\Yn d|_||| dSzx|jD]}t |t j j r|jdk r|j|j_n|j|_|j|_|j|_|jrtdPy ||Wqtk rn}zjt |t}|rd}nd}t|||ttt|tdf}|jj||||||s^Wdd}~XYqXqWWdd|_y |Wn8tk rYn"d |_||| YnXXdS) z5Run tests in suite inside of suite fixtures. z#suite %s (%s) run called, tests: %ssetupNZstoppingz(Timeout when running test %s in suite %sz2KeyboardInterrupt when running test %s in suite %srcTZteardown)rfrgr+Z_testsZ resultProxyrprqZ error_contextru _exc_inforirjr2rkrrlr}r~rrr0rsrtrJrZhas_runr)rrorigrlrrrrrrrrs\           zNoSharedFixtureContextSuite.run) rrrr]r}r~rrrrrrr)rUrrs  r)6r]loggingrrsr tracebackrrrZ nose.caserjZ nose.corerrrZnose.plugins.baserZnose.pyversionrZ nose.resultrZ nose.suiterZ nose.utilr Zunittest.runnerr r%queuer warningsr ior r getLoggerrrfrr r!r"r#r$rqrr)r*r3r`rZr[rrrrrr^sR           _s a