148 lines
5.0 KiB
Python
148 lines
5.0 KiB
Python
|
|
import py
|
|
import execnet
|
|
from _py.test.dist.txnode import TXNode
|
|
queue = py.builtin._tryimport("queue", "Queue")
|
|
Queue = queue.Queue
|
|
|
|
class EventQueue:
|
|
def __init__(self, registry, queue=None):
|
|
if queue is None:
|
|
queue = Queue()
|
|
self.queue = queue
|
|
registry.register(self)
|
|
|
|
def geteventargs(self, eventname, timeout=2.0):
|
|
events = []
|
|
while 1:
|
|
try:
|
|
eventcall = self.queue.get(timeout=timeout)
|
|
except queue.Empty:
|
|
#print "node channel", self.node.channel
|
|
#print "remoteerror", self.node.channel._getremoteerror()
|
|
py.builtin.print_("seen events", events)
|
|
raise IOError("did not see %r events" % (eventname))
|
|
else:
|
|
name, args, kwargs = eventcall
|
|
assert isinstance(name, str)
|
|
if name == eventname:
|
|
if args:
|
|
return args
|
|
return kwargs
|
|
events.append(name)
|
|
if name == "pytest_internalerror":
|
|
py.builtin.print_(str(kwargs["excrepr"]))
|
|
|
|
class MySetup:
|
|
def __init__(self, request):
|
|
self.id = 0
|
|
self.request = request
|
|
|
|
def geteventargs(self, eventname, timeout=2.0):
|
|
eq = EventQueue(self.config.pluginmanager, self.queue)
|
|
return eq.geteventargs(eventname, timeout=timeout)
|
|
|
|
def makenode(self, config=None):
|
|
if config is None:
|
|
config = py.test.config._reparse([])
|
|
self.config = config
|
|
self.queue = Queue()
|
|
self.xspec = execnet.XSpec("popen")
|
|
self.gateway = execnet.makegateway(self.xspec)
|
|
self.id += 1
|
|
self.gateway.id = str(self.id)
|
|
self.node = TXNode(self.gateway, self.config, putevent=self.queue.put)
|
|
assert not self.node.channel.isclosed()
|
|
return self.node
|
|
|
|
def xfinalize(self):
|
|
if hasattr(self, 'node'):
|
|
gw = self.node.gateway
|
|
py.builtin.print_("exiting:", gw)
|
|
gw.exit()
|
|
|
|
def pytest_funcarg__mysetup(request):
|
|
mysetup = MySetup(request)
|
|
#pyfuncitem.addfinalizer(mysetup.finalize)
|
|
return mysetup
|
|
|
|
def test_node_hash_equality(mysetup):
|
|
node = mysetup.makenode()
|
|
node2 = mysetup.makenode()
|
|
assert node != node2
|
|
assert node == node
|
|
assert not (node != node)
|
|
|
|
class TestMasterSlaveConnection:
|
|
def test_crash_invalid_item(self, mysetup):
|
|
node = mysetup.makenode()
|
|
node.send(123) # invalid item
|
|
kwargs = mysetup.geteventargs("pytest_testnodedown")
|
|
assert kwargs['node'] is node
|
|
assert str(kwargs['error']).find("AttributeError") != -1
|
|
|
|
def test_crash_killed(self, testdir, mysetup):
|
|
if not hasattr(py.std.os, 'kill'):
|
|
py.test.skip("no os.kill")
|
|
item = testdir.getitem("""
|
|
def test_func():
|
|
import os
|
|
os.kill(os.getpid(), 15)
|
|
""")
|
|
node = mysetup.makenode(item.config)
|
|
node.send(item)
|
|
kwargs = mysetup.geteventargs("pytest_testnodedown")
|
|
assert kwargs['node'] is node
|
|
assert str(kwargs['error']).find("Not properly terminated") != -1
|
|
|
|
def test_node_down(self, mysetup):
|
|
node = mysetup.makenode()
|
|
node.shutdown()
|
|
kwargs = mysetup.geteventargs("pytest_testnodedown")
|
|
assert kwargs['node'] is node
|
|
assert not kwargs['error']
|
|
node.callback(node.ENDMARK)
|
|
excinfo = py.test.raises(IOError,
|
|
"mysetup.geteventargs('testnodedown', timeout=0.01)")
|
|
|
|
def test_send_on_closed_channel(self, testdir, mysetup):
|
|
item = testdir.getitem("def test_func(): pass")
|
|
node = mysetup.makenode(item.config)
|
|
node.channel.close()
|
|
py.test.raises(IOError, "node.send(item)")
|
|
#ev = self.getcalls(pytest_internalerror)
|
|
#assert ev.excinfo.errisinstance(IOError)
|
|
|
|
def test_send_one(self, testdir, mysetup):
|
|
item = testdir.getitem("def test_func(): pass")
|
|
node = mysetup.makenode(item.config)
|
|
node.send(item)
|
|
kwargs = mysetup.geteventargs("pytest_runtest_logreport")
|
|
rep = kwargs['report']
|
|
assert rep.passed
|
|
py.builtin.print_(rep)
|
|
assert rep.item == item
|
|
|
|
def test_send_some(self, testdir, mysetup):
|
|
items = testdir.getitems("""
|
|
def test_pass():
|
|
pass
|
|
def test_fail():
|
|
assert 0
|
|
def test_skip():
|
|
import py
|
|
py.test.skip("x")
|
|
""")
|
|
node = mysetup.makenode(items[0].config)
|
|
for item in items:
|
|
node.send(item)
|
|
for outcome in "passed failed skipped".split():
|
|
kwargs = mysetup.geteventargs("pytest_runtest_logreport")
|
|
report = kwargs['report']
|
|
assert getattr(report, outcome)
|
|
|
|
node.sendlist(items)
|
|
for outcome in "passed failed skipped".split():
|
|
rep = mysetup.geteventargs("pytest_runtest_logreport")['report']
|
|
assert getattr(rep, outcome)
|