fix/work around some corner cases for dist-testing

--HG--
branch : 1.0.x
This commit is contained in:
holger krekel 2009-07-31 14:22:02 +02:00
parent 737c32c783
commit a7382df5e9
7 changed files with 108 additions and 10 deletions

View File

@ -11,12 +11,12 @@ def test_waitfinish_removes_tempdir():
ff.waitfinish()
assert not ff.tempdir.check()
def test_tempdir_gets_gc_collected():
def test_tempdir_gets_gc_collected(monkeypatch):
monkeypatch.setattr(os, 'fork', lambda: os.getpid())
ff = py.process.ForkedFunc(boxf1)
assert ff.tempdir.check()
ff.__del__()
assert not ff.tempdir.check()
os.waitpid(ff.pid, 0)
def test_basic_forkedfunc():
result = py.process.ForkedFunc(boxf1).waitfinish()

View File

@ -11,6 +11,13 @@ from py.__.test.dist.nodemanage import NodeManager
import Queue
debug_file = None # open('/tmp/loop.log', 'w')
def debug(*args):
if debug_file is not None:
s = " ".join(map(str, args))
debug_file.write(s+"\n")
debug_file.flush()
class LoopState(object):
def __init__(self, dsession, colitems):
self.dsession = dsession
@ -23,9 +30,14 @@ class LoopState(object):
self.shuttingdown = False
self.testsfailed = False
def __repr__(self):
return "<LoopState exitstatus=%r shuttingdown=%r len(colitems)=%d>" % (
self.exitstatus, self.shuttingdown, len(self.colitems))
def pytest_runtest_logreport(self, rep):
if rep.item in self.dsession.item2nodes:
self.dsession.removeitem(rep.item, rep.node)
if rep.when != "teardown": # otherwise we have already managed it
self.dsession.removeitem(rep.item, rep.node)
if rep.failed:
self.testsfailed = True
@ -39,9 +51,14 @@ class LoopState(object):
def pytest_testnodedown(self, node, error=None):
pending = self.dsession.removenode(node)
if pending:
crashitem = pending[0]
self.dsession.handle_crashitem(crashitem, node)
self.colitems.extend(pending[1:])
if error:
crashitem = pending[0]
debug("determined crashitem", crashitem)
self.dsession.handle_crashitem(crashitem, node)
# XXX recovery handling for "each"?
# currently pending items are not retried
if self.dsession.config.option.dist == "load":
self.colitems.extend(pending[1:])
def pytest_rescheduleitems(self, items):
self.colitems.extend(items)
@ -115,6 +132,9 @@ class DSession(Session):
if eventname == "pytest_testnodedown":
self.config.hook.pytest_testnodedown(**kwargs)
self.removenode(kwargs['node'])
elif eventname == "pytest_runtest_logreport":
# might be some teardown report
self.config.hook.pytest_runtest_logreport(**kwargs)
if not self.node2pending:
# finished
if loopstate.testsfailed:
@ -200,7 +220,9 @@ class DSession(Session):
node.sendlist(sending)
pending.extend(sending)
for item in sending:
self.item2nodes.setdefault(item, []).append(node)
nodes = self.item2nodes.setdefault(item, [])
assert node not in nodes
nodes.append(node)
self.config.hook.pytest_itemstart(item=item, node=node)
tosend[:] = tosend[room:] # update inplace
if tosend:
@ -237,7 +259,8 @@ class DSession(Session):
nodes.remove(node)
if not nodes:
del self.item2nodes[item]
self.node2pending[node].remove(item)
pending = self.node2pending[node]
pending.remove(item)
def handle_crashitem(self, item, node):
runner = item.config.pluginmanager.getplugin("runner")

View File

@ -69,7 +69,8 @@ class ImmutablePickler:
pickler = MyPickler(f, self._protocol, uneven=self.uneven)
pickler.memo = self._picklememo
pickler.dump(obj)
self._updateunpicklememo()
if obj is not None:
self._updateunpicklememo()
#print >>debug, "dumped", obj
#print >>debug, "picklememo", self._picklememo
return f.getvalue()

