diff --git a/py/execnet/gateway.py b/py/execnet/gateway.py index d48037c4b..eb834012d 100644 --- a/py/execnet/gateway.py +++ b/py/execnet/gateway.py @@ -1,7 +1,8 @@ -import sys, os, inspect, socket +import sys, os, inspect, socket, atexit, weakref import py from subprocess import Popen, PIPE -from py.__.execnet.gateway_base import BaseGateway, Popen2IO, SocketIO +from py.__.execnet.gateway_base import BaseGateway, Message, Popen2IO, SocketIO +from py.__.execnet.gateway_base import ExecnetAPI # the list of modules that must be send to the other side # for bootstrapping gateways @@ -12,18 +13,63 @@ startup_modules = [ 'py.__.execnet.gateway_base', ] +debug = False + def getsource(dottedname): mod = __import__(dottedname, None, None, ['__doc__']) return inspect.getsource(mod) + +class GatewayCleanup: + def __init__(self): + self._activegateways = weakref.WeakKeyDictionary() + atexit.register(self.cleanup_atexit) + + def register(self, gateway): + assert gateway not in self._activegateways + self._activegateways[gateway] = True + + def unregister(self, gateway): + del self._activegateways[gateway] + + def cleanup_atexit(self): + if debug: + debug.writeslines(["="*20, "cleaning up", "=" * 20]) + debug.flush() + for gw in self._activegateways.keys(): + gw.exit() + #gw.join() # should work as well + class InitiatingGateway(BaseGateway): """ initialize gateways on both sides of a inputoutput object. """ + _cleanup = GatewayCleanup() + def __init__(self, io): self._remote_bootstrap_gateway(io) super(InitiatingGateway, self).__init__(io=io, _startcount=1) # XXX we dissallow execution form the other side self._initreceive(requestqueue=False) + self.hook = py._com.HookRelay(ExecnetAPI, py._com.comregistry) self.hook.pyexecnet_gateway_init(gateway=self) + self._cleanup.register(self) + + def __repr__(self): + """ return string representing gateway type and status. """ + if hasattr(self, 'remoteaddress'): + addr = '[%s]' % (self.remoteaddress,) + else: + addr = '' + try: + r = (self._receiverthread.isAlive() and "receiving" or + "not receiving") + s = "sending" # XXX + i = len(self._channelfactory.channels()) + except AttributeError: + r = s = "uninitialized" + i = "no" + return "<%s%s %s/%s (%s active channels)>" %( + self.__class__.__name__, addr, r, s, i) + def _remote_bootstrap_gateway(self, io, extra=''): """ return Gateway with a asynchronously remotely @@ -41,7 +87,111 @@ class InitiatingGateway(BaseGateway): source = "\n".join(bootstrap) self._trace("sending gateway bootstrap code") #open("/tmp/bootstrap.py", 'w').write(source) - io.write('%r\n' % source) + repr_source = repr(source) + "\n" + io.write(repr_source.encode('ascii')) + + def _rinfo(self, update=False): + """ return some sys/env information from remote. """ + if update or not hasattr(self, '_cache_rinfo'): + self._cache_rinfo = RInfo(**self.remote_exec(""" + import sys, os + channel.send(dict( + executable = sys.executable, + version_info = sys.version_info, + platform = sys.platform, + cwd = os.getcwd(), + pid = os.getpid(), + )) + """).receive()) + return self._cache_rinfo + + def remote_exec(self, source, stdout=None, stderr=None): + """ return channel object and connect it to a remote + execution thread where the given 'source' executes + and has the sister 'channel' object in its global + namespace. The callback functions 'stdout' and + 'stderr' get called on receival of remote + stdout/stderr output strings. + """ + source = str(py.code.Source(source)) + channel = self.newchannel() + outid = self._newredirectchannelid(stdout) + errid = self._newredirectchannelid(stderr) + self._send(Message.CHANNEL_OPEN( + channel.id, (source, outid, errid))) + return channel + + def remote_init_threads(self, num=None): + """ start up to 'num' threads for subsequent + remote_exec() invocations to allow concurrent + execution. + """ + if hasattr(self, '_remotechannelthread'): + raise IOError("remote threads already running") + from py.__.thread import pool + source = py.code.Source(pool, """ + execpool = WorkerPool(maxthreads=%r) + gw = channel.gateway + while 1: + task = gw._requestqueue.get() + if task is None: + gw._stopsend() + execpool.shutdown() + execpool.join() + raise gw._StopExecLoop + execpool.dispatch(gw._executetask, task) + """ % num) + self._remotechannelthread = self.remote_exec(source) + + def _newredirectchannelid(self, callback): + if callback is None: + return + if hasattr(callback, 'write'): + callback = callback.write + assert callable(callback) + chan = self.newchannel() + chan.setcallback(callback) + return chan.id + + def _remote_redirect(self, stdout=None, stderr=None): + """ return a handle representing a redirection of a remote + end's stdout to a local file object. with handle.close() + the redirection will be reverted. + """ + clist = [] + for name, out in ('stdout', stdout), ('stderr', stderr): + if out: + outchannel = self.newchannel() + outchannel.setcallback(getattr(out, 'write', out)) + channel = self.remote_exec(""" + import sys + outchannel = channel.receive() + outchannel.gateway._ThreadOut(sys, %r).setdefaultwriter(outchannel.send) + """ % name) + channel.send(outchannel) + clist.append(channel) + for c in clist: + c.waitclose() + class Handle: + def close(_): + for name, out in ('stdout', stdout), ('stderr', stderr): + if out: + c = self.remote_exec(""" + import sys + channel.gateway._ThreadOut(sys, %r).resetdefault() + """ % name) + c.waitclose() + return Handle() + + + +class RInfo: + def __init__(self, **kwargs): + self.__dict__.update(kwargs) + def __repr__(self): + info = ", ".join(["%s=%s" % item + for item in self.__dict__.items()]) + return "" % info class PopenCmdGateway(InitiatingGateway): def __init__(self, cmd): @@ -68,6 +218,7 @@ class PopenGateway(PopenCmdGateway): if not python: python = sys.executable cmd = '%s -u -c "exec input()"' % python + cmd = '%s -u -c "import sys ; exec(eval(sys.stdin.readline()))"' % python super(PopenGateway, self).__init__(cmd) def _remote_bootstrap_gateway(self, io, extra=''): @@ -113,7 +264,7 @@ class SocketGateway(InitiatingGateway): host, port = hostport mydir = py.path.local(__file__).dirpath() socketserverbootstrap = py.code.Source( - mydir.join('script', 'socketserver.py').read('rU'), """ + mydir.join('script', 'socketserver.py').read('r'), """ import socket sock = bind_and_listen((%r, %r)) port = sock.getsockname() @@ -187,18 +338,18 @@ def stdouterrin_setnull(): else: devnull = '/dev/null' # stdin - sys.stdin = os.fdopen(os.dup(0), 'rb', 0) + sys.stdin = os.fdopen(os.dup(0), 'r', 1) fd = os.open(devnull, os.O_RDONLY) os.dup2(fd, 0) os.close(fd) # stdout - sys.stdout = os.fdopen(os.dup(1), 'wb', 0) + sys.stdout = os.fdopen(os.dup(1), 'w', 1) fd = os.open(devnull, os.O_WRONLY) os.dup2(fd, 1) # stderr for win32 if os.name == 'nt': - sys.stderr = os.fdopen(os.dup(2), 'wb', 0) + sys.stderr = os.fdopen(os.dup(2), 'w', 1) os.dup2(fd, 2) os.close(fd) diff --git a/py/execnet/gateway_base.py b/py/execnet/gateway_base.py index c31ff2812..0a1d1a14e 100644 --- a/py/execnet/gateway_base.py +++ b/py/execnet/gateway_base.py @@ -11,7 +11,7 @@ base gateway code # few tests that try to catch when we mess up. """ -import sys, os, weakref, atexit +import sys, os, weakref import threading, traceback, socket, struct try: import queue @@ -21,20 +21,27 @@ except ImportError: # XXX the following lines should not be here if 'ThreadOut' not in globals(): import py - from py.code import Source ThreadOut = py._thread.ThreadOut if sys.version_info > (3, 0): exec("""def do_exec(co, loc): exec(co, loc)""") + unicode = str + sysex = Exception else: exec("""def do_exec(co, loc): exec co in loc""") + bytes = str + sysex = (KeyboardInterrupt, SystemExit) -import os -debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid() , 'wa') +def str(*args): + raise EnvironmentError( + "use unicode or bytes, not cross-python ambigous 'str'") + +default_encoding = "UTF-8" + +debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid() , 'w') -sysex = (KeyboardInterrupt, SystemExit) # ___________________________________________________________________________ # @@ -52,7 +59,7 @@ class SocketIO: sock.setsockopt(socket.SOL_IP, socket.IP_TOS, 0x10)# IPTOS_LOWDELAY except socket.error: e = sys.exc_info()[1] - py.builtin.print_("WARNING: Cannot set socket option:", str(e)) + sys.stderr.write("WARNING: cannot set socketoption") self.readable = self.writeable = True def read(self, numbytes): @@ -66,6 +73,7 @@ class SocketIO: return buf def write(self, data): + assert isinstance(data, bytes) self.sock.sendall(data) def close_read(self): @@ -85,9 +93,9 @@ class SocketIO: class Popen2IO: server_stmt = """ -import os, sys, StringIO +import os, sys, tempfile io = Popen2IO(sys.stdout, sys.stdin) -sys.stdout = sys.stderr = StringIO.StringIO() +sys.stdout = sys.stderr = tempfile.TemporaryFile() """ error = (IOError, OSError, EOFError) @@ -102,13 +110,15 @@ sys.stdout = sys.stderr = StringIO.StringIO() def read(self, numbytes): """Read exactly 'bytes' bytes from the pipe. """ - s = self.infile.read(numbytes) - if len(s) < numbytes: + data = self.infile.read(numbytes) + if len(data) < numbytes: raise EOFError - return s + assert isinstance(data, bytes) + return data def write(self, data): """write out all bytes to the pipe. """ + assert isinstance(data, bytes) self.outfile.write(data) self.outfile.flush() @@ -143,10 +153,11 @@ class Message: # version :-((( There is no sane solution, short of a custom # pure Python marshaller data = self.data - if isinstance(data, str): + if isinstance(data, bytes): dataformat = 1 else: data = repr(self.data) # argh + data = data.encode(default_encoding) dataformat = 2 header = struct.pack(HDR_FORMAT, self.msgtype, dataformat, self.channelid, len(data)) @@ -160,6 +171,7 @@ class Message: if dataformat == 1: pass elif dataformat == 2: + data = data.decode(default_encoding) data = eval(data, {}) # reversed argh else: raise ValueError("bad data format") @@ -205,7 +217,8 @@ def _setupmessages(): class CHANNEL_CLOSE_ERROR(Message): def received(self, gateway): - remote_error = gateway._channelfactory.RemoteError(self.data) + data = self.data.decode(default_encoding) + remote_error = gateway._channelfactory.RemoteError(data) gateway._channelfactory._local_close(self.channelid, remote_error) class CHANNEL_LAST_MESSAGE(Message): @@ -237,7 +250,8 @@ class RemoteError(EOFError): def warn(self): # XXX do this better - print >> sys.stderr, "Warning: unhandled %r" % (self,) + sys.stderr.write("Warning: unhandled %r\n" % (self,)) + NO_ENDMARKER_WANTED = object() @@ -341,7 +355,8 @@ class Channel(object): # but it's never damaging to send too many CHANNEL_CLOSE messages put = self.gateway._send if error is not None: - put(Message.CHANNEL_CLOSE_ERROR(self.id, str(error))) + put(Message.CHANNEL_CLOSE_ERROR(self.id, + error.encode(default_encoding))) else: put(Message.CHANNEL_CLOSE(self.id)) if isinstance(error, RemoteError): @@ -437,7 +452,7 @@ class ChannelFactory(object): self._writelock.release() def channels(self): - return self._channels.values() + return list(self._channels.values()) # # internal methods, called from the receiver thread @@ -512,9 +527,9 @@ class ChannelFactory(object): self.finished = True finally: self._writelock.release() - for id in self._channels.keys(): + for id in list(self._channels): self._local_last_message(id) - for id in self._callbacks.keys(): + for id in list(self._callbacks): self._close_callback(id) class ChannelFile(object): @@ -567,30 +582,6 @@ class ChannelFileRead(ChannelFile): -# ---------------------------------------------------------- -# cleanup machinery (for exiting processes) -# ---------------------------------------------------------- - -class GatewayCleanup: - def __init__(self): - self._activegateways = weakref.WeakKeyDictionary() - atexit.register(self.cleanup_atexit) - - def register(self, gateway): - assert gateway not in self._activegateways - self._activegateways[gateway] = True - - def unregister(self, gateway): - del self._activegateways[gateway] - - def cleanup_atexit(self): - if debug: - print >>debug, "="*20 + "cleaning up" + "=" * 20 - debug.flush() - for gw in self._activegateways.keys(): - gw.exit() - #gw.join() # should work as well - class ExecnetAPI: def pyexecnet_gateway_init(self, gateway): """ signal initialisation of new gateway. """ @@ -605,28 +596,20 @@ class ExecnetAPI: def pyexecnet_gwmanage_rsyncfinish(self, source, gateways): """ called after rsyncing a directory to remote gateways takes place. """ - - class BaseGateway(object): + hook = ExecnetAPI() + exc_info = sys.exc_info class _StopExecLoop(Exception): pass _ThreadOut = ThreadOut - remoteaddress = "" _requestqueue = None - _cleanup = GatewayCleanup() def __init__(self, io, _startcount=2): """ initialize core gateway, using the given inputoutput object. """ self._io = io self._channelfactory = ChannelFactory(self, _startcount) - self._cleanup.register(self) - if _startcount == 1: # only import 'py' on the "client" side - import py - self.hook = py._com.HookRelay(ExecnetAPI, py._com.comregistry) - else: - self.hook = ExecnetAPI() def _initreceive(self, requestqueue=False): if requestqueue: @@ -636,31 +619,13 @@ class BaseGateway(object): self._receiverthread.setDaemon(1) self._receiverthread.start() - def __repr__(self): - """ return string representing gateway type and status. """ - addr = self.remoteaddress - if addr: - addr = '[%s]' % (addr,) - else: - addr = '' - try: - r = (self._receiverthread.isAlive() and "receiving" or - "not receiving") - s = "sending" # XXX - i = len(self._channelfactory.channels()) - except AttributeError: - r = s = "uninitialized" - i = "no" - return "<%s%s %s/%s (%s active channels)>" %( - self.__class__.__name__, addr, r, s, i) - def _trace(self, *args): if debug: try: l = "\n".join(args).split(os.linesep) id = getid(self) for x in l: - print >>debug, x + debug.write(x+"\n") debug.flush() except sysex: raise @@ -679,7 +644,6 @@ class BaseGateway(object): def _thread_receiver(self): """ thread to read and handle Messages half-sync-half-async. """ try: - from sys import exc_info while 1: try: msg = Message.readfrom(self._io) @@ -690,7 +654,7 @@ class BaseGateway(object): except EOFError: break except: - self._traceex(exc_info()) + self._traceex(self.exc_info()) break finally: # XXX we need to signal fatal error states to @@ -705,7 +669,6 @@ class BaseGateway(object): if threading: # might be None during shutdown/finalization self._trace('leaving %r' % threading.currentThread()) - from sys import exc_info def _send(self, msg): if msg is None: self._io.close_write() @@ -743,7 +706,6 @@ class BaseGateway(object): channel.close("execution disallowed") def _servemain(self, joining=True): - from sys import exc_info self._initreceive(requestqueue=True) try: while 1: @@ -762,7 +724,6 @@ class BaseGateway(object): def _executetask(self, item): """ execute channel/source items. """ - from sys import exc_info channel, (source, outid, errid) = item try: loc = { 'channel' : channel, '__name__': '__channelexec__'} @@ -781,7 +742,7 @@ class BaseGateway(object): channel.close() raise except: - excinfo = exc_info() + excinfo = self.exc_info() l = traceback.format_exception(*excinfo) errortext = "".join(l) channel.close(errortext) @@ -789,87 +750,11 @@ class BaseGateway(object): else: channel.close() - def _newredirectchannelid(self, callback): - if callback is None: - return - if hasattr(callback, 'write'): - callback = callback.write - assert callable(callback) - chan = self.newchannel() - chan.setcallback(callback) - return chan.id - - def _rinfo(self, update=False): - """ return some sys/env information from remote. """ - if update or not hasattr(self, '_cache_rinfo'): - class RInfo: - def __init__(self, **kwargs): - self.__dict__.update(kwargs) - def __repr__(self): - info = ", ".join(["%s=%s" % item - for item in self.__dict__.items()]) - return "" % info - self._cache_rinfo = RInfo(**self.remote_exec(""" - import sys, os - channel.send(dict( - executable = sys.executable, - version_info = sys.version_info, - platform = sys.platform, - cwd = os.getcwd(), - pid = os.getpid(), - )) - """).receive()) - return self._cache_rinfo - # _____________________________________________________________________ # # High Level Interface # _____________________________________________________________________ # - def remote_exec(self, source, stdout=None, stderr=None): - """ return channel object and connect it to a remote - execution thread where the given 'source' executes - and has the sister 'channel' object in its global - namespace. The callback functions 'stdout' and - 'stderr' get called on receival of remote - stdout/stderr output strings. - """ - try: - source = str(Source(source)) - except NameError: - try: - import py - source = str(py.code.Source(source)) - except ImportError: - pass - channel = self.newchannel() - outid = self._newredirectchannelid(stdout) - errid = self._newredirectchannelid(stderr) - self._send(Message.CHANNEL_OPEN( - channel.id, (source, outid, errid))) - return channel - - def remote_init_threads(self, num=None): - """ start up to 'num' threads for subsequent - remote_exec() invocations to allow concurrent - execution. - """ - if hasattr(self, '_remotechannelthread'): - raise IOError("remote threads already running") - from py.__.thread import pool - source = py.code.Source(pool, """ - execpool = WorkerPool(maxthreads=%r) - gw = channel.gateway - while 1: - task = gw._requestqueue.get() - if task is None: - gw._stopsend() - execpool.shutdown() - execpool.join() - raise gw._StopExecLoop - execpool.dispatch(gw._executetask, task) - """ % num) - self._remotechannelthread = self.remote_exec(source) def newchannel(self): """ return new channel object. """ @@ -891,37 +776,6 @@ class BaseGateway(object): self._stopsend() self.hook.pyexecnet_gateway_exit(gateway=self) - def _remote_redirect(self, stdout=None, stderr=None): - """ return a handle representing a redirection of a remote - end's stdout to a local file object. with handle.close() - the redirection will be reverted. - """ - clist = [] - for name, out in ('stdout', stdout), ('stderr', stderr): - if out: - outchannel = self.newchannel() - outchannel.setcallback(getattr(out, 'write', out)) - channel = self.remote_exec(""" - import sys - outchannel = channel.receive() - outchannel.gateway._ThreadOut(sys, %r).setdefaultwriter(outchannel.send) - """ % name) - channel.send(outchannel) - clist.append(channel) - for c in clist: - c.waitclose() - class Handle: - def close(_): - for name, out in ('stdout', stdout), ('stderr', stderr): - if out: - c = self.remote_exec(""" - import sys - channel.gateway._ThreadOut(sys, %r).resetdefault() - """ % name) - c.waitclose() - return Handle() - - def _stopsend(self): self._send(None) diff --git a/py/execnet/gwmanage.py b/py/execnet/gwmanage.py index 58c0f15a8..4d1bbe1d9 100644 --- a/py/execnet/gwmanage.py +++ b/py/execnet/gwmanage.py @@ -123,4 +123,4 @@ class HostRSync(py.execnet.RSync): path = os.path.basename(self._sourcedir) + "/" + modified_rel_path remotepath = gateway.spec.chdir py.builtin.print_('%s:%s <= %s' % - (gateway.remoteaddress, remotepath, path)) + (gateway.spec, remotepath, path)) diff --git a/py/execnet/multi.py b/py/execnet/multi.py index e82da6b05..e1a9857cb 100644 --- a/py/execnet/multi.py +++ b/py/execnet/multi.py @@ -2,6 +2,10 @@ Support for working with multiple channels and gateways """ import py +try: + import queue +except ImportError: + import Queue as queue NO_ENDMARKER_WANTED = object() @@ -40,7 +44,7 @@ class MultiChannel: try: return self._queue except AttributeError: - self._queue = py.std.Queue.Queue() + self._queue = queue.Queue() for ch in self._channels: def putreceived(obj, channel=ch): self._queue.put((channel, obj)) diff --git a/py/execnet/rsync.py b/py/execnet/rsync.py index 07b58df36..33be68272 100644 --- a/py/execnet/rsync.py +++ b/py/execnet/rsync.py @@ -91,8 +91,7 @@ class RSync(object): def _report_send_file(self, gateway, modified_rel_path): if self._verbose: - py.builtin.print_('%s <= %s' % (gateway.remoteaddress, - modified_rel_path)) + print("%s <= %s" %(gateway, modified_rel_path)) def send(self, raises=True): """ Sends a sourcedir to all added targets. Flag indicates diff --git a/py/execnet/testing/test_gateway.py b/py/execnet/testing/test_gateway.py index 53a5423c4..25f87a532 100644 --- a/py/execnet/testing/test_gateway.py +++ b/py/execnet/testing/test_gateway.py @@ -2,7 +2,7 @@ from __future__ import generators import os, sys, time, signal import py from py.__.execnet.gateway_base import Message, Channel, ChannelFactory -from py.__.execnet.gateway_base import ExecnetAPI +from py.__.execnet.gateway_base import ExecnetAPI, queue, Popen2IO from py.__.execnet.gateway import startup_modules, getsource pytest_plugins = "pytester" @@ -19,6 +19,7 @@ class TestExecnetEvents: call = rec.popcall("pyexecnet_gateway_exit") assert call.gateway == gw + def test_getsource_import_modules(): for dottedname in startup_modules: yield getsource, dottedname @@ -41,7 +42,7 @@ def test_stdouterrin_setnull(): from py.__.execnet.gateway import stdouterrin_setnull stdouterrin_setnull() import os - os.write(1, "hello") + os.write(1, "hello".encode('ascii')) if os.name == "nt": os.write(2, "world") os.read(0, 1) @@ -53,13 +54,14 @@ def test_stdouterrin_setnull(): class TestMessage: def test_wire_protocol(self): for cls in Message._types.values(): - one = py.io.TextIO() - cls(42, '23').writeto(one) - two = py.io.TextIO(one.getvalue()) + one = py.io.BytesIO() + data = '23'.encode('ascii') + cls(42, data).writeto(one) + two = py.io.BytesIO(one.getvalue()) msg = Message.readfrom(two) assert isinstance(msg, cls) assert msg.channelid == 42 - assert msg.data == '23' + assert msg.data == data assert isinstance(repr(msg), str) # == "" %(msg.__class__.__name__, ) @@ -100,7 +102,7 @@ class BasicRemoteExecution: assert self.gw._receiverthread.isAlive() def test_repr_doesnt_crash(self): - assert isinstance(repr(self), str) + assert isinstance(repr(self.gw), str) def test_attribute__name__(self): channel = self.gw.remote_exec("channel.send(__name__)") @@ -316,8 +318,7 @@ class BasicRemoteExecution: assert l[3] == 999 def test_channel_endmarker_callback_error(self): - from Queue import Queue - q = Queue() + q = queue.Queue() channel = self.gw.remote_exec(source=''' raise ValueError() ''') @@ -446,18 +447,6 @@ class BasicRemoteExecution: res = channel.receive() assert res == 42 - def test_non_reverse_execution(self): - gw = self.gw - c1 = gw.remote_exec(""" - c = channel.gateway.remote_exec("pass") - try: - c.waitclose() - except c.RemoteError, e: - channel.send(str(e)) - """) - text = c1.receive() - assert text.find("execution disallowed") != -1 - def test__rinfo(self): rinfo = self.gw._rinfo() assert rinfo.executable @@ -486,9 +475,8 @@ class BasicCmdbasedRemoteExecution(BasicRemoteExecution): def test_channel_endmarker_remote_killterm(): gw = py.execnet.PopenGateway() try: - from Queue import Queue - q = Queue() - channel = gw.remote_exec(source=''' + q = queue.Queue() + channel = gw.remote_exec(''' import os os.kill(os.getpid(), 15) ''') @@ -581,8 +569,7 @@ def test_endmarker_delivery_on_remote_killterm(): py.test.skip("no os.kill()") gw = py.execnet.PopenGateway() try: - from Queue import Queue - q = Queue() + q = queue.Queue() channel = gw.remote_exec(source=''' import os os.kill(os.getpid(), 15) diff --git a/py/execnet/testing/test_message.py b/py/execnet/testing/test_message.py new file mode 100644 index 000000000..6b07a1f69 --- /dev/null +++ b/py/execnet/testing/test_message.py @@ -0,0 +1,39 @@ + +import py +from py.__.execnet import gateway_base + +@py.test.mark.multi(ver=["2.4", "2.5", "2.6", "3.1"]) +def test_io_message(ver, tmpdir): + executable = py.path.local.sysfind("python" + ver) + if executable is None: + py.test.skip("no python%s found" % (ver,)) + check = tmpdir.join("check.py") + check.write(py.code.Source(gateway_base, """ + try: + from io import BytesIO + except ImportError: + from StringIO import StringIO as BytesIO + import tempfile + temp_out = BytesIO() + temp_in = BytesIO() + io = Popen2IO(temp_out, temp_in) + for i, msg_cls in Message._types.items(): + print ("checking %s %s" %(i, msg_cls)) + for data in "hello", "hello".encode('ascii'): + msg1 = msg_cls(i, data) + msg1.writeto(io) + x = io.outfile.getvalue() + io.outfile.truncate(0) + io.outfile.seek(0) + io.infile.seek(0) + io.infile.write(x) + io.infile.seek(0) + msg2 = Message.readfrom(io) + assert msg1.channelid == msg2.channelid, (msg1, msg2) + assert msg1.data == msg2.data + print ("all passed") + """)) + #out = py.process.cmdexec("%s %s" %(executable,check)) + out = executable.sysexec(check) + print (out) + assert "all passed" in out diff --git a/py/path/local.py b/py/path/local.py index d72652e32..10def4b9f 100644 --- a/py/path/local.py +++ b/py/path/local.py @@ -551,11 +551,13 @@ class LocalPath(FSBase): proc = Popen([str(self)] + list(argv), stdout=PIPE, stderr=PIPE) stdout, stderr = proc.communicate() ret = proc.wait() - if ret != 0: - raise py.process.cmdexec.Error(ret, ret, str(self), - stdout, stderr,) if py.builtin._isbytes(stdout): stdout = py.builtin._totext(stdout, sys.getdefaultencoding()) + if ret != 0: + if py.builtin._isbytes(stderr): + stderr = py.builtin._totext(stderr, sys.getdefaultencoding()) + raise py.process.cmdexec.Error(ret, ret, str(self), + stdout, stderr,) return stdout def sysfind(cls, name, checker=None):