[svn r63210] * implement "--dist=each" to distribute each to each available node

* improved node-management and nice showing of results
* streamline command line options

--HG--
branch : trunk
This commit is contained in:
hpk 2009-03-22 21:44:45 +01:00
parent ad6afe21ff
commit 2c2bfb5513
13 changed files with 123 additions and 84 deletions

View File

@ -12,11 +12,10 @@ class PylibTestconfigPlugin:
group = parser.addgroup("pylib", "py lib testing options")
group.addoption('--sshhost',
action="store", dest="sshhost", default=None,
help=("target to run tests requiring ssh, e.g. "
"user@codespeak.net"))
help=("ssh xspec for ssh functional tests. "))
group.addoption('--gx',
action="append", dest="gspecs", default=None,
help=("add a global test environment, XSpec-syntax. ")),
help=("add a global test environment, XSpec-syntax. "))
group.addoption('--runslowtests',
action="store_true", dest="runslowtests", default=False,
help=("run slow tests"))

View File

@ -17,10 +17,10 @@ To send tests to multiple CPUs, type::
Especially for longer running tests or tests requiring
a lot of IO this can lead to considerable speed ups.
Test on a different python interpreter
Test on a different python version
----------------------------------------------------------
To send tests to a python2.4 process, you may type::
To send tests to a python2.4 interpreter process, you may type::
py.test --tx popen//python=python2.4
@ -65,6 +65,7 @@ with something like this::
py.test -d --tx socket=192.168.1.102:8888 --rsyncdir mypkg mypkg
no remote installation requirements
++++++++++++++++++++++++++++++++++++++++++++

View File

@ -244,6 +244,9 @@ class Gateway(object):
def _rinfo(self, update=False):
""" return some sys/env information from remote. """
if update or not hasattr(self, '_cache_rinfo'):
class RInfo:
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
self._cache_rinfo = RInfo(**self.remote_exec("""
import sys, os
channel.send(dict(
@ -251,6 +254,7 @@ class Gateway(object):
version_info = sys.version_info,
platform = sys.platform,
cwd = os.getcwd(),
pid = os.getpid(),
))
""").receive())
return self._cache_rinfo
@ -368,10 +372,6 @@ class Gateway(object):
if self._requestqueue is not None:
self._requestqueue.put(None)
class RInfo:
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
def getid(gw, cache={}):
name = gw.__class__.__name__
try:

View File

@ -3,19 +3,21 @@ import py
XSpec = py.execnet.XSpec
class TestXSpec:
def test_attributes(self):
def test_norm_attributes(self):
spec = XSpec("socket=192.168.102.2:8888//python=c:/this/python2.5//chdir=d:\hello")
assert spec.socket == "192.168.102.2:8888"
assert spec.python == "c:/this/python2.5"
assert spec.chdir == "d:\hello"
assert spec.nice is None
assert not hasattr(spec, 'xyz')
py.test.raises(AttributeError, "spec._hello")
spec = XSpec("socket=192.168.102.2:8888//python=python2.5")
spec = XSpec("socket=192.168.102.2:8888//python=python2.5//nice=3")
assert spec.socket == "192.168.102.2:8888"
assert spec.python == "python2.5"
assert spec.chdir is None
assert spec.nice == "3"
spec = XSpec("ssh=user@host//chdir=/hello/this//python=/usr/bin/python2.5")
assert spec.ssh == "user@host"
@ -53,6 +55,18 @@ class TestMakegateway:
assert rinfo.cwd == py.std.os.getcwd()
assert rinfo.version_info == py.std.sys.version_info
def test_popen_nice(self):
gw = py.execnet.makegateway("popen//nice=5")
remotenice = gw.remote_exec("""
import os
if hasattr(os, 'nice'):
channel.send(os.nice(0))
else:
channel.send(None)
""").receive()
if remotenice is not None:
assert remotenice == 5
def test_popen_explicit(self):
gw = py.execnet.makegateway("popen//python=%s" % py.std.sys.executable)
assert gw.spec.python == py.std.sys.executable

View File

