remove dist-testing and looponfail code from core. there remain some (pytest_runner particularly) tests that test both plain and dist modes which cannot be easily dis-entangled. food for thought.

--HG--
branch : trunk
This commit is contained in:
holger krekel 2010-01-13 16:00:33 +01:00
parent d4f5073076
commit 40f41496d8
63 changed files with 343 additions and 3170 deletions

View File

@ -1,6 +1,9 @@
Changes between 1.X and 1.1.1
=====================================
- moved dist/looponfailing from py.test core into a new
separately released pytest-xdist plugin.
- new junitxml plugin: --xml=path will generate a junit style xml file
which is parseable e.g. by the hudson continous integration server.

View File

@ -6,8 +6,8 @@ plugins = [
('advanced python testing',
'skipping mark pdb figleaf coverage '
'monkeypatch capture recwarn tmpdir',),
('testing domains',
'oejskit django genscript'),
('other testing domains, misc',
'oejskit django xdist genscript'),
('reporting and failure logging',
'pastebin logxml xmlresult resultlog terminal',),
('other testing conventions',
@ -22,7 +22,6 @@ plugins = [
externals = {
'oejskit': "run javascript tests in real life browsers",
'figleaf': "for testing with Titus' figleaf coverage module",
'django': "for testing django applications",
'coverage': "for testing with Ned's coverage module ",
'xmlresult': "for generating xml reports "
@ -159,7 +158,7 @@ class PluginDoc(RestWriter):
config.pluginmanager.import_plugin(name)
plugin = config.pluginmanager.getplugin(name)
assert plugin is not None, plugin
print plugin
doc = plugin.__doc__.strip()
i = doc.find("\n")
if i == -1:
@ -169,12 +168,13 @@ class PluginDoc(RestWriter):
oneliner = doc[:i].strip()
moduledoc = doc[i+1:].strip()
self.name = plugin.__name__.split(".")[-1]
self.name = oneliner # plugin.__name__.split(".")[-1]
self.oneliner = oneliner
self.moduledoc = moduledoc
self.h1("%s plugin" % self.name) # : %s" %(self.name, self.oneliner))
self.Print(self.oneliner)
#self.h1("%s plugin" % self.name) # : %s" %(self.name, self.oneliner))
self.h1(oneliner)
#self.Print(self.oneliner)
self.Print()
self.Print(".. contents::")
self.Print(" :local:")

View File

@ -175,7 +175,8 @@ def test_cmdline_entrypoints(monkeypatch):
for script in unversioned_scripts:
assert script in points
def test_slave_popen_needs_no_pylib(testdir, venv):
def test_slave_popen_needs_no_pylib(testdir, venv, pytestconfig):
pytestconfig.pluginmanager.skipifmissing("xdist")
venv.ensure()
#xxx execnet optimizes popen
#ch = venv.makegateway().remote_exec("import execnet")
@ -192,8 +193,10 @@ def test_slave_popen_needs_no_pylib(testdir, venv):
"*1 passed*"
])
def test_slave_needs_no_execnet(testdir, specssh):
gw = execnet.makegateway(specssh)
def test_slave_needs_no_execnet(testdir, sshhost, pytestconfig):
pytestconfig.pluginmanager.skipifmissing("xdist")
xspec = "ssh=%s" % sshhost
gw = execnet.makegateway("ssh=%s" % sshhost)
ch = gw.remote_exec("""
import os, subprocess
subprocess.call(["virtualenv", "--no-site-packages", "subdir"])
@ -207,7 +210,7 @@ def test_slave_needs_no_execnet(testdir, specssh):
e = sys.exc_info()[1]
py.test.skip("could not prepare ssh slave:%s" % str(e))
gw.exit()
newspec = "%s//python=%s//chdir=%s" % (specssh, path, chdir)
newspec = "%s//python=%s//chdir=%s" % (xspec, path, chdir)
gw = execnet.makegateway(newspec)
ch = gw.remote_exec("import execnet")
py.test.raises(ch.RemoteError, ch.waitclose)

View File

@ -5,14 +5,7 @@ pytest_plugins = '_pytest doctest pytester'.split()
collect_ignore = ['build', 'doc/_build']
rsyncdirs = ['conftest.py', 'bin', 'py', 'doc', 'testing']
try:
import execnet
except ImportError:
pass
else:
rsyncdirs.append(str(py.path.local(execnet.__file__).dirpath()))
import py
def pytest_addoption(parser):
@ -20,42 +13,16 @@ def pytest_addoption(parser):
group.addoption('--sshhost',
action="store", dest="sshhost", default=None,
help=("ssh xspec for ssh functional tests. "))
group.addoption('--gx',
action="append", dest="gspecs", default=None,
help=("add a global test environment, XSpec-syntax. "))
group.addoption('--runslowtests',
action="store_true", dest="runslowtests", default=False,
help=("run slow tests"))
def pytest_funcarg__specssh(request):
return getspecssh(request.config)
def getgspecs(config):
return [execnet.XSpec(spec)
for spec in config.getvalueorskip("gspecs")]
# configuration information for tests
def getgspecs(config):
return [execnet.XSpec(spec)
for spec in config.getvalueorskip("gspecs")]
def getspecssh(config):
xspecs = getgspecs(config)
for spec in xspecs:
if spec.ssh:
if not py.path.local.sysfind("ssh"):
py.test.skip("command not found: ssh")
return spec
py.test.skip("need '--gx ssh=...'")
def getsocketspec(config):
xspecs = getgspecs(config)
for spec in xspecs:
if spec.socket:
return spec
py.test.skip("need '--gx socket=...'")
def pytest_funcarg__sshhost(request):
val = request.config.getvalue("sshhost")
if val:
return val
py.test.skip("need --sshhost option")
def pytest_generate_tests(metafunc):
multi = getattr(metafunc.function, 'multi', None)
if multi is not None:

18
doc/test/dist.html Normal file
View File

@ -0,0 +1,18 @@
<html>
<head>
<meta http-equiv="refresh" content=" 1 ; URL=plugin/xdist.html" />
</head>
<body>
<script type="text/javascript">
var gaJsHost = (("https:" == document.location.protocol) ? "https://ssl." : "http://www.");
document.write(unescape("%3Cscript src='" + gaJsHost + "google-analytics.com/ga.js' type='text/javascript'%3E%3C/script%3E"));
</script>
<script type="text/javascript">
try {
var pageTracker = _gat._getTracker("UA-7597274-3");
pageTracker._trackPageview();
} catch(err) {}</script>
</body>
</html>

View File

@ -13,8 +13,6 @@ funcargs_: powerful parametrized test function setup
`plugins`_: list of available plugins with usage examples and feature details.
`distributed testing`_: ad-hoc run tests on multiple CPUs and platforms
customize_: configuration, customization, extensions
changelog_: history of changes covering last releases

View File

@ -1,8 +1,7 @@
pytest_capture plugin
=====================
configurable per-test stdout/stderr capturing mechanisms.
=========================================================
.. contents::
:local:

View File

@ -1,8 +1,7 @@
pytest_doctest plugin
=====================
collect and execute doctests from modules and test files.
=========================================================
.. contents::
:local:

View File

@ -1,17 +1,41 @@
pytest_figleaf plugin
=====================
add options to drive and report python test coverage using the 'figleaf' package.
report test coverage using the 'figleaf' package.
=================================================
Install the `pytest-figleaf`_ plugin to use figleaf coverage testing::
easy_install pytest-figleaf
.. contents::
:local:
or::
pip install pytest-figleaf
Usage
---------------
This will make py.test have figleaf related options.
after pip or easy_install mediated installation of ``pytest-figleaf`` you can type::
.. _`pytest-figleaf`: http://bitbucket.org/hpk42/pytest-figleaf/
py.test --figleaf [...]
to enable figleaf coverage in your test run. A default ".figleaf" data file
and "html" directory will be created. You can use ``--fig-data``
and ``fig-html`` to modify the paths.
command line options
--------------------
``--figleaf``
trace python coverage with figleaf and write HTML for files below the current working dir
``--fig-data=dir``
set tracing file, default: ".figleaf".
``--fig-html=dir``
set html reporting dir, default "html".
Start improving this plugin in 30 seconds
=========================================
1. Download `pytest_figleaf.py`_ plugin source code
2. put it somewhere as ``pytest_figleaf.py`` into your import path
3. a subsequent ``py.test`` run will use your local version
Checkout customize_, other plugins_ or `get in contact`_.
.. include:: links.txt

View File

@ -1,8 +1,7 @@
pytest_genscript plugin
=======================
generate standalone test script to be distributed along with an application.
============================================================================
.. contents::
:local:

View File

@ -1,8 +1,7 @@
pytest_helpconfig plugin
========================
provide version info, conftest/environment config names.
========================================================
.. contents::
:local:

View File

@ -1,8 +1,7 @@
pytest_hooklog plugin
=====================
log invocations of extension hooks to a file.
=============================================
.. contents::
:local:

View File

@ -8,7 +8,7 @@ mark_ generic mechanism for marking python functions.
pdb_ interactive debugging with the Python Debugger.
figleaf_ (external) for testing with Titus' figleaf coverage module
figleaf_ report test coverage using the 'figleaf' package.
coverage_ (external) for testing with Ned's coverage module
@ -21,13 +21,15 @@ recwarn_ helpers for asserting deprecation and other warnings.
tmpdir_ provide temporary directories to test functions.
testing domains
===============
other testing domains, misc
===========================
oejskit_ (external) run javascript tests in real life browsers
django_ (external) for testing django applications
xdist_ loop on failing tests, distribute test runs to CPUs and hosts.
genscript_ generate standalone test script to be distributed along with an application.

View File

@ -1,5 +1,5 @@
.. _`pytest_logxml.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/pytest_logxml.py
.. _`helpconfig`: helpconfig.html
.. _`terminal`: terminal.html
.. _`pytest_recwarn.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/pytest_recwarn.py
.. _`unittest`: unittest.html
.. _`pytest_monkeypatch.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/pytest_monkeypatch.py
@ -15,11 +15,14 @@
.. _`pytest_nose.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/pytest_nose.py
.. _`pytest_restdoc.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/pytest_restdoc.py
.. _`restdoc`: restdoc.html
.. _`xdist`: xdist.html
.. _`pytest_pastebin.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/pytest_pastebin.py
.. _`pytest_tmpdir.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/pytest_tmpdir.py
.. _`terminal`: terminal.html
.. _`pytest_figleaf.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/pytest_figleaf.py
.. _`pytest_hooklog.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/pytest_hooklog.py
.. _`logxml`: logxml.html
.. _`helpconfig`: helpconfig.html
.. _`plugin.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/plugin.py
.. _`pytest_skipping.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/pytest_skipping.py
.. _`checkout the py.test development version`: ../../install.html#checkout
.. _`pytest_helpconfig.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/pytest_helpconfig.py

View File

@ -1,8 +1,7 @@
pytest_logxml plugin
====================
logging of test results in JUnit-XML format, for use with Hudson
================================================================
.. contents::
:local:

View File

@ -1,8 +1,7 @@
pytest_mark plugin
==================
generic mechanism for marking python functions.
===============================================
.. contents::
:local:

View File

@ -1,8 +1,7 @@
pytest_monkeypatch plugin
=========================
safely patch object attributes, dicts and environment variables.
================================================================
.. contents::
:local:

View File

@ -1,8 +1,7 @@
pytest_nose plugin
==================
nose-compatibility plugin: allow to run nose test suites natively.
==================================================================
.. contents::
:local:

View File

@ -1,8 +1,7 @@
pytest_pastebin plugin
======================
submit failure or test session information to a pastebin service.
=================================================================
.. contents::
:local:

View File

@ -1,8 +1,7 @@
pytest_pdb plugin
=================
interactive debugging with the Python Debugger.
===============================================
.. contents::
:local:

View File

@ -1,8 +1,7 @@
pytest_recwarn plugin
=====================
helpers for asserting deprecation and other warnings.
=====================================================
.. contents::
:local:

View File

@ -1,8 +1,7 @@
pytest_restdoc plugin
=====================
perform ReST syntax, local and remote reference tests on .rst/.txt files.
=========================================================================
.. contents::
:local:

View File

@ -1,8 +1,7 @@
pytest_resultlog plugin
=======================
non-xml machine-readable logging of test results.
=================================================
.. contents::
:local:

View File

@ -1,8 +1,7 @@
pytest_skipping plugin
======================
advanced skipping for python test functions, classes or modules.
================================================================
.. contents::
:local:

View File

@ -1,8 +1,7 @@
pytest_terminal plugin
======================
Implements terminal reporting of the full testing process.
==========================================================
.. contents::
:local:

View File

@ -1,8 +1,7 @@
pytest_tmpdir plugin
====================
provide temporary directories to test functions.
================================================
.. contents::
:local:

View File

@ -1,8 +1,7 @@
pytest_unittest plugin
======================
automatically discover and run traditional "unittest.py" style tests.
=====================================================================
.. contents::
:local:

181
doc/test/plugin/xdist.txt Normal file
View File

@ -0,0 +1,181 @@
loop on failing tests, distribute test runs to CPUs and hosts.
==============================================================
.. contents::
:local:
The `pytest-xdist`_ plugin extends py.test with some unique
test execution modes:
* Looponfail: run your tests in a subprocess. After it finishes py.test
waits until a file in your project changes and then re-runs only the
failing tests. This is repeated until all tests pass after which again
a full run is performed.
* Load-balancing: if you have multiple CPUs or hosts you can use
those for a combined test run. This allows to speed up
development or to use special resources of remote machines.
* Multi-Platform coverage: you can specify different Python interpreters
or different platforms and run tests in parallel on all of them.
Before running tests remotely, ``py.test`` efficiently synchronizes your
program source code to the remote place. All test results
are reported back and displayed to your local test session.
You may specify different Python versions and interpreters.
Usage examples
---------------------
Speed up test runs by sending tests to multiple CPUs
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
To send tests to multiple CPUs, type::
py.test -n NUM
Especially for longer running tests or tests requiring
a lot of IO this can lead to considerable speed ups.
Running tests in a Python subprocess
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
To instantiate a python2.4 sub process and send tests to it, you may type::
py.test -d --tx popen//python=python2.4
This will start a subprocess which is run with the "python2.4"
Python interpreter, found in your system binary lookup path.
If you prefix the --tx option value like this::
--tx 3*popen//python=python2.4
then three subprocesses would be created and tests
will be load-balanced across these three processes.
Sending tests to remote SSH accounts
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Suppose you have a package ``mypkg`` which contains some
tests that you can successfully run locally. And you
have a ssh-reachable machine ``myhost``. Then
you can ad-hoc distribute your tests by typing::
py.test -d --tx ssh=myhostpopen --rsyncdir mypkg mypkg
This will synchronize your ``mypkg`` package directory
to an remote ssh account and then locally collect tests
and send them to remote places for execution.
You can specify multiple ``--rsyncdir`` directories
to be sent to the remote side.
**NOTE:** For py.test to collect and send tests correctly
you not only need to make sure all code and tests
directories are rsynced, but that any test (sub) directory
also has an ``__init__.py`` file because internally
py.test references tests as a fully qualified python
module path. **You will otherwise get strange errors**
during setup of the remote side.
Sending tests to remote Socket Servers
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Download the single-module `socketserver.py`_ Python program
and run it like this::
python socketserver.py
It will tell you that it starts listening on the default
port. You can now on your home machine specify this
new socket host with something like this::
py.test -d --tx socket=192.168.1.102:8888 --rsyncdir mypkg mypkg
.. _`atonce`:
Running tests on many platforms at once
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
The basic command to run tests on multiple platforms is::
py.test --dist=each --tx=spec1 --tx=spec2
If you specify a windows host, an OSX host and a Linux
environment this command will send each tests to all
platforms - and report back failures from all platforms
at once. The specifications strings use the `xspec syntax`_.
.. _`xspec syntax`: http://codespeak.net/execnet/trunk/basics.html#xspec
.. _`socketserver.py`: http://codespeak.net/svn/py/dist/py/execnet/script/socketserver.py
.. _`execnet`: http://codespeak.net/execnet
Specifying test exec environments in a conftest.py
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Instead of specifying command line options, you can
put options values in a ``conftest.py`` file like this::
pytest_option_tx = ['ssh=myhost//python=python2.5', 'popen//python=python2.5']
pytest_option_dist = True
Any commandline ``--tx`` specifictions will add to the list of available execution
environments.
Specifying "rsync" dirs in a conftest.py
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
In your ``mypkg/conftest.py`` you may specify directories to synchronise
or to exclude::
rsyncdirs = ['.', '../plugins']
rsyncignore = ['_cache']
These directory specifications are relative to the directory
where the ``conftest.py`` is found.
command line options
--------------------
``-f, --looponfail``
run tests in subprocess, wait for modified files and re-run failing test set until all pass.
``-n numprocesses``
shortcut for '--dist=load --tx=NUM*popen'
``--boxed``
box each test run in a separate process (unix)
``--dist=distmode``
set mode for distributing tests to exec environments.
each: send each test to each available environment.
load: send each test to available environment.
(default) no: run tests inprocess, don't distribute.
``--tx=xspec``
add a test execution environment. some examples: --tx popen//python=python2.5 --tx socket=192.168.1.102:8888 --tx ssh=user@codespeak.net//chdir=testcache
``-d``
load-balance tests. shortcut for '--dist=load'
``--rsyncdir=dir1``
add directory for rsyncing to remote tx nodes.
Start improving this plugin in 30 seconds
=========================================
1. Download `plugin.py`_ plugin source code
2. put it somewhere as ``plugin.py`` into your import path
3. a subsequent ``py.test`` run will use your local version
Checkout customize_, other plugins_ or `get in contact`_.
.. include:: links.txt

View File

@ -198,8 +198,10 @@ class Config(object):
modpath = py.path.local(mod.__file__).dirpath()
l = []
for relroot in relroots:
relroot = relroot.replace("/", py.path.local.sep)
l.append(modpath.join(relroot, abs=True))
if not isinstance(relroot, py.path.local):
relroot = relroot.replace("/", py.path.local.sep)
relroot = modpath.join(relroot, abs=True)
l.append(relroot)
return l
def addoptions(self, groupname, *specs):
@ -253,46 +255,10 @@ class Config(object):
self.trace("instantiated session %r" % session)
return session
def getxspecs(self):
xspeclist = []
for xspec in self.getvalue("tx"):
i = xspec.find("*")
try:
num = int(xspec[:i])
except ValueError:
xspeclist.append(xspec)
else:
xspeclist.extend([xspec[i+1:]] * num)
if not xspeclist:
raise self.Error("MISSING test execution (tx) nodes: please specify --tx")
import execnet
return [execnet.XSpec(x) for x in xspeclist]
def getrsyncdirs(self):
config = self
candidates = [py._pydir] + config.option.rsyncdir
conftestroots = config.getconftest_pathlist("rsyncdirs")
if conftestroots:
candidates.extend(conftestroots)
roots = []
for root in candidates:
root = py.path.local(root).realpath()
if not root.check():
raise config.Error("rsyncdir doesn't exist: %r" %(root,))
if root not in roots:
roots.append(root)
return roots
#
# helpers
#
def checkmarshal(name, value):
try:
py.std.marshal.dumps(value)
except ValueError:
raise ValueError("%s=%r is not marshallable" %(name, value))
def gettopdir(args):
""" return the top directory for the given paths.
if the common base dir resides in a python package

View File

@ -1 +0,0 @@
#

View File

@ -1,280 +0,0 @@
import py
from py.impl.test.session import Session
from py.impl.test import outcome
from py.impl.test.dist.nodemanage import NodeManager
queue = py.builtin._tryimport('queue', 'Queue')
debug_file = None # open('/tmp/loop.log', 'w')
def debug(*args):
if debug_file is not None:
s = " ".join(map(str, args))
debug_file.write(s+"\n")
debug_file.flush()
class LoopState(object):
def __init__(self, dsession, colitems):
self.dsession = dsession
self.colitems = colitems
self.exitstatus = None
# loopstate.dowork is False after reschedule events
# because otherwise we might very busily loop
# waiting for a host to become ready.
self.dowork = True
self.shuttingdown = False
self.testsfailed = False
def __repr__(self):
return "<LoopState exitstatus=%r shuttingdown=%r len(colitems)=%d>" % (
self.exitstatus, self.shuttingdown, len(self.colitems))
def pytest_runtest_logreport(self, report):
if report.item in self.dsession.item2nodes:
if report.when != "teardown": # otherwise we already managed it
self.dsession.removeitem(report.item, report.node)
if report.failed:
self.testsfailed = True
def pytest_collectreport(self, report):
if report.passed:
self.colitems.extend(report.result)
def pytest_testnodeready(self, node):
self.dsession.addnode(node)
def pytest_testnodedown(self, node, error=None):
pending = self.dsession.removenode(node)
if pending:
if error:
crashitem = pending[0]
debug("determined crashitem", crashitem)
self.dsession.handle_crashitem(crashitem, node)
# XXX recovery handling for "each"?
# currently pending items are not retried
if self.dsession.config.option.dist == "load":
self.colitems.extend(pending[1:])
def pytest_rescheduleitems(self, items):
self.colitems.extend(items)
self.dowork = False # avoid busywait
class DSession(Session):
"""
Session drives the collection and running of tests
and generates test events for reporters.
"""
MAXITEMSPERHOST = 15
def __init__(self, config):
self.queue = queue.Queue()
self.node2pending = {}
self.item2nodes = {}
super(DSession, self).__init__(config=config)
#def pytest_configure(self, __multicall__, config):
# __multicall__.execute()
# try:
# config.getxspecs()
# except config.Error:
# print
# raise config.Error("dist mode %r needs test execution environments, "
# "none found." %(config.option.dist))
def main(self, colitems):
self.sessionstarts()
self.setup()
exitstatus = self.loop(colitems)
self.teardown()
self.sessionfinishes(exitstatus=exitstatus)
return exitstatus
def loop_once(self, loopstate):
if loopstate.shuttingdown:
return self.loop_once_shutdown(loopstate)
colitems = loopstate.colitems
if loopstate.dowork and colitems:
self.triggertesting(loopstate.colitems)
colitems[:] = []
# we use a timeout here so that control-C gets through
while 1:
try:
eventcall = self.queue.get(timeout=2.0)
break
except queue.Empty:
continue
loopstate.dowork = True
callname, args, kwargs = eventcall
if callname is not None:
call = getattr(self.config.hook, callname)
assert not args
call(**kwargs)
# termination conditions
if ((loopstate.testsfailed and self.config.option.exitfirst) or
(not self.item2nodes and not colitems and not self.queue.qsize())):
self.triggershutdown()
loopstate.shuttingdown = True
elif not self.node2pending:
loopstate.exitstatus = outcome.EXIT_NOHOSTS
def loop_once_shutdown(self, loopstate):
# once we are in shutdown mode we dont send
# events other than HostDown upstream
eventname, args, kwargs = self.queue.get()
if eventname == "pytest_testnodedown":
self.config.hook.pytest_testnodedown(**kwargs)
self.removenode(kwargs['node'])
elif eventname == "pytest_runtest_logreport":
# might be some teardown report
self.config.hook.pytest_runtest_logreport(**kwargs)
elif eventname == "pytest_internalerror":
self.config.hook.pytest_internalerror(**kwargs)
loopstate.exitstatus = outcome.EXIT_INTERNALERROR
elif eventname == "pytest__teardown_final_logerror":
self.config.hook.pytest__teardown_final_logerror(**kwargs)
loopstate.exitstatus = outcome.EXIT_TESTSFAILED
if not self.node2pending:
# finished
if loopstate.testsfailed:
loopstate.exitstatus = outcome.EXIT_TESTSFAILED
else:
loopstate.exitstatus = outcome.EXIT_OK
#self.config.pluginmanager.unregister(loopstate)
def _initloopstate(self, colitems):
loopstate = LoopState(self, colitems)
self.config.pluginmanager.register(loopstate)
return loopstate
def loop(self, colitems):
try:
loopstate = self._initloopstate(colitems)
loopstate.dowork = False # first receive at least one HostUp events
while 1:
self.loop_once(loopstate)
if loopstate.exitstatus is not None:
exitstatus = loopstate.exitstatus
break
except KeyboardInterrupt:
excinfo = py.code.ExceptionInfo()
self.config.hook.pytest_keyboard_interrupt(excinfo=excinfo)
exitstatus = outcome.EXIT_INTERRUPTED
except:
self.config.pluginmanager.notify_exception()
exitstatus = outcome.EXIT_INTERNALERROR
self.config.pluginmanager.unregister(loopstate)
if exitstatus == 0 and self._testsfailed:
exitstatus = outcome.EXIT_TESTSFAILED
return exitstatus
def triggershutdown(self):
for node in self.node2pending:
node.shutdown()
def addnode(self, node):
assert node not in self.node2pending
self.node2pending[node] = []
def removenode(self, node):
try:
pending = self.node2pending.pop(node)
except KeyError:
# this happens if we didn't receive a testnodeready event yet
return []
for item in pending:
l = self.item2nodes[item]
l.remove(node)
if not l:
del self.item2nodes[item]
return pending
def triggertesting(self, colitems):
colitems = self.filteritems(colitems)
senditems = []
for next in colitems:
if isinstance(next, py.test.collect.Item):
senditems.append(next)
else:
self.config.hook.pytest_collectstart(collector=next)
colrep = self.config.hook.pytest_make_collect_report(collector=next)
self.queueevent("pytest_collectreport", report=colrep)
if self.config.option.dist == "each":
self.senditems_each(senditems)
else:
# XXX assert self.config.option.dist == "load"
self.senditems_load(senditems)
def queueevent(self, eventname, **kwargs):
self.queue.put((eventname, (), kwargs))
def senditems_each(self, tosend):
if not tosend:
return
room = self.MAXITEMSPERHOST
for node, pending in self.node2pending.items():
room = min(self.MAXITEMSPERHOST - len(pending), room)
sending = tosend[:room]
if sending:
for node, pending in self.node2pending.items():
node.sendlist(sending)
pending.extend(sending)
for item in sending:
nodes = self.item2nodes.setdefault(item, [])
assert node not in nodes
nodes.append(node)
item.ihook.pytest_itemstart(item=item, node=node)
tosend[:] = tosend[room:] # update inplace
if tosend:
# we have some left, give it to the main loop
self.queueevent("pytest_rescheduleitems", items=tosend)
def senditems_load(self, tosend):
if not tosend:
return
for node, pending in self.node2pending.items():
room = self.MAXITEMSPERHOST - len(pending)
if room > 0:
sending = tosend[:room]
node.sendlist(sending)
for item in sending:
#assert item not in self.item2node, (
# "sending same item %r to multiple "
# "not implemented" %(item,))
self.item2nodes.setdefault(item, []).append(node)
item.ihook.pytest_itemstart(item=item, node=node)
pending.extend(sending)
tosend[:] = tosend[room:] # update inplace
if not tosend:
break
if tosend:
# we have some left, give it to the main loop
self.queueevent("pytest_rescheduleitems", items=tosend)
def removeitem(self, item, node):
if item not in self.item2nodes:
raise AssertionError(item, self.item2nodes)
nodes = self.item2nodes[item]
if node in nodes: # the node might have gone down already
nodes.remove(node)
if not nodes:
del self.item2nodes[item]
pending = self.node2pending[node]
pending.remove(item)
def handle_crashitem(self, item, node):
runner = item.config.pluginmanager.getplugin("runner")
info = "!!! Node %r crashed during running of test %r" %(node, item)
rep = runner.ItemTestReport(item=item, excinfo=info, when="???")
rep.node = node
item.ihook.pytest_runtest_logreport(report=rep)
def setup(self):
""" setup any neccessary resources ahead of the test run. """
self.nodemanager = NodeManager(self.config)
self.nodemanager.setup_nodes(putevent=self.queue.put)
if self.config.option.dist == "each":
self.nodemanager.wait_nodesready(5.0)
def teardown(self):
""" teardown any resources after a test run. """
self.nodemanager.teardown_nodes()

View File

@ -1,99 +0,0 @@
"""
instantiating, managing and rsyncing to test hosts
"""
import py
import sys, os.path
import execnet
from execnet.gateway_base import RemoteError
class GatewayManager:
RemoteError = RemoteError
def __init__(self, specs, hook, defaultchdir="pyexecnetcache"):
self.specs = []
self.hook = hook
self.group = execnet.Group()
for spec in specs:
if not isinstance(spec, execnet.XSpec):
spec = execnet.XSpec(spec)
if not spec.chdir and not spec.popen:
spec.chdir = defaultchdir
self.specs.append(spec)
def makegateways(self):
assert not list(self.group)
for spec in self.specs:
gw = self.group.makegateway(spec)
self.hook.pytest_gwmanage_newgateway(
gateway=gw, platinfo=gw._rinfo())
def rsync(self, source, notify=None, verbose=False, ignores=None):
""" perform rsync to all remote hosts.
"""
rsync = HostRSync(source, verbose=verbose, ignores=ignores)
seen = py.builtin.set()
gateways = []
for gateway in self.group:
spec = gateway.spec
if spec.popen and not spec.chdir:
# XXX this assumes that sources are python-packages
# and that adding the basedir does not hurt
gateway.remote_exec("""
import sys ; sys.path.insert(0, %r)
""" % os.path.dirname(str(source))).waitclose()
continue
if spec not in seen:
def finished():
if notify:
notify("rsyncrootready", spec, source)
rsync.add_target_host(gateway, finished=finished)
seen.add(spec)
gateways.append(gateway)
if seen:
self.hook.pytest_gwmanage_rsyncstart(
source=source,
gateways=gateways,
)
rsync.send()
self.hook.pytest_gwmanage_rsyncfinish(
source=source,
gateways=gateways,
)
def exit(self):
self.group.terminate()
class HostRSync(execnet.RSync):
""" RSyncer that filters out common files
"""
def __init__(self, sourcedir, *args, **kwargs):
self._synced = {}
ignores= None
if 'ignores' in kwargs:
ignores = kwargs.pop('ignores')
self._ignores = ignores or []
super(HostRSync, self).__init__(sourcedir=sourcedir, **kwargs)
def filter(self, path):
path = py.path.local(path)
if not path.ext in ('.pyc', '.pyo'):
if not path.basename.endswith('~'):
if path.check(dotfile=0):
for x in self._ignores:
if path == x:
break
else:
return True
def add_target_host(self, gateway, finished=None):
remotepath = os.path.basename(self._sourcedir)
super(HostRSync, self).add_target(gateway, remotepath,
finishedcallback=finished,
delete=True,)
def _report_send_file(self, gateway, modified_rel_path):
if self._verbose:
path = os.path.basename(self._sourcedir) + "/" + modified_rel_path
remotepath = gateway.spec.chdir
py.builtin.print_('%s:%s <= %s' %
(gateway.spec, remotepath, path))

View File

@ -1,187 +0,0 @@
"""
Pickling support for two processes that want to exchange
*immutable* object instances. Immutable in the sense
that the receiving side of an object can modify its
copy but when it sends it back the original sending
side will continue to see its unmodified version
(and no actual state will go over the wire).
This module also implements an experimental
execnet pickling channel using this idea.
"""
import py
import sys, os, struct
#debug = open("log-mypickle-%d" % os.getpid(), 'w')
if sys.version_info >= (3,0):
makekey = lambda x: x
fromkey = lambda x: x
from pickle import _Pickler as Pickler
from pickle import _Unpickler as Unpickler
else:
makekey = str
fromkey = int
from pickle import Pickler, Unpickler
class MyPickler(Pickler):
""" Pickler with a custom memoize()
to take care of unique ID creation.
See the usage in ImmutablePickler
XXX we could probably extend Pickler
and Unpickler classes to directly
update the other'S memos.
"""
def __init__(self, file, protocol, uneven):
Pickler.__init__(self, file, protocol)
self.uneven = uneven
def memoize(self, obj):
if self.fast:
return
assert id(obj) not in self.memo
memo_len = len(self.memo)
key = memo_len * 2 + self.uneven
self.write(self.put(key))
self.memo[id(obj)] = key, obj
#if sys.version_info < (3,0):
# def save_string(self, obj, pack=struct.pack):
# obj = unicode(obj)
# self.save_unicode(obj, pack=pack)
# Pickler.dispatch[str] = save_string
class ImmutablePickler:
def __init__(self, uneven, protocol=0):
""" ImmutablePicklers are instantiated in Pairs.
The two sides need to create unique IDs
while pickling their objects. This is
done by using either even or uneven
numbers, depending on the instantiation
parameter.
"""
self._picklememo = {}
self._unpicklememo = {}
self._protocol = protocol
self.uneven = uneven and 1 or 0
def selfmemoize(self, obj):
# this is for feeding objects to ourselfes
# which be the case e.g. if you want to pickle
# from a forked process back to the original
f = py.io.BytesIO()
pickler = MyPickler(f, self._protocol, uneven=self.uneven)
pickler.memo = self._picklememo
pickler.memoize(obj)
self._updateunpicklememo()
def dumps(self, obj):
f = py.io.BytesIO()
pickler = MyPickler(f, self._protocol, uneven=self.uneven)
pickler.memo = self._picklememo
pickler.dump(obj)
if obj is not None:
self._updateunpicklememo()
#print >>debug, "dumped", obj
#print >>debug, "picklememo", self._picklememo
return f.getvalue()
def loads(self, string):
f = py.io.BytesIO(string)
unpickler = Unpickler(f)
unpickler.memo = self._unpicklememo
res = unpickler.load()
self._updatepicklememo()
#print >>debug, "loaded", res
#print >>debug, "unpicklememo", self._unpicklememo
return res
def _updatepicklememo(self):
for x, obj in self._unpicklememo.items():
self._picklememo[id(obj)] = (fromkey(x), obj)
def _updateunpicklememo(self):
for key,obj in self._picklememo.values():
key = makekey(key)
if key in self._unpicklememo:
assert self._unpicklememo[key] is obj
self._unpicklememo[key] = obj
NO_ENDMARKER_WANTED = object()
class UnpickleError(Exception):
""" Problems while unpickling. """
def __init__(self, formatted):
self.formatted = formatted
Exception.__init__(self, formatted)
def __str__(self):
return self.formatted
class PickleChannel(object):
""" PickleChannels wrap execnet channels
and allow to send/receive by using
"immutable pickling".
"""
_unpicklingerror = None
def __init__(self, channel):
self._channel = channel
# we use the fact that each side of a
# gateway connection counts with uneven
# or even numbers depending on which
# side it is (for the purpose of creating
# unique ids - which is what we need it here for)
uneven = channel.gateway._channelfactory.count % 2
self._ipickle = ImmutablePickler(uneven=uneven)
self.RemoteError = channel.RemoteError
def send(self, obj):
pickled_obj = self._ipickle.dumps(obj)
self._channel.send(pickled_obj)
def receive(self):
pickled_obj = self._channel.receive()
return self._unpickle(pickled_obj)
def _unpickle(self, pickled_obj):
if isinstance(pickled_obj, self._channel.__class__):
return pickled_obj
return self._ipickle.loads(pickled_obj)
def _getremoteerror(self):
return self._unpicklingerror or self._channel._getremoteerror()
def close(self):
return self._channel.close()
def isclosed(self):
return self._channel.isclosed()
def waitclose(self, timeout=None):
return self._channel.waitclose(timeout=timeout)
def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED):
if endmarker is NO_ENDMARKER_WANTED:
def unpickle_callback(pickled_obj):
obj = self._unpickle(pickled_obj)
callback(obj)
self._channel.setcallback(unpickle_callback)
return
uniqueendmarker = object()
def unpickle_callback(pickled_obj):
if pickled_obj is uniqueendmarker:
return callback(endmarker)
try:
obj = self._unpickle(pickled_obj)
except KeyboardInterrupt:
raise
except:
excinfo = py.code.ExceptionInfo()
formatted = str(excinfo.getrepr(showlocals=True,funcargs=True))
self._unpicklingerror = UnpickleError(formatted)
callback(endmarker)
else:
callback(obj)
self._channel.setcallback(unpickle_callback, uniqueendmarker)

View File

@ -1,81 +0,0 @@
import py
import sys, os
from py.impl.test.dist.txnode import TXNode
from py.impl.test.dist.gwmanage import GatewayManager
class NodeManager(object):
def __init__(self, config, specs=None):
self.config = config
if specs is None:
specs = self.config.getxspecs()
self.roots = self.config.getrsyncdirs()
self.gwmanager = GatewayManager(specs, config.hook)
self.nodes = []
self._nodesready = py.std.threading.Event()
def trace(self, msg):
self.config.hook.pytest_trace(category="nodemanage", msg=msg)
def config_getignores(self):
return self.config.getconftest_pathlist("rsyncignore")
def rsync_roots(self):
""" make sure that all remote gateways
have the same set of roots in their
current directory.
"""
self.makegateways()
options = {
'ignores': self.config_getignores(),
'verbose': self.config.option.verbose,
}
if self.roots:
# send each rsync root
for root in self.roots:
self.gwmanager.rsync(root, **options)
else:
XXX # do we want to care for situations without explicit rsyncdirs?
# we transfer our topdir as the root
self.gwmanager.rsync(self.config.topdir, **options)
# and cd into it
self.gwmanager.multi_chdir(self.config.topdir.basename, inplacelocal=False)
def makegateways(self):
# we change to the topdir sot that
# PopenGateways will have their cwd
# such that unpickling configs will
# pick it up as the right topdir
# (for other gateways this chdir is irrelevant)
self.trace("making gateways")
old = self.config.topdir.chdir()
try:
self.gwmanager.makegateways()
finally:
old.chdir()
def setup_nodes(self, putevent):
self.rsync_roots()
self.trace("setting up nodes")
for gateway in self.gwmanager.group:
node = TXNode(gateway, self.config, putevent, slaveready=self._slaveready)
gateway.node = node # to keep node alive
self.trace("started node %r" % node)
def _slaveready(self, node):
#assert node.gateway == node.gateway
#assert node.gateway.node == node
self.nodes.append(node)
self.trace("%s slave node ready %r" % (node.gateway.id, node))
if len(self.nodes) == len(list(self.gwmanager.group)):
self._nodesready.set()
def wait_nodesready(self, timeout=None):
self._nodesready.wait(timeout)
if not self._nodesready.isSet():
raise IOError("nodes did not get ready for %r secs" % timeout)
def teardown_nodes(self):
# XXX do teardown nodes?
self.gwmanager.exit()

View File

@ -1,164 +0,0 @@
"""
Manage setup, running and local representation of remote nodes/processes.
"""
import py
from py.impl.test.dist.mypickle import PickleChannel
from py.impl.test import outcome
class TXNode(object):
""" Represents a Test Execution environment in the controlling process.
- sets up a slave node through an execnet gateway
- manages sending of test-items and receival of results and events
- creates events when the remote side crashes
"""
ENDMARK = -1
def __init__(self, gateway, config, putevent, slaveready=None):
self.config = config
self.putevent = putevent
self.gateway = gateway
self.channel = install_slave(gateway, config)
self._sendslaveready = slaveready
self.channel.setcallback(self.callback, endmarker=self.ENDMARK)
self._down = False
def __repr__(self):
id = self.gateway.id
status = self._down and 'true' or 'false'
return "<TXNode %r down=%s>" %(id, status)
def notify(self, eventname, *args, **kwargs):
assert not args
self.putevent((eventname, args, kwargs))
def callback(self, eventcall):
""" this gets called for each object we receive from
the other side and if the channel closes.
Note that channel callbacks run in the receiver
thread of execnet gateways - we need to
avoid raising exceptions or doing heavy work.
"""
try:
if eventcall == self.ENDMARK:
err = self.channel._getremoteerror()
if not self._down:
if not err or isinstance(err, EOFError):
err = "Not properly terminated"
self.notify("pytest_testnodedown", node=self, error=err)
self._down = True
return
eventname, args, kwargs = eventcall
if eventname == "slaveready":
if self._sendslaveready:
self._sendslaveready(self)
self.notify("pytest_testnodeready", node=self)
elif eventname == "slavefinished":
self._down = True
self.notify("pytest_testnodedown", error=None, node=self)
elif eventname in ("pytest_runtest_logreport",
"pytest__teardown_final_logerror"):
kwargs['report'].node = self
self.notify(eventname, **kwargs)
else:
self.notify(eventname, **kwargs)
except KeyboardInterrupt:
# should not land in receiver-thread
raise
except:
excinfo = py.code.ExceptionInfo()
py.builtin.print_("!" * 20, excinfo)
self.config.pluginmanager.notify_exception(excinfo)
def send(self, item):
assert item is not None
self.channel.send(item)
def sendlist(self, itemlist):
self.channel.send(itemlist)
def shutdown(self):
self.channel.send(None)
# setting up slave code
def install_slave(gateway, config):
channel = gateway.remote_exec(source="""
import os, sys
sys.path.insert(0, os.getcwd())
from py.impl.test.dist.mypickle import PickleChannel
from py.impl.test.dist.txnode import SlaveNode
channel.send("basicimport")
channel = PickleChannel(channel)
slavenode = SlaveNode(channel)
slavenode.run()
""")
channel.receive()
channel = PickleChannel(channel)
basetemp = None
if gateway.spec.popen:
popenbase = config.ensuretemp("popen")
basetemp = py.path.local.make_numbered_dir(prefix="slave-",
keep=0, rootdir=popenbase)
basetemp = str(basetemp)
channel.send((config, basetemp, gateway.id))
return channel
class SlaveNode(object):
def __init__(self, channel):
self.channel = channel
def __repr__(self):
return "<%s channel=%s>" %(self.__class__.__name__, self.channel)
def sendevent(self, eventname, *args, **kwargs):
self.channel.send((eventname, args, kwargs))
def pytest_runtest_logreport(self, report):
self.sendevent("pytest_runtest_logreport", report=report)
def pytest__teardown_final_logerror(self, report):
self.sendevent("pytest__teardown_final_logerror", report=report)
def run(self):
channel = self.channel
self.config, basetemp, self.nodeid = channel.receive()
if basetemp:
self.config.basetemp = py.path.local(basetemp)
self.config.pluginmanager.do_configure(self.config)
self.config.pluginmanager.register(self)
self.runner = self.config.pluginmanager.getplugin("pytest_runner")
self.sendevent("slaveready")
try:
self.config.hook.pytest_sessionstart(session=self)
while 1:
task = channel.receive()
if task is None:
break
if isinstance(task, list):
for item in task:
self.run_single(item=item)
else:
self.run_single(item=task)
self.config.hook.pytest_sessionfinish(
session=self,
exitstatus=outcome.EXIT_OK)
except KeyboardInterrupt:
raise
except:
er = py.code.ExceptionInfo().getrepr(funcargs=True, showlocals=True)
self.sendevent("pytest_internalerror", excrepr=er)
raise
else:
self.sendevent("slavefinished")
def run_single(self, item):
call = self.runner.CallInfo(item._checkcollectable, when='setup')
if call.excinfo:
# likely it is not collectable here because of
# platform/import-dependency induced skips
# we fake a setup-error report with the obtained exception
# and do not care about capturing or non-runner hooks
rep = self.runner.pytest_runtest_makereport(item=item, call=call)
self.pytest_runtest_logreport(rep)
return
item.config.hook.pytest_runtest_protocol(item=item)

View File

@ -1 +0,0 @@
#

View File

@ -1,165 +0,0 @@
"""
LooponfailingSession and Helpers.
NOTE that one really has to avoid loading and depending on
application modules within the controlling process
(the one that starts repeatedly test processes)
otherwise changes to source code can crash
the controlling process which should never happen.
"""
import py
import sys
import execnet
from py.impl.test.session import Session
from py.impl.test.looponfail import util
class LooponfailingSession(Session):
def __init__(self, config):
super(LooponfailingSession, self).__init__(config=config)
self.rootdirs = [self.config.topdir] # xxx dist_rsync_roots?
self.statrecorder = util.StatRecorder(self.rootdirs)
self.remotecontrol = RemoteControl(self.config)
self.out = py.io.TerminalWriter()
def main(self, initialitems):
try:
self.loopstate = loopstate = LoopState([])
self.remotecontrol.setup()
while 1:
self.loop_once(loopstate)
if not loopstate.colitems and loopstate.wasfailing:
continue # the last failures passed, let's rerun all
self.statrecorder.waitonchange(checkinterval=2.0)
except KeyboardInterrupt:
print
def loop_once(self, loopstate):
colitems = loopstate.colitems
loopstate.wasfailing = colitems and len(colitems)
loopstate.colitems = self.remotecontrol.runsession(colitems or ())
self.remotecontrol.setup()
class LoopState:
def __init__(self, colitems=None):
self.colitems = colitems
class RemoteControl(object):
def __init__(self, config):
self.config = config
def trace(self, *args):
if self.config.option.debug:
msg = " ".join([str(x) for x in args])
py.builtin.print_("RemoteControl:", msg)
def initgateway(self):
return execnet.makegateway("popen")
def setup(self, out=None):
if out is None:
out = py.io.TerminalWriter()
if hasattr(self, 'gateway'):
raise ValueError("already have gateway %r" % self.gateway)
self.trace("setting up slave session")
self.gateway = self.initgateway()
self.channel = channel = self.gateway.remote_exec("""
import os
import py
chdir = channel.receive()
outchannel = channel.gateway.newchannel()
channel.send(outchannel)
os.chdir(chdir) # unpickling config uses cwd as topdir
config_state = channel.receive()
fullwidth, hasmarkup = channel.receive()
py.test.config.__setstate__(config_state)
import sys
sys.stdout = sys.stderr = outchannel.makefile('w')
from py.impl.test.looponfail.remote import slave_runsession
slave_runsession(channel, py.test.config, fullwidth, hasmarkup)
""")
channel.send(str(self.config.topdir))
remote_outchannel = channel.receive()
def write(s):
out._file.write(s)
out._file.flush()
remote_outchannel.setcallback(write)
channel.send(self.config.__getstate__())
channel.send((out.fullwidth, out.hasmarkup))
self.trace("set up of slave session complete")
def ensure_teardown(self):
if hasattr(self, 'channel'):
if not self.channel.isclosed():
self.trace("closing", self.channel)
self.channel.close()
del self.channel
if hasattr(self, 'gateway'):
self.trace("exiting", self.gateway)
self.gateway.exit()
del self.gateway
def runsession(self, colitems=()):
try:
self.trace("sending", colitems)
trails = colitems
self.channel.send(trails)
try:
return self.channel.receive()
except self.channel.RemoteError:
e = sys.exc_info()[1]
self.trace("ERROR", e)
raise
finally:
self.ensure_teardown()
def slave_runsession(channel, config, fullwidth, hasmarkup):
""" we run this on the other side. """
if config.option.debug:
def DEBUG(*args):
print(" ".join(map(str, args)))
else:
def DEBUG(*args): pass
DEBUG("SLAVE: received configuration, using topdir:", config.topdir)
#config.option.session = None
config.option.looponfail = False
config.option.usepdb = False
trails = channel.receive()
config.pluginmanager.do_configure(config)
DEBUG("SLAVE: initsession()")
session = config.initsession()
# XXX configure the reporter object's terminal writer more directly
# XXX and write a test for this remote-terminal setting logic
config.pytest_terminal_hasmarkup = hasmarkup
config.pytest_terminal_fullwidth = fullwidth
if trails:
colitems = []
for trail in trails:
try:
colitem = config._rootcol.fromtrail(trail)
except ValueError:
#XXX send info for "test disappeared" or so
continue
colitems.append(colitem)
else:
colitems = config.getinitialnodes()
session.shouldclose = channel.isclosed
class Failures(list):
def pytest_runtest_logreport(self, report):
if report.failed:
self.append(report)
pytest_collectreport = pytest_runtest_logreport
failreports = Failures()
session.pluginmanager.register(failreports)
DEBUG("SLAVE: starting session.main()")
session.main(colitems)
session.config.hook.pytest_looponfailinfo(
failreports=list(failreports),
rootdirs=[config.topdir])
rootcol = session.config._rootcol
channel.send([rootcol.totrail(rep.getnode()) for rep in failreports])

View File

@ -1,53 +0,0 @@
import py
class StatRecorder:
def __init__(self, rootdirlist):
self.rootdirlist = rootdirlist
self.statcache = {}
self.check() # snapshot state
def fil(self, p):
return p.ext in ('.py', '.txt', '.c', '.h')
def rec(self, p):
return p.check(dotfile=0)
def waitonchange(self, checkinterval=1.0):
while 1:
changed = self.check()
if changed:
return
py.std.time.sleep(checkinterval)
def check(self, removepycfiles=True):
changed = False
statcache = self.statcache
newstat = {}
for rootdir in self.rootdirlist:
for path in rootdir.visit(self.fil, self.rec):
oldstat = statcache.get(path, None)
if oldstat is not None:
del statcache[path]
try:
newstat[path] = curstat = path.stat()
except py.error.ENOENT:
if oldstat:
del statcache[path]
changed = True
else:
if oldstat:
if oldstat.mtime != curstat.mtime or \
oldstat.size != curstat.size:
changed = True
py.builtin.print_("# MODIFIED", path)
if removepycfiles and path.ext == ".py":
pycfile = path + "c"
if pycfile.check():
pycfile.remove()
else:
changed = True
if statcache:
changed = True
self.statcache = newstat
return changed

View File

@ -61,6 +61,18 @@ class PluginManager(object):
def getplugins(self):
return list(self.registry)
def skipifmissing(self, name):
if not self.hasplugin(name):
py.test.skip("plugin %r is missing" % name)
def hasplugin(self, name):
try:
self.getplugin(name)
except KeyError:
return False
else:
return True
def getplugin(self, name):
try:
return self._name2plugin[name]

View File

@ -3,16 +3,6 @@
import sys
import py
try:
import execnet
if not py.path.local(py.__file__).check():
raise ImportError("")
except ImportError:
execnet = None
else:
if not hasattr(execnet, 'Group'):
execnet = None
def pytest_pyfunc_call(__multicall__, pyfuncitem):
if not __multicall__.execute():
testfunction = pyfuncitem.obj
@ -63,10 +53,6 @@ def pytest_addoption(parser):
"space separated keywords. precede a keyword with '-' to negate. "
"Terminate the expression with ':' to treat a match as a signal "
"to run all subsequent tests. ")
if execnet:
group._addoption('-f', '--looponfail',
action="store_true", dest="looponfail", default=False,
help="run tests, re-run failing test set until all pass.")
group = parser.getgroup("collect", "collection")
group.addoption('--collectonly',
@ -82,60 +68,15 @@ def pytest_addoption(parser):
"test process debugging and configuration")
group.addoption('--basetemp', dest="basetemp", default=None, metavar="dir",
help="base temporary directory for this test run.")
if execnet:
add_dist_options(parser)
else:
parser.hints.append(
"'execnet>=1.0.0b4' required for --looponfailing / distributed testing."
)
def add_dist_options(parser):
# see http://pytest.org/help/dist")
group = parser.getgroup("dist", "distributed testing")
group._addoption('--dist', metavar="distmode",
action="store", choices=['load', 'each', 'no'],
type="choice", dest="dist", default="no",
help=("set mode for distributing tests to exec environments.\n\n"
"each: send each test to each available environment.\n\n"
"load: send each test to available environment.\n\n"
"(default) no: run tests inprocess, don't distribute."))
group._addoption('--tx', dest="tx", action="append", default=[], metavar="xspec",
help=("add a test execution environment. some examples: "
"--tx popen//python=python2.5 --tx socket=192.168.1.102:8888 "
"--tx ssh=user@codespeak.net//chdir=testcache"))
group._addoption('-d',
action="store_true", dest="distload", default=False,
help="load-balance tests. shortcut for '--dist=load'")
group._addoption('-n', dest="numprocesses", metavar="numprocesses",
action="store", type="int",
help="shortcut for '--dist=load --tx=NUM*popen'")
group.addoption('--rsyncdir', action="append", default=[], metavar="dir1",
help="add directory for rsyncing to remote tx nodes.")
def pytest_configure(config):
fixoptions(config)
setsession(config)
def fixoptions(config):
if execnet:
if config.option.numprocesses:
config.option.dist = "load"
config.option.tx = ['popen'] * int(config.option.numprocesses)
if config.option.distload:
config.option.dist = "load"
def setsession(config):
val = config.getvalue
if val("collectonly"):
from py.impl.test.session import Session
config.setsessionclass(Session)
elif execnet:
if val("looponfail"):
from py.impl.test.looponfail.remote import LooponfailingSession
config.setsessionclass(LooponfailingSession)
elif val("dist") != "no":
from py.impl.test.dist.dsession import DSession
config.setsessionclass(DSession)
# pycollect related hooks and code, should move to pytest_pycollect.py

View File

@ -4,10 +4,6 @@ interactive debugging with the Python Debugger.
import py
import pdb, sys, linecache
from py.impl.test.outcome import Skipped
try:
import execnet
except ImportError:
execnet = None
def pytest_addoption(parser):
group = parser.getgroup("general")
@ -15,16 +11,9 @@ def pytest_addoption(parser):
action="store_true", dest="usepdb", default=False,
help="start the interactive Python debugger on errors.")
def pytest_configure(__multicall__, config):
if config.option.usepdb:
if execnet:
__multicall__.execute()
if config.getvalue("looponfail"):
raise config.Error("--pdb incompatible with --looponfail.")
if config.option.dist != "no":
raise config.Error("--pdb incompatible with distributing tests.")
config.pluginmanager.register(PdbInvoke())
def pytest_configure(config):
if config.getvalue("usepdb"):
config.pluginmanager.register(PdbInvoke(), 'pdb')
class PdbInvoke:
def pytest_runtest_makereport(self, item, call):

