[svn r63089] factor out MultiChannel and MultiGateway helpers

--HG--
branch : trunk
This commit is contained in:
hpk 2009-03-19 18:05:41 +01:00
parent 92b2d4786d
commit 6f2cca80ae
3 changed files with 73 additions and 63 deletions

View File

@ -27,8 +27,8 @@ version = "1.0.0a1"
initpkg(__name__,
description = "pylib and py.test: agile development and test support library",
revision = int('$LastChangedRevision: 63027 $'.split(':')[1][:-1]),
lastchangedate = '$LastChangedDate: 2009-03-18 12:18:39 +0100 (Wed, 18 Mar 2009) $',
revision = int('$LastChangedRevision: 63089 $'.split(':')[1][:-1]),
lastchangedate = '$LastChangedDate: 2009-03-19 18:05:41 +0100 (Thu, 19 Mar 2009) $',
version = version,
url = "http://pylib.org",
download_url = "http://codespeak.net/py/0.9.2/download.html",
@ -152,8 +152,8 @@ initpkg(__name__,
'execnet.PopenGateway' : ('./execnet/register.py', 'PopenGateway'),
'execnet.SshGateway' : ('./execnet/register.py', 'SshGateway'),
'execnet.GatewaySpec' : ('./execnet/gwmanage.py', 'GatewaySpec'),
'execnet.MultiGateway' : ('./execnet/gwmanage.py', 'MultiGateway'),
'execnet.MultiChannel' : ('./execnet/gwmanage.py', 'MultiChannel'),
'execnet.MultiGateway' : ('./execnet/multi.py', 'MultiGateway'),
'execnet.MultiChannel' : ('./execnet/multi.py', 'MultiChannel'),
# execnet scripts
'execnet.RSync' : ('./execnet/rsync.py', 'RSync'),

View File

@ -94,64 +94,6 @@ class GatewaySpec(object):
gw.spec = self
return gw
class MultiChannel:
def __init__(self, channels):
self._channels = channels
def send_each(self, item):
for ch in self._channels:
ch.send(item)
def receive_each(self, withchannel=False):
assert not hasattr(self, '_queue')
l = []
for ch in self._channels:
obj = ch.receive()
if withchannel:
l.append((ch, obj))
else:
l.append(obj)
return l
def make_receive_queue(self, endmarker=NO_ENDMARKER_WANTED):
try:
return self._queue
except AttributeError:
self._queue = py.std.Queue.Queue()
for ch in self._channels:
def putreceived(obj, channel=ch):
self._queue.put((channel, obj))
if endmarker is NO_ENDMARKER_WANTED:
ch.setcallback(putreceived)
else:
ch.setcallback(putreceived, endmarker=endmarker)
return self._queue
def waitclose(self):
first = None
for ch in self._channels:
try:
ch.waitclose()
except ch.RemoteError:
if first is None:
first = py.std.sys.exc_info()
if first:
raise first[0], first[1], first[2]
class MultiGateway:
RemoteError = RemoteError
def __init__(self, gateways):
self.gateways = gateways
def remote_exec(self, source):
channels = []
for gw in self.gateways:
channels.append(gw.remote_exec(source))
return MultiChannel(channels)
def exit(self):
for gw in self.gateways:
gw.exit()
class GatewayManager:
RemoteError = RemoteError
@ -179,7 +121,7 @@ class GatewayManager:
else:
if remote:
l.append(gw)
return MultiGateway(gateways=l)
return py.execnet.MultiGateway(gateways=l)
def multi_exec(self, source, inplacelocal=True):
""" remote execute code on all gateways.

68
py/execnet/multi.py Normal file
View File

@ -0,0 +1,68 @@
"""
Working with multiple channels and gateways
"""
import py
from py.__.execnet.channel import RemoteError
NO_ENDMARKER_WANTED = object()
class MultiGateway:
RemoteError = RemoteError
def __init__(self, gateways):
self.gateways = gateways
def remote_exec(self, source):
channels = []
for gw in self.gateways:
channels.append(gw.remote_exec(source))
return MultiChannel(channels)
def exit(self):
for gw in self.gateways:
gw.exit()
class MultiChannel:
def __init__(self, channels):
self._channels = channels
def send_each(self, item):
for ch in self._channels:
ch.send(item)
def receive_each(self, withchannel=False):
assert not hasattr(self, '_queue')
l = []
for ch in self._channels:
obj = ch.receive()
if withchannel:
l.append((ch, obj))
else:
l.append(obj)
return l
def make_receive_queue(self, endmarker=NO_ENDMARKER_WANTED):
try:
return self._queue
except AttributeError:
self._queue = py.std.Queue.Queue()
for ch in self._channels:
def putreceived(obj, channel=ch):
self._queue.put((channel, obj))
if endmarker is NO_ENDMARKER_WANTED:
ch.setcallback(putreceived)
else:
ch.setcallback(putreceived, endmarker=endmarker)
return self._queue
def waitclose(self):
first = None
for ch in self._channels:
try:
ch.waitclose()
except ch.RemoteError:
if first is None:
first = py.std.sys.exc_info()
if first:
raise first[0], first[1], first[2]