@ -10,7 +10,7 @@ class XSpec:
* if no "=value" is given, assume a boolean True value
"""
# XXX for now we are very restrictive about actually allowed key-values
popen = ssh = socket = python = chdir = None
popen = ssh = socket = python = chdir = nice = None
def __init__(self, string):
self._spec = string
@ -56,14 +56,18 @@ def makegateway(spec):
gw = py.execnet.SocketGateway(*hostport)
gw.spec = spec
# XXX events
if spec.chdir:
gw.remote_exec("""
if spec.chdir or spec.nice:
channel = gw.remote_exec("""
import os
path = %r
try:
path, nice = channel.receive()
if path:
if not os.path.exists(path):
os.mkdir(path)
os.chdir(path)
except OSError:
os.mkdir(path)
os.chdir(path)
""" % spec.chdir).waitclose()
if nice and hasattr(os, 'nice'):
os.nice(nice)
""")
nice = spec.nice and int(spec.nice) or 0
channel.send((spec.chdir, nice))
channel.waitclose()
return gw

View File

@ -177,8 +177,11 @@ class DSession(Session):
else:
self.bus.notify("collectionstart", event.CollectionStart(next))
self.queueevent("collectionreport", basic_collect_report(next))
self.senditems(senditems)
#self.senditems_each(senditems)
if self.config.option.dist == "each":
self.senditems_each(senditems)
else:
# XXX assert self.config.option.dist == "load"
self.senditems_load(senditems)
def queueevent(self, eventname, *args, **kwargs):
self.queue.put((eventname, args, kwargs))
@ -201,7 +204,7 @@ class DSession(Session):
# we have some left, give it to the main loop
self.queueevent("rescheduleitems", event.RescheduleItems(tosend))
def senditems(self, tosend):
def senditems_load(self, tosend):
if not tosend:
return
for node, pending in self.node2pending.items():
@ -242,6 +245,8 @@ class DSession(Session):
""" setup any neccessary resources ahead of the test run. """
self.nodemanager = NodeManager(self.config)
self.nodemanager.setup_nodes(putevent=self.queue.put)
if self.config.option.dist == "each":
self.nodemanager.wait_nodesready(5.0)
def teardown(self):
""" teardown any resources after a test run. """

View File

@ -1,6 +1,6 @@
import py
import sys, os
from py.__.test.dist.txnode import MasterNode
from py.__.test.dist.txnode import TXNode
from py.__.execnet.gwmanage import GatewayManager
@ -12,19 +12,11 @@ class NodeManager(object):
self.roots = self.config.getrsyncdirs()
self.gwmanager = GatewayManager(specs)
self.nodes = []
self._nodesready = py.std.threading.Event()
def trace(self, msg):
self.config.bus.notify("trace", "nodemanage", msg)
def trace_nodestatus(self):
if self.config.option.debug:
for ch, result in self.gwmanager.multi_exec("""
import sys, os
channel.send((sys.executable, os.getcwd(), sys.path))
""").receive_each(withchannel=True):
self.trace("spec %r, execuable %r, cwd %r, syspath %r" %(
ch.gateway.spec, result[0], result[1], result[2]))
def config_getignores(self):
return self.config.getconftest_pathlist("rsyncignore")
@ -55,20 +47,33 @@ class NodeManager(object):
# such that unpickling configs will
# pick it up as the right topdir
# (for other gateways this chdir is irrelevant)
self.trace("making gateways")
old = self.config.topdir.chdir()
try:
self.gwmanager.makegateways()
finally:
old.chdir()
self.trace_nodestatus()
def setup_nodes(self, putevent):
self.rsync_roots()
self.trace_nodestatus()
self.trace("setting up nodes")
for gateway in self.gwmanager.gateways:
node = MasterNode(gateway, self.config, putevent)
self.nodes.append(node)
node = TXNode(gateway, self.config, putevent, slaveready=self._slaveready)
gateway.node = node # to keep node alive
self.trace("started node %r" % node)
def _slaveready(self, node):
#assert node.gateway == node.gateway
#assert node.gateway.node == node
self.nodes.append(node)
self.trace("%s slave node ready %r" % (node.gateway.id, node))
if len(self.nodes) == len(self.gwmanager.gateways):
self._nodesready.set()
def wait_nodesready(self, timeout=None):
self._nodesready.wait(timeout)
if not self._nodesready.isSet():
raise IOError("nodes did not get ready for %r secs" % timeout)
def teardown_nodes(self):
# XXX do teardown nodes?

View File

@ -35,14 +35,14 @@ class TestDSession:
assert not session.node2pending
session.addnode(node)
assert len(session.node2pending) == 1
session.senditems([item])
session.senditems_load([item])
pending = session.removenode(node)
assert pending == [item]
assert item not in session.item2nodes
l = session.removenode(node)
assert not l
def test_send_remove_to_two_nodes(self, testdir):
def test_senditems_each_and_receive_with_two_nodes(self, testdir):
item = testdir.getitem("def test_func(): pass")
node1 = MockNode()
node2 = MockNode()
@ -60,13 +60,13 @@ class TestDSession:
assert not session.node2pending[node1]
assert not session.item2nodes
def test_senditems_removeitems(self, testdir):
def test_senditems_load_and_receive_one_node(self, testdir):
item = testdir.getitem("def test_func(): pass")
node = MockNode()
rep = run(item, node)
session = DSession(item.config)
session.addnode(node)
session.senditems([item])
session.senditems_load([item])
assert session.node2pending[node] == [item]
assert session.item2nodes[item] == [node]
session.removeitem(item, node)
@ -174,7 +174,7 @@ class TestDSession:
session.addnode(node2)
# have one test pending for a node that goes down
session.senditems([item1, item2])
session.senditems_load([item1, item2])
node = session.item2nodes[item1] [0]
session.queueevent("testnodedown", node, None)
evrec = EventRecorder(session.bus)
@ -316,7 +316,7 @@ class TestDSession:
node = MockNode()
session.addnode(node)
session.senditems([item])
session.senditems_load([item])
session.queueevent("itemtestreport", run(item, node))
loopstate = session._initloopstate([])
session.loop_once(loopstate)
@ -340,7 +340,7 @@ class TestDSession:
colreport = basic_collect_report(modcol)
item1, item2 = colreport.result
session.senditems([item1])
session.senditems_load([item1])
# node2pending will become empty when the loop sees the report
rep = run(item1, node)

View File

@ -28,6 +28,13 @@ class TestNodeManager:
assert p.join("dir1").check()
assert p.join("dir1", "file1").check()
def test_popen_nodes_are_ready(self, testdir):
nodemanager = NodeManager(testdir.parseconfig(
"--tx", "3*popen"))
nodemanager.setup_nodes([].append)
nodemanager.wait_nodesready(timeout=2.0)
def test_popen_rsync_subdir(self, testdir, source, dest):
dir1 = source.mkdir("dir1")
dir2 = dir1.mkdir("dir2")

View File

@ -1,6 +1,6 @@
import py
from py.__.test.dist.txnode import MasterNode
from py.__.test.dist.txnode import TXNode
class EventQueue:
def __init__(self, bus, queue=None):
@ -44,7 +44,7 @@ class MySetup:
self.queue = py.std.Queue.Queue()
self.xspec = py.execnet.XSpec("popen")
self.gateway = py.execnet.makegateway(self.xspec)
self.node = MasterNode(self.gateway, self.config, putevent=self.queue.put)
self.node = TXNode(self.gateway, self.config, putevent=self.queue.put)
assert not self.node.channel.isclosed()
return self.node

View File

@ -5,16 +5,22 @@ import py
from py.__.test import event
from py.__.test.dist.mypickle import PickleChannel
class MasterNode(object):
""" Install slave code, manage sending test tasks & receiving results """
class TXNode(object):
""" Represents a Test Execution environment in the controlling process.
- sets up a slave node through an execnet gateway
- manages sending of test-items and receival of results and events
- creates events when the remote side crashes
"""
ENDMARK = -1
def __init__(self, gateway, config, putevent):
def __init__(self, gateway, config, putevent, slaveready=None):
self.config = config
self.putevent = putevent
self.gateway = gateway
self.channel = install_slave(gateway, config)
self.channel.setcallback(self.callback, endmarker=self.ENDMARK)
self._sendslaveready = slaveready
self._down = False
def notify(self, eventname, *args, **kwargs):
@ -39,6 +45,8 @@ class MasterNode(object):
return
eventname, args, kwargs = eventcall
if eventname == "slaveready":
if self._sendslaveready:
self._sendslaveready(self)
self.notify("testnodeready", self)
elif eventname == "slavefinished":
self._down = True

View File

@ -26,6 +26,7 @@ class TerminalReporter:
file = py.std.sys.stdout
self._tw = py.io.TerminalWriter(file)
self.currentfspath = None
self.gateway2info = {}
def write_fspath_result(self, fspath, res):
if fspath != self.currentfspath:
@ -96,10 +97,12 @@ class TerminalReporter:
else:
d['extra'] = ""
d['cwd'] = rinfo.cwd
self.write_line("%(id)s %(spec)s -- platform %(platform)s, "
infoline = ("%(id)s %(spec)s -- platform %(platform)s, "
"Python %(version)s "
"cwd: %(cwd)s"
"%(extra)s" % d)
self.write_line(infoline)
self.gateway2info[gateway] = infoline
def pyevent_gwmanage_rsyncstart(self, source, gateways):
targets = ", ".join([gw.id for gw in gateways])
@ -123,7 +126,6 @@ class TerminalReporter:
def pyevent_testnodeready(self, node):
self.write_line("%s node ready to receive tests" %(node.gateway.id,))
def pyevent_testnodedown(self, node, error):
if error:
self.write_line("%s node down, error: %s" %(node.gateway.id, error))
@ -244,7 +246,11 @@ class TerminalReporter:
if 'failed' in self.stats and self.config.option.tbstyle != "no":
self.write_sep("=", "FAILURES")
for ev in self.stats['failed']:
self.write_sep("_")
self.write_sep("_", "FAILURES")
if hasattr(ev, 'node'):
self.write_line(self.gateway2info.get(
ev.node.gateway, "node %r (platinfo not found? strange)")
[:self._tw.fullwidth-1])
ev.toterminal(self._tw)
def summary_stats(self):
@ -344,15 +350,6 @@ from py.__.test import event
from py.__.test.runner import basic_run_report
class TestTerminal:
@py.test.mark.xfail
def test_testnodeready(self, testdir, linecomp):
item = testdir.getitem("def test_func(): pass")
rep = TerminalReporter(item.config, linecomp.stringio)
XXX # rep.pyevent_testnodeready(maketestnodeready())
linecomp.assert_contains_lines([
"*INPROCESS* %s %s - Python %s" %(sys.platform,
sys.executable, repr_pythonversion(sys.version_info))
])
def test_pass_skip_fail(self, testdir, linecomp):
modcol = testdir.getmodulecol("""