View File

@ -8,12 +8,6 @@ from py.impl.test.outcome import Skipped
#
# pytest plugin hooks
def pytest_addoption(parser):
group = parser.getgroup("general")
group.addoption('--boxed',
action="store_true", dest="boxed", default=False,
help="box each test run in a separate process (unix)")
# XXX move to pytest_sessionstart and fix py.test owns tests
def pytest_configure(config):
config._setupstate = SetupState()
@ -36,12 +30,7 @@ def pytest_make_collect_report(collector):
return CollectReport(collector, result, excinfo)
def pytest_runtest_protocol(item):
if item.config.getvalue("boxed"):
reports = forked_run_report(item)
for rep in reports:
item.ihook.pytest_runtest_logreport(report=rep)
else:
runtestprotocol(item)
runtestprotocol(item)
return True
def runtestprotocol(item, log=True):
@ -116,38 +105,6 @@ class CallInfo:
status = "result: %r" % (self.result,)
return "<CallInfo when=%r %s>" % (self.when, status)
def forked_run_report(item):
# for now, we run setup/teardown in the subprocess
# XXX optionally allow sharing of setup/teardown
EXITSTATUS_TESTEXIT = 4
from py.impl.test.dist.mypickle import ImmutablePickler
ipickle = ImmutablePickler(uneven=0)
ipickle.selfmemoize(item.config)
# XXX workaround the issue that 2.6 cannot pickle
# instances of classes defined in global conftest.py files
ipickle.selfmemoize(item)
def runforked():
try:
reports = runtestprotocol(item, log=False)
except KeyboardInterrupt:
py.std.os._exit(EXITSTATUS_TESTEXIT)
return ipickle.dumps(reports)
ff = py.process.ForkedFunc(runforked)
result = ff.waitfinish()
if result.retval is not None:
return ipickle.loads(result.retval)
else:
if result.exitstatus == EXITSTATUS_TESTEXIT:
py.test.exit("forked test item %s raised Exit" %(item,))
return [report_process_crash(item, result)]
def report_process_crash(item, result):
path, lineno = item._getfslineno()
info = "%s:%s: running the test CRASHED with signal %d" %(
path, lineno, result.signal)
return ItemTestReport(item, excinfo=info, when="???")
class BaseReport(object):
def __repr__(self):
l = ["%s=%s" %(key, value)

View File

@ -55,8 +55,6 @@ def main():
'py.impl.path',
'py.impl.process',
'py.impl.test',
'py.impl.test.dist',
'py.impl.test.looponfail',
],
zip_safe=False,
)

