[svn r63153] refactoring almost complete, apart from testnodeready info

--HG--
branch : trunk
This commit is contained in:
hpk 2009-03-20 20:04:36 +01:00
parent 10fb32ad37
commit afca655202
11 changed files with 186 additions and 213 deletions

View File

@ -25,7 +25,7 @@ class LoopState(object):
self.testsfailed = False self.testsfailed = False
def pyevent_itemtestreport(self, event): def pyevent_itemtestreport(self, event):
if event.colitem in self.dsession.item2host: if event.colitem in self.dsession.item2node:
self.dsession.removeitem(event.colitem) self.dsession.removeitem(event.colitem)
if event.failed: if event.failed:
self.testsfailed = True self.testsfailed = True
@ -34,14 +34,14 @@ class LoopState(object):
if event.passed: if event.passed:
self.colitems.extend(event.result) self.colitems.extend(event.result)
def pyevent_testnodeready(self, event): def pyevent_testnodeready(self, node):
self.dsession.addhost(event.host) self.dsession.addnode(node)
def pyevent_testnodedown(self, event): def pyevent_testnodedown(self, node, error=None):
pending = self.dsession.removehost(event.host) pending = self.dsession.removenode(node)
if pending: if pending:
crashitem = pending[0] crashitem = pending[0]
self.dsession.handle_crashitem(crashitem, event.host) self.dsession.handle_crashitem(crashitem, node)
self.colitems.extend(pending[1:]) self.colitems.extend(pending[1:])
def pyevent_rescheduleitems(self, event): def pyevent_rescheduleitems(self, event):
@ -59,8 +59,8 @@ class DSession(Session):
super(DSession, self).__init__(config=config) super(DSession, self).__init__(config=config)
self.queue = Queue.Queue() self.queue = Queue.Queue()
self.host2pending = {} self.node2pending = {}
self.item2host = {} self.item2node = {}
if self.config.getvalue("executable") and \ if self.config.getvalue("executable") and \
not self.config.getvalue("numprocesses"): not self.config.getvalue("numprocesses"):
self.config.option.numprocesses = 1 self.config.option.numprocesses = 1
@ -84,7 +84,7 @@ class DSession(Session):
if config.option.numprocesses: if config.option.numprocesses:
return return
try: try:
config.getvalue('hosts') config.getvalue('xspecs')
except KeyError: except KeyError:
print "Please specify test environments for distribution of tests:" print "Please specify test environments for distribution of tests:"
print "py.test --tx ssh=user@somehost --tx popen//python=python2.5" print "py.test --tx ssh=user@somehost --tx popen//python=python2.5"
@ -97,9 +97,9 @@ class DSession(Session):
def main(self, colitems=None): def main(self, colitems=None):
colitems = self.getinitialitems(colitems) colitems = self.getinitialitems(colitems)
self.sessionstarts() self.sessionstarts()
self.setup_hosts() self.setup_nodes()
exitstatus = self.loop(colitems) exitstatus = self.loop(colitems)
self.teardown_hosts() self.teardown_nodes()
self.sessionfinishes() self.sessionfinishes()
return exitstatus return exitstatus
@ -124,10 +124,10 @@ class DSession(Session):
# termination conditions # termination conditions
if ((loopstate.testsfailed and self.config.option.exitfirst) or if ((loopstate.testsfailed and self.config.option.exitfirst) or
(not self.item2host and not colitems and not self.queue.qsize())): (not self.item2node and not colitems and not self.queue.qsize())):
self.triggershutdown() self.triggershutdown()
loopstate.shuttingdown = True loopstate.shuttingdown = True
elif not self.host2pending: elif not self.node2pending:
loopstate.exitstatus = outcome.EXIT_NOHOSTS loopstate.exitstatus = outcome.EXIT_NOHOSTS
def loop_once_shutdown(self, loopstate): def loop_once_shutdown(self, loopstate):
@ -135,10 +135,10 @@ class DSession(Session):
# events other than HostDown upstream # events other than HostDown upstream
eventname, args, kwargs = self.queue.get() eventname, args, kwargs = self.queue.get()
if eventname == "testnodedown": if eventname == "testnodedown":
ev, = args node, error = args[0], args[1]
self.bus.notify("testnodedown", ev) self.bus.notify("testnodedown", node, error)
self.removehost(ev.host) self.removenode(node)
if not self.host2pending: if not self.node2pending:
# finished # finished
if loopstate.testsfailed: if loopstate.testsfailed:
loopstate.exitstatus = outcome.EXIT_TESTSFAILED loopstate.exitstatus = outcome.EXIT_TESTSFAILED
@ -170,21 +170,21 @@ class DSession(Session):
return exitstatus return exitstatus
def triggershutdown(self): def triggershutdown(self):
for host in self.host2pending: for node in self.node2pending:
host.node.shutdown() node.shutdown()
def addhost(self, host): def addnode(self, node):
assert host not in self.host2pending assert node not in self.node2pending
self.host2pending[host] = [] self.node2pending[node] = []
def removehost(self, host): def removenode(self, node):
try: try:
pending = self.host2pending.pop(host) pending = self.node2pending.pop(node)
except KeyError: except KeyError:
# this happens if we didn't receive a testnodeready event yet # this happens if we didn't receive a testnodeready event yet
return [] return []
for item in pending: for item in pending:
del self.item2host[item] del self.item2node[item]
return pending return pending
def triggertesting(self, colitems): def triggertesting(self, colitems):
@ -204,17 +204,17 @@ class DSession(Session):
def senditems(self, tosend): def senditems(self, tosend):
if not tosend: if not tosend:
return return
for host, pending in self.host2pending.items(): for node, pending in self.node2pending.items():
room = self.MAXITEMSPERHOST - len(pending) room = self.MAXITEMSPERHOST - len(pending)
if room > 0: if room > 0:
sending = tosend[:room] sending = tosend[:room]
host.node.sendlist(sending) node.sendlist(sending)
for item in sending: for item in sending:
#assert item not in self.item2host, ( #assert item not in self.item2node, (
# "sending same item %r to multiple hosts " # "sending same item %r to multiple "
# "not implemented" %(item,)) # "not implemented" %(item,))
self.item2host[item] = host self.item2node[item] = node
self.bus.notify("itemstart", event.ItemStart(item, host)) self.bus.notify("itemstart", event.ItemStart(item, node))
pending.extend(sending) pending.extend(sending)
tosend[:] = tosend[room:] # update inplace tosend[:] = tosend[room:] # update inplace
if not tosend: if not tosend:
@ -224,24 +224,24 @@ class DSession(Session):
self.queueevent("rescheduleitems", event.RescheduleItems(tosend)) self.queueevent("rescheduleitems", event.RescheduleItems(tosend))
def removeitem(self, item): def removeitem(self, item):
if item not in self.item2host: if item not in self.item2node:
raise AssertionError(item, self.item2host) raise AssertionError(item, self.item2node)
host = self.item2host.pop(item) node = self.item2node.pop(item)
self.host2pending[host].remove(item) self.node2pending[node].remove(item)
#self.config.bus.notify("removeitem", item, host.hostid) #self.config.bus.notify("removeitem", item, host.hostid)
def handle_crashitem(self, item, host): def handle_crashitem(self, item, node):
longrepr = "!!! Host %r crashed during running of test %r" %(host, item) longrepr = "!!! Node %r crashed during running of test %r" %(node, item)
rep = event.ItemTestReport(item, when="???", excinfo=longrepr) rep = event.ItemTestReport(item, when="???", excinfo=longrepr)
self.bus.notify("itemtestreport", rep) self.bus.notify("itemtestreport", rep)
def setup_hosts(self): def setup_nodes(self):
""" setup any neccessary resources ahead of the test run. """ """ setup any neccessary resources ahead of the test run. """
from py.__.test.dsession.hostmanage import HostManager from py.__.test.dsession.hostmanage import HostManager
self.hm = HostManager(self.config) self.hm = HostManager(self.config)
self.hm.setup_hosts(putevent=self.queue.put) self.hm.setup_hosts(putevent=self.queue.put)
def teardown_hosts(self): def teardown_nodes(self):
""" teardown any resources after a test run. """ """ teardown any resources after a test run. """
self.hm.teardown_hosts() self.hm.teardown_hosts()

