test_ok1/py/test/dist/txnode.py

136 lines
4.8 KiB
Python

"""
Manage setup, running and local representation of remote nodes/processes.
"""
import py
from py.__.test.dist.mypickle import PickleChannel
class TXNode(object):
""" Represents a Test Execution environment in the controlling process.
- sets up a slave node through an execnet gateway
- manages sending of test-items and receival of results and events
- creates events when the remote side crashes
"""
ENDMARK = -1
def __init__(self, gateway, config, putevent, slaveready=None):
self.config = config
self.putevent = putevent
self.gateway = gateway
self.channel = install_slave(gateway, config)
self._sendslaveready = slaveready
self.channel.setcallback(self.callback, endmarker=self.ENDMARK)
self._down = False
def notify(self, eventname, *args, **kwargs):
assert not args
self.putevent((eventname, args, kwargs))
def callback(self, eventcall):
""" this gets called for each object we receive from
the other side and if the channel closes.
Note that channel callbacks run in the receiver
thread of execnet gateways - we need to
avoid raising exceptions or doing heavy work.
"""
try:
if eventcall == self.ENDMARK:
err = self.channel._getremoteerror()
if not self._down:
if not err:
err = "Not properly terminated"
self.notify("pytest_testnodedown", node=self, error=err)
self._down = True
return
eventname, args, kwargs = eventcall
if eventname == "slaveready":
if self._sendslaveready:
self._sendslaveready(self)
self.notify("pytest_testnodeready", node=self)
elif eventname == "slavefinished":
self._down = True
self.notify("pytest_testnodedown", error=None, node=self)
elif eventname == "pytest_itemtestreport":
rep = kwargs['rep']
rep.node = self
self.notify("pytest_itemtestreport", rep=rep)
else:
self.notify(eventname, *args, **kwargs)
except KeyboardInterrupt:
# should not land in receiver-thread
raise
except:
excinfo = py.code.ExceptionInfo()
print "!" * 20, excinfo
self.config.pluginmanager.notify_exception(excinfo)
def send(self, item):
assert item is not None
self.channel.send(item)
def sendlist(self, itemlist):
self.channel.send(itemlist)
def shutdown(self):
self.channel.send(None)
# setting up slave code
def install_slave(gateway, config):
channel = gateway.remote_exec(source="""
import os, sys
sys.path.insert(0, os.getcwd())
from py.__.test.dist.mypickle import PickleChannel
from py.__.test.dist.txnode import SlaveNode
channel = PickleChannel(channel)
slavenode = SlaveNode(channel)
slavenode.run()
""")
channel = PickleChannel(channel)
basetemp = None
if gateway.spec.popen:
popenbase = config.ensuretemp("popen")
basetemp = py.path.local.make_numbered_dir(prefix="slave-",
keep=0, rootdir=popenbase)
basetemp = str(basetemp)
channel.send((config, basetemp))
return channel
class SlaveNode(object):
def __init__(self, channel):
self.channel = channel
def __repr__(self):
return "<%s channel=%s>" %(self.__class__.__name__, self.channel)
def sendevent(self, eventname, *args, **kwargs):
self.channel.send((eventname, args, kwargs))
def pytest_itemtestreport(self, rep):
self.sendevent("pytest_itemtestreport", rep=rep)
def run(self):
channel = self.channel
self.config, basetemp = channel.receive()
if basetemp:
self.config.basetemp = py.path.local(basetemp)
self.config.pluginmanager.do_configure(self.config)
self.config.pluginmanager.register(self)
self.sendevent("slaveready")
try:
while 1:
task = channel.receive()
if task is None:
self.sendevent("slavefinished")
break
if isinstance(task, list):
for item in task:
item.config.pluginmanager.do_itemrun(item)
else:
task.config.pluginmanager.do_itemrun(item=task)
except KeyboardInterrupt:
raise
except:
er = py.code.ExceptionInfo().getrepr(funcargs=True, showlocals=True)
self.sendevent("pytest_internalerror", excrepr=er)
raise