View File

@ -155,6 +155,45 @@ class TestDSession:
dumpqueue(session.queue)
assert loopstate.exitstatus == outcome.EXIT_NOHOSTS
def test_removeitem_from_failing_teardown(self, testdir):
# teardown reports only come in when they signal a failure
# internal session-management should basically ignore them
# XXX probably it'S best to invent a new error hook for
# teardown/setup related failures
modcol = testdir.getmodulecol("""
def test_one():
pass
def teardown_function(function):
assert 0
""")
item1, = modcol.collect()
# setup a session with two nodes
session = DSession(item1.config)
node1, node2 = MockNode(), MockNode()
session.addnode(node1)
session.addnode(node2)
# have one test pending for a node that goes down
session.senditems_each([item1])
nodes = session.item2nodes[item1]
class rep:
failed = True
item = item1
node = nodes[0]
when = "call"
session.queueevent("pytest_runtest_logreport", rep=rep)
reprec = testdir.getreportrecorder(session)
print session.item2nodes
loopstate = session._initloopstate([])
assert len(session.item2nodes[item1]) == 2
session.loop_once(loopstate)
assert len(session.item2nodes[item1]) == 1
rep.when = "teardown"
session.queueevent("pytest_runtest_logreport", rep=rep)
session.loop_once(loopstate)
assert len(session.item2nodes[item1]) == 1
def test_testnodedown_causes_reschedule_pending(self, testdir):
modcol = testdir.getmodulecol("""
def test_crash():
@ -173,7 +212,8 @@ class TestDSession:
# have one test pending for a node that goes down
session.senditems_load([item1, item2])
node = session.item2nodes[item1] [0]
session.queueevent("pytest_testnodedown", node=node, error=None)
item1.config.option.dist = "load"
session.queueevent("pytest_testnodedown", node=node, error="xyz")
reprec = testdir.getreportrecorder(session)
print session.item2nodes
loopstate = session._initloopstate([])
@ -385,3 +425,18 @@ def test_collected_function_causes_remote_skip(testdir):
result.stdout.fnmatch_lines([
"*2 skipped*"
])
def test_teardownfails_one_function(testdir):
p = testdir.makepyfile("""
def test_func():
pass
def teardown_function(function):
assert 0
""")
result = testdir.runpytest(p, '--dist=each', '--tx=popen')
result.stdout.fnmatch_lines([
"*def teardown_function(function):*",
"*1 passed*1 error*"
])

View File

@ -32,6 +32,7 @@ class EventQueue:
class MySetup:
def __init__(self, request):
self.id = 0
self.request = request
def geteventargs(self, eventname, timeout=2.0):
@ -45,6 +46,8 @@ class MySetup:
self.queue = py.std.Queue.Queue()
self.xspec = py.execnet.XSpec("popen")
self.gateway = py.execnet.makegateway(self.xspec)
self.id += 1
self.gateway.id = str(self.id)
self.node = TXNode(self.gateway, self.config, putevent=self.queue.put)
assert not self.node.channel.isclosed()
return self.node

View File

@ -21,6 +21,11 @@ class TXNode(object):
self.channel.setcallback(self.callback, endmarker=self.ENDMARK)
self._down = False
def __repr__(self):
id = self.gateway.id
status = self._down and 'true' or 'false'
return "<TXNode %r down=%s>" %(id, status)
def notify(self, eventname, *args, **kwargs):
assert not args
self.putevent((eventname, args, kwargs))

View File

@ -198,6 +198,17 @@ class ItemTestReport(BaseReport):
self.shortrepr = shortrepr
self.longrepr = longrepr
def __repr__(self):
status = (self.passed and "passed" or
self.skipped and "skipped" or
self.failed and "failed" or
"CORRUPT")
l = [repr(self.item.name), "when=%r" % self.when, "outcome %r" % status,]
if hasattr(self, 'node'):
l.append("txnode=%s" % self.node.gateway.id)
info = " " .join(map(str, l))
return "<ItemTestReport %s>" % info
def getnode(self):
return self.item