View File

@ -1,21 +1,6 @@
import py
from py.plugin.pytest_default import pytest_report_iteminfo
def test_implied_different_sessions(testdir, tmpdir):
def x(*args):
config = testdir.reparseconfig([tmpdir] + list(args))
try:
config.pluginmanager.do_configure(config)
except ValueError:
return Exception
return getattr(config._sessionclass, '__name__', None)
assert x() == None
py.test.importorskip("execnet")
assert x('-d') == 'DSession'
assert x('--dist=each') == 'DSession'
assert x('-n3') == 'DSession'
assert x('-f') == 'LooponfailingSession'
def test_plugin_specify(testdir):
testdir.chdir()
config = py.test.raises(ImportError, """
@ -40,50 +25,6 @@ def test_exclude(testdir):
assert result.ret == 0
assert result.stdout.fnmatch_lines(["*1 passed*"])
class TestDistOptions:
def setup_method(self, method):
py.test.importorskip("execnet")
def test_getxspecs(self, testdir):
config = testdir.parseconfigure("--tx=popen", "--tx", "ssh=xyz")
xspecs = config.getxspecs()
assert len(xspecs) == 2
print(xspecs)
assert xspecs[0].popen
assert xspecs[1].ssh == "xyz"
def test_xspecs_multiplied(self, testdir):
xspecs = testdir.parseconfigure("--tx=3*popen",).getxspecs()
assert len(xspecs) == 3
assert xspecs[1].popen
def test_getrsyncdirs(self, testdir):
config = testdir.parseconfigure('--rsyncdir=' + str(testdir.tmpdir))
roots = config.getrsyncdirs()
assert len(roots) == 1 + 1 # pylib itself
assert testdir.tmpdir in roots
def test_getrsyncdirs_with_conftest(self, testdir):
p = py.path.local()
for bn in 'x y z'.split():
p.mkdir(bn)
testdir.makeconftest("""
rsyncdirs= 'x',
""")
config = testdir.parseconfigure(testdir.tmpdir, '--rsyncdir=y', '--rsyncdir=z')
roots = config.getrsyncdirs()
assert len(roots) == 3 + 1 # pylib itself
assert py.path.local('y') in roots
assert py.path.local('z') in roots
assert testdir.tmpdir.join('x') in roots
def test_dist_options(self, testdir):
config = testdir.parseconfigure("-n 2")
assert config.option.dist == "load"
assert config.option.tx == ['popen'] * 2
config = testdir.parseconfigure("-d")
assert config.option.dist == "load"
def test_pytest_report_iteminfo():
class FakeItem(object):

View File

@ -25,13 +25,14 @@ def test_gen(testdir, anypython, standalone):
"*imported from*mypytest"
])
def test_rundist(testdir, standalone):
def test_rundist(testdir, pytestconfig, standalone):
pytestconfig.pluginmanager.skipifmissing("xdist")
testdir.makepyfile("""
def test_one():
pass
""")
result = standalone.run(sys.executable, testdir, '-n', '3')
assert result.ret == 2
result.stderr.fnmatch_lines([
"*no such option*"
assert result.ret == 0
result.stdout.fnmatch_lines([
"*1 passed*",
])

View File

@ -43,14 +43,3 @@ class TestPDB:
child.expect("1 failed")
if child.isalive():
child.wait()
def test_dist_incompatibility_messages(self, testdir):
py.test.importorskip("execnet")
Error = py.test.config.Error
py.test.raises(Error, "testdir.parseconfigure('--pdb', '--looponfail')")
result = testdir.runpytest("--pdb", "-n", "3")
assert result.ret != 0
assert "incompatible" in result.stderr.str()
result = testdir.runpytest("--pdb", "-d", "--tx", "popen")
assert result.ret != 0
assert "incompatible" in result.stderr.str()

View File

@ -221,7 +221,9 @@ class TestExecutionForked(BaseFunctionalTests):
pytestmark = py.test.mark.skipif("not hasattr(os, 'fork')")
def getrunner(self):
return runner.forked_run_report
# XXX re-arrange this test to live in pytest-xdist
xplugin = py.test.importorskip("xdist.plugin")
return xplugin.forked_run_report
def test_suicide(self, testdir):
reports = testdir.runitem("""
@ -262,19 +264,6 @@ class TestCollectionReports:
assert not rep.passed
assert rep.skipped
@py.test.mark.skipif("not hasattr(os, 'fork')")
def test_functional_boxed(testdir):
p1 = testdir.makepyfile("""
import os
def test_function():
os.kill(os.getpid(), 15)
""")
result = testdir.runpytest(p1, "--boxed")
assert result.stdout.fnmatch_lines([
"*CRASHED*",
"*1 failed*"
])
def test_callinfo():
ci = runner.CallInfo(lambda: 0, '123')
assert ci.when == "123"

View File

@ -3,10 +3,6 @@ terminal reporting of the full testing process.
"""
import py
import sys
try:
import execnet
except ImportError:
execnet = None
# ===============================================================================
# plugin tests
@ -45,12 +41,13 @@ def pytest_generate_tests(metafunc):
id="verbose",
funcargs={'option': Option(verbose=True)}
)
nodist = getattr(metafunc.function, 'nodist', False)
if execnet and not nodist:
metafunc.addcall(
id="verbose-dist",
funcargs={'option': Option(dist='each', verbose=True)}
)
if metafunc.config.pluginmanager.hasplugin("xdist"):
nodist = getattr(metafunc.function, 'nodist', False)
if not nodist:
metafunc.addcall(
id="verbose-dist",
funcargs={'option': Option(dist='each', verbose=True)}
)
class TestTerminal:
def test_pass_skip_fail(self, testdir, option):
@ -545,7 +542,7 @@ class TestTerminalFunctional:
"y* = 'xxxxxx*"
])
def test_verbose_reporting(self, testdir):
def test_verbose_reporting(self, testdir, pytestconfig):
p1 = testdir.makepyfile("""
import py
def test_fail():
@ -568,12 +565,12 @@ class TestTerminalFunctional:
"*test_verbose_reporting.py:10: test_gen*FAIL*",
])
assert result.ret == 1
if execnet:
result = testdir.runpytest(p1, '-v', '-n 1')
result.stdout.fnmatch_lines([
"*FAIL*test_verbose_reporting.py:2: test_fail*",
])
assert result.ret == 1
pytestconfig.pluginmanager.skipifmissing("xdist")
result = testdir.runpytest(p1, '-v', '-n 1')
result.stdout.fnmatch_lines([
"*FAIL*test_verbose_reporting.py:2: test_fail*",
])
assert result.ret == 1
def test_getreportopt():