View File

@ -47,6 +47,7 @@ class HostManager(object):
hosts = getxspecs(self.config) hosts = getxspecs(self.config)
self.roots = getconfigroots(config) self.roots = getconfigroots(config)
self.gwmanager = GatewayManager(hosts) self.gwmanager = GatewayManager(hosts)
self.nodes = []
def makegateways(self): def makegateways(self):
# we change to the topdir sot that # we change to the topdir sot that
@ -116,11 +117,9 @@ class HostManager(object):
""").waitclose() """).waitclose()
for gateway in self.gwmanager.gateways: for gateway in self.gwmanager.gateways:
host = gateway.spec node = MasterNode(gateway, self.config, putevent)
host.node = MasterNode(host, self.nodes.append(node)
gateway,
self.config,
putevent)
def teardown_hosts(self): def teardown_hosts(self):
# XXX teardown nodes?
self.gwmanager.exit() self.gwmanager.exit()

View File

@ -6,13 +6,14 @@ from py.__.test import event
from py.__.test.dsession.mypickle import PickleChannel from py.__.test.dsession.mypickle import PickleChannel
class MasterNode(object): class MasterNode(object):
""" Install slave code, manage sending test tasks & receiving results """
ENDMARK = -1 ENDMARK = -1
def __init__(self, host, gateway, config, putevent): def __init__(self, gateway, config, putevent):
self.host = host
self.config = config self.config = config
self.putevent = putevent self.putevent = putevent
self.channel = install_slave(host, gateway, config) self.gateway = gateway
self.channel = install_slave(gateway, config)
self.channel.setcallback(self.callback, endmarker=self.ENDMARK) self.channel.setcallback(self.callback, endmarker=self.ENDMARK)
self._down = False self._down = False
@ -32,23 +33,25 @@ class MasterNode(object):
err = self.channel._getremoteerror() err = self.channel._getremoteerror()
if not self._down: if not self._down:
if not err: if not err:
err = "TERMINATED" err = "Not properly terminated"
self.notify("testnodedown", event.HostDown(self.host, err)) self.notify("testnodedown", self, err)
self._down = True
return return
elif eventcall is None: eventname, args, kwargs = eventcall
if eventname == "slaveready":
self.notify("testnodeready", self)
elif eventname == "slavefinished":
self._down = True self._down = True
self.notify("testnodedown", event.HostDown(self.host, None)) self.notify("testnodedown", self, None)
return else:
except KeyboardInterrupt: self.notify(eventname, *args, **kwargs)
except KeyboardInterrupt:
# should not land in receiver-thread
raise raise
except: except:
excinfo = py.code.ExceptionInfo() excinfo = py.code.ExceptionInfo()
print "!" * 20, excinfo print "!" * 20, excinfo
self.notify("internalerror", event.InternalException(excinfo)) self.notify("internalerror", event.InternalException(excinfo))
else:
# XXX we need to have the proper event name
eventname, args, kwargs = eventcall
self.notify(eventname, *args, **kwargs)
def send(self, item): def send(self, item):
assert item is not None assert item is not None
@ -61,7 +64,7 @@ class MasterNode(object):
self.channel.send(None) self.channel.send(None)
# setting up slave code # setting up slave code
def install_slave(host, gateway, config): def install_slave(gateway, config):
channel = gateway.remote_exec(source=""" channel = gateway.remote_exec(source="""
from py.__.test.dsession.mypickle import PickleChannel from py.__.test.dsession.mypickle import PickleChannel
from py.__.test.dsession.masterslave import SlaveNode from py.__.test.dsession.masterslave import SlaveNode
@ -71,12 +74,12 @@ def install_slave(host, gateway, config):
""") """)
channel = PickleChannel(channel) channel = PickleChannel(channel)
basetemp = None basetemp = None
if host.popen: if gateway.spec.popen:
popenbase = config.ensuretemp("popen") popenbase = config.ensuretemp("popen")
basetemp = py.path.local.make_numbered_dir(prefix="slave-", basetemp = py.path.local.make_numbered_dir(prefix="slave-",
keep=0, rootdir=popenbase) keep=0, rootdir=popenbase)
basetemp = str(basetemp) basetemp = str(basetemp)
channel.send((host, config, basetemp)) channel.send((config, basetemp))
return channel return channel
class SlaveNode(object): class SlaveNode(object):
@ -91,17 +94,16 @@ class SlaveNode(object):
def run(self): def run(self):
channel = self.channel channel = self.channel
host, self.config, basetemp = channel.receive() self.config, basetemp = channel.receive()
if basetemp: if basetemp:
self.config.basetemp = py.path.local(basetemp) self.config.basetemp = py.path.local(basetemp)
self.config.pytestplugins.do_configure(self.config) self.config.pytestplugins.do_configure(self.config)
self.sendevent("testnodeready", maketestnodeready(host)) self.sendevent("slaveready")
try: try:
while 1: while 1:
task = channel.receive() task = channel.receive()
self.config.bus.notify("masterslave_receivedtask", task) if task is None:
if task is None: # shutdown self.sendevent("slavefinished")
self.channel.send(None)
break break
if isinstance(task, list): if isinstance(task, list):
for item in task: for item in task:
@ -119,10 +121,3 @@ class SlaveNode(object):
testrep = runner(item) testrep = runner(item)
self.sendevent("itemtestreport", testrep) self.sendevent("itemtestreport", testrep)
def maketestnodeready(host="INPROCESS"):
import sys
platinfo = {}
for name in 'platform', 'executable', 'version_info':
platinfo["sys."+name] = getattr(sys, name)
return event.HostUp(host, platinfo)