View File

@ -400,6 +400,27 @@ class TestDistribution:
])
assert dest.join(subdir.basename).check(dir=1)
def test_dist_each(self, testdir):
interpreters = []
for name in ("python2.4", "python2.5"):
interp = py.path.local.sysfind(name)
if interp is None:
py.test.skip("%s not found" % name)
interpreters.append(interp)
testdir.makepyfile(__init__="", test_one="""
import sys
def test_hello():
print "%s...%s" % sys.version_info[:2]
assert 0
""")
args = ["--dist=each"]
args += ["--tx", "popen//python=%s" % interpreters[0]]
args += ["--tx", "popen//python=%s" % interpreters[1]]
result = testdir.runpytest(*args)
result.stdout.fnmatch_lines(["2...4"])
result.stdout.fnmatch_lines(["2...5"])
class TestInteractive:
def getspawn(self, tmpdir):
@ -455,28 +476,6 @@ class TestInteractive:
child.expect("1 passed", timeout=5.0)
child.kill(15)
@py.test.mark.xfail("need new cmdline option")
def test_dist_each(self, testdir):
for name in ("python2.4", "python2.5"):
if not py.path.local.sysfind(name):
py.test.skip("%s not found" % name)
testdir.makepyfile(__init__="", test_one="""
import sys
def test_hello():
print sys.version_info[:2]
assert 0
""")
result = testdir.runpytest("--dist-each",
"--tx=popen//python2.4",
"--tx=popen//python2.5",
)
assert result.ret == 1
result.stdout.fnmatch_lines([
"*popen-python2.5*FAIL*",
"*popen-python2.4*FAIL*",
"*2 failed*"
])
class TestKeyboardInterrupt:
def test_raised_in_testfunction(self, testdir):
p1 = testdir.makepyfile("""