View File

@ -1 +0,0 @@
#

View File

@ -1,119 +0,0 @@
import py
class TestDistribution:
def test_manytests_to_one_popen(self, testdir):
p1 = testdir.makepyfile("""
import py
def test_fail0():
assert 0
def test_fail1():
raise ValueError()
def test_ok():
pass
def test_skip():
py.test.skip("hello")
""",
)
result = testdir.runpytest(p1, '-d', '--tx=popen', '--tx=popen')
result.stdout.fnmatch_lines([
"*0*popen*Python*",
"*1*popen*Python*",
"*2 failed, 1 passed, 1 skipped*",
])
assert result.ret == 1
def test_dist_conftest_specified(self, testdir):
p1 = testdir.makepyfile("""
import py
def test_fail0():
assert 0
def test_fail1():
raise ValueError()
def test_ok():
pass
def test_skip():
py.test.skip("hello")
""",
)
testdir.makeconftest("""
option_tx = 'popen popen popen'.split()
""")
result = testdir.runpytest(p1, '-d')
result.stdout.fnmatch_lines([
"*0*popen*Python*",
"*1*popen*Python*",
"*2*popen*Python*",
"*2 failed, 1 passed, 1 skipped*",
])
assert result.ret == 1
def test_dist_tests_with_crash(self, testdir):
if not hasattr(py.std.os, 'kill'):
py.test.skip("no os.kill")
p1 = testdir.makepyfile("""
import py
def test_fail0():
assert 0
def test_fail1():
raise ValueError()
def test_ok():
pass
def test_skip():
py.test.skip("hello")
def test_crash():
import time
import os
time.sleep(0.5)
os.kill(os.getpid(), 15)
"""
)
result = testdir.runpytest(p1, '-d', '--tx=3*popen')
result.stdout.fnmatch_lines([
"*popen*Python*",
"*popen*Python*",
"*popen*Python*",
"*node down*",
"*3 failed, 1 passed, 1 skipped*"
])
assert result.ret == 1
def test_distribution_rsyncdirs_example(self, testdir):
source = testdir.mkdir("source")
dest = testdir.mkdir("dest")
subdir = source.mkdir("example_pkg")
subdir.ensure("__init__.py")
p = subdir.join("test_one.py")
p.write("def test_5(): assert not __file__.startswith(%r)" % str(p))
result = testdir.runpytest("-d", "--rsyncdir=%(subdir)s" % locals(),
"--tx=popen//chdir=%(dest)s" % locals(), p)
assert result.ret == 0
result.stdout.fnmatch_lines([
"*0* *popen*platform*",
#"RSyncStart: [G1]",
#"RSyncFinished: [G1]",
"*1 passed*"
])
assert dest.join(subdir.basename).check(dir=1)
def test_dist_each(self, testdir):
interpreters = []
for name in ("python2.4", "python2.5"):
interp = py.path.local.sysfind(name)
if interp is None:
py.test.skip("%s not found" % name)
interpreters.append(interp)
testdir.makepyfile(__init__="", test_one="""
import sys
def test_hello():
print("%s...%s" % sys.version_info[:2])
assert 0
""")
args = ["--dist=each"]
args += ["--tx", "popen//python=%s" % interpreters[0]]
args += ["--tx", "popen//python=%s" % interpreters[1]]
result = testdir.runpytest(*args)
s = result.stdout.str()
assert "2.4" in s
assert "2.5" in s