View File

@ -1,10 +1,11 @@
from py.__.test.dsession.dsession import DSession from py.__.test.dsession.dsession import DSession
from py.__.test.dsession.masterslave import maketestnodeready
from py.__.test.runner import basic_collect_report from py.__.test.runner import basic_collect_report
from py.__.test import event from py.__.test import event
from py.__.test import outcome from py.__.test import outcome
import py import py
XSpec = py.execnet.XSpec
def run(item): def run(item):
runner = item._getrunner() runner = item._getrunner()
return runner(item) return runner(item)
@ -33,35 +34,33 @@ class TestDSession:
config.initsession().fixoptions() config.initsession().fixoptions()
assert config.option.numprocesses == 3 assert config.option.numprocesses == 3
def test_add_remove_host(self, testdir): def test_add_remove_node(self, testdir):
item = testdir.getitem("def test_func(): pass") item = testdir.getitem("def test_func(): pass")
rep = run(item) rep = run(item)
session = DSession(item.config) session = DSession(item.config)
host = py.execnet.XSpec("popen") node = MockNode()
host.node = MockNode() assert not session.node2pending
assert not session.host2pending session.addnode(node)
session.addhost(host) assert len(session.node2pending) == 1
assert len(session.host2pending) == 1
session.senditems([item]) session.senditems([item])
pending = session.removehost(host) pending = session.removenode(node)
assert pending == [item] assert pending == [item]
assert item not in session.item2host assert item not in session.item2node
l = session.removehost(host) l = session.removenode(node)
assert not l assert not l
def test_senditems_removeitems(self, testdir): def test_senditems_removeitems(self, testdir):
item = testdir.getitem("def test_func(): pass") item = testdir.getitem("def test_func(): pass")
rep = run(item) rep = run(item)
session = DSession(item.config) session = DSession(item.config)
host = py.execnet.XSpec("popen") node = MockNode()
host.node = MockNode() session.addnode(node)
session.addhost(host)
session.senditems([item]) session.senditems([item])
assert session.host2pending[host] == [item] assert session.node2pending[node] == [item]
assert session.item2host[item] == host assert session.item2node[item] == node
session.removeitem(item) session.removeitem(item)
assert not session.host2pending[host] assert not session.node2pending[node]
assert not session.item2host assert not session.item2node
def test_triggertesting_collect(self, testdir): def test_triggertesting_collect(self, testdir):
modcol = testdir.getmodulecol(""" modcol = testdir.getmodulecol("""
@ -78,19 +77,17 @@ class TestDSession:
def test_triggertesting_item(self, testdir): def test_triggertesting_item(self, testdir):
item = testdir.getitem("def test_func(): pass") item = testdir.getitem("def test_func(): pass")
session = DSession(item.config) session = DSession(item.config)
host1 = py.execnet.XSpec("popen") node1 = MockNode()
host1.node = MockNode() node2 = MockNode()
host2 = py.execnet.XSpec("popen") session.addnode(node1)
host2.node = MockNode() session.addnode(node2)
session.addhost(host1)
session.addhost(host2)
session.triggertesting([item] * (session.MAXITEMSPERHOST*2 + 1)) session.triggertesting([item] * (session.MAXITEMSPERHOST*2 + 1))
host1_sent = host1.node.sent[0] sent1 = node1.sent[0]
host2_sent = host2.node.sent[0] sent2 = node2.sent[0]
assert host1_sent == [item] * session.MAXITEMSPERHOST assert sent1 == [item] * session.MAXITEMSPERHOST
assert host2_sent == [item] * session.MAXITEMSPERHOST assert sent2 == [item] * session.MAXITEMSPERHOST
assert session.host2pending[host1] == host1_sent assert session.node2pending[node1] == sent1
assert session.host2pending[host2] == host2_sent assert session.node2pending[node2] == sent2
name, args, kwargs = session.queue.get(block=False) name, args, kwargs = session.queue.get(block=False)
assert name == "rescheduleitems" assert name == "rescheduleitems"
ev, = args ev, = args
@ -115,37 +112,34 @@ class TestDSession:
def test_rescheduleevent(self, testdir): def test_rescheduleevent(self, testdir):
item = testdir.getitem("def test_func(): pass") item = testdir.getitem("def test_func(): pass")
session = DSession(item.config) session = DSession(item.config)
host1 = GatewaySpec("localhost") node = MockNode()
host1.node = MockNode() session.addnode(node)
session.addhost(host1)
ev = event.RescheduleItems([item]) ev = event.RescheduleItems([item])
loopstate = session._initloopstate([]) loopstate = session._initloopstate([])
session.queueevent("rescheduleitems", ev) session.queueevent("rescheduleitems", ev)
session.loop_once(loopstate) session.loop_once(loopstate)
# check that RescheduleEvents are not immediately # check that RescheduleEvents are not immediately
# rescheduled if there are no hosts # rescheduled if there are no nodes
assert loopstate.dowork == False assert loopstate.dowork == False
session.queueevent("anonymous", event.NOP()) session.queueevent("anonymous", event.NOP())
session.loop_once(loopstate) session.loop_once(loopstate)
session.queueevent("anonymous", event.NOP()) session.queueevent("anonymous", event.NOP())
session.loop_once(loopstate) session.loop_once(loopstate)
assert host1.node.sent == [[item]] assert node.sent == [[item]]
session.queueevent("itemtestreport", run(item)) session.queueevent("itemtestreport", run(item))
session.loop_once(loopstate) session.loop_once(loopstate)
assert loopstate.shuttingdown assert loopstate.shuttingdown
assert not loopstate.testsfailed assert not loopstate.testsfailed
def test_no_hosts_remaining_for_tests(self, testdir): def test_no_node_remaining_for_tests(self, testdir):
item = testdir.getitem("def test_func(): pass") item = testdir.getitem("def test_func(): pass")
# setup a session with one host # setup a session with one node
session = DSession(item.config) session = DSession(item.config)
host1 = GatewaySpec("localhost") node = MockNode()
host1.node = MockNode() session.addnode(node)
session.addhost(host1)
# setup a HostDown event # setup a HostDown event
ev = event.HostDown(host1, None) session.queueevent("testnodedown", node, None)
session.queueevent("testnodedown", ev)
loopstate = session._initloopstate([item]) loopstate = session._initloopstate([item])
loopstate.dowork = False loopstate.dowork = False
@ -162,22 +156,18 @@ class TestDSession:
""") """)
item1, item2 = modcol.collect() item1, item2 = modcol.collect()
# setup a session with two hosts # setup a session with two nodes
session = DSession(item1.config) session = DSession(item1.config)
host1 = GatewaySpec("localhost") node1, node2 = MockNode(), MockNode()
host1.node = MockNode() session.addnode(node1)
session.addhost(host1) session.addnode(node2)
host2 = GatewaySpec("localhost")
host2.node = MockNode()
session.addhost(host2)
# have one test pending for a host that goes down # have one test pending for a node that goes down
session.senditems([item1, item2]) session.senditems([item1, item2])
host = session.item2host[item1] node = session.item2node[item1]
ev = event.HostDown(host, None) session.queueevent("testnodedown", node, None)
session.queueevent("testnodedown", ev)
evrec = EventRecorder(session.bus) evrec = EventRecorder(session.bus)
print session.item2host print session.item2node
loopstate = session._initloopstate([]) loopstate = session._initloopstate([])
session.loop_once(loopstate) session.loop_once(loopstate)
@ -186,20 +176,19 @@ class TestDSession:
assert testrep.failed assert testrep.failed
assert testrep.colitem == item1 assert testrep.colitem == item1
assert str(testrep.longrepr).find("crashed") != -1 assert str(testrep.longrepr).find("crashed") != -1
assert str(testrep.longrepr).find(host.address) != -1 #assert str(testrep.longrepr).find(node.gateway.spec) != -1
def test_testnodeready_adds_to_available(self, testdir): def test_testnodeready_adds_to_available(self, testdir):
item = testdir.getitem("def test_func(): pass") item = testdir.getitem("def test_func(): pass")
# setup a session with two hosts # setup a session with two nodes
session = DSession(item.config) session = DSession(item.config)
host1 = GatewaySpec("localhost") node1 = MockNode()
testnodeready = maketestnodeready(host1) session.queueevent("testnodeready", node1)
session.queueevent("testnodeready", testnodeready)
loopstate = session._initloopstate([item]) loopstate = session._initloopstate([item])
loopstate.dowork = False loopstate.dowork = False
assert len(session.host2pending) == 0 assert len(session.node2pending) == 0
session.loop_once(loopstate) session.loop_once(loopstate)
assert len(session.host2pending) == 1 assert len(session.node2pending) == 1
def test_event_propagation(self, testdir, EventRecorder): def test_event_propagation(self, testdir, EventRecorder):
item = testdir.getitem("def test_func(): pass") item = testdir.getitem("def test_func(): pass")
@ -212,20 +201,19 @@ class TestDSession:
def runthrough(self, item): def runthrough(self, item):
session = DSession(item.config) session = DSession(item.config)
host1 = GatewaySpec("localhost") node = MockNode()
host1.node = MockNode() session.addnode(node)
session.addhost(host1)
loopstate = session._initloopstate([item]) loopstate = session._initloopstate([item])
session.queueevent("NOP") session.queueevent("NOP")
session.loop_once(loopstate) session.loop_once(loopstate)
assert host1.node.sent == [[item]] assert node.sent == [[item]]
ev = run(item) ev = run(item)
session.queueevent("itemtestreport", ev) session.queueevent("itemtestreport", ev)
session.loop_once(loopstate) session.loop_once(loopstate)
assert loopstate.shuttingdown assert loopstate.shuttingdown
session.queueevent("testnodedown", event.HostDown(host1, None)) session.queueevent("testnodedown", node, None)
session.loop_once(loopstate) session.loop_once(loopstate)
dumpqueue(session.queue) dumpqueue(session.queue)
return session, loopstate.exitstatus return session, loopstate.exitstatus
@ -249,12 +237,11 @@ class TestDSession:
""") """)
modcol.config.option.exitfirst = True modcol.config.option.exitfirst = True
session = DSession(modcol.config) session = DSession(modcol.config)
host1 = GatewaySpec("localhost") node = MockNode()
host1.node = MockNode() session.addnode(node)
session.addhost(host1)
items = basic_collect_report(modcol).result items = basic_collect_report(modcol).result
# trigger testing - this sends tests to host1 # trigger testing - this sends tests to the node
session.triggertesting(items) session.triggertesting(items)
# run tests ourselves and produce reports # run tests ourselves and produce reports
@ -271,18 +258,17 @@ class TestDSession:
def test_shuttingdown_filters_events(self, testdir, EventRecorder): def test_shuttingdown_filters_events(self, testdir, EventRecorder):
item = testdir.getitem("def test_func(): pass") item = testdir.getitem("def test_func(): pass")
session = DSession(item.config) session = DSession(item.config)
host = GatewaySpec("localhost") node = MockNode()
session.addhost(host) session.addnode(node)
loopstate = session._initloopstate([]) loopstate = session._initloopstate([])
loopstate.shuttingdown = True loopstate.shuttingdown = True
evrec = EventRecorder(session.bus) evrec = EventRecorder(session.bus)
session.queueevent("itemtestreport", run(item)) session.queueevent("itemtestreport", run(item))
session.loop_once(loopstate) session.loop_once(loopstate)
assert not evrec.getfirstnamed("testnodedown") assert not evrec.getfirstnamed("testnodedown")
ev = event.HostDown(host) session.queueevent("testnodedown", node, None)
session.queueevent("testnodedown", ev)
session.loop_once(loopstate) session.loop_once(loopstate)
assert evrec.getfirstnamed('testnodedown') == ev assert evrec.getfirstnamed('testnodedown') == node
def test_filteritems(self, testdir, EventRecorder): def test_filteritems(self, testdir, EventRecorder):
modcol = testdir.getmodulecol(""" modcol = testdir.getmodulecol("""
@ -317,17 +303,16 @@ class TestDSession:
item = testdir.getitem("def test_func(): pass") item = testdir.getitem("def test_func(): pass")
session = DSession(item.config) session = DSession(item.config)
host = GatewaySpec("localhost") node = MockNode()
host.node = MockNode() session.addnode(node)
session.addhost(host)
session.senditems([item]) session.senditems([item])
session.queueevent("itemtestreport", run(item)) session.queueevent("itemtestreport", run(item))
loopstate = session._initloopstate([]) loopstate = session._initloopstate([])
session.loop_once(loopstate) session.loop_once(loopstate)
assert host.node._shutdown is True assert node._shutdown is True
assert loopstate.exitstatus is None, "loop did not wait for testnodedown" assert loopstate.exitstatus is None, "loop did not wait for testnodedown"
assert loopstate.shuttingdown assert loopstate.shuttingdown
session.queueevent("testnodedown", event.HostDown(host, None)) session.queueevent("testnodedown", node, None)
session.loop_once(loopstate) session.loop_once(loopstate)
assert loopstate.exitstatus == 0 assert loopstate.exitstatus == 0
@ -339,15 +324,13 @@ class TestDSession:
pass pass
""") """)
session = DSession(modcol.config) session = DSession(modcol.config)
host = GatewaySpec("localhost") node = MockNode()
host.node = MockNode() session.addnode(node)
session.addhost(host)
colreport = basic_collect_report(modcol) colreport = basic_collect_report(modcol)
item1, item2 = colreport.result item1, item2 = colreport.result
session.senditems([item1]) session.senditems([item1])
# host2pending will become empty when the loop sees # node2pending will become empty when the loop sees the report
# the report
session.queueevent("itemtestreport", run(item1)) session.queueevent("itemtestreport", run(item1))
# but we have a collection pending # but we have a collection pending

