[svn r63014] streamlining multichannel interface, fixing test work with -n 3
--HG-- branch : trunk
This commit is contained in:
parent
9db14e19e8
commit
c3e5ca560a
py
|
@ -19,7 +19,9 @@ import py
|
||||||
import sys, os
|
import sys, os
|
||||||
from py.__.test.dsession.masterslave import MasterNode
|
from py.__.test.dsession.masterslave import MasterNode
|
||||||
from py.__.test import event
|
from py.__.test import event
|
||||||
|
from py.__.execnet.channel import RemoteError
|
||||||
|
|
||||||
|
NO_ENDMARKER_WANTED = object()
|
||||||
|
|
||||||
class GatewaySpec(object):
|
class GatewaySpec(object):
|
||||||
def __init__(self, spec, defaultjoinpath="pyexecnetcache"):
|
def __init__(self, spec, defaultjoinpath="pyexecnetcache"):
|
||||||
|
@ -81,20 +83,49 @@ class MultiChannel:
|
||||||
def __init__(self, channels):
|
def __init__(self, channels):
|
||||||
self._channels = channels
|
self._channels = channels
|
||||||
|
|
||||||
def receive_items(self):
|
def send_each(self, item):
|
||||||
items = []
|
|
||||||
for ch in self._channels:
|
for ch in self._channels:
|
||||||
items.append((ch, ch.receive()))
|
ch.send(item)
|
||||||
return items
|
|
||||||
|
def receive_each(self, withchannel=False):
|
||||||
|
assert not hasattr(self, '_queue')
|
||||||
|
l = []
|
||||||
|
for ch in self._channels:
|
||||||
|
obj = ch.receive()
|
||||||
|
if withchannel:
|
||||||
|
l.append((ch, obj))
|
||||||
|
else:
|
||||||
|
l.append(obj)
|
||||||
|
return l
|
||||||
|
|
||||||
|
def make_receive_queue(self, endmarker=NO_ENDMARKER_WANTED):
|
||||||
|
try:
|
||||||
|
return self._queue
|
||||||
|
except AttributeError:
|
||||||
|
self._queue = py.std.Queue.Queue()
|
||||||
|
for ch in self._channels:
|
||||||
|
def putreceived(obj, channel=ch):
|
||||||
|
self._queue.put((channel, obj))
|
||||||
|
if endmarker is NO_ENDMARKER_WANTED:
|
||||||
|
ch.setcallback(putreceived)
|
||||||
|
else:
|
||||||
|
ch.setcallback(putreceived, endmarker=endmarker)
|
||||||
|
return self._queue
|
||||||
|
|
||||||
def receive(self):
|
|
||||||
return [x[1] for x in self.receive_items()]
|
|
||||||
|
|
||||||
def waitclose(self):
|
def waitclose(self):
|
||||||
|
first = None
|
||||||
for ch in self._channels:
|
for ch in self._channels:
|
||||||
|
try:
|
||||||
ch.waitclose()
|
ch.waitclose()
|
||||||
|
except ch.RemoteError:
|
||||||
|
if first is None:
|
||||||
|
first = py.std.sys.exc_info()
|
||||||
|
if first:
|
||||||
|
raise first[0], first[1], first[2]
|
||||||
|
|
||||||
class MultiGateway:
|
class MultiGateway:
|
||||||
|
RemoteError = RemoteError
|
||||||
def __init__(self, gateways):
|
def __init__(self, gateways):
|
||||||
self.gateways = gateways
|
self.gateways = gateways
|
||||||
def remote_exec(self, source):
|
def remote_exec(self, source):
|
||||||
|
@ -104,6 +135,8 @@ class MultiGateway:
|
||||||
return MultiChannel(channels)
|
return MultiChannel(channels)
|
||||||
|
|
||||||
class GatewayManager:
|
class GatewayManager:
|
||||||
|
RemoteError = RemoteError
|
||||||
|
|
||||||
def __init__(self, specs):
|
def __init__(self, specs):
|
||||||
self.specs = [GatewaySpec(spec) for spec in specs]
|
self.specs = [GatewaySpec(spec) for spec in specs]
|
||||||
self.gateways = []
|
self.gateways = []
|
||||||
|
@ -118,6 +151,8 @@ class GatewayManager:
|
||||||
self.gateways.append(spec.makegateway())
|
self.gateways.append(spec.makegateway())
|
||||||
|
|
||||||
def getgateways(self, remote=True, inplacelocal=True):
|
def getgateways(self, remote=True, inplacelocal=True):
|
||||||
|
if not self.gateways and self.specs:
|
||||||
|
self.makegateways()
|
||||||
l = []
|
l = []
|
||||||
for gw in self.gateways:
|
for gw in self.gateways:
|
||||||
if gw.spec.inplacelocal():
|
if gw.spec.inplacelocal():
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
"""
|
"""
|
||||||
tests for
|
tests for
|
||||||
- host specifications
|
- gateway specifications
|
||||||
- managing hosts
|
- multi channels and multi gateways
|
||||||
|
- gateway management
|
||||||
- manage rsyncing of hosts
|
- manage rsyncing of hosts
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
@ -25,10 +26,6 @@ class TestGatewaySpec:
|
||||||
assert spec.type == "popen"
|
assert spec.type == "popen"
|
||||||
spec2 = GatewaySpec("popen" + joinpath)
|
spec2 = GatewaySpec("popen" + joinpath)
|
||||||
self._equality(spec, spec2)
|
self._equality(spec, spec2)
|
||||||
if joinpath == "":
|
|
||||||
assert spec.inplacelocal()
|
|
||||||
else:
|
|
||||||
assert not spec.inplacelocal()
|
|
||||||
|
|
||||||
def test_ssh(self):
|
def test_ssh(self):
|
||||||
for prefix in ('ssh:', ''): # ssh is default
|
for prefix in ('ssh:', ''): # ssh is default
|
||||||
|
@ -44,7 +41,6 @@ class TestGatewaySpec:
|
||||||
assert spec.type == "ssh"
|
assert spec.type == "ssh"
|
||||||
spec2 = GatewaySpec(specstring)
|
spec2 = GatewaySpec(specstring)
|
||||||
self._equality(spec, spec2)
|
self._equality(spec, spec2)
|
||||||
assert not spec.inplacelocal()
|
|
||||||
|
|
||||||
def test_socket(self):
|
def test_socket(self):
|
||||||
for hostpart in ('x.y', 'x', 'popen'):
|
for hostpart in ('x.y', 'x', 'popen'):
|
||||||
|
@ -59,7 +55,6 @@ class TestGatewaySpec:
|
||||||
assert spec.type == "socket"
|
assert spec.type == "socket"
|
||||||
spec2 = GatewaySpec("socket:" + hostpart + port + joinpath)
|
spec2 = GatewaySpec("socket:" + hostpart + port + joinpath)
|
||||||
self._equality(spec, spec2)
|
self._equality(spec, spec2)
|
||||||
assert not spec.inplacelocal()
|
|
||||||
|
|
||||||
def _equality(self, spec1, spec2):
|
def _equality(self, spec1, spec2):
|
||||||
assert spec1 != spec2
|
assert spec1 != spec2
|
||||||
|
@ -155,16 +150,13 @@ class TestGatewayManagerPopen:
|
||||||
testdir.tmpdir.chdir()
|
testdir.tmpdir.chdir()
|
||||||
hellopath = testdir.tmpdir.mkdir("hello")
|
hellopath = testdir.tmpdir.mkdir("hello")
|
||||||
hm.makegateways()
|
hm.makegateways()
|
||||||
l = [x[1] for x in hm.multi_exec(
|
l = hm.multi_exec("import os ; channel.send(os.getcwd())").receive_each()
|
||||||
"import os ; channel.send(os.getcwd())"
|
|
||||||
).receive_items()
|
|
||||||
]
|
|
||||||
paths = [x[1] for x in l]
|
paths = [x[1] for x in l]
|
||||||
assert l == [str(hellopath)] * 2
|
assert l == [str(hellopath)] * 2
|
||||||
py.test.raises(Exception, 'hm.multi_chdir("world", inplacelocal=False)')
|
py.test.raises(hm.RemoteError, 'hm.multi_chdir("world", inplacelocal=False)')
|
||||||
worldpath = hellopath.mkdir("world")
|
worldpath = hellopath.mkdir("world")
|
||||||
hm.multi_chdir("world", inplacelocal=False)
|
hm.multi_chdir("world", inplacelocal=False)
|
||||||
l = hm.multi_exec("import os ; channel.send(os.getcwd())").receive()
|
l = hm.multi_exec("import os ; channel.send(os.getcwd())").receive_each()
|
||||||
assert len(l) == 2
|
assert len(l) == 2
|
||||||
assert l[0] == l[1]
|
assert l[0] == l[1]
|
||||||
curwd = os.getcwd()
|
curwd = os.getcwd()
|
||||||
|
@ -178,12 +170,12 @@ class TestGatewayManagerPopen:
|
||||||
hellopath = testdir.tmpdir.mkdir("hello")
|
hellopath = testdir.tmpdir.mkdir("hello")
|
||||||
hm.makegateways()
|
hm.makegateways()
|
||||||
hm.multi_chdir("hello", inplacelocal=False)
|
hm.multi_chdir("hello", inplacelocal=False)
|
||||||
l = hm.multi_exec("import os ; channel.send(os.getcwd())").receive()
|
l = hm.multi_exec("import os ; channel.send(os.getcwd())").receive_each()
|
||||||
assert len(l) == 2
|
assert len(l) == 2
|
||||||
assert l == [os.getcwd()] * 2
|
assert l == [os.getcwd()] * 2
|
||||||
|
|
||||||
hm.multi_chdir("hello")
|
hm.multi_chdir("hello")
|
||||||
l = hm.multi_exec("import os ; channel.send(os.getcwd())").receive()
|
l = hm.multi_exec("import os ; channel.send(os.getcwd())").receive_each()
|
||||||
assert len(l) == 2
|
assert len(l) == 2
|
||||||
assert l[0] == l[1]
|
assert l[0] == l[1]
|
||||||
curwd = os.getcwd()
|
curwd = os.getcwd()
|
||||||
|
@ -192,7 +184,7 @@ class TestGatewayManagerPopen:
|
||||||
|
|
||||||
from py.__.execnet.gwmanage import MultiChannel
|
from py.__.execnet.gwmanage import MultiChannel
|
||||||
class TestMultiChannel:
|
class TestMultiChannel:
|
||||||
def test_multichannel_receive_items(self):
|
def test_multichannel_receive_each(self):
|
||||||
class pseudochannel:
|
class pseudochannel:
|
||||||
def receive(self):
|
def receive(self):
|
||||||
return 12
|
return 12
|
||||||
|
@ -200,9 +192,35 @@ class TestMultiChannel:
|
||||||
pc1 = pseudochannel()
|
pc1 = pseudochannel()
|
||||||
pc2 = pseudochannel()
|
pc2 = pseudochannel()
|
||||||
multichannel = MultiChannel([pc1, pc2])
|
multichannel = MultiChannel([pc1, pc2])
|
||||||
l = multichannel.receive_items()
|
l = multichannel.receive_each(withchannel=True)
|
||||||
assert len(l) == 2
|
assert len(l) == 2
|
||||||
assert l == [(pc1, 12), (pc2, 12)]
|
assert l == [(pc1, 12), (pc2, 12)]
|
||||||
|
l = multichannel.receive_each(withchannel=False)
|
||||||
|
assert l == [12,12]
|
||||||
|
|
||||||
|
def test_multichannel_send_each(self):
|
||||||
|
gm = GatewayManager(['popen'] * 2)
|
||||||
|
mc = gm.multi_exec("""
|
||||||
|
import os
|
||||||
|
channel.send(channel.receive() + 1)
|
||||||
|
""")
|
||||||
|
mc.send_each(41)
|
||||||
|
l = mc.receive_each()
|
||||||
|
assert l == [42,42]
|
||||||
|
|
||||||
|
def test_multichannel_receive_queue(self):
|
||||||
|
gm = GatewayManager(['popen'] * 2)
|
||||||
|
mc = gm.multi_exec("""
|
||||||
|
import os
|
||||||
|
channel.send(os.getpid())
|
||||||
|
""")
|
||||||
|
queue = mc.make_receive_queue()
|
||||||
|
ch, item = queue.get(timeout=0.5)
|
||||||
|
ch2, item2 = queue.get(timeout=0.5)
|
||||||
|
assert ch != ch2
|
||||||
|
assert ch.gateway != ch2.gateway
|
||||||
|
assert item != item2
|
||||||
|
mc.waitclose()
|
||||||
|
|
||||||
def test_multichannel_waitclose(self):
|
def test_multichannel_waitclose(self):
|
||||||
l = []
|
l = []
|
||||||
|
|
|
@ -51,7 +51,7 @@ class HostManager(object):
|
||||||
for ch, result in self.gwmanager.multi_exec("""
|
for ch, result in self.gwmanager.multi_exec("""
|
||||||
import sys, os
|
import sys, os
|
||||||
channel.send((sys.executable, os.getcwd(), sys.path))
|
channel.send((sys.executable, os.getcwd(), sys.path))
|
||||||
""").receive_items():
|
""").receive_each(withchannel=True):
|
||||||
self.trace("spec %r, execuable %r, cwd %r, syspath %r" %(
|
self.trace("spec %r, execuable %r, cwd %r, syspath %r" %(
|
||||||
ch.gateway.spec, result[0], result[1], result[2]))
|
ch.gateway.spec, result[0], result[1], result[2]))
|
||||||
|
|
||||||
|
|
|
@ -39,7 +39,7 @@ class TestHostManager:
|
||||||
hm = HostManager(config, hosts=["popen:%s" % dest])
|
hm = HostManager(config, hosts=["popen:%s" % dest])
|
||||||
assert hm.config.topdir == source == config.topdir
|
assert hm.config.topdir == source == config.topdir
|
||||||
hm.rsync_roots()
|
hm.rsync_roots()
|
||||||
p, = hm.gwmanager.multi_exec("import os ; channel.send(os.getcwd())").receive()
|
p, = hm.gwmanager.multi_exec("import os ; channel.send(os.getcwd())").receive_each()
|
||||||
p = py.path.local(p)
|
p = py.path.local(p)
|
||||||
print "remote curdir", p
|
print "remote curdir", p
|
||||||
assert p == dest.join(config.topdir.basename)
|
assert p == dest.join(config.topdir.basename)
|
||||||
|
|
Loading…
Reference in New Issue