View File

@ -1,4 +0,0 @@
try:
import execnet
except ImportError:
collect_ignore = ['.']

View File

@ -1,505 +0,0 @@
from py.impl.test.dist.dsession import DSession
from py.impl.test import outcome
import py
import execnet
XSpec = execnet.XSpec
def run(item, node, excinfo=None):
runner = item.config.pluginmanager.getplugin("runner")
rep = runner.ItemTestReport(item=item,
excinfo=excinfo, when="call")
rep.node = node
return rep
class MockNode:
def __init__(self):
self.sent = []
def sendlist(self, items):
self.sent.append(items)
def shutdown(self):
self._shutdown=True
def dumpqueue(queue):
while queue.qsize():
print(queue.get())
class TestDSession:
def test_add_remove_node(self, testdir):
item = testdir.getitem("def test_func(): pass")
node = MockNode()
rep = run(item, node)
session = DSession(item.config)
assert not session.node2pending
session.addnode(node)
assert len(session.node2pending) == 1
session.senditems_load([item])
pending = session.removenode(node)
assert pending == [item]
assert item not in session.item2nodes
l = session.removenode(node)
assert not l
def test_senditems_each_and_receive_with_two_nodes(self, testdir):
item = testdir.getitem("def test_func(): pass")
node1 = MockNode()
node2 = MockNode()
session = DSession(item.config)
session.addnode(node1)
session.addnode(node2)
session.senditems_each([item])
assert session.node2pending[node1] == [item]
assert session.node2pending[node2] == [item]
assert node1 in session.item2nodes[item]
assert node2 in session.item2nodes[item]
session.removeitem(item, node1)
assert session.item2nodes[item] == [node2]
session.removeitem(item, node2)
assert not session.node2pending[node1]
assert not session.item2nodes
def test_senditems_load_and_receive_one_node(self, testdir):
item = testdir.getitem("def test_func(): pass")
node = MockNode()
rep = run(item, node)
session = DSession(item.config)
session.addnode(node)
session.senditems_load([item])
assert session.node2pending[node] == [item]
assert session.item2nodes[item] == [node]
session.removeitem(item, node)
assert not session.node2pending[node]
assert not session.item2nodes
def test_triggertesting_collect(self, testdir):
modcol = testdir.getmodulecol("""
def test_func():
pass
""")
session = DSession(modcol.config)
session.triggertesting([modcol])
name, args, kwargs = session.queue.get(block=False)
assert name == 'pytest_collectreport'
report = kwargs['report']
assert len(report.result) == 1
def test_triggertesting_item(self, testdir):
item = testdir.getitem("def test_func(): pass")
session = DSession(item.config)
node1 = MockNode()
node2 = MockNode()
session.addnode(node1)
session.addnode(node2)
session.triggertesting([item] * (session.MAXITEMSPERHOST*2 + 1))
sent1 = node1.sent[0]
sent2 = node2.sent[0]
assert sent1 == [item] * session.MAXITEMSPERHOST
assert sent2 == [item] * session.MAXITEMSPERHOST
assert session.node2pending[node1] == sent1
assert session.node2pending[node2] == sent2
name, args, kwargs = session.queue.get(block=False)
assert name == "pytest_rescheduleitems"
assert kwargs['items'] == [item]
def test_keyboardinterrupt(self, testdir):
item = testdir.getitem("def test_func(): pass")
session = DSession(item.config)
def raise_(timeout=None): raise KeyboardInterrupt()
session.queue.get = raise_
exitstatus = session.loop([])
assert exitstatus == outcome.EXIT_INTERRUPTED
def test_internalerror(self, testdir):
item = testdir.getitem("def test_func(): pass")
session = DSession(item.config)
def raise_(): raise ValueError()
session.queue.get = raise_
exitstatus = session.loop([])
assert exitstatus == outcome.EXIT_INTERNALERROR
def test_rescheduleevent(self, testdir):
item = testdir.getitem("def test_func(): pass")
session = DSession(item.config)
node = MockNode()
session.addnode(node)
loopstate = session._initloopstate([])
session.queueevent("pytest_rescheduleitems", items=[item])
session.loop_once(loopstate)
# check that RescheduleEvents are not immediately
# rescheduled if there are no nodes
assert loopstate.dowork == False
session.queueevent(None)
session.loop_once(loopstate)
session.queueevent(None)
session.loop_once(loopstate)
assert node.sent == [[item]]
session.queueevent("pytest_runtest_logreport", report=run(item, node))
session.loop_once(loopstate)
assert loopstate.shuttingdown
assert not loopstate.testsfailed
def test_no_node_remaining_for_tests(self, testdir):
item = testdir.getitem("def test_func(): pass")
# setup a session with one node
session = DSession(item.config)
node = MockNode()
session.addnode(node)
# setup a HostDown event
session.queueevent("pytest_testnodedown", node=node, error=None)
loopstate = session._initloopstate([item])
loopstate.dowork = False
session.loop_once(loopstate)
dumpqueue(session.queue)
assert loopstate.exitstatus == outcome.EXIT_NOHOSTS
def test_removeitem_from_failing_teardown(self, testdir):
# teardown reports only come in when they signal a failure
# internal session-management should basically ignore them
# XXX probably it'S best to invent a new error hook for
# teardown/setup related failures
modcol = testdir.getmodulecol("""
def test_one():
pass
def teardown_function(function):
assert 0
""")
item1, = modcol.collect()
# setup a session with two nodes
session = DSession(item1.config)
node1, node2 = MockNode(), MockNode()
session.addnode(node1)
session.addnode(node2)
# have one test pending for a node that goes down
session.senditems_each([item1])
nodes = session.item2nodes[item1]
class rep:
failed = True
item = item1
node = nodes[0]
when = "call"
session.queueevent("pytest_runtest_logreport", report=rep)
reprec = testdir.getreportrecorder(session)
print(session.item2nodes)
loopstate = session._initloopstate([])
assert len(session.item2nodes[item1]) == 2
session.loop_once(loopstate)
assert len(session.item2nodes[item1]) == 1
rep.when = "teardown"
session.queueevent("pytest_runtest_logreport", report=rep)
session.loop_once(loopstate)
assert len(session.item2nodes[item1]) == 1
def test_testnodedown_causes_reschedule_pending(self, testdir):
modcol = testdir.getmodulecol("""
def test_crash():
assert 0
def test_fail():
x
""")
item1, item2 = modcol.collect()
# setup a session with two nodes
session = DSession(item1.config)
node1, node2 = MockNode(), MockNode()
session.addnode(node1)
session.addnode(node2)
# have one test pending for a node that goes down
session.senditems_load([item1, item2])
node = session.item2nodes[item1] [0]
item1.config.option.dist = "load"
session.queueevent("pytest_testnodedown", node=node, error="xyz")
reprec = testdir.getreportrecorder(session)
print(session.item2nodes)
loopstate = session._initloopstate([])
session.loop_once(loopstate)
assert loopstate.colitems == [item2] # do not reschedule crash item
rep = reprec.matchreport(names="pytest_runtest_logreport")
assert rep.failed
assert rep.item == item1
assert str(rep.longrepr).find("crashed") != -1
#assert str(testrep.longrepr).find(node.gateway.spec) != -1
def test_testnodeready_adds_to_available(self, testdir):
item = testdir.getitem("def test_func(): pass")
# setup a session with two nodes
session = DSession(item.config)
node1 = MockNode()
session.queueevent("pytest_testnodeready", node=node1)
loopstate = session._initloopstate([item])
loopstate.dowork = False
assert len(session.node2pending) == 0
session.loop_once(loopstate)
assert len(session.node2pending) == 1
def runthrough(self, item, excinfo=None):
session = DSession(item.config)
node = MockNode()
session.addnode(node)
loopstate = session._initloopstate([item])
session.queueevent(None)
session.loop_once(loopstate)
assert node.sent == [[item]]
ev = run(item, node, excinfo=excinfo)
session.queueevent("pytest_runtest_logreport", report=ev)
session.loop_once(loopstate)
assert loopstate.shuttingdown
session.queueevent("pytest_testnodedown", node=node, error=None)
session.loop_once(loopstate)
dumpqueue(session.queue)
return session, loopstate.exitstatus
def test_exit_completed_tests_ok(self, testdir):
item = testdir.getitem("def test_func(): pass")
session, exitstatus = self.runthrough(item)
assert exitstatus == outcome.EXIT_OK
def test_exit_completed_tests_fail(self, testdir):
item = testdir.getitem("def test_func(): 0/0")
session, exitstatus = self.runthrough(item, excinfo="fail")
assert exitstatus == outcome.EXIT_TESTSFAILED
def test_exit_on_first_failing(self, testdir):
modcol = testdir.getmodulecol("""
def test_fail():
assert 0
def test_pass():
pass
""")
modcol.config.option.exitfirst = True
session = DSession(modcol.config)
node = MockNode()
session.addnode(node)
items = modcol.config.hook.pytest_make_collect_report(collector=modcol).result
# trigger testing - this sends tests to the node
session.triggertesting(items)
# run tests ourselves and produce reports
ev1 = run(items[0], node, "fail")
ev2 = run(items[1], node, None)
session.queueevent("pytest_runtest_logreport", report=ev1) # a failing one
session.queueevent("pytest_runtest_logreport", report=ev2)
# now call the loop
loopstate = session._initloopstate(items)
session.loop_once(loopstate)
assert loopstate.testsfailed
assert loopstate.shuttingdown
def test_shuttingdown_filters(self, testdir):
item = testdir.getitem("def test_func(): pass")
session = DSession(item.config)
node = MockNode()
session.addnode(node)
loopstate = session._initloopstate([])
loopstate.shuttingdown = True
reprec = testdir.getreportrecorder(session)
session.queueevent("pytest_runtest_logreport", report=run(item, node))
session.loop_once(loopstate)
assert not reprec.getcalls("pytest_testnodedown")
session.queueevent("pytest_testnodedown", node=node, error=None)
session.loop_once(loopstate)
assert reprec.getcall('pytest_testnodedown').node == node
def test_filteritems(self, testdir):
modcol = testdir.getmodulecol("""
def test_fail():
assert 0
def test_pass():
pass
""")
session = DSession(modcol.config)
modcol.config.option.keyword = "nothing"
dsel = session.filteritems([modcol])
assert dsel == [modcol]
items = modcol.collect()
hookrecorder = testdir.getreportrecorder(session).hookrecorder
remaining = session.filteritems(items)
assert remaining == []
event = hookrecorder.getcalls("pytest_deselected")[-1]
assert event.items == items
modcol.config.option.keyword = "test_fail"
remaining = session.filteritems(items)
assert remaining == [items[0]]
event = hookrecorder.getcalls("pytest_deselected")[-1]
assert event.items == [items[1]]
def test_testnodedown_shutdown_after_completion(self, testdir):
item = testdir.getitem("def test_func(): pass")
session = DSession(item.config)
node = MockNode()
session.addnode(node)
session.senditems_load([item])
session.queueevent("pytest_runtest_logreport", report=run(item, node))
loopstate = session._initloopstate([])
session.loop_once(loopstate)
assert node._shutdown is True
assert loopstate.exitstatus is None, "loop did not wait for testnodedown"
assert loopstate.shuttingdown
session.queueevent("pytest_testnodedown", node=node, error=None)
session.loop_once(loopstate)
assert loopstate.exitstatus == 0
def test_nopending_but_collection_remains(self, testdir):
modcol = testdir.getmodulecol("""
def test_fail():
assert 0
def test_pass():
pass
""")
session = DSession(modcol.config)
node = MockNode()
session.addnode(node)
colreport = modcol.config.hook.pytest_make_collect_report(collector=modcol)
item1, item2 = colreport.result
session.senditems_load([item1])
# node2pending will become empty when the loop sees the report
rep = run(item1, node)
session.queueevent("pytest_runtest_logreport", report=run(item1, node))
# but we have a collection pending
session.queueevent("pytest_collectreport", report=colreport)
loopstate = session._initloopstate([])
session.loop_once(loopstate)
assert loopstate.exitstatus is None, "loop did not care for collection report"
assert not loopstate.colitems
session.loop_once(loopstate)
assert loopstate.colitems == colreport.result
assert loopstate.exitstatus is None, "loop did not care for colitems"
def test_dist_some_tests(self, testdir):
p1 = testdir.makepyfile(test_one="""
def test_1():
pass
def test_x():
import py
py.test.skip("aaa")
def test_fail():
assert 0
""")
config = testdir.parseconfig('-d', p1, '--tx=popen')
dsession = DSession(config)
hookrecorder = testdir.getreportrecorder(config).hookrecorder
dsession.main([config.getnode(p1)])
rep = hookrecorder.popcall("pytest_runtest_logreport").report
assert rep.passed
rep = hookrecorder.popcall("pytest_runtest_logreport").report
assert rep.skipped
rep = hookrecorder.popcall("pytest_runtest_logreport").report
assert rep.failed
# see that the node is really down
node = hookrecorder.popcall("pytest_testnodedown").node
assert node.gateway.spec.popen
#XXX eq.geteventargs("pytest_sessionfinish")
def test_collected_function_causes_remote_skip(testdir):
sub = testdir.mkpydir("testing")
sub.join("test_module.py").write(py.code.Source("""
import py
path = py.path.local(%r)
if path.check():
path.remove()
else:
py.test.skip("remote skip")
def test_func():
pass
def test_func2():
pass
""" % str(sub.ensure("somefile"))))
result = testdir.runpytest('-v', '--dist=each', '--tx=popen')
result.stdout.fnmatch_lines([
"*2 skipped*"
])
def test_teardownfails_one_function(testdir):
p = testdir.makepyfile("""
def test_func():
pass
def teardown_function(function):
assert 0
""")
result = testdir.runpytest(p, '--dist=each', '--tx=popen')
result.stdout.fnmatch_lines([
"*def teardown_function(function):*",
"*1 passed*1 error*"
])
@py.test.mark.xfail
def test_terminate_on_hangingnode(testdir):
p = testdir.makeconftest("""
def pytest__teardown_final(session):
if session.nodeid == "my": # running on slave
import time
time.sleep(3)
""")
result = testdir.runpytest(p, '--dist=each', '--tx=popen//id=my')
assert result.duration < 2.0
result.stdout.fnmatch_lines([
"*killed*my*",
])
def test_session_hooks(testdir):
testdir.makeconftest("""
import sys
def pytest_sessionstart(session):
sys.pytestsessionhooks = session
def pytest_sessionfinish(session):
f = open(session.nodeid or "master", 'w')
f.write("xy")
f.close()
# let's fail on the slave
if session.nodeid:
raise ValueError(42)
""")
p = testdir.makepyfile("""
import sys
def test_hello():
assert hasattr(sys, 'pytestsessionhooks')
""")
result = testdir.runpytest(p, "--dist=each", "--tx=popen//id=my1")
result.stdout.fnmatch_lines([
"*ValueError*",
"*1 passed*",
])
assert result.ret
d = result.parseoutcomes()
assert d['passed'] == 1
assert testdir.tmpdir.join("my1").check()
assert testdir.tmpdir.join("master").check()
def test_funcarg_teardown_failure(testdir):
p = testdir.makepyfile("""
def pytest_funcarg__myarg(request):
def teardown(val):
raise ValueError(val)
return request.cached_setup(setup=lambda: 42, teardown=teardown,
scope="module")
def test_hello(myarg):
pass
""")
result = testdir.runpytest(p, "-n1")
assert result.ret
result.stdout.fnmatch_lines([
"*ValueError*42*",
"*1 passed*1 error*",
])