View File

@ -52,10 +52,10 @@ class TestAsyncFunctional:
assert ev.skipped assert ev.skipped
ev, = eq.geteventargs("itemtestreport") ev, = eq.geteventargs("itemtestreport")
assert ev.failed assert ev.failed
# see that the host is really down # see that the node is really down
ev, = eq.geteventargs("testnodedown") node, error = eq.geteventargs("testnodedown")
assert ev.host.popen assert node.gateway.spec.popen
ev, = eq.geteventargs("testrunfinish") eq.geteventargs("testrunfinish")
def test_distribution_rsyncdirs_example(self, testdir): def test_distribution_rsyncdirs_example(self, testdir):
source = testdir.mkdir("source") source = testdir.mkdir("source")

View File

@ -47,7 +47,7 @@ class TestHostManager:
assert dest.join("dir1", "dir2").check() assert dest.join("dir1", "dir2").check()
assert dest.join("dir1", "dir2", 'hello').check() assert dest.join("dir1", "dir2", 'hello').check()
def test_hostmanager_init_rsync_roots(self, source, dest): def test_init_rsync_roots(self, source, dest):
dir2 = source.ensure("dir1", "dir2", dir=1) dir2 = source.ensure("dir1", "dir2", dir=1)
source.ensure("dir1", "somefile", dir=1) source.ensure("dir1", "somefile", dir=1)
dir2.ensure("hello") dir2.ensure("hello")
@ -63,7 +63,7 @@ class TestHostManager:
assert not dest.join("dir1").check() assert not dest.join("dir1").check()
assert not dest.join("bogus").check() assert not dest.join("bogus").check()
def test_hostmanager_rsyncignore(self, source, dest): def test_rsyncignore(self, source, dest):
dir2 = source.ensure("dir1", "dir2", dir=1) dir2 = source.ensure("dir1", "dir2", dir=1)
dir5 = source.ensure("dir5", "dir6", "bogus") dir5 = source.ensure("dir5", "dir6", "bogus")
dirf = source.ensure("dir5", "file") dirf = source.ensure("dir5", "file")
@ -81,7 +81,7 @@ class TestHostManager:
assert dest.join("dir5","file").check() assert dest.join("dir5","file").check()
assert not dest.join("dir6").check() assert not dest.join("dir6").check()
def test_hostmanage_optimise_popen(self, source, dest): def test_optimise_popen(self, source, dest):
hosts = ["popen"] * 3 hosts = ["popen"] * 3
source.join("conftest.py").write("rsyncdirs = ['a']") source.join("conftest.py").write("rsyncdirs = ['a']")
source.ensure('a', dir=1) source.ensure('a', dir=1)
@ -92,7 +92,7 @@ class TestHostManager:
assert gwspec._samefilesystem() assert gwspec._samefilesystem()
assert not gwspec.chdir assert not gwspec.chdir
def test_hostmanage_setup_hosts_DEBUG(self, source, EventRecorder): def test_setup_hosts_DEBUG(self, source, EventRecorder):
hosts = ["popen"] * 2 hosts = ["popen"] * 2
source.join("conftest.py").write("rsyncdirs = ['a']") source.join("conftest.py").write("rsyncdirs = ['a']")
source.ensure('a', dir=1) source.ensure('a', dir=1)
@ -107,7 +107,7 @@ class TestHostManager:
assert l assert l
hm.teardown_hosts() hm.teardown_hosts()
def test_hostmanage_ssh_setup_hosts(self, specssh, testdir): def test_ssh_setup_hosts(self, specssh, testdir):
testdir.makepyfile(__init__="", test_x=""" testdir.makepyfile(__init__="", test_x="""
def test_one(): def test_one():
pass pass

View File

@ -42,9 +42,9 @@ class MySetup:
config = py.test.config._reparse([]) config = py.test.config._reparse([])
self.config = config self.config = config
self.queue = py.std.Queue.Queue() self.queue = py.std.Queue.Queue()
self.host = py.execnet.XSpec("popen") self.xspec = py.execnet.XSpec("popen")
self.gateway = py.execnet.makegateway(self.host) self.gateway = py.execnet.makegateway(self.xspec)
self.node = MasterNode(self.host, self.gateway, self.config, putevent=self.queue.put) self.node = MasterNode(self.gateway, self.config, putevent=self.queue.put)
assert not self.node.channel.isclosed() assert not self.node.channel.isclosed()
return self.node return self.node
@ -64,14 +64,20 @@ def pytest_pyfuncarg_testdir(__call__, pyfuncitem):
testdir.chdir() testdir.chdir()
return testdir return testdir
class TestMasterSlaveConnection: def test_node_hash_equality(mysetup):
node = mysetup.makenode()
node2 = mysetup.makenode()
assert node != node2
assert node == node
assert not (node != node)
class TestMasterSlaveConnection:
def test_crash_invalid_item(self, mysetup): def test_crash_invalid_item(self, mysetup):
node = mysetup.makenode() node = mysetup.makenode()
node.send(123) # invalid item node.send(123) # invalid item
ev, = mysetup.geteventargs("testnodedown") n, error = mysetup.geteventargs("testnodedown")
assert ev.host == mysetup.host assert n is node
assert str(ev.error).find("AttributeError") != -1 assert str(error).find("AttributeError") != -1
def test_crash_killed(self, testdir, mysetup): def test_crash_killed(self, testdir, mysetup):
if not hasattr(py.std.os, 'kill'): if not hasattr(py.std.os, 'kill'):
@ -83,16 +89,16 @@ class TestMasterSlaveConnection:
""") """)
node = mysetup.makenode(item.config) node = mysetup.makenode(item.config)
node.send(item) node.send(item)
ev, = mysetup.geteventargs("testnodedown") n, error = mysetup.geteventargs("testnodedown")
assert ev.host == mysetup.host assert n is node
assert str(ev.error).find("TERMINATED") != -1 assert str(error).find("Not properly terminated") != -1
def test_node_down(self, mysetup): def test_node_down(self, mysetup):
node = mysetup.makenode() node = mysetup.makenode()
node.shutdown() node.shutdown()
ev, = mysetup.geteventargs("testnodedown") n, error = mysetup.geteventargs("testnodedown")
assert ev.host == mysetup.host assert n is node
assert not ev.error assert not error
node.callback(node.ENDMARK) node.callback(node.ENDMARK)
excinfo = py.test.raises(IOError, excinfo = py.test.raises(IOError,
"mysetup.geteventargs('testnodedown', timeout=0.01)") "mysetup.geteventargs('testnodedown', timeout=0.01)")

