B ÔÂ_Ö‰ã@s¾dZddlZddlZddlZddlZddlZddlZddlZddlZddl Z ddl m Z ddl m Z ddl mZddlmZddlmZddlmZdd lmZdd lmZydd lmZWn ek rÜdd lmZYnXdd lmZdd lmZyddl m!Z!Wnek r$ddl Z YnXdZ"e #e$¡Z%da&a'a(a)a*a+Gdd„de,ƒZ-dd„Z.Gdd„dƒZ/Gdd„deƒZ0dd„Z1Gdd„de ƒZ2dd„Z3dd„Z4Gdd „d eƒZ5dS)!a­ Overview ======== The multiprocess plugin enables you to distribute your test run among a set of worker processes that run tests in parallel. This can speed up CPU-bound test runs (as long as the number of work processeses is around the number of processors or cores available), but is mainly useful for IO-bound tests that spend most of their time waiting for data to arrive from someplace else. .. note :: See :doc:`../doc_tests/test_multiprocess/multiprocess` for additional documentation and examples. Use of this plugin on python 2.5 or earlier requires the multiprocessing_ module, also available from PyPI. .. _multiprocessing : http://code.google.com/p/python-multiprocessing/ How tests are distributed ========================= The ideal case would be to dispatch each test to a worker process separately. This ideal is not attainable in all cases, however, because many test suites depend on context (class, module or package) fixtures. The plugin can't know (unless you tell it -- see below!) if a context fixture can be called many times concurrently (is re-entrant), or if it can be shared among tests running in different processes. Therefore, if a context has fixtures, the default behavior is to dispatch the entire suite to a worker as a unit. Controlling distribution ^^^^^^^^^^^^^^^^^^^^^^^^ There are two context-level variables that you can use to control this default behavior. If a context's fixtures are re-entrant, set ``_multiprocess_can_split_ = True`` in the context, and the plugin will dispatch tests in suites bound to that context as if the context had no fixtures. This means that the fixtures will execute concurrently and multiple times, typically once per test. If a context's fixtures can be shared by tests running in different processes -- such as a package-level fixture that starts an external http server or initializes a shared database -- then set ``_multiprocess_shared_ = True`` in the context. These fixtures will then execute in the primary nose process, and tests in those contexts will be individually dispatched to run in parallel. How results are collected and reported ====================================== As each test or suite executes in a worker process, results (failures, errors, and specially handled exceptions like SkipTest) are collected in that process. When the worker process finishes, it returns results to the main nose process. There, any progress output is printed (dots!), and the results from the test run are combined into a consolidated result set. When results have been received for all dispatched tests, or all workers have died, the result summary is output as normal. Beware! ======= Not all test suites will benefit from, or even operate correctly using, this plugin. For example, CPU-bound tests will run more slowly if you don't have multiple processors. There are also some differences in plugin interactions and behaviors due to the way in which tests are dispatched and loaded. In general, test loading under this plugin operates as if it were always in directed mode instead of discovered mode. For instance, doctests in test modules will always be found when using this plugin with the doctest plugin. But the biggest issue you will face is probably concurrency. Unless you have kept your tests as religiously pure unit tests, with no side-effects, no ordering issues, and no external dependencies, chances are you will experience odd, intermittent and unexplainable failures and errors when using this plugin. This doesn't necessarily mean the plugin is broken; it may mean that your test suite is not safe for concurrency. New Features in 1.1.0 ===================== * functions generated by test generators are now added to the worker queue making them multi-threaded. * fixed timeout functionality, now functions will be terminated with a TimedOutException exception when they exceed their execution time. The worker processes are not terminated. * added ``--process-restartworker`` option to restart workers once they are done, this helps control memory usage. Sometimes memory leaks can accumulate making long runs very difficult. * added global _instantiate_plugins to configure which plugins are started on the worker processes. éN)ÚTextTestRunner)Úfailure)Úloader)ÚPlugin)Úbytes_)ÚTextTestResult)Ú ContextSuite)Ú test_address)Ú_WritelnDecorator)ÚEmpty)Úwarn)ÚStringIOc@seZdZddd„Zdd„ZdS)ÚTimedOutExceptionú Timed OutcCs ||_dS)N)Úvalue)Úselfr©rú=/tmp/pip-unpacked-wheel-8n4943v7/nose/plugins/multiprocess.pyÚ__init__‡szTimedOutException.__init__cCs t|jƒS)N)Úreprr)rrrrÚ__str__‰szTimedOutException.__str__N)r)Ú__name__Ú __module__Ú __qualname__rrrrrrr†s rcCs~yZddlm}mat tjtj¡}|ƒ}t tj|¡|j|j|j|j |j f\aaaa a Wnt k rxt dt ƒYnXdS)Nr)ÚManagerÚProcesszKmultiprocessing module is not available, multiprocess plugin cannot be used)ÚmultiprocessingrrÚsignalÚSIGINTÚSIG_IGNÚQueueÚPoolÚEventÚValueÚArrayÚ ImportErrorr ÚRuntimeWarning)rÚoldÚmrrrÚ _import_mpŒs&r)c@s,eZdZdd„Zdd„Zdd„Zdd„Zd S) ÚTestLetcCs<y| ¡|_Wntk r"YnX| ¡|_t|ƒ|_dS)N)ÚidÚ_idÚAttributeErrorÚshortDescriptionÚ_short_descriptionÚstrÚ_str)rÚcaserrrr£s  zTestLet.__init__cCs|jS)N)r,)rrrrr+«sz TestLet.idcCs|jS)N)r/)rrrrr.®szTestLet.shortDescriptioncCs|jS)N)r1)rrrrr±szTestLet.__str__N)rrrrr+r.rrrrrr*¢sr*c@s8eZdZdZdZiZdd„Zdd„Zdd„Zd d „Z d S) Ú MultiProcesszF Run tests in multiple processes. Requires processing module. iècCs\|jdd| dd¡dddd|jd d| d d ¡d d dd|jdd| dd¡ddddS)z0 Register command-line options. z --processesÚstoreZNOSE_PROCESSESrÚmultiprocess_workersZNUMaNSpread test run among this many processes. Set a number equal to the number of processors or cores in your machine for best results. Pass a negative number to have the number of processes automatically set to the number of cores. Passing 0 means to disable parallel testing. Default is 0 unless NOSE_PROCESSES is set. [NOSE_PROCESSES])ÚactionÚdefaultÚdestÚmetavarÚhelpz--process-timeoutZNOSE_PROCESS_TIMEOUTé Úmultiprocess_timeoutZSECONDSzfSet timeout for return of results from each test runner process. Default is 10. [NOSE_PROCESS_TIMEOUT]z--process-restartworkerÚ store_trueZNOSE_PROCESS_RESTARTWORKERFÚmultiprocess_restartworkerz™If set, will restart each worker process once their tests are done, this helps control memory leaks from killing the system. [NOSE_PROCESS_RESTARTWORKER])r6r7r8r:N)Ú add_optionÚget)rÚparserÚenvrrrÚoptions»s    zMultiProcess.optionsc Cs y|j d¡Wntk r$YnXt|dƒs:d|_dS|jrDdS||_yt|jƒ}Wnt t fk rtd}YnX|rt ƒt dkr”d|_dS|dkrÌyddl }| ¡}Wntk rÊd|_dSXd|_||j_t|jƒ}||j_t|jƒ}||j_d|jd<dS)z# Configure plugin. Úactiver5FNrT)ÚstatusÚpopÚKeyErrorÚhasattrZenabledÚworkerÚconfigÚintr5Ú TypeErrorÚ ValueErrorr)rrÚ cpu_countÚNotImplementedErrorÚfloatr<r>)rrCrJÚworkersrÚtÚrrrrÚ configureÛsB     zMultiProcess.configurecCs |j|_dS)zbRemember loader class so MultiProcessTestRunner can instantiate the right loader. N)Ú __class__Ú loaderClass)rrrrrÚprepareTestLoaderszMultiProcess.prepareTestLoadercCst|j|jj|j|jdS)z9Replace test runner with MultiProcessTestRunner. )ÚstreamÚ verbosityrJrV)ÚMultiProcessTestRunnerrXrJrYrV)rÚrunnerrrrÚprepareTestRunner szMultiProcess.prepareTestRunnerN) rrrÚ__doc__ZscorerErCrTrWr\rrrrr3´s )r3cCs tƒ‚dS)N)r)ÚsigÚframerrrÚ signalhandlersr`cs€eZdZdZ‡fdd„Zdd„Zdd„Zdd „Zd d „Ze eƒZd d „Z e e ƒZ dd„Z dd„Z e e ƒZ dd„Z dd„Z‡ZS)rZg@c s&| dtj¡|_tt|ƒjf|ŽdS)NrV)rFrZdefaultTestLoaderrVÚsuperrZr)rÚkw)rUrrrszMultiProcessTestRunner.__init__c Cs„x|| |¡D]l}t d|t|ƒ¡t|tjjƒrVt|jt j ƒrVt d¡||ƒqt|t ƒr‚|j t j kr‚t d¡||ƒqqt|t ƒrZ|  |¡rZt d|¡y | ¡WnDttfk rÌ‚YnŒt dt ¡¡| |t ¡¡YnbX| |¡|jrF|jj  |g¡}x*|dd…D]}t|ddƒr(d|_q(W| |||||¡q| |||¡} t d t|ƒ| |¡qWdS) NzNext batch %s (%s)zCase is a Failurez%s has shared fixturesz%s setup failedéÚ_multiprocess_shared_FTzQueued test %s (%s) to %s)Ú nextBatchÚlogÚdebugÚtypeÚ isinstanceÚnoser2ÚTestÚtestrÚFailurerÚcontextÚsharedFixturesÚsetUpÚKeyboardInterruptÚ SystemExitÚsysÚexc_infoÚaddErrorÚappendÚfactoryr@ÚgetattrÚ_multiprocess_can_split_ÚcollectÚaddtaskÚlen) rrlÚ testQueueÚtasksÚ to_teardownÚresultr2Z ancestorsZanÚ test_addrrrrrzs<      zMultiProcessTestRunner.collectc Csˆtdtdƒƒ}tdt ¡ƒ}tƒ}tt||||||||j|jt  |j ¡f d} || _ || _ || _ t tjt¡} |  ¡t tj| ¡| S)NÚcÚÚd)ÚtargetÚargs)r#rÚtimer"rr[rVrUÚpickleÚdumpsrJÚ currentaddrÚ currentstartÚkeyboardCaughtrÚSIGILLr`Ústart) rÚiworkerr}Ú resultQueueÚ shouldStopr€rŠr‹rŒÚpr'rrrÚ startProcessEs(z#MultiProcessTestRunner.startProcessc Cs2t d||t ¡¡|jj |¡}|dk r.|}|jj |j¡}|dk rL||_t ƒ}t ƒ}g}g}g}g} t ƒ} |  ¡} t   ¡} |  |||| | ¡t d|jj¡x@t|jjƒD]0} | | ||| | ¡}| |¡t d| d¡q®Wt|ƒ}|jj}d}y€xb|r^t dt|ƒ||¡yL|j|d\}}}}t d||t|ƒ¡yNy| |¡Wn$tk rxt d ||¡YnX|t|ƒ7}| |¡Wn:tk rÌt d |¡t d tt|ƒd ƒ¡YnX| ||g¡| | |¡|jjr|  ¡s|  ¡P|jj rbt d |¡||j!dd|  "¡sb| #¡sbt d|¡| |||| | ¡||<Wnt$k röt dt|ƒ| #¡t|ƒ¡d}x:t%|ƒD],\}}| &¡r¤t'|j(j)dƒ}t   ¡|j*j)}t|ƒd kr||jjdkrt d|¡nd}t|ƒd kr¤||jjdkr¤t d||¡t'dƒ|j(_)|j+ ,¡t   ¡}xz|j+ "¡sÎ| &¡rÎt   ¡||j-kr°t .d|¡| /¡| |||| | ¡||<}Pt 0|j1t2j3¡t  4d¡qVWq¤W|sò| #¡ròt d¡PYnX|jj}xZ|D]R}| &¡rt|j(j)ƒd krt   ¡|j*j)}||jjkrt5||jj|ƒ}qWqþWt dt|ƒt|ƒ¡WnFt6t7fk r¾}z"t 8d¡|}|  9|t: ;¡¡Wdd}~XYnXyx^| D]V}t d|¡y | <¡Wn6t6t7fk r‚Yn|  9|t: ;¡¡YnXqÊWt   ¡}|  =¡|  >| |¡|jj ?| ¡|dkrŠt d¡x&|D]}| &¡rh|j@dddqhWxJt%|ƒD]>\}}| &¡r”t d |¡| !¡| &¡r”t d|¡q”WWnTt6t7fk r,t 8d ¡x |D]}| &¡rþ| /¡qþW|r&|‚n‚YnX| S)!a Execute the test (which may be a test suite). If the test is a suite, distribute it out among as many processes as have been configured, at as fine a level as is possible given the context fixtures defined in the suite or any sub-suites. z%s.run(%s) (%s)NzStarting %s workerszStarted worker process %séz5Waiting for results (%s/%s tasks), next timeout=%.3fs)Útimeoutz1Results received for worker %d, %s, new tasks: %dz)worker %s failed to remove from tasks: %szGot result for unknown task? %sz current: %srzjoining worker %sz!starting new process on worker %sz8Timed out with %s tasks pending (empty testQueue=%r): %sFÚasciigš™™™™™¹?zLworker %d has finished its work item, but is not exiting? do we wait for it?Tztimed out worker %s: %srƒzterminating worker %szAll workers deadzCompleted %s tasks (%s remain)z4parent received ctrl-c when waiting for test resultsz#Tearing down shared fixtures for %szTell all workers to stopÚSTOP)Úblockzfailed to join worker %sz=parent received ctrl-c when shutting down: stop all processes)ArfrgÚosÚgetpidrJÚpluginsZ prepareTestZsetOutputStreamrXr r"Z _makeResultr‡rzr5Úranger“rvr|r<r@ÚremoverMr ÚextendrGr0ÚlistÚ consolidateZ stopOnErrorZ wasSuccessfulÚsetr>ÚjoinÚis_setÚemptyr Ú enumerateÚis_aliverrŠrr‹rŒÚclearÚ waitkilltimeÚerrorÚ terminateÚkillÚpidrrÚsleepÚminrqrrÚinforursrtÚtearDownZ printErrorsZ printSummaryÚfinalizeÚput)rrlÚwrapperÚwrappedr}rr~Z completedrQrr‘r€rŽÚir’Z total_tasksZ nexttimeoutZ thrownErrorrÚaddrZ newtask_addrsÚ batch_resultZ any_aliveÚwZ worker_addrZtimeprocessingZ startkilltimeÚer2ÚstoprIrrrÚrun[s                  "              zMultiProcessTestRunner.runcCstd}t|tjjƒr.t|jdƒr.d|j_|jj}t  |¡}|j ||fdd|dk r^|t |ƒ7}|dk rp|  |¡|S)NÚargF)r˜) rirjr2rkrHrlZ descriptorr¼rZÚaddressr²r0rv)r}r~r2r¼rrrrr{s   zMultiProcessTestRunner.addtaskcCsºt|dƒr| ¡\}}}n(t|dƒr6t|jƒ\}}}n td|ƒ‚g}|dkrp|dkrdtd|ƒ‚q˜| |¡n(tj |¡\}}|  d¡rŽ|}| |¡|dk rª| |¡d  t t |ƒ¡S)Nr½rnzUnable to convert %s to addresszUnaddressable case %srú:) rHr½r rnÚ Exceptionrvr™ÚpathÚsplitÚ startswithr¢Úmapr0)r2ÚfileÚmodÚcallÚpartsÚdirnameÚbasenamerrrr½!s"       zMultiProcessTestRunner.addressccsºt|dƒrt|jddƒsdSt|tƒr2| |j¡sJt|ddƒrJt|tjƒsŽt|tƒr†t |ƒ}t |ƒdkr†t|dddƒ|jkr†|d}|Vn(x&|D]}x|  |¡D] }|Vq¤Wq”WdS)NrnZ_multiprocess_TZ can_splitr”r) rHrxrnrirZ hasFixturesÚ checkCanSplitÚunittestZ TestSuiterŸr|re)rrlZ containedr2Úbatchrrrre:s"        z MultiProcessTestRunner.nextBatchcCs|sdSt|ddƒrdSdS)a8 Callback that we use to check whether the fixtures found in a context or ancestor are ones we care about. Contexts can tell us that their fixtures are reentrant by setting _multiprocess_can_split_. So if we see that, we return False to disregard those fixtures. FryT)rx)rnZfixtrrrrÊZs  z$MultiProcessTestRunner.checkCanSplitcCs t|ddƒ}|sdSt|ddƒS)NrnFrd)rx)rr2rnrrrrojs z%MultiProcessTestRunner.sharedFixturescCsìt d|¡y|\}}}}}Wn2tk rPt d|¡tjt ¡Ž|ƒdSX|j |¡|j |7_ |j   |¡|j   |¡xRt | ¡ƒD]B\}\} } } ||jkrºg| | f|j|<|j|\} } } |   | ¡q’Wt d||j ¡dS)Nzbatch result is %szresult in unexpected format %szRan %s tests (total: %s))rfrgrMrrmrsrtrXÚwriteÚtestsRunÚfailuresržÚerrorsrŸÚitemsÚ errorClasses)rr€r·ÚoutputrÎrÏrÐrÒÚkeyÚstorageÚlabelÚisfailZ mystorageZ_junkrrrr ps"      z"MultiProcessTestRunner.consolidate)rrrr¨rrzr“r»r{Ú staticmethodr½rerÊror Ú __classcell__rr)rUrrZs (7 rZc Csfy@yt|||||||||| ƒ Stk r<t d|¡YnXWn tk r`t d|¡YnXdS)Nz&Worker %s keyboard interrupt, stoppingz%Worker %s timed out waiting for tasks)Ú__runnerrqrfrgr ) Úixr}rrŠr‹rŒr‘rVÚ resultClassrJrrrr[‡s r[c  sÒt ˆ¡‰ˆ ¡} tdk rFx*tD]"} | ƒ} |  | i¡ˆj | ¡q Wˆj ˆjˆ¡ˆj  ¡t   d|t   ¡¡|ˆd} t| j_‡‡fdd„}‡‡fdd„}dd„}xt|d ƒD]\}}| ¡rÖt  d |¡P|ƒ}|  |g¡}ˆ|_g|_||_t   d |||¡yZ|dk r"|t|ƒ}t|ƒ|_t ¡|_||ƒtd ƒ|_| |||j||ƒf¡WnPtk r>}zºt|tƒ}|r| ¡t |jƒrð|r¨d }nd}t  |||¡td ƒ|_t!j"t# $¡Ž|ƒ| |||j||ƒf¡n6|rüd}nd}t   |||¡| |||j||ƒf¡|s.‚Wdd}~XYnxt%k rjtd ƒ|_t  d|¡‚YnLtd ƒ|_t  d|¡t!j"t# $¡Ž|ƒ| |||j||ƒf¡YnXˆj&r¶Pq¶Wt   d|¡dS)NzWorker %s executing, pid=%d)rJcsˆjˆjdS)N)r•)r@r<r)rJr}rrr@¢sz__runner..getcs4ttƒƒ}ˆ|dˆjˆd}ˆj |¡}|r0|S|S)Nr”)Z descriptionsrYrJ)r r rYr›ZprepareTestResult)rXr€Z plug_result)rJrÜrrÚ makeResult¥s  z__runner..makeResultcSstdd„|jDƒ}dd„|jDƒ}i}x8t|j ¡ƒD]&\}\}}}dd„|Dƒ||f||<q4W|j ¡|j|||fS)NcSsg|]\}}t|ƒ|f‘qSr)r*)Ú.0r‚Úerrrrrú °sz+__runner..batch..cSsg|]\}}t|ƒ|f‘qSr)r*)rÞr‚rßrrrrà±scSsg|]\}}t|ƒ|f‘qSr)r*)rÞr‚rßrrrrà´s)rÏrÐrŸrÒrÑrXÚgetvaluerÎ)r€rÏrÐrÒrÔrÕrÖr×rrrr̯s z__runner..batchr—zWorker %d STOPPEDzWorker %s Test is %s (%s)rƒz,Worker %s timed out, failing current test %sz5Worker %s keyboard interrupt, failing current test %szWorker %s test %s timed outz$Worker %s test %s keyboard interruptzWorker %s system exitz1Worker %s error running test or returning resultszWorker %s ending)'rˆÚloadsZ parserClassÚ_instantiate_pluginsZ addOptionsr›Z addPluginrTrCÚbeginrfrgr™ršÚNoSharedFixtureContextSuiteZ suiteClassÚiterr£Ú exceptionZloadTestsFromNamesr}r~r¼r0rrr‡r²rqrirr¡r|rrmrsrtrrr>)rÛr}rrŠr‹rŒr‘rVrÜrJZ dummy_parserZ pluginclassZpluginrr@rÝrÌrr¼r€rlr¹r•Úmsgr)rJrÜr}rrÚ’s‚                    rÚcs@eZdZdZdZdZdZ‡fdd„Z‡fdd„Zdd„Z ‡Z S) råa Context suite that never fires shared fixtures. When a context sets _multiprocess_shared_, fixtures in that context are executed by the main process. Using this suite class prevents them from executing in the runner process as well. Ncs$t|ddƒrdStt|ƒ |¡dS)NrdF)rxraråÚ setupContext)rrn)rUrrréÿs z(NoSharedFixtureContextSuite.setupContextcs$t|ddƒrdStt|ƒ |¡dS)NrdF)rxraråÚteardownContext)rrn)rUrrrês z+NoSharedFixtureContextSuite.teardownContextcCsÊt dt|ƒ||j¡|jr0| ||¡|}}n ||}}y | ¡Wn4tk r\‚Yn d|_| ||  ¡¡dSzúxô|jD]ê}t |t j j ƒr®|jdk r®|j|j_n|j|_|j|_|j|_|jrØt d¡Py ||ƒWq†tk rn}zjt |tƒ}|rd}nd}t |||¡ttt|ƒƒt ¡df}|jj ||¡| ||¡|s^‚Wdd}~XYq†Xq†WWdd|_y | ¡Wn8tk r¢‚Yn"d |_| ||  ¡¡YnXXdS) z5Run tests in suite inside of suite fixtures. z#suite %s (%s) run called, tests: %sÚsetupNZstoppingz(Timeout when running test %s in suite %sz2KeyboardInterrupt when running test %s in suite %srcTZteardown)rfrgr+Z_testsZ resultProxyrprqZ error_contextruÚ _exc_inforirjr2rkr¼rlr}r~r‘rr0rsrtrJr›Zhas_runr°)rr€Úorigrlr¹r•rèrßrrrr»s\           zNoSharedFixtureContextSuite.run) rrrr]r}r~r¼rérêr»rÙrr)rUrråós  rå)6r]Úloggingr™rsr‡Ú tracebackrËrˆrZ nose.caserjZ nose.corerrrZnose.plugins.baserZnose.pyversionrZ nose.resultrZ nose.suiterZ nose.utilr Zunittest.runnerr r%Úqueuer Úwarningsr Úior rãÚ getLoggerrrfrr r!r"r#r$rqrr)r*r3r`rZr[rÚrårrrrÚ^sR           _s a