View File

@ -1,127 +0,0 @@
"""
tests for
- gateway management
- manage rsyncing of hosts
"""
import py
import os
from py.impl.test.dist.gwmanage import GatewayManager, HostRSync
from py.impl.test.pluginmanager import HookRelay, Registry
from py.plugin import hookspec
import execnet
def pytest_funcarg__hookrecorder(request):
_pytest = request.getfuncargvalue('_pytest')
hook = request.getfuncargvalue('hook')
return _pytest.gethookrecorder(hook)
def pytest_funcarg__hook(request):
return HookRelay(hookspec, Registry())
class TestGatewayManagerPopen:
def test_popen_no_default_chdir(self, hook):
gm = GatewayManager(["popen"], hook)
assert gm.specs[0].chdir is None
def test_default_chdir(self, hook):
l = ["ssh=noco", "socket=xyz"]
for spec in GatewayManager(l, hook).specs:
assert spec.chdir == "pyexecnetcache"
for spec in GatewayManager(l, hook, defaultchdir="abc").specs:
assert spec.chdir == "abc"
def test_popen_makegateway_events(self, hook, hookrecorder, _pytest):
hm = GatewayManager(["popen"] * 2, hook)
hm.makegateways()
call = hookrecorder.popcall("pytest_gwmanage_newgateway")
assert call.gateway.spec == execnet.XSpec("popen")
assert call.gateway.id == "gw0"
assert call.platinfo.executable == call.gateway._rinfo().executable
call = hookrecorder.popcall("pytest_gwmanage_newgateway")
assert call.gateway.id == "gw1"
assert len(hm.group) == 2
hm.exit()
assert not len(hm.group)
def test_popens_rsync(self, hook, mysetup):
source = mysetup.source
hm = GatewayManager(["popen"] * 2, hook)
hm.makegateways()
assert len(hm.group) == 2
for gw in hm.group:
class pseudoexec:
args = []
def __init__(self, *args):
self.args.extend(args)
def waitclose(self):
pass
gw.remote_exec = pseudoexec
l = []
hm.rsync(source, notify=lambda *args: l.append(args))
assert not l
hm.exit()
assert not len(hm.group)
assert "sys.path.insert" in gw.remote_exec.args[0]
def test_rsync_popen_with_path(self, hook, mysetup):
source, dest = mysetup.source, mysetup.dest
hm = GatewayManager(["popen//chdir=%s" %dest] * 1, hook)
hm.makegateways()
source.ensure("dir1", "dir2", "hello")
l = []
hm.rsync(source, notify=lambda *args: l.append(args))
assert len(l) == 1
assert l[0] == ("rsyncrootready", hm.group['gw0'].spec, source)
hm.exit()
dest = dest.join(source.basename)
assert dest.join("dir1").check()
assert dest.join("dir1", "dir2").check()
assert dest.join("dir1", "dir2", 'hello').check()
def test_rsync_same_popen_twice(self, hook, mysetup, hookrecorder):
source, dest = mysetup.source, mysetup.dest
hm = GatewayManager(["popen//chdir=%s" %dest] * 2, hook)
hm.makegateways()
source.ensure("dir1", "dir2", "hello")
hm.rsync(source)
call = hookrecorder.popcall("pytest_gwmanage_rsyncstart")
assert call.source == source
assert len(call.gateways) == 1
assert call.gateways[0] in hm.group
call = hookrecorder.popcall("pytest_gwmanage_rsyncfinish")
class pytest_funcarg__mysetup:
def __init__(self, request):
tmp = request.getfuncargvalue('tmpdir')
self.source = tmp.mkdir("source")
self.dest = tmp.mkdir("dest")
class TestHRSync:
def test_hrsync_filter(self, mysetup):
source, dest = mysetup.source, mysetup.dest
source.ensure("dir", "file.txt")
source.ensure(".svn", "entries")
source.ensure(".somedotfile", "moreentries")
source.ensure("somedir", "editfile~")
syncer = HostRSync(source)
l = list(source.visit(rec=syncer.filter,
fil=syncer.filter))
assert len(l) == 3
basenames = [x.basename for x in l]
assert 'dir' in basenames
assert 'file.txt' in basenames
assert 'somedir' in basenames
def test_hrsync_one_host(self, mysetup):
source, dest = mysetup.source, mysetup.dest
gw = execnet.makegateway("popen//chdir=%s" % dest)
finished = []
rsync = HostRSync(source)
rsync.add_target_host(gw, finished=lambda: finished.append(1))
source.join("hello.py").write("world")
rsync.send()
gw.exit()
assert dest.join(source.basename, "hello.py").check()
assert len(finished) == 1

View File