View File

@ -146,20 +146,10 @@ class RescheduleItems(BaseEvent):
def __init__(self, items): def __init__(self, items):
self.items = items self.items = items
class HostGatewayReady(BaseEvent): #class HostGatewayReady(BaseEvent):
def __init__(self, host, roots): # def __init__(self, host, roots):
self.host = host # self.host = host
self.roots = roots # self.roots = roots
class HostUp(BaseEvent):
def __init__(self, host, platinfo):
self.host = host
self.platinfo = platinfo
class HostDown(BaseEvent):
def __init__(self, host, error=None):
self.host = host
self.error = error
# --------------------------------------------------------------------- # ---------------------------------------------------------------------
# Events related to rsyncing # Events related to rsyncing

View File

@ -200,11 +200,11 @@ class PytestPluginHooks:
within the gateway manager context. within the gateway manager context.
""" """
def pyevent_testnodeready(self, event): def pyevent_testnodeready(self, node):
""" Host is up. """ """ Node is ready to operate. """
def pyevent_testnodedown(self, event): def pyevent_testnodedown(self, node, error):
""" Host is down. """ """ Node is down. """
def pyevent_rescheduleitems(self, event): def pyevent_rescheduleitems(self, event):
""" Items from a host that went down. """ """ Items from a host that went down. """

