diff --git a/py/execnet/gateway.py b/py/execnet/gateway.py index 6e70387fb..c978ffdef 100644 --- a/py/execnet/gateway.py +++ b/py/execnet/gateway.py @@ -1,22 +1,15 @@ +""" +gateway code for initiating popen, socket and ssh connections. +(c) 2004-2009, Holger Krekel and others +""" + import sys, os, inspect, socket, atexit, weakref import py -from py.__.execnet.gateway_base import BaseGateway, Message, Popen2IO, SocketIO -from py.__.execnet.gateway_base import ExecnetAPI - -# the list of modules that must be send to the other side -# for bootstrapping gateways -# XXX we'd like to have a leaner and meaner bootstrap mechanism - -startup_modules = [ - 'py.__.execnet.gateway_base', -] +from py.__.execnet.gateway_base import Message, Popen2IO, SocketIO +from py.__.execnet import gateway_base debug = False -def getsource(dottedname): - mod = __import__(dottedname, None, None, ['__doc__']) - return inspect.getsource(mod) - class GatewayCleanup: def __init__(self): self._activegateways = weakref.WeakKeyDictionary() @@ -37,15 +30,22 @@ class GatewayCleanup: gw.exit() #gw.join() # should work as well +class ExecnetAPI: + def pyexecnet_gateway_init(self, gateway): + """ signal initialisation of new gateway. """ + def pyexecnet_gateway_exit(self, gateway): + """ signal exitting of gateway. """ -class InitiatingGateway(BaseGateway): +class InitiatingGateway(gateway_base.BaseGateway): """ initialize gateways on both sides of a inputoutput object. """ + # XXX put the next two global variables into an Execnet object + # which intiaties gateways and passes in appropriate values. _cleanup = GatewayCleanup() + hook = ExecnetAPI() def __init__(self, io): self._remote_bootstrap_gateway(io) super(InitiatingGateway, self).__init__(io=io, _startcount=1) - # XXX we dissallow execution form the other side self._initreceive() self.hook = py._com.HookRelay(ExecnetAPI, py._com.comregistry) self.hook.pyexecnet_gateway_init(gateway=self) @@ -87,10 +87,10 @@ class InitiatingGateway(BaseGateway): uniquely identify channels across both sides. """ bootstrap = [extra] - bootstrap += [getsource(x) for x in startup_modules] + bootstrap += [inspect.getsource(gateway_base)] bootstrap += [io.server_stmt, "io.write('1'.encode('ascii'))", - "BaseGateway(io=io, _startcount=2)._servemain()", + "SlaveGateway(io=io, _startcount=2).serve()", ] source = "\n".join(bootstrap) self._trace("sending gateway bootstrap code") @@ -136,7 +136,7 @@ class InitiatingGateway(BaseGateway): execpool.shutdown() execpool.join() raise gw._StopExecLoop - execpool.dispatch(gw._executetask, task) + execpool.dispatch(gw.executetask, task) """ % num) self._remotechannelthread = self.remote_exec(source) diff --git a/py/execnet/gateway_base.py b/py/execnet/gateway_base.py index af182f9af..0125412e5 100644 --- a/py/execnet/gateway_base.py +++ b/py/execnet/gateway_base.py @@ -1,15 +1,37 @@ """ -base gateway code +base execnet gateway code, a quick overview. -# note that the whole code of this module (as well as some -# other modules) execute not only on the local side but -# also on any gateway's remote side. On such remote sides -# we cannot assume the py library to be there and -# InstallableGateway._remote_bootstrap_gateway() (located -# in register.py) will take care to send source fragments -# to the other side. Yes, it is fragile but we have a -# few tests that try to catch when we mess up. +the code of this module is sent to the "other side" +as a means of bootstrapping a Gateway object +capable of receiving and executing code, +and routing data through channels. +Gateways operate on InputOutput objects offering +a write and a read(n) method. + +Once bootstrapped a higher level protocol +based on Messages is used. Messages are serialized +to and from InputOutput objects. The details of this protocol +are locally defined in this module. There is no need +for standardizing or versioning the protocol. + +After bootstrapping the BaseGateway opens a receiver thread which +accepts encoded messages and triggers actions to interpret them. +Sending of channel data items happens directly through +write operations to InputOutput objects so there is no +separate thread. + +Code execution messages are put into an execqueue from +which they will be taken for execution. gateway.serve() +will take and execute such items, one by one. This means +that by incoming default execution is single-threaded. + +The receiver thread terminates if the remote side sends +a gateway termination message or if the IO-connection drops. +It puts an end symbol into the execqueue so +that serve() can cleanly finish as well. + +(C) 2004-2009 Holger Krekel, Armin Rigo and others """ import sys, os, weakref import threading, traceback, socket, struct @@ -89,7 +111,6 @@ class SocketIO: class Popen2IO: server_stmt = """ import os, sys, tempfile -#io = Popen2IO(os.fdopen(1, 'wb'), os.fdopen(0, 'rb')) io = Popen2IO(sys.stdout, sys.stdin) sys.stdout = tempfile.TemporaryFile('w') sys.stdin = tempfile.TemporaryFile('r') @@ -97,37 +118,36 @@ sys.stdin = tempfile.TemporaryFile('r') error = (IOError, OSError, EOFError) def __init__(self, outfile, infile): + # we need raw byte streams + if hasattr(infile, 'buffer'): + infile = infile.buffer + if hasattr(outfile, 'buffer'): + outfile = outfile.buffer self.outfile, self.infile = outfile, infile if sys.platform == "win32": import msvcrt msvcrt.setmode(infile.fileno(), os.O_BINARY) msvcrt.setmode(outfile.fileno(), os.O_BINARY) - self.readable = self.writeable = True def read(self, numbytes): - """Read exactly 'bytes' bytes from the pipe. """ - infile = self.infile - if hasattr(infile, 'buffer'): - infile = infile.buffer - data = infile.read(numbytes) + """Read exactly 'numbytes' bytes from the pipe. """ + data = self.infile.read(numbytes) if len(data) < numbytes: raise EOFError return data def write(self, data): - """write out all bytes to the pipe. """ + """write out all data bytes. """ assert isinstance(data, bytes) - outfile = self.outfile - if hasattr(outfile, 'buffer'): - outfile = outfile.buffer - outfile.write(data) - outfile.flush() + self.outfile.write(data) + self.outfile.flush() def close_read(self): if self.readable: self.infile.close() self.readable = None + def close_write(self): try: self.outfile.close() @@ -201,7 +221,6 @@ class Message: return "" %(self.__class__.__name__, self.channelid, self.data) - def _setupmessages(): class CHANNEL_OPEN(Message): def received(self, gateway): @@ -234,7 +253,7 @@ def _setupmessages(): classes = [CHANNEL_OPEN, CHANNEL_NEW, CHANNEL_DATA, CHANNEL_CLOSE, CHANNEL_CLOSE_ERROR, CHANNEL_LAST_MESSAGE] - + for i, cls in enumerate(classes): Message._types[i] = cls cls.msgtype = i @@ -359,9 +378,9 @@ class Channel(object): def makefile(self, mode='w', proxyclose=False): """ return a file-like object. - mode: 'w' for binary writes, 'r' for binary reads - proxyclose: set to true if you want to have a - subsequent file.close() automatically close the channel. + mode: 'w' for writes, 'r' for reads + proxyclose: if true file.close() will + trigger a channel.close() call. """ if mode == "w": return ChannelFileWrite(channel=self, proxyclose=proxyclose) @@ -581,18 +600,8 @@ class ChannelFileRead(ChannelFile): break line += c return line - - - -class ExecnetAPI: - def pyexecnet_gateway_init(self, gateway): - """ signal initialisation of new gateway. """ - def pyexecnet_gateway_exit(self, gateway): - """ signal exitting of gateway. """ - class BaseGateway(object): - hook = ExecnetAPI() exc_info = sys.exc_info class _StopExecLoop(Exception): @@ -671,18 +680,37 @@ class BaseGateway(object): self._send(None) def _stopexec(self): - if hasattr(self, '_execqueue'): - self._execqueue.put(None) + pass def _local_schedulexec(self, channel, sourcetask): - if hasattr(self, '_execqueue'): - self._execqueue.put((channel, sourcetask)) - else: - # we will not execute, let's send back an error - # to inform the other side - channel.close("execution disallowed") + channel.close("execution disallowed") - def _servemain(self, joining=True): + # _____________________________________________________________________ + # + # High Level Interface + # _____________________________________________________________________ + # + def newchannel(self): + """ return new channel object. """ + return self._channelfactory.new() + + def join(self, joinexec=True): + """ Wait for all IO (and by default all execution activity) + to stop. the joinexec parameter is obsolete. + """ + current = threading.currentThread() + if self._receiverthread.isAlive(): + self._trace("joining receiver thread") + self._receiverthread.join() + +class SlaveGateway(BaseGateway): + def _stopexec(self): + self._execqueue.put(None) + + def _local_schedulexec(self, channel, sourcetask): + self._execqueue.put((channel, sourcetask)) + + def serve(self, joining=True): self._execqueue = queue.Queue() self._initreceive() try: @@ -692,15 +720,15 @@ class BaseGateway(object): self._stopsend() break try: - self._executetask(item) + self.executetask(item) except self._StopExecLoop: break finally: - self._trace("_servemain finished") + self._trace("serve") if joining: self.join() - def _executetask(self, item): + def executetask(self, item): """ execute channel/source items. """ channel, source = item try: @@ -725,22 +753,3 @@ class BaseGateway(object): else: channel.close() - # _____________________________________________________________________ - # - # High Level Interface - # _____________________________________________________________________ - # - - def newchannel(self): - """ return new channel object. """ - return self._channelfactory.new() - - def join(self, joinexec=True): - """ Wait for all IO (and by default all execution activity) - to stop. the joinexec parameter is obsolete. - """ - current = threading.currentThread() - if self._receiverthread.isAlive(): - self._trace("joining receiver thread") - self._receiverthread.join() - diff --git a/py/execnet/multi.py b/py/execnet/multi.py index e1a9857cb..0c8009136 100644 --- a/py/execnet/multi.py +++ b/py/execnet/multi.py @@ -1,5 +1,7 @@ """ - Support for working with multiple channels and gateways +Support for working with multiple channels and gateways + +(c) 2008-2009, Holger Krekel and others """ import py try: diff --git a/py/execnet/rsync.py b/py/execnet/rsync.py index 0892f4cec..f2fceb2af 100644 --- a/py/execnet/rsync.py +++ b/py/execnet/rsync.py @@ -1,3 +1,8 @@ +""" +1:N rsync implemenation on top of execnet. + +(c) 2006-2009, Armin Rigo, Holger Krekel, Maciej Fijalkowski +""" import py, os, stat md5 = py.builtin._tryimport('hashlib', 'md5').md5 diff --git a/py/execnet/rsync_remote.py b/py/execnet/rsync_remote.py index ea08f8150..945cb8204 100644 --- a/py/execnet/rsync_remote.py +++ b/py/execnet/rsync_remote.py @@ -1,4 +1,3 @@ - def f(): import os, stat, shutil try: diff --git a/py/execnet/xspec.py b/py/execnet/xspec.py index 3831e1077..da741de16 100644 --- a/py/execnet/xspec.py +++ b/py/execnet/xspec.py @@ -1,4 +1,6 @@ - +""" +(c) 2008-2009, holger krekel +""" import py class XSpec: @@ -9,7 +11,7 @@ class XSpec: * keys are not allowed to start with underscore * if no "=value" is given, assume a boolean True value """ - # XXX for now we are very restrictive about actually allowed key-values + # XXX allow customization, for only allow specific key names popen = ssh = socket = python = chdir = nice = None def __init__(self, string): diff --git a/testing/execnet/test_basics.py b/testing/execnet/test_basics.py index a87165bed..edba7cbdd 100644 --- a/testing/execnet/test_basics.py +++ b/testing/execnet/test_basics.py @@ -143,23 +143,6 @@ def test_geterrortext(anypython, tmpdir): print (out) assert "all passed" in out -def test_getsource_import_modules(): - for dottedname in gateway.startup_modules: - yield gateway.getsource, dottedname - -def test_getsource_no_colision(): - seen = {} - for dottedname in gateway.startup_modules: - mod = __import__(dottedname, None, None, ['__doc__']) - for name, value in vars(mod).items(): - if py.std.inspect.isclass(value): - if name in seen: - olddottedname, oldval = seen[name] - if oldval is not value: - py.test.fail("duplicate class %r in %s and %s" % - (name, dottedname, olddottedname)) - seen[name] = (dottedname, value) - def test_stdouterrin_setnull(): cap = py.io.StdCaptureFD() from py.__.execnet.gateway import stdouterrin_setnull diff --git a/testing/execnet/test_gateway.py b/testing/execnet/test_gateway.py index e62cdb5dc..90ad29926 100644 --- a/testing/execnet/test_gateway.py +++ b/testing/execnet/test_gateway.py @@ -449,7 +449,7 @@ class TestPopenGateway: assert rinfo.version_info == py.std.sys.version_info def test_gateway_init_event(self, _pytest): - rec = _pytest.gethookrecorder(gateway_base.ExecnetAPI) + rec = _pytest.gethookrecorder(gateway.ExecnetAPI) gw = py.execnet.PopenGateway() call = rec.popcall("pyexecnet_gateway_init") assert call.gateway == gw