@ -1,254 +0,0 @@
import py
import sys
import execnet
Queue = py.builtin._tryimport('queue', 'Queue').Queue
from py.impl.test.dist.mypickle import ImmutablePickler, PickleChannel
from py.impl.test.dist.mypickle import UnpickleError, makekey
# first let's test some basic functionality
def pytest_generate_tests(metafunc):
if 'picklemod' in metafunc.funcargnames:
import pickle
metafunc.addcall(funcargs={'picklemod': pickle})
try:
import cPickle
except ImportError:
pass
else:
metafunc.addcall(funcargs={'picklemod': cPickle})
elif "obj" in metafunc.funcargnames and "proto" in metafunc.funcargnames:
a1 = A()
a2 = A()
a2.a1 = a1
for proto in (0,1,2, -1):
for obj in {1:2}, [1,2,3], a1, a2:
metafunc.addcall(funcargs=dict(obj=obj, proto=proto))
def test_underlying_basic_pickling_mechanisms(picklemod):
f1 = py.io.BytesIO()
f2 = py.io.BytesIO()
pickler1 = picklemod.Pickler(f1)
unpickler1 = picklemod.Unpickler(f2)
pickler2 = picklemod.Pickler(f2)
unpickler2 = picklemod.Unpickler(f1)
#pickler1.memo = unpickler1.memo = {}
#pickler2.memo = unpickler2.memo = {}
d = {}
pickler1.dump(d)
f1.seek(0)
d_other = unpickler2.load()
# translate unpickler2 memo to pickler2
pickler2.memo = dict([(id(obj), (int(x), obj))
for x, obj in unpickler2.memo.items()])
pickler2.dump(d_other)
f2.seek(0)
unpickler1.memo = dict([(makekey(x), y)
for x, y in pickler1.memo.values()])
d_back = unpickler1.load()
assert d is d_back
class A:
pass
def test_pickle_and_back_IS_same(obj, proto):
p1 = ImmutablePickler(uneven=False, protocol=proto)
p2 = ImmutablePickler(uneven=True, protocol=proto)
s1 = p1.dumps(obj)
d2 = p2.loads(s1)
s2 = p2.dumps(d2)
obj_back = p1.loads(s2)
assert obj is obj_back
def test_pickling_twice_before_unpickling():
p1 = ImmutablePickler(uneven=False)
p2 = ImmutablePickler(uneven=True)
a1 = A()
a2 = A()
a3 = A()
a3.a1 = a1
a2.a1 = a1
s1 = p1.dumps(a1)
a1.a3 = a3
s2 = p1.dumps(a2)
other_a1 = p2.loads(s1)
other_a2 = p2.loads(s2)
back_a1 = p1.loads(p2.dumps(other_a1))
other_a3 = p2.loads(p1.dumps(a3))
back_a3 = p1.loads(p2.dumps(other_a3))
back_a2 = p1.loads(p2.dumps(other_a2))
back_a1 = p1.loads(p2.dumps(other_a1))
assert back_a1 is a1
assert back_a2 is a2
def test_pickling_concurrently():
p1 = ImmutablePickler(uneven=False)
p2 = ImmutablePickler(uneven=True)
a1 = A()
a1.hasattr = 42
a2 = A()
s1 = p1.dumps(a1)
s2 = p2.dumps(a2)
other_a1 = p2.loads(s1)
other_a2 = p1.loads(s2)
a1_back = p1.loads(p2.dumps(other_a1))
def test_self_memoize():
p1 = ImmutablePickler(uneven=False)
a1 = A()
p1.selfmemoize(a1)
x = p1.loads(p1.dumps(a1))
assert x is a1
TESTTIMEOUT = 2.0
class TestPickleChannelFunctional:
def setup_class(cls):
cls.gw = execnet.PopenGateway()
cls.gw.remote_exec(
"import py ; py.path.local(%r).pyimport()" %(__file__)
)
cls.gw.remote_init_threads(5)
# we need the remote test code to import
# the same test module here
def test_popen_send_instance(self):
channel = self.gw.remote_exec("""
from py.impl.test.dist.mypickle import PickleChannel
channel = PickleChannel(channel)
from testing.pytest.dist.test_mypickle import A
a1 = A()
a1.hello = 10
channel.send(a1)
a2 = channel.receive()
channel.send(a2 is a1)
""")
channel = PickleChannel(channel)
a_received = channel.receive()
assert isinstance(a_received, A)
assert a_received.hello == 10
channel.send(a_received)
remote_a2_is_a1 = channel.receive()
assert remote_a2_is_a1
def test_send_concurrent(self):
channel = self.gw.remote_exec("""
from py.impl.test.dist.mypickle import PickleChannel
channel = PickleChannel(channel)
from testing.pytest.dist.test_mypickle import A
l = [A() for i in range(10)]
channel.send(l)
other_l = channel.receive()
channel.send((l, other_l))
channel.send(channel.receive())
channel.receive()
""")
channel = PickleChannel(channel)
l = [A() for i in range(10)]
channel.send(l)
other_l = channel.receive()
channel.send(other_l)
ret = channel.receive()
assert ret[0] is other_l
assert ret[1] is l
back = channel.receive()
assert other_l is other_l
channel.send(None)
#s1 = p1.dumps(a1)
#s2 = p2.dumps(a2)
#other_a1 = p2.loads(s1)
#other_a2 = p1.loads(s2)
#a1_back = p1.loads(p2.dumps(other_a1))
def test_popen_with_callback(self):
channel = self.gw.remote_exec("""
from py.impl.test.dist.mypickle import PickleChannel
channel = PickleChannel(channel)
from testing.pytest.dist.test_mypickle import A
a1 = A()
a1.hello = 10
channel.send(a1)
a2 = channel.receive()
channel.send(a2 is a1)
""")
channel = PickleChannel(channel)
queue = Queue()
channel.setcallback(queue.put)
a_received = queue.get(timeout=TESTTIMEOUT)
assert isinstance(a_received, A)
assert a_received.hello == 10
channel.send(a_received)
#remote_a2_is_a1 = queue.get(timeout=TESTTIMEOUT)
#assert remote_a2_is_a1
def test_popen_with_callback_with_endmarker(self):
channel = self.gw.remote_exec("""
from py.impl.test.dist.mypickle import PickleChannel
channel = PickleChannel(channel)
from testing.pytest.dist.test_mypickle import A
a1 = A()
a1.hello = 10
channel.send(a1)
a2 = channel.receive()
channel.send(a2 is a1)
""")
channel = PickleChannel(channel)
queue = Queue()
channel.setcallback(queue.put, endmarker=-1)
a_received = queue.get(timeout=TESTTIMEOUT)
assert isinstance(a_received, A)
assert a_received.hello == 10
channel.send(a_received)
remote_a2_is_a1 = queue.get(timeout=TESTTIMEOUT)
assert remote_a2_is_a1
endmarker = queue.get(timeout=TESTTIMEOUT)
assert endmarker == -1
def test_popen_with_callback_with_endmarker_and_unpickling_error(self):
channel = self.gw.remote_exec("""
from py.impl.test.dist.mypickle import PickleChannel
channel = PickleChannel(channel)
from testing.pytest.dist.test_mypickle import A
a1 = A()
channel.send(a1)
channel.send(a1)
""")
channel = PickleChannel(channel)
queue = Queue()
a = channel.receive()
channel._ipickle._unpicklememo.clear()
channel.setcallback(queue.put, endmarker=-1)
next = queue.get(timeout=TESTTIMEOUT)
assert next == -1
error = channel._getremoteerror()
assert isinstance(error, UnpickleError)
def test_popen_with_various_methods(self):
channel = self.gw.remote_exec("""
from py.impl.test.dist.mypickle import PickleChannel
channel = PickleChannel(channel)
channel.receive()
""")
channel = PickleChannel(channel)
assert not channel.isclosed()
assert not channel._getremoteerror()
channel.send(2)
channel.waitclose(timeout=2)

View File

@ -1,127 +0,0 @@
import py
from py.impl.test.dist.nodemanage import NodeManager
class pytest_funcarg__mysetup:
def __init__(self, request):
basetemp = request.config.mktemp(
"mysetup-%s" % request.function.__name__,
numbered=True)
self.source = basetemp.mkdir("source")
self.dest = basetemp.mkdir("dest")
request.getfuncargvalue("_pytest")
class TestNodeManager:
@py.test.mark.xfail
def test_rsync_roots_no_roots(self, testdir, mysetup):
mysetup.source.ensure("dir1", "file1").write("hello")
config = testdir.reparseconfig([source])
nodemanager = NodeManager(config, ["popen//chdir=%s" % mysetup.dest])
assert nodemanager.config.topdir == source == config.topdir
nodemanager.rsync_roots()
p, = nodemanager.gwmanager.multi_exec("import os ; channel.send(os.getcwd())").receive_each()
p = py.path.local(p)
py.builtin.print_("remote curdir", p)
assert p == mysetup.dest.join(config.topdir.basename)
assert p.join("dir1").check()
assert p.join("dir1", "file1").check()
def test_popen_nodes_are_ready(self, testdir):
nodemanager = NodeManager(testdir.parseconfig(
"--tx", "3*popen"))
nodemanager.setup_nodes([].append)
nodemanager.wait_nodesready(timeout=10.0)
def test_popen_rsync_subdir(self, testdir, mysetup):
source, dest = mysetup.source, mysetup.dest
dir1 = mysetup.source.mkdir("dir1")
dir2 = dir1.mkdir("dir2")
dir2.ensure("hello")
for rsyncroot in (dir1, source):
dest.remove()
nodemanager = NodeManager(testdir.parseconfig(
"--tx", "popen//chdir=%s" % dest,
"--rsyncdir", rsyncroot,
source,
))
assert nodemanager.config.topdir == source
nodemanager.rsync_roots()
if rsyncroot == source:
dest = dest.join("source")
assert dest.join("dir1").check()
assert dest.join("dir1", "dir2").check()
assert dest.join("dir1", "dir2", 'hello').check()
nodemanager.gwmanager.exit()
def test_init_rsync_roots(self, testdir, mysetup):
source, dest = mysetup.source, mysetup.dest
dir2 = source.ensure("dir1", "dir2", dir=1)
source.ensure("dir1", "somefile", dir=1)
dir2.ensure("hello")
source.ensure("bogusdir", "file")
source.join("conftest.py").write(py.code.Source("""
rsyncdirs = ['dir1/dir2']
"""))
session = testdir.reparseconfig([source]).initsession()
nodemanager = NodeManager(session.config, ["popen//chdir=%s" % dest])
nodemanager.rsync_roots()
assert dest.join("dir2").check()
assert not dest.join("dir1").check()
assert not dest.join("bogus").check()
def test_rsyncignore(self, testdir, mysetup):
source, dest = mysetup.source, mysetup.dest
dir2 = source.ensure("dir1", "dir2", dir=1)
dir5 = source.ensure("dir5", "dir6", "bogus")
dirf = source.ensure("dir5", "file")
dir2.ensure("hello")
source.join("conftest.py").write(py.code.Source("""
rsyncdirs = ['dir1', 'dir5']
rsyncignore = ['dir1/dir2', 'dir5/dir6']
"""))
session = testdir.reparseconfig([source]).initsession()
nodemanager = NodeManager(session.config,
["popen//chdir=%s" % dest])
nodemanager.rsync_roots()
assert dest.join("dir1").check()
assert not dest.join("dir1", "dir2").check()
assert dest.join("dir5","file").check()
assert not dest.join("dir6").check()
def test_optimise_popen(self, testdir, mysetup):
source, dest = mysetup.source, mysetup.dest
specs = ["popen"] * 3
source.join("conftest.py").write("rsyncdirs = ['a']")
source.ensure('a', dir=1)
config = testdir.reparseconfig([source])
nodemanager = NodeManager(config, specs)
nodemanager.rsync_roots()
for gwspec in nodemanager.gwmanager.specs:
assert gwspec._samefilesystem()
assert not gwspec.chdir
def test_setup_DEBUG(self, mysetup, testdir):
source = mysetup.source
specs = ["popen"] * 2
source.join("conftest.py").write("rsyncdirs = ['a']")
source.ensure('a', dir=1)
config = testdir.reparseconfig([source, '--debug'])
assert config.option.debug
nodemanager = NodeManager(config, specs)
reprec = testdir.getreportrecorder(config).hookrecorder
nodemanager.setup_nodes(putevent=[].append)
for spec in nodemanager.gwmanager.specs:
l = reprec.getcalls("pytest_trace")
assert l
nodemanager.teardown_nodes()
def test_ssh_setup_nodes(self, specssh, testdir):
testdir.makepyfile(__init__="", test_x="""
def test_one():
pass
""")
reprec = testdir.inline_run("-d", "--rsyncdir=%s" % testdir.tmpdir,
"--tx", specssh, testdir.tmpdir)
rep, = reprec.getreports("pytest_runtest_logreport")
assert rep.passed

View File

@ -1,148 +0,0 @@
import py
import execnet
from py.impl.test.dist.txnode import TXNode
queue = py.builtin._tryimport("queue", "Queue")
Queue = queue.Queue
class EventQueue:
def __init__(self, registry, queue=None):
if queue is None:
queue = Queue()
self.queue = queue
registry.register(self)
def geteventargs(self, eventname, timeout=2.0):
events = []
while 1:
try:
eventcall = self.queue.get(timeout=timeout)
except queue.Empty:
#print "node channel", self.node.channel
#print "remoteerror", self.node.channel._getremoteerror()
py.builtin.print_("seen events", events)
raise IOError("did not see %r events" % (eventname))
else:
name, args, kwargs = eventcall
assert isinstance(name, str)
if name == eventname:
if args:
return args
return kwargs
events.append(name)
if name == "pytest_internalerror":
py.builtin.print_(str(kwargs["excrepr"]))
class MySetup:
def __init__(self, request):
self.id = 0
self.request = request
def geteventargs(self, eventname, timeout=2.0):
eq = EventQueue(self.config.pluginmanager, self.queue)
return eq.geteventargs(eventname, timeout=timeout)
def makenode(self, config=None):
if config is None:
testdir = self.request.getfuncargvalue("testdir")
config = testdir.reparseconfig([])
self.config = config
self.queue = Queue()
self.xspec = execnet.XSpec("popen")
self.gateway = execnet.makegateway(self.xspec)
self.id += 1
self.gateway.id = str(self.id)
self.node = TXNode(self.gateway, self.config, putevent=self.queue.put)
assert not self.node.channel.isclosed()
return self.node
def xfinalize(self):
if hasattr(self, 'node'):
gw = self.node.gateway
py.builtin.print_("exiting:", gw)
gw.exit()
def pytest_funcarg__mysetup(request):
mysetup = MySetup(request)
#pyfuncitem.addfinalizer(mysetup.finalize)
return mysetup
def test_node_hash_equality(mysetup):
node = mysetup.makenode()
node2 = mysetup.makenode()
assert node != node2
assert node == node
assert not (node != node)
class TestMasterSlaveConnection:
def test_crash_invalid_item(self, mysetup):
node = mysetup.makenode()
node.send(123) # invalid item
kwargs = mysetup.geteventargs("pytest_testnodedown")
assert kwargs['node'] is node
assert "Not properly terminated" in str(kwargs['error'])
def test_crash_killed(self, testdir, mysetup):
if not hasattr(py.std.os, 'kill'):
py.test.skip("no os.kill")
item = testdir.getitem("""
def test_func():
import os
os.kill(os.getpid(), 9)
""")
node = mysetup.makenode(item.config)
node.send(item)
kwargs = mysetup.geteventargs("pytest_testnodedown")
assert kwargs['node'] is node
assert "Not properly terminated" in str(kwargs['error'])
def test_node_down(self, mysetup):
node = mysetup.makenode()
node.shutdown()
kwargs = mysetup.geteventargs("pytest_testnodedown")
assert kwargs['node'] is node
assert not kwargs['error']
node.callback(node.ENDMARK)
excinfo = py.test.raises(IOError,
"mysetup.geteventargs('testnodedown', timeout=0.01)")
def test_send_on_closed_channel(self, testdir, mysetup):
item = testdir.getitem("def test_func(): pass")
node = mysetup.makenode(item.config)
node.channel.close()
py.test.raises(IOError, "node.send(item)")
#ev = self.getcalls(pytest_internalerror)
#assert ev.excinfo.errisinstance(IOError)
def test_send_one(self, testdir, mysetup):
item = testdir.getitem("def test_func(): pass")
node = mysetup.makenode(item.config)
node.send(item)
kwargs = mysetup.geteventargs("pytest_runtest_logreport")
rep = kwargs['report']
assert rep.passed
py.builtin.print_(rep)
assert rep.item == item
def test_send_some(self, testdir, mysetup):
items = testdir.getitems("""
def test_pass():
pass
def test_fail():
assert 0
def test_skip():
import py
py.test.skip("x")
""")
node = mysetup.makenode(items[0].config)
for item in items:
node.send(item)
for outcome in "passed failed skipped".split():
kwargs = mysetup.geteventargs("pytest_runtest_logreport")
report = kwargs['report']
assert getattr(report, outcome)
node.sendlist(items)
for outcome in "passed failed skipped".split():
rep = mysetup.geteventargs("pytest_runtest_logreport")['report']
assert getattr(rep, outcome)

View File

@ -1 +0,0 @@
#

View File

