[svn r37822] added lots of docstrings, general cleanup

--HG--
branch : trunk
This commit is contained in:
hpk 2007-02-02 20:57:47 +01:00
parent 0600b1aa36
commit 59264d57ae
3 changed files with 70 additions and 142 deletions

View File

@ -33,23 +33,25 @@ sysex = (KeyboardInterrupt, SystemExit)
class Gateway(object): class Gateway(object):
_ThreadOut = ThreadOut _ThreadOut = ThreadOut
remoteaddress = "" remoteaddress = ""
def __init__(self, io, execthreads=None, _startcount=2):
def __init__(self, io, startcount=2, maxthreads=None): """ initialize core gateway, using the given
inputoutput object and 'execthreads' execution
threads.
"""
global registered_cleanup global registered_cleanup
self._execpool = WorkerPool(maxthreads=maxthreads) self._execpool = WorkerPool(maxthreads=execthreads)
## self.running = True
self._io = io self._io = io
self._outgoing = Queue.Queue() self._outgoing = Queue.Queue()
self._channelfactory = ChannelFactory(self, startcount) self._channelfactory = ChannelFactory(self, _startcount)
## self._exitlock = threading.Lock()
if not registered_cleanup: if not registered_cleanup:
atexit.register(cleanup_atexit) atexit.register(cleanup_atexit)
registered_cleanup = True registered_cleanup = True
_active_sendqueues[self._outgoing] = True _active_sendqueues[self._outgoing] = True
self._pool = NamedThreadPool(receiver = self._thread_receiver, self._pool = NamedThreadPool(receiver = self._thread_receiver,
sender = self._thread_sender) sender = self._thread_sender)
def __repr__(self): def __repr__(self):
""" return string representing gateway type and status. """
addr = self.remoteaddress addr = self.remoteaddress
if addr: if addr:
addr = '[%s]' % (addr,) addr = '[%s]' % (addr,)
@ -199,13 +201,12 @@ class Gateway(object):
return self._channelfactory.new() return self._channelfactory.new()
def remote_exec(self, source, stdout=None, stderr=None): def remote_exec(self, source, stdout=None, stderr=None):
""" return channel object for communicating with the asynchronously """ return channel object and connect it to a remote
executing 'source' code which will have a corresponding 'channel' execution thread where the given 'source' executes
object in its executing namespace. and has the sister 'channel' object in its global
namespace. The callback functions 'stdout' and
You may provide callback functions 'stdout' and 'stderr' 'stderr' get called on receival of remote
which will get called with the remote stdout/stderr output stdout/stderr output strings.
piece by piece.
""" """
try: try:
source = str(Source(source)) source = str(Source(source))
@ -252,29 +253,8 @@ class Gateway(object):
c.waitclose(1.0) c.waitclose(1.0)
return Handle() return Handle()
## def exit(self):
## """ initiate full gateway teardown.
## Note that the teardown of sender/receiver threads happens
## asynchronously and timeouts on stopping worker execution
## threads are ignored. You can issue join() or join(joinexec=False)
## if you want to wait for a full teardown (possibly excluding
## execution threads).
## """
## # note that threads may still be scheduled to start
## # during our execution!
## self._exitlock.acquire()
## try:
## if self.running:
## self.running = False
## if not self._pool.getstarted('sender'):
## raise IOError("sender thread not alive anymore!")
## self._outgoing.put(None)
## self._trace("exit procedure triggered, pid %d " % (os.getpid(),))
## _gateways.remove(self)
## finally:
## self._exitlock.release()
def exit(self): def exit(self):
""" Try to stop all IO activity. """
try: try:
del _active_sendqueues[self._outgoing] del _active_sendqueues[self._outgoing]
except KeyError: except KeyError:
@ -283,6 +263,9 @@ class Gateway(object):
self._outgoing.put(None) self._outgoing.put(None)
def join(self, joinexec=True): def join(self, joinexec=True):
""" Wait for all IO (and by default all execution activity)
to stop.
"""
current = threading.currentThread() current = threading.currentThread()
for x in self._pool.getstarted(): for x in self._pool.getstarted():
if x != current: if x != current:

