From 59264d57aeba65640317fbb5fe00b56c85999fb8 Mon Sep 17 00:00:00 2001 From: hpk Date: Fri, 2 Feb 2007 20:57:47 +0100 Subject: [PATCH] [svn r37822] added lots of docstrings, general cleanup --HG-- branch : trunk --- py/execnet/gateway.py | 55 ++++------- py/execnet/register.py | 146 +++++++++-------------------- py/execnet/testing/test_gateway.py | 11 ++- 3 files changed, 70 insertions(+), 142 deletions(-) diff --git a/py/execnet/gateway.py b/py/execnet/gateway.py index ac6b11041..b4d9f2ca1 100644 --- a/py/execnet/gateway.py +++ b/py/execnet/gateway.py @@ -33,23 +33,25 @@ sysex = (KeyboardInterrupt, SystemExit) class Gateway(object): _ThreadOut = ThreadOut remoteaddress = "" - - def __init__(self, io, startcount=2, maxthreads=None): + def __init__(self, io, execthreads=None, _startcount=2): + """ initialize core gateway, using the given + inputoutput object and 'execthreads' execution + threads. + """ global registered_cleanup - self._execpool = WorkerPool(maxthreads=maxthreads) -## self.running = True + self._execpool = WorkerPool(maxthreads=execthreads) self._io = io self._outgoing = Queue.Queue() - self._channelfactory = ChannelFactory(self, startcount) -## self._exitlock = threading.Lock() + self._channelfactory = ChannelFactory(self, _startcount) if not registered_cleanup: atexit.register(cleanup_atexit) registered_cleanup = True _active_sendqueues[self._outgoing] = True self._pool = NamedThreadPool(receiver = self._thread_receiver, - sender = self._thread_sender) + sender = self._thread_sender) def __repr__(self): + """ return string representing gateway type and status. """ addr = self.remoteaddress if addr: addr = '[%s]' % (addr,) @@ -199,13 +201,12 @@ class Gateway(object): return self._channelfactory.new() def remote_exec(self, source, stdout=None, stderr=None): - """ return channel object for communicating with the asynchronously - executing 'source' code which will have a corresponding 'channel' - object in its executing namespace. - - You may provide callback functions 'stdout' and 'stderr' - which will get called with the remote stdout/stderr output - piece by piece. + """ return channel object and connect it to a remote + execution thread where the given 'source' executes + and has the sister 'channel' object in its global + namespace. The callback functions 'stdout' and + 'stderr' get called on receival of remote + stdout/stderr output strings. """ try: source = str(Source(source)) @@ -252,29 +253,8 @@ class Gateway(object): c.waitclose(1.0) return Handle() -## def exit(self): -## """ initiate full gateway teardown. -## Note that the teardown of sender/receiver threads happens -## asynchronously and timeouts on stopping worker execution -## threads are ignored. You can issue join() or join(joinexec=False) -## if you want to wait for a full teardown (possibly excluding -## execution threads). -## """ -## # note that threads may still be scheduled to start -## # during our execution! -## self._exitlock.acquire() -## try: -## if self.running: -## self.running = False -## if not self._pool.getstarted('sender'): -## raise IOError("sender thread not alive anymore!") -## self._outgoing.put(None) -## self._trace("exit procedure triggered, pid %d " % (os.getpid(),)) -## _gateways.remove(self) -## finally: -## self._exitlock.release() - def exit(self): + """ Try to stop all IO activity. """ try: del _active_sendqueues[self._outgoing] except KeyError: @@ -283,6 +263,9 @@ class Gateway(object): self._outgoing.put(None) def join(self, joinexec=True): + """ Wait for all IO (and by default all execution activity) + to stop. + """ current = threading.currentThread() for x in self._pool.getstarted(): if x != current: diff --git a/py/execnet/register.py b/py/execnet/register.py index 040d72e6e..890cbca0e 100644 --- a/py/execnet/register.py +++ b/py/execnet/register.py @@ -7,9 +7,7 @@ import py # the list of modules that must be send to the other side # for bootstrapping gateways -# XXX we want to have a cleaner bootstrap mechanism -# by making sure early that we have the py lib available -# in a sufficient version +# XXX we'd like to have a leaner and meaner bootstrap mechanism startup_modules = [ 'py.__.thread.io', @@ -30,7 +28,7 @@ class InstallableGateway(gateway.Gateway): """ initialize gateways on both sides of a inputoutput object. """ def __init__(self, io): self._remote_bootstrap_gateway(io) - super(InstallableGateway, self).__init__(io=io, startcount=1) + super(InstallableGateway, self).__init__(io=io, _startcount=1) def _remote_bootstrap_gateway(self, io, extra=''): """ return Gateway with a asynchronously remotely @@ -42,7 +40,9 @@ class InstallableGateway(gateway.Gateway): """ bootstrap = [extra] bootstrap += [getsource(x) for x in startup_modules] - bootstrap += [io.server_stmt, "Gateway(io=io, startcount=2).join(joinexec=False)",] + bootstrap += [io.server_stmt, + "Gateway(io=io, _startcount=2).join(joinexec=False)", + ] source = "\n".join(bootstrap) self._trace("sending gateway bootstrap code") io.write('%r\n' % source) @@ -52,45 +52,20 @@ class PopenCmdGateway(InstallableGateway): infile, outfile = os.popen2(cmd) io = inputoutput.Popen2IO(infile, outfile) super(PopenCmdGateway, self).__init__(io=io) -## self._pidchannel = self.remote_exec(""" -## import os -## channel.send(os.getpid()) -## """) - -## def exit(self): -## try: -## self._pidchannel.waitclose(timeout=0.5) -## pid = self._pidchannel.receive() -## except IOError: -## self._trace("IOError: could not receive child PID:") -## self._traceex(sys.exc_info()) -## pid = None -## super(PopenCmdGateway, self).exit() -## if pid is not None: -## self._trace("waiting for pid %s" % pid) -## try: -## os.waitpid(pid, 0) -## except KeyboardInterrupt: -## if sys.platform != "win32": -## os.kill(pid, 15) -## raise -## except OSError, e: -## self._trace("child process %s already dead? error:%s" % -## (pid, str(e))) class PopenGateway(PopenCmdGateway): - # use sysfind/sysexec/subprocess instead of os.popen? + """ This Gateway provides interaction with a newly started + python subprocess. + """ def __init__(self, python=sys.executable): + """ instantiate a gateway to a subprocess + started with the given 'python' executable. + """ cmd = '%s -u -c "exec input()"' % python super(PopenGateway, self).__init__(cmd) def _remote_bootstrap_gateway(self, io, extra=''): - # XXX the following hack helps us to import the same version - # of the py lib and other dependcies, but only works for - # PopenGateways because we can assume to have access to - # the same filesystem - # --> we definitely need proper remote imports working - # across any kind of gateway! + # have the subprocess use the same PYTHONPATH and py lib x = py.path.local(py.__file__).dirpath().dirpath() ppath = os.environ.get('PYTHONPATH', '') plist = [str(x)] + ppath.split(':') @@ -98,48 +73,66 @@ class PopenGateway(PopenCmdGateway): "import sys ; sys.path[:0] = %r" % (plist,), "import os ; os.environ['PYTHONPATH'] = %r" % ppath, str(py.code.Source(stdouterrin_setnull)), - "stdouterrin_setnull()", + "stdouterrin_setnull()", "" ]) super(PopenGateway, self)._remote_bootstrap_gateway(io, s) class SocketGateway(InstallableGateway): + """ This Gateway provides interaction with a remote process + by connecting to a specified socket. On the remote + side you need to manually start a small script + (py/execnet/script/socketserver.py) that accepts + SocketGateway connections. + """ def __init__(self, host, port): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + """ instantiate a gateway to a processed accessed + via a host/port specified socket. + """ self.host = host = str(host) self.port = port = int(port) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((host, port)) io = inputoutput.SocketIO(sock) super(SocketGateway, self).__init__(io=io) self.remoteaddress = '%s:%d' % (self.host, self.port) - def remote_install(cls, gateway, hostport=None): - """ return a connected socket gateway through the - given gateway. + def new_remote(cls, gateway, hostport=None): + """ return a new (connected) socket gateway, instatiated + indirectly through the given 'gateway'. """ if hostport is None: host, port = ('', 0) # XXX works on all platforms? else: host, port = hostport socketserverbootstrap = py.code.Source( - mypath.dirpath('script', 'socketserver.py').read('rU'), - """ + mypath.dirpath('script', 'socketserver.py').read('rU'), """ import socket sock = bind_and_listen((%r, %r)) port = sock.getsockname() channel.send(port) startserver(sock) - """ % (host, port)) + """ % (host, port) + ) # execute the above socketserverbootstrap on the other side channel = gateway.remote_exec(socketserverbootstrap) (realhost, realport) = channel.receive() - #gateway._trace("remote_install received" - # "port=%r, hostname = %r" %(realport, hostname)) + #gateway._trace("new_remote received" + # "port=%r, hostname = %r" %(realport, hostname)) return py.execnet.SocketGateway(host, realport) - remote_install = classmethod(remote_install) + new_remote = classmethod(new_remote) + class SshGateway(PopenCmdGateway): + """ This Gateway provides interaction with a remote process, + established via the 'ssh' command line binary. + The remote side needs to have a Python interpreter executable. + """ def __init__(self, sshaddress, remotepython='python', identity=None): + """ instantiate a remote ssh process with the + given 'sshaddress' and remotepython version. + you may specify an 'identity' filepath. + """ self.remoteaddress = sshaddress remotecmd = '%s -u -c "exec input()"' % (remotepython,) cmdline = [sshaddress, remotecmd] @@ -160,8 +153,11 @@ class SshGateway(PopenCmdGateway): ]) super(SshGateway, self)._remote_bootstrap_gateway(io, extra) + def stdouterrin_setnull(): - # redirect file descriptors 0 and 1 to /dev/null, to avoid + """ redirect file descriptors 0 and 1 (and possibly 2) to /dev/null. + note that this function may run remotely without py lib support. + """ # complete confusion (this is independent from the sys.stdout # and sys.stderr redirection that gateway.remote_exec() can do) # note that we redirect fd 2 on win too, since for some reason that @@ -188,55 +184,3 @@ def stdouterrin_setnull(): os.dup2(fd, 2) os.close(fd) -# XXX -# XXX unusued code below -# XXX - -class ExecGateway(PopenGateway): - def remote_exec_sync_stdcapture(self, lines, callback): - # hack: turn the content of the cell into - # - # if 1: - # line1 - # line2 - # ... - # - lines = [' ' + line for line in lines] - lines.insert(0, 'if 1:') - lines.append('') - sourcecode = '\n'.join(lines) - try: - callbacks = self.callbacks - except AttributeError: - callbacks = self.callbacks = {} - answerid = id(callback) - self.callbacks[answerid] = callback - - self.exec_remote(''' - import sys, StringIO - try: - execns - except: - execns = {} - oldout, olderr = sys.stdout, sys.stderr - try: - buffer = StringIO.StringIO() - sys.stdout = sys.stderr = buffer - try: - exec compile(%(sourcecode)r, 'single') in execns - except: - import traceback - traceback.print_exc() - finally: - sys.stdout=oldout - sys.stderr=olderr - # fiddle us (the caller) into executing the callback on remote answers - gateway.exec_remote( - "gateway.invoke_callback(%(answerid)r, %%r)" %% buffer.getvalue()) - ''' % locals()) - - def invoke_callback(self, answerid, value): - callback = self.callbacks[answerid] - del self.callbacks[answerid] - callback(value) - diff --git a/py/execnet/testing/test_gateway.py b/py/execnet/testing/test_gateway.py index 6ef7a719e..854d613b7 100644 --- a/py/execnet/testing/test_gateway.py +++ b/py/execnet/testing/test_gateway.py @@ -302,10 +302,11 @@ class BasicRemoteExecution: def test_remote_exec_redirect_multi(self): num = 3 l = [[] for x in range(num)] - channels = [self.gw.remote_exec("print %d" % i, stdout=l[i].append) + channels = [self.gw.remote_exec("print %d" % i, + stdout=l[i].append) for i in range(num)] for x in channels: - x.waitclose(1.0) + x.waitclose(5.0) for i in range(num): subl = l[i] @@ -451,9 +452,9 @@ class SocketGatewaySetup: def setup_class(cls): # open a gateway to a fresh child process cls.proxygw = py.execnet.PopenGateway() - cls.gw = py.execnet.SocketGateway.remote_install(cls.proxygw, - ("127.0.0.1", 0) - ) + cls.gw = py.execnet.SocketGateway.new_remote(cls.proxygw, + ("127.0.0.1", 0) + ) ## def teardown_class(cls): ## cls.gw.exit()