View File

@ -103,19 +103,20 @@ class TerminalReporter:
# which garbles our output if we use self.write_line # which garbles our output if we use self.write_line
self.write_line(msg) self.write_line(msg)
def pyevent_testnodeready(self, event): def pyevent_testnodeready(self, node):
d = event.platinfo.copy() # XXX
d['host'] = getattr(event.host, 'address', event.host) self.write_line("Node Ready: %r, spec %r" % (node,node.gateway.spec))
d['version'] = repr_pythonversion(d['sys.version_info'])
self.write_line("HOSTUP: %(host)s %(sys.platform)s " #d = event.platinfo.copy()
"%(sys.executable)s - Python %(version)s" % #d['host'] = getattr(event.host, 'address', event.host)
d) #d['version'] = repr_pythonversion(d['sys.version_info'])
#self.write_line("HOSTUP: %(host)s %(sys.platform)s "
# "%(sys.executable)s - Python %(version)s" %
# d)
def pyevent_testnodedown(self, event): def pyevent_testnodedown(self, node, error):
host = event.host
error = event.error
if error: if error:
self.write_line("HostDown %s: %s" %(host, error)) self.write_line("Node Down: %r: %r" %(node, error))
def pyevent_trace(self, category, msg): def pyevent_trace(self, category, msg):
if self.config.option.debug or \ if self.config.option.debug or \
@ -323,13 +324,13 @@ def repr_pythonversion(v=None):
from py.__.test import event from py.__.test import event
from py.__.test.runner import basic_run_report from py.__.test.runner import basic_run_report
from py.__.test.dsession.masterslave import maketestnodeready
class TestTerminal: class TestTerminal:
@py.test.mark.xfail
def test_testnodeready(self, testdir, linecomp): def test_testnodeready(self, testdir, linecomp):
item = testdir.getitem("def test_func(): pass") item = testdir.getitem("def test_func(): pass")
rep = TerminalReporter(item.config, linecomp.stringio) rep = TerminalReporter(item.config, linecomp.stringio)
rep.pyevent_testnodeready(maketestnodeready()) XXX # rep.pyevent_testnodeready(maketestnodeready())
linecomp.assert_contains_lines([ linecomp.assert_contains_lines([
"*INPROCESS* %s %s - Python %s" %(sys.platform, "*INPROCESS* %s %s - Python %s" %(sys.platform,
sys.executable, repr_pythonversion(sys.version_info)) sys.executable, repr_pythonversion(sys.version_info))
@ -428,10 +429,10 @@ class TestTerminal:
rep = TerminalReporter(modcol.config, file=linecomp.stringio) rep = TerminalReporter(modcol.config, file=linecomp.stringio)
class gw1: class gw1:
id = "X1" id = "X1"
spec = py.execnet.GatewaySpec("popen") spec = py.execnet.XSpec("popen")
class gw2: class gw2:
id = "X2" id = "X2"
spec = py.execnet.GatewaySpec("popen") spec = py.execnet.XSpec("popen")
rep.pyevent_gwmanage_newgateway(gateway=gw1) rep.pyevent_gwmanage_newgateway(gateway=gw1)
linecomp.assert_contains_lines([ linecomp.assert_contains_lines([
"X1 instantiated gateway from spec*", "X1 instantiated gateway from spec*",

View File

@ -12,7 +12,6 @@ from py.__.test import event, outcome
Item = py.test.collect.Item Item = py.test.collect.Item
Collector = py.test.collect.Collector Collector = py.test.collect.Collector
from runner import basic_collect_report from runner import basic_collect_report
from py.__.test.dsession.masterslave import maketestnodeready
class Session(object): class Session(object):
""" """
@ -117,7 +116,7 @@ class Session(object):
colitems = self.getinitialitems(colitems) colitems = self.getinitialitems(colitems)
self.shouldstop = False self.shouldstop = False
self.sessionstarts() self.sessionstarts()
self.bus.notify("testnodeready", maketestnodeready()) #self.bus.notify("testnodeready", maketestnodeready())
exitstatus = outcome.EXIT_OK exitstatus = outcome.EXIT_OK
captured_excinfo = None captured_excinfo = None
try: try: