adapt to new execnet.Group code (since execnet-1.0.0b4), strike superflous code
--HG-- branch : trunk
This commit is contained in:
parent
3adf6687c9
commit
6d9e3ac686
|
@ -10,9 +10,9 @@ from execnet.gateway_base import RemoteError
|
||||||
class GatewayManager:
|
class GatewayManager:
|
||||||
RemoteError = RemoteError
|
RemoteError = RemoteError
|
||||||
def __init__(self, specs, hook, defaultchdir="pyexecnetcache"):
|
def __init__(self, specs, hook, defaultchdir="pyexecnetcache"):
|
||||||
self.gateways = []
|
|
||||||
self.specs = []
|
self.specs = []
|
||||||
self.hook = hook
|
self.hook = hook
|
||||||
|
self.group = execnet.Group()
|
||||||
for spec in specs:
|
for spec in specs:
|
||||||
if not isinstance(spec, execnet.XSpec):
|
if not isinstance(spec, execnet.XSpec):
|
||||||
spec = execnet.XSpec(spec)
|
spec = execnet.XSpec(spec)
|
||||||
|
@ -21,48 +21,19 @@ class GatewayManager:
|
||||||
self.specs.append(spec)
|
self.specs.append(spec)
|
||||||
|
|
||||||
def makegateways(self):
|
def makegateways(self):
|
||||||
assert not self.gateways
|
assert not list(self.group)
|
||||||
for spec in self.specs:
|
for spec in self.specs:
|
||||||
gw = execnet.makegateway(spec)
|
gw = self.group.makegateway(spec)
|
||||||
self.gateways.append(gw)
|
|
||||||
gw.id = "[%s]" % len(self.gateways)
|
|
||||||
self.hook.pytest_gwmanage_newgateway(
|
self.hook.pytest_gwmanage_newgateway(
|
||||||
gateway=gw, platinfo=gw._rinfo())
|
gateway=gw, platinfo=gw._rinfo())
|
||||||
|
|
||||||
def getgateways(self, remote=True, inplacelocal=True):
|
|
||||||
if not self.gateways and self.specs:
|
|
||||||
self.makegateways()
|
|
||||||
l = []
|
|
||||||
for gw in self.gateways:
|
|
||||||
if gw.spec._samefilesystem():
|
|
||||||
if inplacelocal:
|
|
||||||
l.append(gw)
|
|
||||||
else:
|
|
||||||
if remote:
|
|
||||||
l.append(gw)
|
|
||||||
return execnet.MultiGateway(gateways=l)
|
|
||||||
|
|
||||||
def multi_exec(self, source, inplacelocal=True):
|
|
||||||
""" remote execute code on all gateways.
|
|
||||||
@param inplacelocal=False: don't send code to inplacelocal hosts.
|
|
||||||
"""
|
|
||||||
multigw = self.getgateways(inplacelocal=inplacelocal)
|
|
||||||
return multigw.remote_exec(source)
|
|
||||||
|
|
||||||
def multi_chdir(self, basename, inplacelocal=True):
|
|
||||||
""" perform a remote chdir to the given path, may be relative.
|
|
||||||
@param inplacelocal=False: don't send code to inplacelocal hosts.
|
|
||||||
"""
|
|
||||||
self.multi_exec("import os ; os.chdir(%r)" % basename,
|
|
||||||
inplacelocal=inplacelocal).waitclose()
|
|
||||||
|
|
||||||
def rsync(self, source, notify=None, verbose=False, ignores=None):
|
def rsync(self, source, notify=None, verbose=False, ignores=None):
|
||||||
""" perform rsync to all remote hosts.
|
""" perform rsync to all remote hosts.
|
||||||
"""
|
"""
|
||||||
rsync = HostRSync(source, verbose=verbose, ignores=ignores)
|
rsync = HostRSync(source, verbose=verbose, ignores=ignores)
|
||||||
seen = py.builtin.set()
|
seen = py.builtin.set()
|
||||||
gateways = []
|
gateways = []
|
||||||
for gateway in self.gateways:
|
for gateway in self.group:
|
||||||
spec = gateway.spec
|
spec = gateway.spec
|
||||||
if not spec._samefilesystem():
|
if not spec._samefilesystem():
|
||||||
if spec not in seen:
|
if spec not in seen:
|
||||||
|
@ -84,9 +55,7 @@ class GatewayManager:
|
||||||
)
|
)
|
||||||
|
|
||||||
def exit(self):
|
def exit(self):
|
||||||
while self.gateways:
|
self.group.terminate()
|
||||||
gw = self.gateways.pop()
|
|
||||||
gw.exit()
|
|
||||||
|
|
||||||
class HostRSync(execnet.RSync):
|
class HostRSync(execnet.RSync):
|
||||||
""" RSyncer that filters out common files
|
""" RSyncer that filters out common files
|
||||||
|
|
|
@ -57,7 +57,7 @@ class NodeManager(object):
|
||||||
def setup_nodes(self, putevent):
|
def setup_nodes(self, putevent):
|
||||||
self.rsync_roots()
|
self.rsync_roots()
|
||||||
self.trace("setting up nodes")
|
self.trace("setting up nodes")
|
||||||
for gateway in self.gwmanager.gateways:
|
for gateway in self.gwmanager.group:
|
||||||
node = TXNode(gateway, self.config, putevent, slaveready=self._slaveready)
|
node = TXNode(gateway, self.config, putevent, slaveready=self._slaveready)
|
||||||
gateway.node = node # to keep node alive
|
gateway.node = node # to keep node alive
|
||||||
self.trace("started node %r" % node)
|
self.trace("started node %r" % node)
|
||||||
|
@ -67,7 +67,7 @@ class NodeManager(object):
|
||||||
#assert node.gateway.node == node
|
#assert node.gateway.node == node
|
||||||
self.nodes.append(node)
|
self.nodes.append(node)
|
||||||
self.trace("%s slave node ready %r" % (node.gateway.id, node))
|
self.trace("%s slave node ready %r" % (node.gateway.id, node))
|
||||||
if len(self.nodes) == len(self.gwmanager.gateways):
|
if len(self.nodes) == len(list(self.gwmanager.group)):
|
||||||
self._nodesready.set()
|
self._nodesready.set()
|
||||||
|
|
||||||
def wait_nodesready(self, timeout=None):
|
def wait_nodesready(self, timeout=None):
|
||||||
|
|
|
@ -150,7 +150,7 @@ class TerminalReporter:
|
||||||
else:
|
else:
|
||||||
d['extra'] = ""
|
d['extra'] = ""
|
||||||
d['cwd'] = platinfo.cwd
|
d['cwd'] = platinfo.cwd
|
||||||
infoline = ("%(id)s %(spec)s -- platform %(platform)s, "
|
infoline = ("[%(id)s] %(spec)s -- platform %(platform)s, "
|
||||||
"Python %(version)s "
|
"Python %(version)s "
|
||||||
"cwd: %(cwd)s"
|
"cwd: %(cwd)s"
|
||||||
"%(extra)s" % d)
|
"%(extra)s" % d)
|
||||||
|
@ -158,14 +158,14 @@ class TerminalReporter:
|
||||||
self.gateway2info[gateway] = infoline
|
self.gateway2info[gateway] = infoline
|
||||||
|
|
||||||
def pytest_gwmanage_rsyncstart(self, source, gateways):
|
def pytest_gwmanage_rsyncstart(self, source, gateways):
|
||||||
targets = ", ".join([gw.id for gw in gateways])
|
targets = ", ".join(["[%s]" % gw.id for gw in gateways])
|
||||||
msg = "rsyncstart: %s -> %s" %(source, targets)
|
msg = "rsyncstart: %s -> %s" %(source, targets)
|
||||||
if not self.config.option.verbose:
|
if not self.config.option.verbose:
|
||||||
msg += " # use --verbose to see rsync progress"
|
msg += " # use --verbose to see rsync progress"
|
||||||
self.write_line(msg)
|
self.write_line(msg)
|
||||||
|
|
||||||
def pytest_gwmanage_rsyncfinish(self, source, gateways):
|
def pytest_gwmanage_rsyncfinish(self, source, gateways):
|
||||||
targets = ", ".join([gw.id for gw in gateways])
|
targets = ", ".join(["[%s]" % gw.id for gw in gateways])
|
||||||
self.write_line("rsyncfinish: %s -> %s" %(source, targets))
|
self.write_line("rsyncfinish: %s -> %s" %(source, targets))
|
||||||
|
|
||||||
def pytest_plugin_registered(self, plugin):
|
def pytest_plugin_registered(self, plugin):
|
||||||
|
@ -177,11 +177,11 @@ class TerminalReporter:
|
||||||
self.write_line(msg)
|
self.write_line(msg)
|
||||||
|
|
||||||
def pytest_testnodeready(self, node):
|
def pytest_testnodeready(self, node):
|
||||||
self.write_line("%s txnode ready to receive tests" %(node.gateway.id,))
|
self.write_line("[%s] txnode ready to receive tests" %(node.gateway.id,))
|
||||||
|
|
||||||
def pytest_testnodedown(self, node, error):
|
def pytest_testnodedown(self, node, error):
|
||||||
if error:
|
if error:
|
||||||
self.write_line("%s node down, error: %s" %(node.gateway.id, error))
|
self.write_line("[%s] node down, error: %s" %(node.gateway.id, error))
|
||||||
|
|
||||||
def pytest_trace(self, category, msg):
|
def pytest_trace(self, category, msg):
|
||||||
if self.config.option.debug or \
|
if self.config.option.debug or \
|
||||||
|
@ -203,7 +203,7 @@ class TerminalReporter:
|
||||||
line = self._reportinfoline(item)
|
line = self._reportinfoline(item)
|
||||||
extra = ""
|
extra = ""
|
||||||
if node:
|
if node:
|
||||||
extra = "-> " + str(node.gateway.id)
|
extra = "-> [%s]" % node.gateway.id
|
||||||
self.write_ensure_prefix(line, extra)
|
self.write_ensure_prefix(line, extra)
|
||||||
else:
|
else:
|
||||||
if self.config.option.verbose:
|
if self.config.option.verbose:
|
||||||
|
@ -238,7 +238,7 @@ class TerminalReporter:
|
||||||
else:
|
else:
|
||||||
self.ensure_newline()
|
self.ensure_newline()
|
||||||
if hasattr(rep, 'node'):
|
if hasattr(rep, 'node'):
|
||||||
self._tw.write("%s " % rep.node.gateway.id)
|
self._tw.write("[%s] " % rep.node.gateway.id)
|
||||||
self._tw.write(word, **markup)
|
self._tw.write(word, **markup)
|
||||||
self._tw.write(" " + line)
|
self._tw.write(" " + line)
|
||||||
self.currentfspath = -2
|
self.currentfspath = -2
|
||||||
|
|
|
@ -130,16 +130,16 @@ class TestTerminal:
|
||||||
|
|
||||||
rep.pytest_gwmanage_newgateway(gw1, rinfo)
|
rep.pytest_gwmanage_newgateway(gw1, rinfo)
|
||||||
linecomp.assert_contains_lines([
|
linecomp.assert_contains_lines([
|
||||||
"X1*popen*xyz*2.5*"
|
"*X1*popen*xyz*2.5*"
|
||||||
])
|
])
|
||||||
|
|
||||||
rep.pytest_gwmanage_rsyncstart(source="hello", gateways=[gw1, gw2])
|
rep.pytest_gwmanage_rsyncstart(source="hello", gateways=[gw1, gw2])
|
||||||
linecomp.assert_contains_lines([
|
linecomp.assert_contains_lines([
|
||||||
"rsyncstart: hello -> X1, X2"
|
"rsyncstart: hello -> [X1], [X2]"
|
||||||
])
|
])
|
||||||
rep.pytest_gwmanage_rsyncfinish(source="hello", gateways=[gw1, gw2])
|
rep.pytest_gwmanage_rsyncfinish(source="hello", gateways=[gw1, gw2])
|
||||||
linecomp.assert_contains_lines([
|
linecomp.assert_contains_lines([
|
||||||
"rsyncfinish: hello -> X1, X2"
|
"rsyncfinish: hello -> [X1], [X2]"
|
||||||
])
|
])
|
||||||
|
|
||||||
def test_writeline(self, testdir, linecomp):
|
def test_writeline(self, testdir, linecomp):
|
||||||
|
|
|
@ -37,26 +37,26 @@ class TestGatewayManagerPopen:
|
||||||
hm.makegateways()
|
hm.makegateways()
|
||||||
call = hookrecorder.popcall("pytest_gwmanage_newgateway")
|
call = hookrecorder.popcall("pytest_gwmanage_newgateway")
|
||||||
assert call.gateway.spec == execnet.XSpec("popen")
|
assert call.gateway.spec == execnet.XSpec("popen")
|
||||||
assert call.gateway.id == "[1]"
|
assert call.gateway.id == "1"
|
||||||
assert call.platinfo.executable == call.gateway._rinfo().executable
|
assert call.platinfo.executable == call.gateway._rinfo().executable
|
||||||
call = hookrecorder.popcall("pytest_gwmanage_newgateway")
|
call = hookrecorder.popcall("pytest_gwmanage_newgateway")
|
||||||
assert call.gateway.id == "[2]"
|
assert call.gateway.id == "2"
|
||||||
assert len(hm.gateways) == 2
|
assert len(hm.group) == 2
|
||||||
hm.exit()
|
hm.exit()
|
||||||
assert not len(hm.gateways)
|
assert not len(hm.group)
|
||||||
|
|
||||||
def test_popens_rsync(self, hook, mysetup):
|
def test_popens_rsync(self, hook, mysetup):
|
||||||
source = mysetup.source
|
source = mysetup.source
|
||||||
hm = GatewayManager(["popen"] * 2, hook)
|
hm = GatewayManager(["popen"] * 2, hook)
|
||||||
hm.makegateways()
|
hm.makegateways()
|
||||||
assert len(hm.gateways) == 2
|
assert len(hm.group) == 2
|
||||||
for gw in hm.gateways:
|
for gw in hm.group:
|
||||||
gw.remote_exec = None
|
gw.remote_exec = None
|
||||||
l = []
|
l = []
|
||||||
hm.rsync(source, notify=lambda *args: l.append(args))
|
hm.rsync(source, notify=lambda *args: l.append(args))
|
||||||
assert not l
|
assert not l
|
||||||
hm.exit()
|
hm.exit()
|
||||||
assert not len(hm.gateways)
|
assert not len(hm.group)
|
||||||
|
|
||||||
def test_rsync_popen_with_path(self, hook, mysetup):
|
def test_rsync_popen_with_path(self, hook, mysetup):
|
||||||
source, dest = mysetup.source, mysetup.dest
|
source, dest = mysetup.source, mysetup.dest
|
||||||
|
@ -66,7 +66,7 @@ class TestGatewayManagerPopen:
|
||||||
l = []
|
l = []
|
||||||
hm.rsync(source, notify=lambda *args: l.append(args))
|
hm.rsync(source, notify=lambda *args: l.append(args))
|
||||||
assert len(l) == 1
|
assert len(l) == 1
|
||||||
assert l[0] == ("rsyncrootready", hm.gateways[0].spec, source)
|
assert l[0] == ("rsyncrootready", hm.group['1'].spec, source)
|
||||||
hm.exit()
|
hm.exit()
|
||||||
dest = dest.join(source.basename)
|
dest = dest.join(source.basename)
|
||||||
assert dest.join("dir1").check()
|
assert dest.join("dir1").check()
|
||||||
|
@ -82,49 +82,9 @@ class TestGatewayManagerPopen:
|
||||||
call = hookrecorder.popcall("pytest_gwmanage_rsyncstart")
|
call = hookrecorder.popcall("pytest_gwmanage_rsyncstart")
|
||||||
assert call.source == source
|
assert call.source == source
|
||||||
assert len(call.gateways) == 1
|
assert len(call.gateways) == 1
|
||||||
assert hm.gateways[0] == call.gateways[0]
|
assert hm.group["1"] == call.gateways[0]
|
||||||
call = hookrecorder.popcall("pytest_gwmanage_rsyncfinish")
|
call = hookrecorder.popcall("pytest_gwmanage_rsyncfinish")
|
||||||
|
|
||||||
def test_multi_chdir_popen_with_path(self, hook, testdir):
|
|
||||||
hm = GatewayManager(["popen//chdir=hello"] * 2, hook)
|
|
||||||
testdir.tmpdir.chdir()
|
|
||||||
hellopath = testdir.tmpdir.mkdir("hello").realpath()
|
|
||||||
hm.makegateways()
|
|
||||||
l = hm.multi_exec(
|
|
||||||
"import os ; channel.send(os.getcwd())").receive_each()
|
|
||||||
paths = [x[1] for x in l]
|
|
||||||
assert l == [str(hellopath)] * 2
|
|
||||||
py.test.raises(hm.RemoteError,
|
|
||||||
'hm.multi_chdir("world", inplacelocal=False)')
|
|
||||||
worldpath = hellopath.mkdir("world")
|
|
||||||
hm.multi_chdir("world", inplacelocal=False)
|
|
||||||
l = hm.multi_exec(
|
|
||||||
"import os ; channel.send(os.getcwd())").receive_each()
|
|
||||||
assert len(l) == 2
|
|
||||||
assert l[0] == l[1]
|
|
||||||
curwd = os.getcwd()
|
|
||||||
assert l[0].startswith(curwd)
|
|
||||||
assert l[0].endswith("world")
|
|
||||||
|
|
||||||
def test_multi_chdir_popen(self, testdir, hook):
|
|
||||||
import os
|
|
||||||
hm = GatewayManager(["popen"] * 2, hook)
|
|
||||||
testdir.tmpdir.chdir()
|
|
||||||
hellopath = testdir.tmpdir.mkdir("hello")
|
|
||||||
hm.makegateways()
|
|
||||||
hm.multi_chdir("hello", inplacelocal=False)
|
|
||||||
l = hm.multi_exec("import os ; channel.send(os.getcwd())").receive_each()
|
|
||||||
assert len(l) == 2
|
|
||||||
curwd = os.path.realpath(os.getcwd())
|
|
||||||
assert l == [curwd] * 2
|
|
||||||
|
|
||||||
hm.multi_chdir("hello")
|
|
||||||
l = hm.multi_exec("import os ; channel.send(os.getcwd())").receive_each()
|
|
||||||
assert len(l) == 2
|
|
||||||
assert l[0] == l[1]
|
|
||||||
assert l[0].startswith(curwd)
|
|
||||||
assert l[0].endswith("hello")
|
|
||||||
|
|
||||||
class pytest_funcarg__mysetup:
|
class pytest_funcarg__mysetup:
|
||||||
def __init__(self, request):
|
def __init__(self, request):
|
||||||
tmp = request.getfuncargvalue('tmpdir')
|
tmp = request.getfuncargvalue('tmpdir')
|
||||||
|
|
Loading…
Reference in New Issue