test_ok1/py/test/dist/dsession.py

266 lines
9.1 KiB
Python

"""
EXPERIMENTAL dsession session (for dist/non-dist unification)
"""
import py
from py.__.test import event
from py.__.test.runner import basic_run_report, basic_collect_report
from py.__.test.session import Session
from py.__.test import outcome
from py.__.test.dist.nodemanage import NodeManager
import Queue
class LoopState(object):
def __init__(self, dsession, colitems):
self.dsession = dsession
self.colitems = colitems
self.exitstatus = None
# loopstate.dowork is False after reschedule events
# because otherwise we might very busily loop
# waiting for a host to become ready.
self.dowork = True
self.shuttingdown = False
self.testsfailed = False
def pyevent__itemtestreport(self, event):
if event.colitem in self.dsession.item2nodes:
self.dsession.removeitem(event.colitem, event.node)
if event.failed:
self.testsfailed = True
def pyevent__collectionreport(self, event):
if event.passed:
self.colitems.extend(event.result)
def pyevent__testnodeready(self, node):
self.dsession.addnode(node)
def pyevent__testnodedown(self, node, error=None):
pending = self.dsession.removenode(node)
if pending:
crashitem = pending[0]
self.dsession.handle_crashitem(crashitem, node)
self.colitems.extend(pending[1:])
def pyevent__rescheduleitems(self, items):
self.colitems.extend(items)
self.dowork = False # avoid busywait
class DSession(Session):
"""
Session drives the collection and running of tests
and generates test events for reporters.
"""
MAXITEMSPERHOST = 15
def __init__(self, config):
self.queue = Queue.Queue()
self.node2pending = {}
self.item2nodes = {}
super(DSession, self).__init__(config=config)
def pytest_configure(self, __call__, config):
__call__.execute()
try:
config.getxspecs()
except config.Error:
print
raise config.Error("dist mode %r needs test execution environments, "
"none found." %(config.option.dist))
def main(self, colitems=None):
colitems = self.getinitialitems(colitems)
self.sessionstarts()
self.setup()
exitstatus = self.loop(colitems)
self.teardown()
self.sessionfinishes()
return exitstatus
def loop_once(self, loopstate):
if loopstate.shuttingdown:
return self.loop_once_shutdown(loopstate)
colitems = loopstate.colitems
if loopstate.dowork and colitems:
self.triggertesting(loopstate.colitems)
colitems[:] = []
# we use a timeout here so that control-C gets through
while 1:
try:
eventcall = self.queue.get(timeout=2.0)
break
except Queue.Empty:
continue
loopstate.dowork = True
eventname, args, kwargs = eventcall
self.bus.notify(eventname, *args, **kwargs)
# termination conditions
if ((loopstate.testsfailed and self.config.option.exitfirst) or
(not self.item2nodes and not colitems and not self.queue.qsize())):
self.triggershutdown()
loopstate.shuttingdown = True
elif not self.node2pending:
loopstate.exitstatus = outcome.EXIT_NOHOSTS
def loop_once_shutdown(self, loopstate):
# once we are in shutdown mode we dont send
# events other than HostDown upstream
eventname, args, kwargs = self.queue.get()
if eventname == "testnodedown":
node, error = args[0], args[1]
self.bus.notify("testnodedown", node, error)
self.removenode(node)
if not self.node2pending:
# finished
if loopstate.testsfailed:
loopstate.exitstatus = outcome.EXIT_TESTSFAILED
else:
loopstate.exitstatus = outcome.EXIT_OK
#self.config.bus.unregister(loopstate)
def _initloopstate(self, colitems):
loopstate = LoopState(self, colitems)
self.config.bus.register(loopstate)
return loopstate
def loop(self, colitems):
try:
loopstate = self._initloopstate(colitems)
loopstate.dowork = False # first receive at least one HostUp events
while 1:
self.loop_once(loopstate)
if loopstate.exitstatus is not None:
exitstatus = loopstate.exitstatus
break
except KeyboardInterrupt:
exitstatus = outcome.EXIT_INTERRUPTED
except:
self.config.pytestplugins.notify_exception()
exitstatus = outcome.EXIT_INTERNALERROR
self.config.bus.unregister(loopstate)
if exitstatus == 0 and self._testsfailed:
exitstatus = outcome.EXIT_TESTSFAILED
return exitstatus
def triggershutdown(self):
for node in self.node2pending:
node.shutdown()
def addnode(self, node):
assert node not in self.node2pending
self.node2pending[node] = []
def removenode(self, node):
try:
pending = self.node2pending.pop(node)
except KeyError:
# this happens if we didn't receive a testnodeready event yet
return []
for item in pending:
l = self.item2nodes[item]
l.remove(node)
if not l:
del self.item2nodes[item]
return pending
def triggertesting(self, colitems):
colitems = self.filteritems(colitems)
senditems = []
for next in colitems:
if isinstance(next, py.test.collect.Item):
senditems.append(next)
else:
self.bus.notify("collectionstart", next)
self.queueevent("collectionreport", basic_collect_report(next))
if self.config.option.dist == "each":
self.senditems_each(senditems)
else:
# XXX assert self.config.option.dist == "load"
self.senditems_load(senditems)
def queueevent(self, eventname, *args, **kwargs):
self.queue.put((eventname, args, kwargs))
def senditems_each(self, tosend):
if not tosend:
return
room = self.MAXITEMSPERHOST
for node, pending in self.node2pending.items():
room = min(self.MAXITEMSPERHOST - len(pending), room)
sending = tosend[:room]
for node, pending in self.node2pending.items():
node.sendlist(sending)
pending.extend(sending)
for item in sending:
self.item2nodes.setdefault(item, []).append(node)
self.bus.notify("itemstart", item, node)
tosend[:] = tosend[room:] # update inplace
if tosend:
# we have some left, give it to the main loop
self.queueevent("rescheduleitems", tosend)
def senditems_load(self, tosend):
if not tosend:
return
for node, pending in self.node2pending.items():
room = self.MAXITEMSPERHOST - len(pending)
if room > 0:
sending = tosend[:room]
node.sendlist(sending)
for item in sending:
#assert item not in self.item2node, (
# "sending same item %r to multiple "
# "not implemented" %(item,))
self.item2nodes.setdefault(item, []).append(node)
self.bus.notify("itemstart", item, node)
pending.extend(sending)
tosend[:] = tosend[room:] # update inplace
if not tosend:
break
if tosend:
# we have some left, give it to the main loop
self.queueevent("rescheduleitems", tosend)
def removeitem(self, item, node):
if item not in self.item2nodes:
raise AssertionError(item, self.item2nodes)
nodes = self.item2nodes[item]
if node in nodes: # the node might have gone down already
nodes.remove(node)
if not nodes:
del self.item2nodes[item]
self.node2pending[node].remove(item)
def handle_crashitem(self, item, node):
longrepr = "!!! Node %r crashed during running of test %r" %(node, item)
rep = event.ItemTestReport(item, when="???", excinfo=longrepr)
rep.node = node
self.bus.notify("itemtestreport", rep)
def setup(self):
""" setup any neccessary resources ahead of the test run. """
self.nodemanager = NodeManager(self.config)
self.nodemanager.setup_nodes(putevent=self.queue.put)
if self.config.option.dist == "each":
self.nodemanager.wait_nodesready(5.0)
def teardown(self):
""" teardown any resources after a test run. """
self.nodemanager.teardown_nodes()
# debugging function
def dump_picklestate(item):
l = []
while 1:
state = item.__getstate__()
l.append(state)
item = state[-1]
if len(state) != 2:
break
return l