* move gateway management code to py/test/dist because it's not clear
how generally useful it is. * provide pytest_dist_makegateway(txspec) hook so that plugins can add their own interpretation/keywords. --HG-- branch : trunk
This commit is contained in:
parent
8ea2364039
commit
5df58c619d
|
@ -590,15 +590,6 @@ class ExecnetAPI:
|
||||||
def pyexecnet_gateway_exit(self, gateway):
|
def pyexecnet_gateway_exit(self, gateway):
|
||||||
""" signal exitting of gateway. """
|
""" signal exitting of gateway. """
|
||||||
|
|
||||||
def pyexecnet_gwmanage_newgateway(self, gateway, platinfo):
|
|
||||||
""" called when a manager has made a new gateway. """
|
|
||||||
|
|
||||||
def pyexecnet_gwmanage_rsyncstart(self, source, gateways):
|
|
||||||
""" called before rsyncing a directory to remote gateways takes place. """
|
|
||||||
|
|
||||||
def pyexecnet_gwmanage_rsyncfinish(self, source, gateways):
|
|
||||||
""" called after rsyncing a directory to remote gateways takes place. """
|
|
||||||
|
|
||||||
|
|
||||||
class BaseGateway(object):
|
class BaseGateway(object):
|
||||||
hook = ExecnetAPI()
|
hook = ExecnetAPI()
|
||||||
|
|
|
@ -57,7 +57,6 @@ def makegateway(spec):
|
||||||
hostport = spec.socket.split(":")
|
hostport = spec.socket.split(":")
|
||||||
gw = py.execnet.SocketGateway(*hostport)
|
gw = py.execnet.SocketGateway(*hostport)
|
||||||
gw.spec = spec
|
gw.spec = spec
|
||||||
# XXX events
|
|
||||||
if spec.chdir or spec.nice:
|
if spec.chdir or spec.nice:
|
||||||
channel = gw.remote_exec("""
|
channel = gw.remote_exec("""
|
||||||
import os
|
import os
|
||||||
|
|
|
@ -1,27 +1,23 @@
|
||||||
"""
|
"""
|
||||||
instantiating, managing and rsyncing to hosts
|
instantiating, managing and rsyncing to test hosts
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import py
|
import py
|
||||||
import sys, os
|
import sys, os
|
||||||
from py.__.execnet.gateway_base import RemoteError
|
from py.__.execnet.gateway_base import RemoteError
|
||||||
|
|
||||||
NO_ENDMARKER_WANTED = object()
|
|
||||||
|
|
||||||
class GatewayManager:
|
class GatewayManager:
|
||||||
RemoteError = RemoteError
|
RemoteError = RemoteError
|
||||||
def __init__(self, specs, defaultchdir="pyexecnetcache"):
|
def __init__(self, specs, hook, defaultchdir="pyexecnetcache"):
|
||||||
self.gateways = []
|
self.gateways = []
|
||||||
self.specs = []
|
self.specs = []
|
||||||
|
self.hook = hook
|
||||||
for spec in specs:
|
for spec in specs:
|
||||||
if not isinstance(spec, py.execnet.XSpec):
|
if not isinstance(spec, py.execnet.XSpec):
|
||||||
spec = py.execnet.XSpec(spec)
|
spec = py.execnet.XSpec(spec)
|
||||||
if not spec.chdir and not spec.popen:
|
if not spec.chdir and not spec.popen:
|
||||||
spec.chdir = defaultchdir
|
spec.chdir = defaultchdir
|
||||||
self.specs.append(spec)
|
self.specs.append(spec)
|
||||||
self.hook = py._com.HookRelay(
|
|
||||||
py.execnet._HookSpecs, py._com.comregistry)
|
|
||||||
|
|
||||||
def makegateways(self):
|
def makegateways(self):
|
||||||
assert not self.gateways
|
assert not self.gateways
|
||||||
|
@ -29,7 +25,7 @@ class GatewayManager:
|
||||||
gw = py.execnet.makegateway(spec)
|
gw = py.execnet.makegateway(spec)
|
||||||
self.gateways.append(gw)
|
self.gateways.append(gw)
|
||||||
gw.id = "[%s]" % len(self.gateways)
|
gw.id = "[%s]" % len(self.gateways)
|
||||||
self.hook.pyexecnet_gwmanage_newgateway(
|
self.hook.pytest_gwmanage_newgateway(
|
||||||
gateway=gw, platinfo=gw._rinfo())
|
gateway=gw, platinfo=gw._rinfo())
|
||||||
|
|
||||||
def getgateways(self, remote=True, inplacelocal=True):
|
def getgateways(self, remote=True, inplacelocal=True):
|
||||||
|
@ -76,12 +72,12 @@ class GatewayManager:
|
||||||
seen.add(spec)
|
seen.add(spec)
|
||||||
gateways.append(gateway)
|
gateways.append(gateway)
|
||||||
if seen:
|
if seen:
|
||||||
self.hook.pyexecnet_gwmanage_rsyncstart(
|
self.hook.pytest_gwmanage_rsyncstart(
|
||||||
source=source,
|
source=source,
|
||||||
gateways=gateways,
|
gateways=gateways,
|
||||||
)
|
)
|
||||||
rsync.send()
|
rsync.send()
|
||||||
self.hook.pyexecnet_gwmanage_rsyncfinish(
|
self.hook.pytest_gwmanage_rsyncfinish(
|
||||||
source=source,
|
source=source,
|
||||||
gateways=gateways,
|
gateways=gateways,
|
||||||
)
|
)
|
|
@ -1,7 +1,7 @@
|
||||||
import py
|
import py
|
||||||
import sys, os
|
import sys, os
|
||||||
from py.__.test.dist.txnode import TXNode
|
from py.__.test.dist.txnode import TXNode
|
||||||
from py.__.execnet.gwmanage import GatewayManager
|
from py.__.test.dist.gwmanage import GatewayManager
|
||||||
|
|
||||||
|
|
||||||
class NodeManager(object):
|
class NodeManager(object):
|
||||||
|
@ -10,7 +10,7 @@ class NodeManager(object):
|
||||||
if specs is None:
|
if specs is None:
|
||||||
specs = self.config.getxspecs()
|
specs = self.config.getxspecs()
|
||||||
self.roots = self.config.getrsyncdirs()
|
self.roots = self.config.getrsyncdirs()
|
||||||
self.gwmanager = GatewayManager(specs)
|
self.gwmanager = GatewayManager(specs, config.hook)
|
||||||
self.nodes = []
|
self.nodes = []
|
||||||
self._nodesready = py.std.threading.Event()
|
self._nodesready = py.std.threading.Event()
|
||||||
|
|
||||||
|
|
|
@ -133,6 +133,15 @@ pytest_doctest_prepare_content.firstresult = True
|
||||||
# distributed testing
|
# distributed testing
|
||||||
# -------------------------------------------------------------------------
|
# -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def pytest_gwmanage_newgateway(gateway, platinfo):
|
||||||
|
""" called on new raw gateway creation. """
|
||||||
|
|
||||||
|
def pytest_gwmanage_rsyncstart(source, gateways):
|
||||||
|
""" called before rsyncing a directory to remote gateways takes place. """
|
||||||
|
|
||||||
|
def pytest_gwmanage_rsyncfinish(source, gateways):
|
||||||
|
""" called after rsyncing a directory to remote gateways takes place. """
|
||||||
|
|
||||||
def pytest_testnodeready(node):
|
def pytest_testnodeready(node):
|
||||||
""" Test Node is ready to operate. """
|
""" Test Node is ready to operate. """
|
||||||
|
|
||||||
|
|
|
@ -107,7 +107,7 @@ class TerminalReporter:
|
||||||
for line in str(excrepr).split("\n"):
|
for line in str(excrepr).split("\n"):
|
||||||
self.write_line("INTERNALERROR> " + line)
|
self.write_line("INTERNALERROR> " + line)
|
||||||
|
|
||||||
def pyexecnet_gwmanage_newgateway(self, gateway, platinfo):
|
def pytest_gwmanage_newgateway(self, gateway, platinfo):
|
||||||
#self.write_line("%s instantiated gateway from spec %r" %(gateway.id, gateway.spec._spec))
|
#self.write_line("%s instantiated gateway from spec %r" %(gateway.id, gateway.spec._spec))
|
||||||
d = {}
|
d = {}
|
||||||
d['version'] = repr_pythonversion(platinfo.version_info)
|
d['version'] = repr_pythonversion(platinfo.version_info)
|
||||||
|
@ -126,14 +126,14 @@ class TerminalReporter:
|
||||||
self.write_line(infoline)
|
self.write_line(infoline)
|
||||||
self.gateway2info[gateway] = infoline
|
self.gateway2info[gateway] = infoline
|
||||||
|
|
||||||
def pyexecnet_gwmanage_rsyncstart(self, source, gateways):
|
def pytest_gwmanage_rsyncstart(self, source, gateways):
|
||||||
targets = ", ".join([gw.id for gw in gateways])
|
targets = ", ".join([gw.id for gw in gateways])
|
||||||
msg = "rsyncstart: %s -> %s" %(source, targets)
|
msg = "rsyncstart: %s -> %s" %(source, targets)
|
||||||
if not self.config.option.verbose:
|
if not self.config.option.verbose:
|
||||||
msg += " # use --verbose to see rsync progress"
|
msg += " # use --verbose to see rsync progress"
|
||||||
self.write_line(msg)
|
self.write_line(msg)
|
||||||
|
|
||||||
def pyexecnet_gwmanage_rsyncfinish(self, source, gateways):
|
def pytest_gwmanage_rsyncfinish(self, source, gateways):
|
||||||
targets = ", ".join([gw.id for gw in gateways])
|
targets = ", ".join([gw.id for gw in gateways])
|
||||||
self.write_line("rsyncfinish: %s -> %s" %(source, targets))
|
self.write_line("rsyncfinish: %s -> %s" %(source, targets))
|
||||||
|
|
||||||
|
|
|
@ -391,6 +391,7 @@ class TestChannelFile:
|
||||||
def test_join_blocked_execution_gateway():
|
def test_join_blocked_execution_gateway():
|
||||||
gateway = py.execnet.PopenGateway()
|
gateway = py.execnet.PopenGateway()
|
||||||
channel = gateway.remote_exec("""
|
channel = gateway.remote_exec("""
|
||||||
|
import time
|
||||||
time.sleep(5.0)
|
time.sleep(5.0)
|
||||||
""")
|
""")
|
||||||
def doit():
|
def doit():
|
||||||
|
|
|
@ -7,36 +7,46 @@
|
||||||
|
|
||||||
import py
|
import py
|
||||||
import os
|
import os
|
||||||
from py.__.execnet.gwmanage import GatewayManager, HostRSync
|
from py.__.test.dist.gwmanage import GatewayManager, HostRSync
|
||||||
|
from py.__.test.plugin import hookspec
|
||||||
|
|
||||||
|
def pytest_funcarg__hookrecorder(request):
|
||||||
|
_pytest = request.getfuncargvalue('_pytest')
|
||||||
|
hook = request.getfuncargvalue('hook')
|
||||||
|
return _pytest.gethookrecorder(hook._hookspecs, hook._registry)
|
||||||
|
|
||||||
|
def pytest_funcarg__hook(request):
|
||||||
|
registry = py._com.Registry()
|
||||||
|
return py._com.HookRelay(hookspec, registry)
|
||||||
|
|
||||||
class TestGatewayManagerPopen:
|
class TestGatewayManagerPopen:
|
||||||
def test_popen_no_default_chdir(self):
|
def test_popen_no_default_chdir(self, hook):
|
||||||
gm = GatewayManager(["popen"])
|
gm = GatewayManager(["popen"], hook)
|
||||||
assert gm.specs[0].chdir is None
|
assert gm.specs[0].chdir is None
|
||||||
|
|
||||||
def test_default_chdir(self):
|
def test_default_chdir(self, hook):
|
||||||
l = ["ssh=noco", "socket=xyz"]
|
l = ["ssh=noco", "socket=xyz"]
|
||||||
for spec in GatewayManager(l).specs:
|
for spec in GatewayManager(l, hook).specs:
|
||||||
assert spec.chdir == "pyexecnetcache"
|
assert spec.chdir == "pyexecnetcache"
|
||||||
for spec in GatewayManager(l, defaultchdir="abc").specs:
|
for spec in GatewayManager(l, hook, defaultchdir="abc").specs:
|
||||||
assert spec.chdir == "abc"
|
assert spec.chdir == "abc"
|
||||||
|
|
||||||
def test_popen_makegateway_events(self, _pytest):
|
def test_popen_makegateway_events(self, hook, hookrecorder, _pytest):
|
||||||
rec = _pytest.gethookrecorder(py.execnet._HookSpecs)
|
hm = GatewayManager(["popen"] * 2, hook)
|
||||||
hm = GatewayManager(["popen"] * 2)
|
|
||||||
hm.makegateways()
|
hm.makegateways()
|
||||||
call = rec.popcall("pyexecnet_gwmanage_newgateway")
|
call = hookrecorder.popcall("pytest_gwmanage_newgateway")
|
||||||
assert call.gateway.id == "[1]"
|
assert call.gateway.spec == py.execnet.XSpec("popen")
|
||||||
|
assert call.gateway.id == "[1]"
|
||||||
assert call.platinfo.executable == call.gateway._rinfo().executable
|
assert call.platinfo.executable == call.gateway._rinfo().executable
|
||||||
call = rec.popcall("pyexecnet_gwmanage_newgateway")
|
call = hookrecorder.popcall("pytest_gwmanage_newgateway")
|
||||||
assert call.gateway.id == "[2]"
|
assert call.gateway.id == "[2]"
|
||||||
assert len(hm.gateways) == 2
|
assert len(hm.gateways) == 2
|
||||||
hm.exit()
|
hm.exit()
|
||||||
assert not len(hm.gateways)
|
assert not len(hm.gateways)
|
||||||
|
|
||||||
def test_popens_rsync(self, mysetup):
|
def test_popens_rsync(self, hook, mysetup):
|
||||||
source = mysetup.source
|
source = mysetup.source
|
||||||
hm = GatewayManager(["popen"] * 2)
|
hm = GatewayManager(["popen"] * 2, hook)
|
||||||
hm.makegateways()
|
hm.makegateways()
|
||||||
assert len(hm.gateways) == 2
|
assert len(hm.gateways) == 2
|
||||||
for gw in hm.gateways:
|
for gw in hm.gateways:
|
||||||
|
@ -47,9 +57,9 @@ class TestGatewayManagerPopen:
|
||||||
hm.exit()
|
hm.exit()
|
||||||
assert not len(hm.gateways)
|
assert not len(hm.gateways)
|
||||||
|
|
||||||
def test_rsync_popen_with_path(self, mysetup):
|
def test_rsync_popen_with_path(self, hook, mysetup):
|
||||||
source, dest = mysetup.source, mysetup.dest
|
source, dest = mysetup.source, mysetup.dest
|
||||||
hm = GatewayManager(["popen//chdir=%s" %dest] * 1)
|
hm = GatewayManager(["popen//chdir=%s" %dest] * 1, hook)
|
||||||
hm.makegateways()
|
hm.makegateways()
|
||||||
source.ensure("dir1", "dir2", "hello")
|
source.ensure("dir1", "dir2", "hello")
|
||||||
l = []
|
l = []
|
||||||
|
@ -62,21 +72,20 @@ class TestGatewayManagerPopen:
|
||||||
assert dest.join("dir1", "dir2").check()
|
assert dest.join("dir1", "dir2").check()
|
||||||
assert dest.join("dir1", "dir2", 'hello').check()
|
assert dest.join("dir1", "dir2", 'hello').check()
|
||||||
|
|
||||||
def test_hostmanage_rsync_same_popen_twice(self, mysetup, _pytest):
|
def test_rsync_same_popen_twice(self, hook, mysetup, hookrecorder):
|
||||||
source, dest = mysetup.source, mysetup.dest
|
source, dest = mysetup.source, mysetup.dest
|
||||||
rec = _pytest.gethookrecorder(py.execnet._HookSpecs)
|
hm = GatewayManager(["popen//chdir=%s" %dest] * 2, hook)
|
||||||
hm = GatewayManager(["popen//chdir=%s" %dest] * 2)
|
|
||||||
hm.makegateways()
|
hm.makegateways()
|
||||||
source.ensure("dir1", "dir2", "hello")
|
source.ensure("dir1", "dir2", "hello")
|
||||||
hm.rsync(source)
|
hm.rsync(source)
|
||||||
call = rec.popcall("pyexecnet_gwmanage_rsyncstart")
|
call = hookrecorder.popcall("pytest_gwmanage_rsyncstart")
|
||||||
assert call.source == source
|
assert call.source == source
|
||||||
assert len(call.gateways) == 1
|
assert len(call.gateways) == 1
|
||||||
assert hm.gateways[0] == call.gateways[0]
|
assert hm.gateways[0] == call.gateways[0]
|
||||||
call = rec.popcall("pyexecnet_gwmanage_rsyncfinish")
|
call = hookrecorder.popcall("pytest_gwmanage_rsyncfinish")
|
||||||
|
|
||||||
def test_multi_chdir_popen_with_path(self, testdir):
|
def test_multi_chdir_popen_with_path(self, hook, testdir):
|
||||||
hm = GatewayManager(["popen//chdir=hello"] * 2)
|
hm = GatewayManager(["popen//chdir=hello"] * 2, hook)
|
||||||
testdir.tmpdir.chdir()
|
testdir.tmpdir.chdir()
|
||||||
hellopath = testdir.tmpdir.mkdir("hello").realpath()
|
hellopath = testdir.tmpdir.mkdir("hello").realpath()
|
||||||
hm.makegateways()
|
hm.makegateways()
|
||||||
|
@ -96,9 +105,9 @@ class TestGatewayManagerPopen:
|
||||||
assert l[0].startswith(curwd)
|
assert l[0].startswith(curwd)
|
||||||
assert l[0].endswith("world")
|
assert l[0].endswith("world")
|
||||||
|
|
||||||
def test_multi_chdir_popen(self, testdir):
|
def test_multi_chdir_popen(self, testdir, hook):
|
||||||
import os
|
import os
|
||||||
hm = GatewayManager(["popen"] * 2)
|
hm = GatewayManager(["popen"] * 2, hook)
|
||||||
testdir.tmpdir.chdir()
|
testdir.tmpdir.chdir()
|
||||||
hellopath = testdir.tmpdir.mkdir("hello")
|
hellopath = testdir.tmpdir.mkdir("hello")
|
||||||
hm.makegateways()
|
hm.makegateways()
|
|
@ -123,16 +123,16 @@ class TestTerminal:
|
||||||
platform = "xyz"
|
platform = "xyz"
|
||||||
cwd = "qwe"
|
cwd = "qwe"
|
||||||
|
|
||||||
rep.pyexecnet_gwmanage_newgateway(gw1, rinfo)
|
rep.pytest_gwmanage_newgateway(gw1, rinfo)
|
||||||
linecomp.assert_contains_lines([
|
linecomp.assert_contains_lines([
|
||||||
"X1*popen*xyz*2.5*"
|
"X1*popen*xyz*2.5*"
|
||||||
])
|
])
|
||||||
|
|
||||||
rep.pyexecnet_gwmanage_rsyncstart(source="hello", gateways=[gw1, gw2])
|
rep.pytest_gwmanage_rsyncstart(source="hello", gateways=[gw1, gw2])
|
||||||
linecomp.assert_contains_lines([
|
linecomp.assert_contains_lines([
|
||||||
"rsyncstart: hello -> X1, X2"
|
"rsyncstart: hello -> X1, X2"
|
||||||
])
|
])
|
||||||
rep.pyexecnet_gwmanage_rsyncfinish(source="hello", gateways=[gw1, gw2])
|
rep.pytest_gwmanage_rsyncfinish(source="hello", gateways=[gw1, gw2])
|
||||||
linecomp.assert_contains_lines([
|
linecomp.assert_contains_lines([
|
||||||
"rsyncfinish: hello -> X1, X2"
|
"rsyncfinish: hello -> X1, X2"
|
||||||
])
|
])
|
||||||
|
|
Loading…
Reference in New Issue