* make Gateway interface more asymetric: remote_* methods
and cleanup/atexit handling now live exclusively with the "InitiatingGateway" * fix some cross-python io related handling --HG-- branch : trunk
This commit is contained in:
parent
c1fcf9c4d8
commit
f636ed8ced
|
@ -1,7 +1,8 @@
|
|||
import sys, os, inspect, socket
|
||||
import sys, os, inspect, socket, atexit, weakref
|
||||
import py
|
||||
from subprocess import Popen, PIPE
|
||||
from py.__.execnet.gateway_base import BaseGateway, Popen2IO, SocketIO
|
||||
from py.__.execnet.gateway_base import BaseGateway, Message, Popen2IO, SocketIO
|
||||
from py.__.execnet.gateway_base import ExecnetAPI
|
||||
|
||||
# the list of modules that must be send to the other side
|
||||
# for bootstrapping gateways
|
||||
|
@ -12,18 +13,63 @@ startup_modules = [
|
|||
'py.__.execnet.gateway_base',
|
||||
]
|
||||
|
||||
debug = False
|
||||
|
||||
def getsource(dottedname):
|
||||
mod = __import__(dottedname, None, None, ['__doc__'])
|
||||
return inspect.getsource(mod)
|
||||
|
||||
class GatewayCleanup:
|
||||
def __init__(self):
|
||||
self._activegateways = weakref.WeakKeyDictionary()
|
||||
atexit.register(self.cleanup_atexit)
|
||||
|
||||
def register(self, gateway):
|
||||
assert gateway not in self._activegateways
|
||||
self._activegateways[gateway] = True
|
||||
|
||||
def unregister(self, gateway):
|
||||
del self._activegateways[gateway]
|
||||
|
||||
def cleanup_atexit(self):
|
||||
if debug:
|
||||
debug.writeslines(["="*20, "cleaning up", "=" * 20])
|
||||
debug.flush()
|
||||
for gw in self._activegateways.keys():
|
||||
gw.exit()
|
||||
#gw.join() # should work as well
|
||||
|
||||
|
||||
class InitiatingGateway(BaseGateway):
|
||||
""" initialize gateways on both sides of a inputoutput object. """
|
||||
_cleanup = GatewayCleanup()
|
||||
|
||||
def __init__(self, io):
|
||||
self._remote_bootstrap_gateway(io)
|
||||
super(InitiatingGateway, self).__init__(io=io, _startcount=1)
|
||||
# XXX we dissallow execution form the other side
|
||||
self._initreceive(requestqueue=False)
|
||||
self.hook = py._com.HookRelay(ExecnetAPI, py._com.comregistry)
|
||||
self.hook.pyexecnet_gateway_init(gateway=self)
|
||||
self._cleanup.register(self)
|
||||
|
||||
def __repr__(self):
|
||||
""" return string representing gateway type and status. """
|
||||
if hasattr(self, 'remoteaddress'):
|
||||
addr = '[%s]' % (self.remoteaddress,)
|
||||
else:
|
||||
addr = ''
|
||||
try:
|
||||
r = (self._receiverthread.isAlive() and "receiving" or
|
||||
"not receiving")
|
||||
s = "sending" # XXX
|
||||
i = len(self._channelfactory.channels())
|
||||
except AttributeError:
|
||||
r = s = "uninitialized"
|
||||
i = "no"
|
||||
return "<%s%s %s/%s (%s active channels)>" %(
|
||||
self.__class__.__name__, addr, r, s, i)
|
||||
|
||||
|
||||
def _remote_bootstrap_gateway(self, io, extra=''):
|
||||
""" return Gateway with a asynchronously remotely
|
||||
|
@ -41,7 +87,111 @@ class InitiatingGateway(BaseGateway):
|
|||
source = "\n".join(bootstrap)
|
||||
self._trace("sending gateway bootstrap code")
|
||||
#open("/tmp/bootstrap.py", 'w').write(source)
|
||||
io.write('%r\n' % source)
|
||||
repr_source = repr(source) + "\n"
|
||||
io.write(repr_source.encode('ascii'))
|
||||
|
||||
def _rinfo(self, update=False):
|
||||
""" return some sys/env information from remote. """
|
||||
if update or not hasattr(self, '_cache_rinfo'):
|
||||
self._cache_rinfo = RInfo(**self.remote_exec("""
|
||||
import sys, os
|
||||
channel.send(dict(
|
||||
executable = sys.executable,
|
||||
version_info = sys.version_info,
|
||||
platform = sys.platform,
|
||||
cwd = os.getcwd(),
|
||||
pid = os.getpid(),
|
||||
))
|
||||
""").receive())
|
||||
return self._cache_rinfo
|
||||
|
||||
def remote_exec(self, source, stdout=None, stderr=None):
|
||||
""" return channel object and connect it to a remote
|
||||
execution thread where the given 'source' executes
|
||||
and has the sister 'channel' object in its global
|
||||
namespace. The callback functions 'stdout' and
|
||||
'stderr' get called on receival of remote
|
||||
stdout/stderr output strings.
|
||||
"""
|
||||
source = str(py.code.Source(source))
|
||||
channel = self.newchannel()
|
||||
outid = self._newredirectchannelid(stdout)
|
||||
errid = self._newredirectchannelid(stderr)
|
||||
self._send(Message.CHANNEL_OPEN(
|
||||
channel.id, (source, outid, errid)))
|
||||
return channel
|
||||
|
||||
def remote_init_threads(self, num=None):
|
||||
""" start up to 'num' threads for subsequent
|
||||
remote_exec() invocations to allow concurrent
|
||||
execution.
|
||||
"""
|
||||
if hasattr(self, '_remotechannelthread'):
|
||||
raise IOError("remote threads already running")
|
||||
from py.__.thread import pool
|
||||
source = py.code.Source(pool, """
|
||||
execpool = WorkerPool(maxthreads=%r)
|
||||
gw = channel.gateway
|
||||
while 1:
|
||||
task = gw._requestqueue.get()
|
||||
if task is None:
|
||||
gw._stopsend()
|
||||
execpool.shutdown()
|
||||
execpool.join()
|
||||
raise gw._StopExecLoop
|
||||
execpool.dispatch(gw._executetask, task)
|
||||
""" % num)
|
||||
self._remotechannelthread = self.remote_exec(source)
|
||||
|
||||
def _newredirectchannelid(self, callback):
|
||||
if callback is None:
|
||||
return
|
||||
if hasattr(callback, 'write'):
|
||||
callback = callback.write
|
||||
assert callable(callback)
|
||||
chan = self.newchannel()
|
||||
chan.setcallback(callback)
|
||||
return chan.id
|
||||
|
||||
def _remote_redirect(self, stdout=None, stderr=None):
|
||||
""" return a handle representing a redirection of a remote
|
||||
end's stdout to a local file object. with handle.close()
|
||||
the redirection will be reverted.
|
||||
"""
|
||||
clist = []
|
||||
for name, out in ('stdout', stdout), ('stderr', stderr):
|
||||
if out:
|
||||
outchannel = self.newchannel()
|
||||
outchannel.setcallback(getattr(out, 'write', out))
|
||||
channel = self.remote_exec("""
|
||||
import sys
|
||||
outchannel = channel.receive()
|
||||
outchannel.gateway._ThreadOut(sys, %r).setdefaultwriter(outchannel.send)
|
||||
""" % name)
|
||||
channel.send(outchannel)
|
||||
clist.append(channel)
|
||||
for c in clist:
|
||||
c.waitclose()
|
||||
class Handle:
|
||||
def close(_):
|
||||
for name, out in ('stdout', stdout), ('stderr', stderr):
|
||||
if out:
|
||||
c = self.remote_exec("""
|
||||
import sys
|
||||
channel.gateway._ThreadOut(sys, %r).resetdefault()
|
||||
""" % name)
|
||||
c.waitclose()
|
||||
return Handle()
|
||||
|
||||
|
||||
|
||||
class RInfo:
|
||||
def __init__(self, **kwargs):
|
||||
self.__dict__.update(kwargs)
|
||||
def __repr__(self):
|
||||
info = ", ".join(["%s=%s" % item
|
||||
for item in self.__dict__.items()])
|
||||
return "<RInfo %r>" % info
|
||||
|
||||
class PopenCmdGateway(InitiatingGateway):
|
||||
def __init__(self, cmd):
|
||||
|
@ -68,6 +218,7 @@ class PopenGateway(PopenCmdGateway):
|
|||
if not python:
|
||||
python = sys.executable
|
||||
cmd = '%s -u -c "exec input()"' % python
|
||||
cmd = '%s -u -c "import sys ; exec(eval(sys.stdin.readline()))"' % python
|
||||
super(PopenGateway, self).__init__(cmd)
|
||||
|
||||
def _remote_bootstrap_gateway(self, io, extra=''):
|
||||
|
@ -113,7 +264,7 @@ class SocketGateway(InitiatingGateway):
|
|||
host, port = hostport
|
||||
mydir = py.path.local(__file__).dirpath()
|
||||
socketserverbootstrap = py.code.Source(
|
||||
mydir.join('script', 'socketserver.py').read('rU'), """
|
||||
mydir.join('script', 'socketserver.py').read('r'), """
|
||||
import socket
|
||||
sock = bind_and_listen((%r, %r))
|
||||
port = sock.getsockname()
|
||||
|
@ -187,18 +338,18 @@ def stdouterrin_setnull():
|
|||
else:
|
||||
devnull = '/dev/null'
|
||||
# stdin
|
||||
sys.stdin = os.fdopen(os.dup(0), 'rb', 0)
|
||||
sys.stdin = os.fdopen(os.dup(0), 'r', 1)
|
||||
fd = os.open(devnull, os.O_RDONLY)
|
||||
os.dup2(fd, 0)
|
||||
os.close(fd)
|
||||
|
||||
# stdout
|
||||
sys.stdout = os.fdopen(os.dup(1), 'wb', 0)
|
||||
sys.stdout = os.fdopen(os.dup(1), 'w', 1)
|
||||
fd = os.open(devnull, os.O_WRONLY)
|
||||
os.dup2(fd, 1)
|
||||
|
||||
# stderr for win32
|
||||
if os.name == 'nt':
|
||||
sys.stderr = os.fdopen(os.dup(2), 'wb', 0)
|
||||
sys.stderr = os.fdopen(os.dup(2), 'w', 1)
|
||||
os.dup2(fd, 2)
|
||||
os.close(fd)
|
||||
|
|
|
@ -11,7 +11,7 @@ base gateway code
|
|||
# few tests that try to catch when we mess up.
|
||||
|
||||
"""
|
||||
import sys, os, weakref, atexit
|
||||
import sys, os, weakref
|
||||
import threading, traceback, socket, struct
|
||||
try:
|
||||
import queue
|
||||
|
@ -21,20 +21,27 @@ except ImportError:
|
|||
# XXX the following lines should not be here
|
||||
if 'ThreadOut' not in globals():
|
||||
import py
|
||||
from py.code import Source
|
||||
ThreadOut = py._thread.ThreadOut
|
||||
|
||||
if sys.version_info > (3, 0):
|
||||
exec("""def do_exec(co, loc):
|
||||
exec(co, loc)""")
|
||||
unicode = str
|
||||
sysex = Exception
|
||||
else:
|
||||
exec("""def do_exec(co, loc):
|
||||
exec co in loc""")
|
||||
bytes = str
|
||||
sysex = (KeyboardInterrupt, SystemExit)
|
||||
|
||||
import os
|
||||
debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid() , 'wa')
|
||||
def str(*args):
|
||||
raise EnvironmentError(
|
||||
"use unicode or bytes, not cross-python ambigous 'str'")
|
||||
|
||||
default_encoding = "UTF-8"
|
||||
|
||||
debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid() , 'w')
|
||||
|
||||
sysex = (KeyboardInterrupt, SystemExit)
|
||||
|
||||
# ___________________________________________________________________________
|
||||
#
|
||||
|
@ -52,7 +59,7 @@ class SocketIO:
|
|||
sock.setsockopt(socket.SOL_IP, socket.IP_TOS, 0x10)# IPTOS_LOWDELAY
|
||||
except socket.error:
|
||||
e = sys.exc_info()[1]
|
||||
py.builtin.print_("WARNING: Cannot set socket option:", str(e))
|
||||
sys.stderr.write("WARNING: cannot set socketoption")
|
||||
self.readable = self.writeable = True
|
||||
|
||||
def read(self, numbytes):
|
||||
|
@ -66,6 +73,7 @@ class SocketIO:
|
|||
return buf
|
||||
|
||||
def write(self, data):
|
||||
assert isinstance(data, bytes)
|
||||
self.sock.sendall(data)
|
||||
|
||||
def close_read(self):
|
||||
|
@ -85,9 +93,9 @@ class SocketIO:
|
|||
|
||||
class Popen2IO:
|
||||
server_stmt = """
|
||||
import os, sys, StringIO
|
||||
import os, sys, tempfile
|
||||
io = Popen2IO(sys.stdout, sys.stdin)
|
||||
sys.stdout = sys.stderr = StringIO.StringIO()
|
||||
sys.stdout = sys.stderr = tempfile.TemporaryFile()
|
||||
"""
|
||||
error = (IOError, OSError, EOFError)
|
||||
|
||||
|
@ -102,13 +110,15 @@ sys.stdout = sys.stderr = StringIO.StringIO()
|
|||
|
||||
def read(self, numbytes):
|
||||
"""Read exactly 'bytes' bytes from the pipe. """
|
||||
s = self.infile.read(numbytes)
|
||||
if len(s) < numbytes:
|
||||
data = self.infile.read(numbytes)
|
||||
if len(data) < numbytes:
|
||||
raise EOFError
|
||||
return s
|
||||
assert isinstance(data, bytes)
|
||||
return data
|
||||
|
||||
def write(self, data):
|
||||
"""write out all bytes to the pipe. """
|
||||
assert isinstance(data, bytes)
|
||||
self.outfile.write(data)
|
||||
self.outfile.flush()
|
||||
|
||||
|
@ -143,10 +153,11 @@ class Message:
|
|||
# version :-((( There is no sane solution, short of a custom
|
||||
# pure Python marshaller
|
||||
data = self.data
|
||||
if isinstance(data, str):
|
||||
if isinstance(data, bytes):
|
||||
dataformat = 1
|
||||
else:
|
||||
data = repr(self.data) # argh
|
||||
data = data.encode(default_encoding)
|
||||
dataformat = 2
|
||||
header = struct.pack(HDR_FORMAT, self.msgtype, dataformat,
|
||||
self.channelid, len(data))
|
||||
|
@ -160,6 +171,7 @@ class Message:
|
|||
if dataformat == 1:
|
||||
pass
|
||||
elif dataformat == 2:
|
||||
data = data.decode(default_encoding)
|
||||
data = eval(data, {}) # reversed argh
|
||||
else:
|
||||
raise ValueError("bad data format")
|
||||
|
@ -205,7 +217,8 @@ def _setupmessages():
|
|||
|
||||
class CHANNEL_CLOSE_ERROR(Message):
|
||||
def received(self, gateway):
|
||||
remote_error = gateway._channelfactory.RemoteError(self.data)
|
||||
data = self.data.decode(default_encoding)
|
||||
remote_error = gateway._channelfactory.RemoteError(data)
|
||||
gateway._channelfactory._local_close(self.channelid, remote_error)
|
||||
|
||||
class CHANNEL_LAST_MESSAGE(Message):
|
||||
|
@ -237,7 +250,8 @@ class RemoteError(EOFError):
|
|||
|
||||
def warn(self):
|
||||
# XXX do this better
|
||||
print >> sys.stderr, "Warning: unhandled %r" % (self,)
|
||||
sys.stderr.write("Warning: unhandled %r\n" % (self,))
|
||||
|
||||
|
||||
NO_ENDMARKER_WANTED = object()
|
||||
|
||||
|
@ -341,7 +355,8 @@ class Channel(object):
|
|||
# but it's never damaging to send too many CHANNEL_CLOSE messages
|
||||
put = self.gateway._send
|
||||
if error is not None:
|
||||
put(Message.CHANNEL_CLOSE_ERROR(self.id, str(error)))
|
||||
put(Message.CHANNEL_CLOSE_ERROR(self.id,
|
||||
error.encode(default_encoding)))
|
||||
else:
|
||||
put(Message.CHANNEL_CLOSE(self.id))
|
||||
if isinstance(error, RemoteError):
|
||||
|
@ -437,7 +452,7 @@ class ChannelFactory(object):
|
|||
self._writelock.release()
|
||||
|
||||
def channels(self):
|
||||
return self._channels.values()
|
||||
return list(self._channels.values())
|
||||
|
||||
#
|
||||
# internal methods, called from the receiver thread
|
||||
|
@ -512,9 +527,9 @@ class ChannelFactory(object):
|
|||
self.finished = True
|
||||
finally:
|
||||
self._writelock.release()
|
||||
for id in self._channels.keys():
|
||||
for id in list(self._channels):
|
||||
self._local_last_message(id)
|
||||
for id in self._callbacks.keys():
|
||||
for id in list(self._callbacks):
|
||||
self._close_callback(id)
|
||||
|
||||
class ChannelFile(object):
|
||||
|
@ -567,30 +582,6 @@ class ChannelFileRead(ChannelFile):
|
|||
|
||||
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# cleanup machinery (for exiting processes)
|
||||
# ----------------------------------------------------------
|
||||
|
||||
class GatewayCleanup:
|
||||
def __init__(self):
|
||||
self._activegateways = weakref.WeakKeyDictionary()
|
||||
atexit.register(self.cleanup_atexit)
|
||||
|
||||
def register(self, gateway):
|
||||
assert gateway not in self._activegateways
|
||||
self._activegateways[gateway] = True
|
||||
|
||||
def unregister(self, gateway):
|
||||
del self._activegateways[gateway]
|
||||
|
||||
def cleanup_atexit(self):
|
||||
if debug:
|
||||
print >>debug, "="*20 + "cleaning up" + "=" * 20
|
||||
debug.flush()
|
||||
for gw in self._activegateways.keys():
|
||||
gw.exit()
|
||||
#gw.join() # should work as well
|
||||
|
||||
class ExecnetAPI:
|
||||
def pyexecnet_gateway_init(self, gateway):
|
||||
""" signal initialisation of new gateway. """
|
||||
|
@ -605,28 +596,20 @@ class ExecnetAPI:
|
|||
|
||||
def pyexecnet_gwmanage_rsyncfinish(self, source, gateways):
|
||||
""" called after rsyncing a directory to remote gateways takes place. """
|
||||
|
||||
|
||||
|
||||
class BaseGateway(object):
|
||||
hook = ExecnetAPI()
|
||||
exc_info = sys.exc_info
|
||||
|
||||
class _StopExecLoop(Exception): pass
|
||||
_ThreadOut = ThreadOut
|
||||
remoteaddress = ""
|
||||
_requestqueue = None
|
||||
_cleanup = GatewayCleanup()
|
||||
|
||||
def __init__(self, io, _startcount=2):
|
||||
""" initialize core gateway, using the given inputoutput object.
|
||||
"""
|
||||
self._io = io
|
||||
self._channelfactory = ChannelFactory(self, _startcount)
|
||||
self._cleanup.register(self)
|
||||
if _startcount == 1: # only import 'py' on the "client" side
|
||||
import py
|
||||
self.hook = py._com.HookRelay(ExecnetAPI, py._com.comregistry)
|
||||
else:
|
||||
self.hook = ExecnetAPI()
|
||||
|
||||
def _initreceive(self, requestqueue=False):
|
||||
if requestqueue:
|
||||
|
@ -636,31 +619,13 @@ class BaseGateway(object):
|
|||
self._receiverthread.setDaemon(1)
|
||||
self._receiverthread.start()
|
||||
|
||||
def __repr__(self):
|
||||
""" return string representing gateway type and status. """
|
||||
addr = self.remoteaddress
|
||||
if addr:
|
||||
addr = '[%s]' % (addr,)
|
||||
else:
|
||||
addr = ''
|
||||
try:
|
||||
r = (self._receiverthread.isAlive() and "receiving" or
|
||||
"not receiving")
|
||||
s = "sending" # XXX
|
||||
i = len(self._channelfactory.channels())
|
||||
except AttributeError:
|
||||
r = s = "uninitialized"
|
||||
i = "no"
|
||||
return "<%s%s %s/%s (%s active channels)>" %(
|
||||
self.__class__.__name__, addr, r, s, i)
|
||||
|
||||
def _trace(self, *args):
|
||||
if debug:
|
||||
try:
|
||||
l = "\n".join(args).split(os.linesep)
|
||||
id = getid(self)
|
||||
for x in l:
|
||||
print >>debug, x
|
||||
debug.write(x+"\n")
|
||||
debug.flush()
|
||||
except sysex:
|
||||
raise
|
||||
|
@ -679,7 +644,6 @@ class BaseGateway(object):
|
|||
def _thread_receiver(self):
|
||||
""" thread to read and handle Messages half-sync-half-async. """
|
||||
try:
|
||||
from sys import exc_info
|
||||
while 1:
|
||||
try:
|
||||
msg = Message.readfrom(self._io)
|
||||
|
@ -690,7 +654,7 @@ class BaseGateway(object):
|
|||
except EOFError:
|
||||
break
|
||||
except:
|
||||
self._traceex(exc_info())
|
||||
self._traceex(self.exc_info())
|
||||
break
|
||||
finally:
|
||||
# XXX we need to signal fatal error states to
|
||||
|
@ -705,7 +669,6 @@ class BaseGateway(object):
|
|||
if threading: # might be None during shutdown/finalization
|
||||
self._trace('leaving %r' % threading.currentThread())
|
||||
|
||||
from sys import exc_info
|
||||
def _send(self, msg):
|
||||
if msg is None:
|
||||
self._io.close_write()
|
||||
|
@ -743,7 +706,6 @@ class BaseGateway(object):
|
|||
channel.close("execution disallowed")
|
||||
|
||||
def _servemain(self, joining=True):
|
||||
from sys import exc_info
|
||||
self._initreceive(requestqueue=True)
|
||||
try:
|
||||
while 1:
|
||||
|
@ -762,7 +724,6 @@ class BaseGateway(object):
|
|||
|
||||
def _executetask(self, item):
|
||||
""" execute channel/source items. """
|
||||
from sys import exc_info
|
||||
channel, (source, outid, errid) = item
|
||||
try:
|
||||
loc = { 'channel' : channel, '__name__': '__channelexec__'}
|
||||
|
@ -781,7 +742,7 @@ class BaseGateway(object):
|
|||
channel.close()
|
||||
raise
|
||||
except:
|
||||
excinfo = exc_info()
|
||||
excinfo = self.exc_info()
|
||||
l = traceback.format_exception(*excinfo)
|
||||
errortext = "".join(l)
|
||||
channel.close(errortext)
|
||||
|
@ -789,87 +750,11 @@ class BaseGateway(object):
|
|||
else:
|
||||
channel.close()
|
||||
|
||||
def _newredirectchannelid(self, callback):
|
||||
if callback is None:
|
||||
return
|
||||
if hasattr(callback, 'write'):
|
||||
callback = callback.write
|
||||
assert callable(callback)
|
||||
chan = self.newchannel()
|
||||
chan.setcallback(callback)
|
||||
return chan.id
|
||||
|
||||
def _rinfo(self, update=False):
|
||||
""" return some sys/env information from remote. """
|
||||
if update or not hasattr(self, '_cache_rinfo'):
|
||||
class RInfo:
|
||||
def __init__(self, **kwargs):
|
||||
self.__dict__.update(kwargs)
|
||||
def __repr__(self):
|
||||
info = ", ".join(["%s=%s" % item
|
||||
for item in self.__dict__.items()])
|
||||
return "<RInfo %r>" % info
|
||||
self._cache_rinfo = RInfo(**self.remote_exec("""
|
||||
import sys, os
|
||||
channel.send(dict(
|
||||
executable = sys.executable,
|
||||
version_info = sys.version_info,
|
||||
platform = sys.platform,
|
||||
cwd = os.getcwd(),
|
||||
pid = os.getpid(),
|
||||
))
|
||||
""").receive())
|
||||
return self._cache_rinfo
|
||||
|
||||
# _____________________________________________________________________
|
||||
#
|
||||
# High Level Interface
|
||||
# _____________________________________________________________________
|
||||
#
|
||||
def remote_exec(self, source, stdout=None, stderr=None):
|
||||
""" return channel object and connect it to a remote
|
||||
execution thread where the given 'source' executes
|
||||
and has the sister 'channel' object in its global
|
||||
namespace. The callback functions 'stdout' and
|
||||
'stderr' get called on receival of remote
|
||||
stdout/stderr output strings.
|
||||
"""
|
||||
try:
|
||||
source = str(Source(source))
|
||||
except NameError:
|
||||
try:
|
||||
import py
|
||||
source = str(py.code.Source(source))
|
||||
except ImportError:
|
||||
pass
|
||||
channel = self.newchannel()
|
||||
outid = self._newredirectchannelid(stdout)
|
||||
errid = self._newredirectchannelid(stderr)
|
||||
self._send(Message.CHANNEL_OPEN(
|
||||
channel.id, (source, outid, errid)))
|
||||
return channel
|
||||
|
||||
def remote_init_threads(self, num=None):
|
||||
""" start up to 'num' threads for subsequent
|
||||
remote_exec() invocations to allow concurrent
|
||||
execution.
|
||||
"""
|
||||
if hasattr(self, '_remotechannelthread'):
|
||||
raise IOError("remote threads already running")
|
||||
from py.__.thread import pool
|
||||
source = py.code.Source(pool, """
|
||||
execpool = WorkerPool(maxthreads=%r)
|
||||
gw = channel.gateway
|
||||
while 1:
|
||||
task = gw._requestqueue.get()
|
||||
if task is None:
|
||||
gw._stopsend()
|
||||
execpool.shutdown()
|
||||
execpool.join()
|
||||
raise gw._StopExecLoop
|
||||
execpool.dispatch(gw._executetask, task)
|
||||
""" % num)
|
||||
self._remotechannelthread = self.remote_exec(source)
|
||||
|
||||
def newchannel(self):
|
||||
""" return new channel object. """
|
||||
|
@ -891,37 +776,6 @@ class BaseGateway(object):
|
|||
self._stopsend()
|
||||
self.hook.pyexecnet_gateway_exit(gateway=self)
|
||||
|
||||
def _remote_redirect(self, stdout=None, stderr=None):
|
||||
""" return a handle representing a redirection of a remote
|
||||
end's stdout to a local file object. with handle.close()
|
||||
the redirection will be reverted.
|
||||
"""
|
||||
clist = []
|
||||
for name, out in ('stdout', stdout), ('stderr', stderr):
|
||||
if out:
|
||||
outchannel = self.newchannel()
|
||||
outchannel.setcallback(getattr(out, 'write', out))
|
||||
channel = self.remote_exec("""
|
||||
import sys
|
||||
outchannel = channel.receive()
|
||||
outchannel.gateway._ThreadOut(sys, %r).setdefaultwriter(outchannel.send)
|
||||
""" % name)
|
||||
channel.send(outchannel)
|
||||
clist.append(channel)
|
||||
for c in clist:
|
||||
c.waitclose()
|
||||
class Handle:
|
||||
def close(_):
|
||||
for name, out in ('stdout', stdout), ('stderr', stderr):
|
||||
if out:
|
||||
c = self.remote_exec("""
|
||||
import sys
|
||||
channel.gateway._ThreadOut(sys, %r).resetdefault()
|
||||
""" % name)
|
||||
c.waitclose()
|
||||
return Handle()
|
||||
|
||||
|
||||
def _stopsend(self):
|
||||
self._send(None)
|
||||
|
||||
|
|
|
@ -123,4 +123,4 @@ class HostRSync(py.execnet.RSync):
|
|||
path = os.path.basename(self._sourcedir) + "/" + modified_rel_path
|
||||
remotepath = gateway.spec.chdir
|
||||
py.builtin.print_('%s:%s <= %s' %
|
||||
(gateway.remoteaddress, remotepath, path))
|
||||
(gateway.spec, remotepath, path))
|
||||
|
|
|
@ -2,6 +2,10 @@
|
|||
Support for working with multiple channels and gateways
|
||||
"""
|
||||
import py
|
||||
try:
|
||||
import queue
|
||||
except ImportError:
|
||||
import Queue as queue
|
||||
|
||||
NO_ENDMARKER_WANTED = object()
|
||||
|
||||
|
@ -40,7 +44,7 @@ class MultiChannel:
|
|||
try:
|
||||
return self._queue
|
||||
except AttributeError:
|
||||
self._queue = py.std.Queue.Queue()
|
||||
self._queue = queue.Queue()
|
||||
for ch in self._channels:
|
||||
def putreceived(obj, channel=ch):
|
||||
self._queue.put((channel, obj))
|
||||
|
|
|
@ -91,8 +91,7 @@ class RSync(object):
|
|||
|
||||
def _report_send_file(self, gateway, modified_rel_path):
|
||||
if self._verbose:
|
||||
py.builtin.print_('%s <= %s' % (gateway.remoteaddress,
|
||||
modified_rel_path))
|
||||
print("%s <= %s" %(gateway, modified_rel_path))
|
||||
|
||||
def send(self, raises=True):
|
||||
""" Sends a sourcedir to all added targets. Flag indicates
|
||||
|
|
|
@ -2,7 +2,7 @@ from __future__ import generators
|
|||
import os, sys, time, signal
|
||||
import py
|
||||
from py.__.execnet.gateway_base import Message, Channel, ChannelFactory
|
||||
from py.__.execnet.gateway_base import ExecnetAPI
|
||||
from py.__.execnet.gateway_base import ExecnetAPI, queue, Popen2IO
|
||||
|
||||
from py.__.execnet.gateway import startup_modules, getsource
|
||||
pytest_plugins = "pytester"
|
||||
|
@ -19,6 +19,7 @@ class TestExecnetEvents:
|
|||
call = rec.popcall("pyexecnet_gateway_exit")
|
||||
assert call.gateway == gw
|
||||
|
||||
|
||||
def test_getsource_import_modules():
|
||||
for dottedname in startup_modules:
|
||||
yield getsource, dottedname
|
||||
|
@ -41,7 +42,7 @@ def test_stdouterrin_setnull():
|
|||
from py.__.execnet.gateway import stdouterrin_setnull
|
||||
stdouterrin_setnull()
|
||||
import os
|
||||
os.write(1, "hello")
|
||||
os.write(1, "hello".encode('ascii'))
|
||||
if os.name == "nt":
|
||||
os.write(2, "world")
|
||||
os.read(0, 1)
|
||||
|
@ -53,13 +54,14 @@ def test_stdouterrin_setnull():
|
|||
class TestMessage:
|
||||
def test_wire_protocol(self):
|
||||
for cls in Message._types.values():
|
||||
one = py.io.TextIO()
|
||||
cls(42, '23').writeto(one)
|
||||
two = py.io.TextIO(one.getvalue())
|
||||
one = py.io.BytesIO()
|
||||
data = '23'.encode('ascii')
|
||||
cls(42, data).writeto(one)
|
||||
two = py.io.BytesIO(one.getvalue())
|
||||
msg = Message.readfrom(two)
|
||||
assert isinstance(msg, cls)
|
||||
assert msg.channelid == 42
|
||||
assert msg.data == '23'
|
||||
assert msg.data == data
|
||||
assert isinstance(repr(msg), str)
|
||||
# == "<Message.%s channelid=42 '23'>" %(msg.__class__.__name__, )
|
||||
|
||||
|
@ -100,7 +102,7 @@ class BasicRemoteExecution:
|
|||
assert self.gw._receiverthread.isAlive()
|
||||
|
||||
def test_repr_doesnt_crash(self):
|
||||
assert isinstance(repr(self), str)
|
||||
assert isinstance(repr(self.gw), str)
|
||||
|
||||
def test_attribute__name__(self):
|
||||
channel = self.gw.remote_exec("channel.send(__name__)")
|
||||
|
@ -316,8 +318,7 @@ class BasicRemoteExecution:
|
|||
assert l[3] == 999
|
||||
|
||||
def test_channel_endmarker_callback_error(self):
|
||||
from Queue import Queue
|
||||
q = Queue()
|
||||
q = queue.Queue()
|
||||
channel = self.gw.remote_exec(source='''
|
||||
raise ValueError()
|
||||
''')
|
||||
|
@ -446,18 +447,6 @@ class BasicRemoteExecution:
|
|||
res = channel.receive()
|
||||
assert res == 42
|
||||
|
||||
def test_non_reverse_execution(self):
|
||||
gw = self.gw
|
||||
c1 = gw.remote_exec("""
|
||||
c = channel.gateway.remote_exec("pass")
|
||||
try:
|
||||
c.waitclose()
|
||||
except c.RemoteError, e:
|
||||
channel.send(str(e))
|
||||
""")
|
||||
text = c1.receive()
|
||||
assert text.find("execution disallowed") != -1
|
||||
|
||||
def test__rinfo(self):
|
||||
rinfo = self.gw._rinfo()
|
||||
assert rinfo.executable
|
||||
|
@ -486,9 +475,8 @@ class BasicCmdbasedRemoteExecution(BasicRemoteExecution):
|
|||
def test_channel_endmarker_remote_killterm():
|
||||
gw = py.execnet.PopenGateway()
|
||||
try:
|
||||
from Queue import Queue
|
||||
q = Queue()
|
||||
channel = gw.remote_exec(source='''
|
||||
q = queue.Queue()
|
||||
channel = gw.remote_exec('''
|
||||
import os
|
||||
os.kill(os.getpid(), 15)
|
||||
''')
|
||||
|
@ -581,8 +569,7 @@ def test_endmarker_delivery_on_remote_killterm():
|
|||
py.test.skip("no os.kill()")
|
||||
gw = py.execnet.PopenGateway()
|
||||
try:
|
||||
from Queue import Queue
|
||||
q = Queue()
|
||||
q = queue.Queue()
|
||||
channel = gw.remote_exec(source='''
|
||||
import os
|
||||
os.kill(os.getpid(), 15)
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
|
||||
import py
|
||||
from py.__.execnet import gateway_base
|
||||
|
||||
@py.test.mark.multi(ver=["2.4", "2.5", "2.6", "3.1"])
|
||||
def test_io_message(ver, tmpdir):
|
||||
executable = py.path.local.sysfind("python" + ver)
|
||||
if executable is None:
|
||||
py.test.skip("no python%s found" % (ver,))
|
||||
check = tmpdir.join("check.py")
|
||||
check.write(py.code.Source(gateway_base, """
|
||||
try:
|
||||
from io import BytesIO
|
||||
except ImportError:
|
||||
from StringIO import StringIO as BytesIO
|
||||
import tempfile
|
||||
temp_out = BytesIO()
|
||||
temp_in = BytesIO()
|
||||
io = Popen2IO(temp_out, temp_in)
|
||||
for i, msg_cls in Message._types.items():
|
||||
print ("checking %s %s" %(i, msg_cls))
|
||||
for data in "hello", "hello".encode('ascii'):
|
||||
msg1 = msg_cls(i, data)
|
||||
msg1.writeto(io)
|
||||
x = io.outfile.getvalue()
|
||||
io.outfile.truncate(0)
|
||||
io.outfile.seek(0)
|
||||
io.infile.seek(0)
|
||||
io.infile.write(x)
|
||||
io.infile.seek(0)
|
||||
msg2 = Message.readfrom(io)
|
||||
assert msg1.channelid == msg2.channelid, (msg1, msg2)
|
||||
assert msg1.data == msg2.data
|
||||
print ("all passed")
|
||||
"""))
|
||||
#out = py.process.cmdexec("%s %s" %(executable,check))
|
||||
out = executable.sysexec(check)
|
||||
print (out)
|
||||
assert "all passed" in out
|
|
@ -551,11 +551,13 @@ class LocalPath(FSBase):
|
|||
proc = Popen([str(self)] + list(argv), stdout=PIPE, stderr=PIPE)
|
||||
stdout, stderr = proc.communicate()
|
||||
ret = proc.wait()
|
||||
if ret != 0:
|
||||
raise py.process.cmdexec.Error(ret, ret, str(self),
|
||||
stdout, stderr,)
|
||||
if py.builtin._isbytes(stdout):
|
||||
stdout = py.builtin._totext(stdout, sys.getdefaultencoding())
|
||||
if ret != 0:
|
||||
if py.builtin._isbytes(stderr):
|
||||
stderr = py.builtin._totext(stderr, sys.getdefaultencoding())
|
||||
raise py.process.cmdexec.Error(ret, ret, str(self),
|
||||
stdout, stderr,)
|
||||
return stdout
|
||||
|
||||
def sysfind(cls, name, checker=None):
|
||||
|
|
Loading…
Reference in New Issue