View File

@ -7,9 +7,7 @@ import py
# the list of modules that must be send to the other side # the list of modules that must be send to the other side
# for bootstrapping gateways # for bootstrapping gateways
# XXX we want to have a cleaner bootstrap mechanism # XXX we'd like to have a leaner and meaner bootstrap mechanism
# by making sure early that we have the py lib available
# in a sufficient version
startup_modules = [ startup_modules = [
'py.__.thread.io', 'py.__.thread.io',
@ -30,7 +28,7 @@ class InstallableGateway(gateway.Gateway):
""" initialize gateways on both sides of a inputoutput object. """ """ initialize gateways on both sides of a inputoutput object. """
def __init__(self, io): def __init__(self, io):
self._remote_bootstrap_gateway(io) self._remote_bootstrap_gateway(io)
super(InstallableGateway, self).__init__(io=io, startcount=1) super(InstallableGateway, self).__init__(io=io, _startcount=1)
def _remote_bootstrap_gateway(self, io, extra=''): def _remote_bootstrap_gateway(self, io, extra=''):
""" return Gateway with a asynchronously remotely """ return Gateway with a asynchronously remotely
@ -42,7 +40,9 @@ class InstallableGateway(gateway.Gateway):
""" """
bootstrap = [extra] bootstrap = [extra]
bootstrap += [getsource(x) for x in startup_modules] bootstrap += [getsource(x) for x in startup_modules]
bootstrap += [io.server_stmt, "Gateway(io=io, startcount=2).join(joinexec=False)",] bootstrap += [io.server_stmt,
"Gateway(io=io, _startcount=2).join(joinexec=False)",
]
source = "\n".join(bootstrap) source = "\n".join(bootstrap)
self._trace("sending gateway bootstrap code") self._trace("sending gateway bootstrap code")
io.write('%r\n' % source) io.write('%r\n' % source)
@ -52,45 +52,20 @@ class PopenCmdGateway(InstallableGateway):
infile, outfile = os.popen2(cmd) infile, outfile = os.popen2(cmd)
io = inputoutput.Popen2IO(infile, outfile) io = inputoutput.Popen2IO(infile, outfile)
super(PopenCmdGateway, self).__init__(io=io) super(PopenCmdGateway, self).__init__(io=io)
## self._pidchannel = self.remote_exec("""
## import os
## channel.send(os.getpid())
## """)
## def exit(self):
## try:
## self._pidchannel.waitclose(timeout=0.5)
## pid = self._pidchannel.receive()
## except IOError:
## self._trace("IOError: could not receive child PID:")
## self._traceex(sys.exc_info())
## pid = None
## super(PopenCmdGateway, self).exit()
## if pid is not None:
## self._trace("waiting for pid %s" % pid)
## try:
## os.waitpid(pid, 0)
## except KeyboardInterrupt:
## if sys.platform != "win32":
## os.kill(pid, 15)
## raise
## except OSError, e:
## self._trace("child process %s already dead? error:%s" %
## (pid, str(e)))
class PopenGateway(PopenCmdGateway): class PopenGateway(PopenCmdGateway):
# use sysfind/sysexec/subprocess instead of os.popen? """ This Gateway provides interaction with a newly started
python subprocess.
"""
def __init__(self, python=sys.executable): def __init__(self, python=sys.executable):
""" instantiate a gateway to a subprocess
started with the given 'python' executable.
"""
cmd = '%s -u -c "exec input()"' % python cmd = '%s -u -c "exec input()"' % python
super(PopenGateway, self).__init__(cmd) super(PopenGateway, self).__init__(cmd)
def _remote_bootstrap_gateway(self, io, extra=''): def _remote_bootstrap_gateway(self, io, extra=''):
# XXX the following hack helps us to import the same version # have the subprocess use the same PYTHONPATH and py lib
# of the py lib and other dependcies, but only works for
# PopenGateways because we can assume to have access to
# the same filesystem
# --> we definitely need proper remote imports working
# across any kind of gateway!
x = py.path.local(py.__file__).dirpath().dirpath() x = py.path.local(py.__file__).dirpath().dirpath()
ppath = os.environ.get('PYTHONPATH', '') ppath = os.environ.get('PYTHONPATH', '')
plist = [str(x)] + ppath.split(':') plist = [str(x)] + ppath.split(':')
@ -104,42 +79,60 @@ class PopenGateway(PopenCmdGateway):
super(PopenGateway, self)._remote_bootstrap_gateway(io, s) super(PopenGateway, self)._remote_bootstrap_gateway(io, s)
class SocketGateway(InstallableGateway): class SocketGateway(InstallableGateway):
""" This Gateway provides interaction with a remote process
by connecting to a specified socket. On the remote
side you need to manually start a small script
(py/execnet/script/socketserver.py) that accepts
SocketGateway connections.
"""
def __init__(self, host, port): def __init__(self, host, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) """ instantiate a gateway to a processed accessed
via a host/port specified socket.
"""
self.host = host = str(host) self.host = host = str(host)
self.port = port = int(port) self.port = port = int(port)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port)) sock.connect((host, port))
io = inputoutput.SocketIO(sock) io = inputoutput.SocketIO(sock)
super(SocketGateway, self).__init__(io=io) super(SocketGateway, self).__init__(io=io)
self.remoteaddress = '%s:%d' % (self.host, self.port) self.remoteaddress = '%s:%d' % (self.host, self.port)
def remote_install(cls, gateway, hostport=None): def new_remote(cls, gateway, hostport=None):
""" return a connected socket gateway through the """ return a new (connected) socket gateway, instatiated
given gateway. indirectly through the given 'gateway'.
""" """
if hostport is None: if hostport is None:
host, port = ('', 0) # XXX works on all platforms? host, port = ('', 0) # XXX works on all platforms?
else: else:
host, port = hostport host, port = hostport
socketserverbootstrap = py.code.Source( socketserverbootstrap = py.code.Source(
mypath.dirpath('script', 'socketserver.py').read('rU'), mypath.dirpath('script', 'socketserver.py').read('rU'), """
"""
import socket import socket
sock = bind_and_listen((%r, %r)) sock = bind_and_listen((%r, %r))
port = sock.getsockname() port = sock.getsockname()
channel.send(port) channel.send(port)
startserver(sock) startserver(sock)
""" % (host, port)) """ % (host, port)
)
# execute the above socketserverbootstrap on the other side # execute the above socketserverbootstrap on the other side
channel = gateway.remote_exec(socketserverbootstrap) channel = gateway.remote_exec(socketserverbootstrap)
(realhost, realport) = channel.receive() (realhost, realport) = channel.receive()
#gateway._trace("remote_install received" #gateway._trace("new_remote received"
# "port=%r, hostname = %r" %(realport, hostname)) # "port=%r, hostname = %r" %(realport, hostname))
return py.execnet.SocketGateway(host, realport) return py.execnet.SocketGateway(host, realport)
remote_install = classmethod(remote_install) new_remote = classmethod(new_remote)
class SshGateway(PopenCmdGateway): class SshGateway(PopenCmdGateway):
""" This Gateway provides interaction with a remote process,
established via the 'ssh' command line binary.
The remote side needs to have a Python interpreter executable.
"""
def __init__(self, sshaddress, remotepython='python', identity=None): def __init__(self, sshaddress, remotepython='python', identity=None):
""" instantiate a remote ssh process with the
given 'sshaddress' and remotepython version.
you may specify an 'identity' filepath.
"""
self.remoteaddress = sshaddress self.remoteaddress = sshaddress
remotecmd = '%s -u -c "exec input()"' % (remotepython,) remotecmd = '%s -u -c "exec input()"' % (remotepython,)
cmdline = [sshaddress, remotecmd] cmdline = [sshaddress, remotecmd]
@ -160,8 +153,11 @@ class SshGateway(PopenCmdGateway):
]) ])
super(SshGateway, self)._remote_bootstrap_gateway(io, extra) super(SshGateway, self)._remote_bootstrap_gateway(io, extra)
def stdouterrin_setnull(): def stdouterrin_setnull():
# redirect file descriptors 0 and 1 to /dev/null, to avoid """ redirect file descriptors 0 and 1 (and possibly 2) to /dev/null.
note that this function may run remotely without py lib support.
"""
# complete confusion (this is independent from the sys.stdout # complete confusion (this is independent from the sys.stdout
# and sys.stderr redirection that gateway.remote_exec() can do) # and sys.stderr redirection that gateway.remote_exec() can do)
# note that we redirect fd 2 on win too, since for some reason that # note that we redirect fd 2 on win too, since for some reason that
@ -188,55 +184,3 @@ def stdouterrin_setnull():
os.dup2(fd, 2) os.dup2(fd, 2)
os.close(fd) os.close(fd)
# XXX
# XXX unusued code below
# XXX
class ExecGateway(PopenGateway):
def remote_exec_sync_stdcapture(self, lines, callback):
# hack: turn the content of the cell into
#
# if 1:
# line1
# line2
# ...
#
lines = [' ' + line for line in lines]
lines.insert(0, 'if 1:')
lines.append('')
sourcecode = '\n'.join(lines)
try:
callbacks = self.callbacks
except AttributeError:
callbacks = self.callbacks = {}
answerid = id(callback)
self.callbacks[answerid] = callback
self.exec_remote('''
import sys, StringIO
try:
execns
except:
execns = {}
oldout, olderr = sys.stdout, sys.stderr
try:
buffer = StringIO.StringIO()
sys.stdout = sys.stderr = buffer
try:
exec compile(%(sourcecode)r, 'single') in execns
except:
import traceback
traceback.print_exc()
finally:
sys.stdout=oldout
sys.stderr=olderr
# fiddle us (the caller) into executing the callback on remote answers
gateway.exec_remote(
"gateway.invoke_callback(%(answerid)r, %%r)" %% buffer.getvalue())
''' % locals())
def invoke_callback(self, answerid, value):
callback = self.callbacks[answerid]
del self.callbacks[answerid]
callback(value)

View File

@ -302,10 +302,11 @@ class BasicRemoteExecution:
def test_remote_exec_redirect_multi(self): def test_remote_exec_redirect_multi(self):
num = 3 num = 3
l = [[] for x in range(num)] l = [[] for x in range(num)]
channels = [self.gw.remote_exec("print %d" % i, stdout=l[i].append) channels = [self.gw.remote_exec("print %d" % i,
stdout=l[i].append)
for i in range(num)] for i in range(num)]
for x in channels: for x in channels:
x.waitclose(1.0) x.waitclose(5.0)
for i in range(num): for i in range(num):
subl = l[i] subl = l[i]
@ -451,9 +452,9 @@ class SocketGatewaySetup:
def setup_class(cls): def setup_class(cls):
# open a gateway to a fresh child process # open a gateway to a fresh child process
cls.proxygw = py.execnet.PopenGateway() cls.proxygw = py.execnet.PopenGateway()
cls.gw = py.execnet.SocketGateway.remote_install(cls.proxygw, cls.gw = py.execnet.SocketGateway.new_remote(cls.proxygw,
("127.0.0.1", 0) ("127.0.0.1", 0)
) )
## def teardown_class(cls): ## def teardown_class(cls):
## cls.gw.exit() ## cls.gw.exit()