@ -1,151 +0,0 @@
import py
py.test.importorskip("execnet")
from py.impl.test.looponfail.remote import LooponfailingSession, LoopState, RemoteControl
class TestRemoteControl:
def test_nofailures(self, testdir):
item = testdir.getitem("def test_func(): pass\n")
control = RemoteControl(item.config)
control.setup()
failures = control.runsession()
assert not failures
def test_failures_somewhere(self, testdir):
item = testdir.getitem("def test_func(): assert 0\n")
control = RemoteControl(item.config)
control.setup()
failures = control.runsession()
assert failures
control.setup()
item.fspath.write("def test_func(): assert 1\n")
pyc = item.fspath.new(ext=".pyc")
if pyc.check():
pyc.remove()
failures = control.runsession(failures)
assert not failures
def test_failure_change(self, testdir):
modcol = testdir.getitem("""
def test_func():
assert 0
""")
control = RemoteControl(modcol.config)
control.setup()
failures = control.runsession()
assert failures
control.setup()
modcol.fspath.write(py.code.Source("""
def test_func():
assert 1
def test_new():
assert 0
"""))
pyc = modcol.fspath.new(ext=".pyc")
if pyc.check():
pyc.remove()
failures = control.runsession(failures)
assert not failures
control.setup()
failures = control.runsession()
assert failures
assert str(failures).find("test_new") != -1
class TestLooponFailing:
def test_looponfail_from_fail_to_ok(self, testdir):
modcol = testdir.getmodulecol("""
def test_one():
x = 0
assert x == 1
def test_two():
assert 1
""")
session = LooponfailingSession(modcol.config)
loopstate = LoopState()
session.remotecontrol.setup()
session.loop_once(loopstate)
assert len(loopstate.colitems) == 1
modcol.fspath.write(py.code.Source("""
def test_one():
x = 15
assert x == 15
def test_two():
assert 1
"""))
assert session.statrecorder.check()
session.loop_once(loopstate)
assert not loopstate.colitems
def test_looponfail_from_one_to_two_tests(self, testdir):
modcol = testdir.getmodulecol("""
def test_one():
assert 0
""")
session = LooponfailingSession(modcol.config)
loopstate = LoopState()
session.remotecontrol.setup()
loopstate.colitems = []
session.loop_once(loopstate)
assert len(loopstate.colitems) == 1
modcol.fspath.write(py.code.Source("""
def test_one():
assert 1 # passes now
def test_two():
assert 0 # new and fails
"""))
assert session.statrecorder.check()
session.loop_once(loopstate)
assert len(loopstate.colitems) == 0
session.loop_once(loopstate)
assert len(loopstate.colitems) == 1
def test_looponfail_removed_test(self, testdir):
modcol = testdir.getmodulecol("""
def test_one():
assert 0
def test_two():
assert 0
""")
session = LooponfailingSession(modcol.config)
loopstate = LoopState()
session.remotecontrol.setup()
loopstate.colitems = []
session.loop_once(loopstate)
assert len(loopstate.colitems) == 2
modcol.fspath.write(py.code.Source("""
def test_xxx(): # renamed test
assert 0
def test_two():
assert 1 # pass now
"""))
assert session.statrecorder.check()
session.loop_once(loopstate)
assert len(loopstate.colitems) == 0
session.loop_once(loopstate)
assert len(loopstate.colitems) == 1
def test_looponfail_functional_fail_to_ok(self, testdir):
p = testdir.makepyfile("""
def test_one():
x = 0
assert x == 1
""")
child = testdir.spawn_pytest("-f %s" % p)
child.expect("def test_one")
child.expect("x == 1")
child.expect("1 failed")
child.expect("### LOOPONFAILING ####")
child.expect("waiting for changes")
p.write(py.code.Source("""
def test_one():
x = 1
assert x == 1
"""))
child.expect(".*1 passed.*")
child.kill(15)

View File

@ -1,61 +0,0 @@
import py
from py.impl.test.looponfail.util import StatRecorder
def test_filechange(tmpdir):
tmp = tmpdir
hello = tmp.ensure("hello.py")
sd = StatRecorder([tmp])
changed = sd.check()
assert not changed
hello.write("world")
changed = sd.check()
assert changed
tmp.ensure("new.py")
changed = sd.check()
assert changed
tmp.join("new.py").remove()
changed = sd.check()
assert changed
tmp.join("a", "b", "c.py").ensure()
changed = sd.check()
assert changed
tmp.join("a", "c.txt").ensure()
changed = sd.check()
assert changed
changed = sd.check()
assert not changed
tmp.join("a").remove()
changed = sd.check()
assert changed
def test_pycremoval(tmpdir):
tmp = tmpdir
hello = tmp.ensure("hello.py")
sd = StatRecorder([tmp])
changed = sd.check()
assert not changed
pycfile = hello + "c"
pycfile.ensure()
changed = sd.check()
assert not changed
hello.write("world")
changed = sd.check()
assert not pycfile.check()
def test_waitonchange(tmpdir, monkeypatch):
tmp = tmpdir
sd = StatRecorder([tmp])
l = [True, False]
monkeypatch.setattr(StatRecorder, 'check', lambda self: l.pop())
sd.waitonchange(checkinterval=0.2)
assert not l

View File

@ -180,21 +180,6 @@ class TestConfigApi_getinitialnodes:
for col in col.listchain():
assert col.config is config
class TestOptionEffects:
def test_boxed_option_default(self, testdir):
tmpdir = testdir.tmpdir.ensure("subdir", dir=1)
config = testdir.reparseconfig()
config.initsession()
assert not config.option.boxed
py.test.importorskip("execnet")
config = testdir.reparseconfig(['-d', tmpdir])
config.initsession()
assert not config.option.boxed
def test_is_not_boxed_by_default(self, testdir):
config = testdir.reparseconfig([testdir.tmpdir])
assert not config.option.boxed
class TestConfig_gettopdir:
def test_gettopdir(self, testdir):
from py.impl.test.config import gettopdir

View File

@ -265,42 +265,6 @@ def test_config_cmdline_options(recwarn, testdir):
recwarn.pop(DeprecationWarning)
assert config.option.gdest == 17
def test_dist_conftest_options(testdir):
p1 = testdir.tmpdir.ensure("dir", 'p1.py')
p1.dirpath("__init__.py").write("")
p1.dirpath("conftest.py").write(py.code.Source("""
import py
from py.builtin import print_
print_("importing conftest", __file__)
Option = py.test.config.Option
option = py.test.config.addoptions("someopt",
Option('--someopt', action="store_true",
dest="someopt", default=False))
dist_rsync_roots = ['../dir']
print_("added options", option)
print_("config file seen from conftest", py.test.config)
"""))
p1.write(py.code.Source("""
import py
from %s import conftest
from py.builtin import print_
def test_1():
print_("config from test_1", py.test.config)
print_("conftest from test_1", conftest.__file__)
print_("test_1: py.test.config.option.someopt", py.test.config.option.someopt)
print_("test_1: conftest", conftest)
print_("test_1: conftest.option.someopt", conftest.option.someopt)
assert conftest.option.someopt
""" % p1.dirpath().purebasename ))
result = testdir.runpytest('-d', '--tx=popen', p1, '--someopt')
assert result.ret == 0
result.stderr.fnmatch_lines([
"*Deprecation*pytest_addoptions*",
])
result.stdout.fnmatch_lines([
"*1 passed*",
])
def test_conftest_non_python_items(recwarn, testdir):
testdir.makepyfile(conftest="""
import py

View File

@ -1,198 +0,0 @@
import py
import pickle
def setglobals(request):
oldconfig = py.test.config
print("setting py.test.config to None")
py.test.config = None
def resetglobals():
py.builtin.print_("setting py.test.config to", oldconfig)
py.test.config = oldconfig
request.addfinalizer(resetglobals)
def pytest_funcarg__testdir(request):
setglobals(request)
return request.getfuncargvalue("testdir")
class ImmutablePickleTransport:
def __init__(self, request):
from py.impl.test.dist.mypickle import ImmutablePickler
self.p1 = ImmutablePickler(uneven=0)
self.p2 = ImmutablePickler(uneven=1)
setglobals(request)
def p1_to_p2(self, obj):
return self.p2.loads(self.p1.dumps(obj))
def p2_to_p1(self, obj):
return self.p1.loads(self.p2.dumps(obj))
def unifyconfig(self, config):
p2config = self.p1_to_p2(config)
p2config._initafterpickle(config.topdir)
return p2config
pytest_funcarg__pickletransport = ImmutablePickleTransport
class TestImmutablePickling:
def test_pickle_config(self, testdir, pickletransport):
config1 = testdir.parseconfig()
assert config1.topdir == testdir.tmpdir
testdir.chdir()
p2config = pickletransport.p1_to_p2(config1)
assert p2config.topdir.realpath() == config1.topdir.realpath()
config_back = pickletransport.p2_to_p1(p2config)
assert config_back is config1
def test_pickle_modcol(self, testdir, pickletransport):
modcol1 = testdir.getmodulecol("def test_one(): pass")
modcol2a = pickletransport.p1_to_p2(modcol1)
modcol2b = pickletransport.p1_to_p2(modcol1)
assert modcol2a is modcol2b
modcol1_back = pickletransport.p2_to_p1(modcol2a)
assert modcol1_back
def test_pickle_func(self, testdir, pickletransport):
modcol1 = testdir.getmodulecol("def test_one(): pass")
item = modcol1.collect_by_name("test_one")
testdir.chdir()
item2a = pickletransport.p1_to_p2(item)
assert item is not item2a # of course
assert item2a.name == item.name
modback = pickletransport.p2_to_p1(item2a.parent)
assert modback is modcol1
class TestConfigPickling:
def test_config_getstate_setstate(self, testdir):
from py.impl.test.config import Config
testdir.makepyfile(__init__="", conftest="x=1; y=2")
hello = testdir.makepyfile(hello="")
tmp = testdir.tmpdir
testdir.chdir()
config1 = testdir.parseconfig(hello)
config2 = Config()
config2.__setstate__(config1.__getstate__())
assert config2.topdir == py.path.local()
config2_relpaths = [py.path.local(x).relto(config2.topdir)
for x in config2.args]
config1_relpaths = [py.path.local(x).relto(config1.topdir)
for x in config1.args]
assert config2_relpaths == config1_relpaths
for name, value in config1.option.__dict__.items():
assert getattr(config2.option, name) == value
assert config2.getvalue("x") == 1
def test_config_pickling_customoption(self, testdir):
testdir.makeconftest("""
def pytest_addoption(parser):
group = parser.getgroup("testing group")
group.addoption('-G', '--glong', action="store", default=42,
type="int", dest="gdest", help="g value.")
""")
config = testdir.parseconfig("-G", "11")
assert config.option.gdest == 11
repr = config.__getstate__()
config = testdir.Config()
py.test.raises(AttributeError, "config.option.gdest")
config2 = testdir.Config()
config2.__setstate__(repr)
assert config2.option.gdest == 11
def test_config_pickling_and_conftest_deprecated(self, testdir):
tmp = testdir.tmpdir.ensure("w1", "w2", dir=1)
tmp.ensure("__init__.py")
tmp.join("conftest.py").write(py.code.Source("""
def pytest_addoption(parser):
group = parser.getgroup("testing group")
group.addoption('-G', '--glong', action="store", default=42,
type="int", dest="gdest", help="g value.")
"""))
config = testdir.parseconfig(tmp, "-G", "11")
assert config.option.gdest == 11
repr = config.__getstate__()
config = testdir.Config()
py.test.raises(AttributeError, "config.option.gdest")
config2 = testdir.Config()
config2.__setstate__(repr)
assert config2.option.gdest == 11
option = config2.addoptions("testing group",
config2.Option('-G', '--glong', action="store", default=42,
type="int", dest="gdest", help="g value."))
assert option.gdest == 11
def test_config_picklability(self, testdir):
config = testdir.parseconfig()
s = pickle.dumps(config)
newconfig = pickle.loads(s)
assert hasattr(newconfig, "topdir")
assert newconfig.topdir == py.path.local()
def test_collector_implicit_config_pickling(self, testdir):
tmpdir = testdir.tmpdir
testdir.chdir()
testdir.makepyfile(hello="def test_x(): pass")
config = testdir.parseconfig(tmpdir)
col = config.getnode(config.topdir)
io = py.io.BytesIO()
pickler = pickle.Pickler(io)
pickler.dump(col)
io.seek(0)
unpickler = pickle.Unpickler(io)
col2 = unpickler.load()
assert col2.name == col.name
assert col2.listnames() == col.listnames()
def test_config_and_collector_pickling(self, testdir):
tmpdir = testdir.tmpdir
dir1 = tmpdir.ensure("somedir", dir=1)
config = testdir.parseconfig()
col = config.getnode(config.topdir)
col1 = col.join(dir1.basename)
assert col1.parent is col
io = py.io.BytesIO()
pickler = pickle.Pickler(io)
pickler.dump(col)
pickler.dump(col1)
pickler.dump(col)
io.seek(0)
unpickler = pickle.Unpickler(io)
topdir = tmpdir.ensure("newtopdir", dir=1)
topdir.ensure("somedir", dir=1)
old = topdir.chdir()
try:
newcol = unpickler.load()
newcol2 = unpickler.load()
newcol3 = unpickler.load()
assert newcol2.config is newcol.config
assert newcol2.parent == newcol
assert newcol2.config.topdir.realpath() == topdir.realpath()
assert newcol.fspath.realpath() == topdir.realpath()
assert newcol2.fspath.basename == dir1.basename
assert newcol2.fspath.relto(newcol2.config.topdir)
finally:
old.chdir()
def test_config__setstate__wired_correctly_in_childprocess(testdir):
execnet = py.test.importorskip("execnet")
from py.impl.test.dist.mypickle import PickleChannel
gw = execnet.makegateway()
channel = gw.remote_exec("""
import py
from py.impl.test.dist.mypickle import PickleChannel
channel = PickleChannel(channel)
config = channel.receive()
assert py.test.config == config
""")
channel = PickleChannel(channel)
config = testdir.parseconfig()
channel.send(config)
channel.waitclose() # this will potentially raise
gw.exit()

View File

@ -30,7 +30,6 @@ def test_importall():
base.join('test', 'testing', 'data'),
base.join('path', 'gateway',),
base.join('code', 'oldmagic.py'),
base.join('execnet', 'script'),
base.join('compat', 'testing'),
]
if sys.version_info >= (3,0):
@ -41,11 +40,6 @@ def test_importall():
def recurse(p):
return p.check(dotfile=0) and p.basename != "attic"
try:
import execnet
except ImportError:
execnet = None
for p in base.visit('*.py', recurse):
if p.basename == '__init__.py':
continue
@ -57,10 +51,6 @@ def test_importall():
else:
relpath = relpath.replace(base.sep, '.')
modpath = 'py.impl.%s' % relpath
if modpath.startswith("py.impl.test.dist") or \
modpath.startswith("py.impl.test.looponfail"):
if not execnet:
continue
check_import(modpath)
def check_import(modpath):