[svn r63202] prepare for allowing for items to be sent to multiple hosts

--HG--
branch : trunk
This commit is contained in:
hpk 2009-03-22 02:50:16 +01:00
parent bda844b544
commit 5f0cabb295
2 changed files with 40 additions and 30 deletions

View File

@ -26,8 +26,8 @@ class LoopState(object):
self.testsfailed = False
def pyevent_itemtestreport(self, event):
if event.colitem in self.dsession.item2node:
self.dsession.removeitem(event.colitem)
if event.colitem in self.dsession.item2nodes:
self.dsession.removeitem(event.colitem, event.node)
if event.failed:
self.testsfailed = True
@ -59,7 +59,7 @@ class DSession(Session):
def __init__(self, config):
self.queue = Queue.Queue()
self.node2pending = {}
self.item2node = {}
self.item2nodes = {}
super(DSession, self).__init__(config=config)
def pytest_configure(self, config):
@ -106,7 +106,7 @@ class DSession(Session):
# termination conditions
if ((loopstate.testsfailed and self.config.option.exitfirst) or
(not self.item2node and not colitems and not self.queue.qsize())):
(not self.item2nodes and not colitems and not self.queue.qsize())):
self.triggershutdown()
loopstate.shuttingdown = True
elif not self.node2pending:
@ -166,7 +166,11 @@ class DSession(Session):
# this happens if we didn't receive a testnodeready event yet
return []
for item in pending:
del self.item2node[item]
l = self.item2nodes[item]
l.remove(node)
if not l:
del self.item2nodes[item]
return pending
def triggertesting(self, colitems):
@ -195,7 +199,7 @@ class DSession(Session):
#assert item not in self.item2node, (
# "sending same item %r to multiple "
# "not implemented" %(item,))
self.item2node[item] = node
self.item2nodes.setdefault(item, []).append(node)
self.bus.notify("itemstart", item, node)
pending.extend(sending)
tosend[:] = tosend[room:] # update inplace
@ -205,12 +209,14 @@ class DSession(Session):
# we have some left, give it to the main loop
self.queueevent("rescheduleitems", event.RescheduleItems(tosend))
def removeitem(self, item):
if item not in self.item2node:
raise AssertionError(item, self.item2node)
node = self.item2node.pop(item)
def removeitem(self, item, node):
if item not in self.item2nodes:
raise AssertionError(item, self.item2nodes)
nodes = self.item2nodes[item]
nodes.remove(node)
if not nodes:
del self.item2nodes[item]
self.node2pending[node].remove(item)
#self.config.bus.notify("removeitem", item, host.hostid)
def handle_crashitem(self, item, node):
longrepr = "!!! Node %r crashed during running of test %r" %(node, item)

View File

@ -6,9 +6,11 @@ import py
XSpec = py.execnet.XSpec
def run(item):
def run(item, node):
runner = item._getrunner()
return runner(item)
rep = runner(item)
rep.node = node
return rep
class MockNode:
def __init__(self):
@ -27,31 +29,31 @@ def dumpqueue(queue):
class TestDSession:
def test_add_remove_node(self, testdir):
item = testdir.getitem("def test_func(): pass")
rep = run(item)
session = DSession(item.config)
node = MockNode()
rep = run(item, node)
session = DSession(item.config)
assert not session.node2pending
session.addnode(node)
assert len(session.node2pending) == 1
session.senditems([item])
pending = session.removenode(node)
assert pending == [item]
assert item not in session.item2node
assert item not in session.item2nodes
l = session.removenode(node)
assert not l
def test_senditems_removeitems(self, testdir):
item = testdir.getitem("def test_func(): pass")
rep = run(item)
session = DSession(item.config)
node = MockNode()
rep = run(item, node)
session = DSession(item.config)
session.addnode(node)
session.senditems([item])
assert session.node2pending[node] == [item]
assert session.item2node[item] == node
session.removeitem(item)
assert session.item2nodes[item] == [node]
session.removeitem(item, node)
assert not session.node2pending[node]
assert not session.item2node
assert not session.item2nodes
def test_triggertesting_collect(self, testdir):
modcol = testdir.getmodulecol("""
@ -117,7 +119,7 @@ class TestDSession:
session.queueevent("anonymous", event.NOP())
session.loop_once(loopstate)
assert node.sent == [[item]]
session.queueevent("itemtestreport", run(item))
session.queueevent("itemtestreport", run(item, node))
session.loop_once(loopstate)
assert loopstate.shuttingdown
assert not loopstate.testsfailed
@ -155,10 +157,10 @@ class TestDSession:
# have one test pending for a node that goes down
session.senditems([item1, item2])
node = session.item2node[item1]
node = session.item2nodes[item1] [0]
session.queueevent("testnodedown", node, None)
evrec = EventRecorder(session.bus)
print session.item2node
print session.item2nodes
loopstate = session._initloopstate([])
session.loop_once(loopstate)
@ -200,7 +202,7 @@ class TestDSession:
session.loop_once(loopstate)
assert node.sent == [[item]]
ev = run(item)
ev = run(item, node)
session.queueevent("itemtestreport", ev)
session.loop_once(loopstate)
assert loopstate.shuttingdown
@ -236,8 +238,8 @@ class TestDSession:
session.triggertesting(items)
# run tests ourselves and produce reports
ev1 = run(items[0])
ev2 = run(items[1])
ev1 = run(items[0], node)
ev2 = run(items[1], node)
session.queueevent("itemtestreport", ev1) # a failing one
session.queueevent("itemtestreport", ev2)
# now call the loop
@ -254,7 +256,7 @@ class TestDSession:
loopstate = session._initloopstate([])
loopstate.shuttingdown = True
evrec = EventRecorder(session.bus)
session.queueevent("itemtestreport", run(item))
session.queueevent("itemtestreport", run(item, node))
session.loop_once(loopstate)
assert not evrec.getfirstnamed("testnodedown")
session.queueevent("testnodedown", node, None)
@ -297,7 +299,7 @@ class TestDSession:
node = MockNode()
session.addnode(node)
session.senditems([item])
session.queueevent("itemtestreport", run(item))
session.queueevent("itemtestreport", run(item, node))
loopstate = session._initloopstate([])
session.loop_once(loopstate)
assert node._shutdown is True
@ -322,7 +324,9 @@ class TestDSession:
item1, item2 = colreport.result
session.senditems([item1])
# node2pending will become empty when the loop sees the report
session.queueevent("itemtestreport", run(item1))
rep = run(item1, node)
session.queueevent("itemtestreport", run(item1, node))
# but we have a collection pending
session.queueevent("collectionreport", colreport)