From e30aeed876fb936e379a8b10eef2cbc8fca8f08f Mon Sep 17 00:00:00 2001 From: holger krekel Date: Wed, 2 Sep 2009 18:56:43 +0200 Subject: [PATCH] * more tests and fixes for cross-python compatibility * use byte-buffer files if available for io * shift receivelock to gateway object * kill dead code --HG-- branch : trunk --- py/execnet/gateway.py | 38 +++++---- py/execnet/gateway_base.py | 86 +++++++++----------- py/execnet/testing/test_gateway.py | 121 ++++++++++++++++++++++++----- py/execnet/testing/test_message.py | 39 ---------- 4 files changed, 159 insertions(+), 125 deletions(-) delete mode 100644 py/execnet/testing/test_message.py diff --git a/py/execnet/gateway.py b/py/execnet/gateway.py index eb834012d..dab324528 100644 --- a/py/execnet/gateway.py +++ b/py/execnet/gateway.py @@ -35,7 +35,7 @@ class GatewayCleanup: if debug: debug.writeslines(["="*20, "cleaning up", "=" * 20]) debug.flush() - for gw in self._activegateways.keys(): + for gw in list(self._activegateways): gw.exit() #gw.join() # should work as well @@ -70,6 +70,12 @@ class InitiatingGateway(BaseGateway): return "<%s%s %s/%s (%s active channels)>" %( self.__class__.__name__, addr, r, s, i) + def exit(self): + """ Try to stop all exec and IO activity. """ + self._cleanup.unregister(self) + self._stopexec() + self._stopsend() + self.hook.pyexecnet_gateway_exit(gateway=self) def _remote_bootstrap_gateway(self, io, extra=''): """ return Gateway with a asynchronously remotely @@ -93,16 +99,8 @@ class InitiatingGateway(BaseGateway): def _rinfo(self, update=False): """ return some sys/env information from remote. """ if update or not hasattr(self, '_cache_rinfo'): - self._cache_rinfo = RInfo(**self.remote_exec(""" - import sys, os - channel.send(dict( - executable = sys.executable, - version_info = sys.version_info, - platform = sys.platform, - cwd = os.getcwd(), - pid = os.getpid(), - )) - """).receive()) + ch = self.remote_exec(rinfo_source) + self._cache_rinfo = RInfo(**ch.receive()) return self._cache_rinfo def remote_exec(self, source, stdout=None, stderr=None): @@ -193,14 +191,24 @@ class RInfo: for item in self.__dict__.items()]) return "" % info +rinfo_source = """ +import sys, os +channel.send(dict( + executable = sys.executable, + version_info = tuple([sys.version_info[i] for i in range(5)]), + platform = sys.platform, + cwd = os.getcwd(), + pid = os.getpid(), +)) +""" + class PopenCmdGateway(InitiatingGateway): def __init__(self, cmd): # on win close_fds=True does not work, not sure it'd needed #p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, close_fds=True) self._popen = p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE) - infile, outfile = p.stdin, p.stdout self._cmd = cmd - io = Popen2IO(infile, outfile) + io = Popen2IO(p.stdin, p.stdout) super(PopenCmdGateway, self).__init__(io=io) def exit(self): @@ -217,8 +225,8 @@ class PopenGateway(PopenCmdGateway): """ if not python: python = sys.executable - cmd = '%s -u -c "exec input()"' % python - cmd = '%s -u -c "import sys ; exec(eval(sys.stdin.readline()))"' % python + cmd = ('%s -u -c "import sys ; ' + 'exec(eval(sys.stdin.readline()))"' % python) super(PopenGateway, self).__init__(cmd) def _remote_bootstrap_gateway(self, io, extra=''): diff --git a/py/execnet/gateway_base.py b/py/execnet/gateway_base.py index 20d71e0db..229f34006 100644 --- a/py/execnet/gateway_base.py +++ b/py/execnet/gateway_base.py @@ -27,18 +27,18 @@ if sys.version_info > (3, 0): exec("""def do_exec(co, loc): exec(co, loc)""") unicode = str - sysex = Exception else: exec("""def do_exec(co, loc): exec co in loc""") bytes = str - sysex = (KeyboardInterrupt, SystemExit) + def str(*args): raise EnvironmentError( "use unicode or bytes, not cross-python ambigous 'str'") default_encoding = "UTF-8" +sysex = (KeyboardInterrupt, SystemExit) debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid() , 'w') @@ -94,13 +94,15 @@ class SocketIO: class Popen2IO: server_stmt = """ import os, sys, tempfile +#io = Popen2IO(os.fdopen(1, 'wb'), os.fdopen(0, 'rb')) io = Popen2IO(sys.stdout, sys.stdin) -sys.stdout = sys.stderr = tempfile.TemporaryFile() +sys.stdout = tempfile.TemporaryFile() +sys.stdin = tempfile.TemporaryFile() """ error = (IOError, OSError, EOFError) - def __init__(self, infile, outfile): - self.outfile, self.infile = infile, outfile + def __init__(self, outfile, infile): + self.outfile, self.infile = outfile, infile if sys.platform == "win32": import msvcrt msvcrt.setmode(infile.fileno(), os.O_BINARY) @@ -110,17 +112,22 @@ sys.stdout = sys.stderr = tempfile.TemporaryFile() def read(self, numbytes): """Read exactly 'bytes' bytes from the pipe. """ - data = self.infile.read(numbytes) + infile = self.infile + if hasattr(infile, 'buffer'): + infile = infile.buffer + data = infile.read(numbytes) if len(data) < numbytes: raise EOFError - assert isinstance(data, bytes) return data def write(self, data): """write out all bytes to the pipe. """ assert isinstance(data, bytes) - self.outfile.write(data) - self.outfile.flush() + outfile = self.outfile + if hasattr(outfile, 'buffer'): + outfile = outfile.buffer + outfile.write(data) + outfile.flush() def close_read(self): if self.readable: @@ -169,7 +176,7 @@ class Message: senderid, stringlen) = struct.unpack(HDR_FORMAT, header) data = io.read(stringlen) if dataformat == 1: - pass + pass elif dataformat == 2: data = data.decode(default_encoding) data = eval(data, {}) # reversed argh @@ -179,9 +186,6 @@ class Message: return msg readfrom = classmethod(readfrom) - def post_sent(self, gateway, excinfo=None): - pass - def __repr__(self): r = repr(self.data) if len(r) > 50: @@ -193,8 +197,6 @@ class Message: def _setupmessages(): - # XXX use metaclass for registering - class CHANNEL_OPEN(Message): def received(self, gateway): channel = gateway._channelfactory.new(self.channelid) @@ -275,7 +277,7 @@ class Channel(object): # after having cleared the queue we register # the callback only if the channel is not closed already. _callbacks = self.gateway._channelfactory._callbacks - _receivelock = self.gateway._channelfactory._receivelock + _receivelock = self.gateway._receivelock _receivelock.acquire() try: if self._items is None: @@ -426,6 +428,7 @@ class Channel(object): return self.receive() except EOFError: raise StopIteration + __next__ = next ENDMARKER = object() @@ -436,7 +439,6 @@ class ChannelFactory(object): self._channels = weakref.WeakValueDictionary() self._callbacks = {} self._writelock = threading.Lock() - self._receivelock = threading.RLock() self.gateway = gateway self.count = startcount self.finished = False @@ -596,6 +598,7 @@ class BaseGateway(object): """ self._io = io self._channelfactory = ChannelFactory(self, _startcount) + self._receivelock = threading.RLock() def _initreceive(self, requestqueue=False): if requestqueue: @@ -605,18 +608,15 @@ class BaseGateway(object): self._receiverthread.setDaemon(1) self._receiverthread.start() - def _trace(self, *args): + def _trace(self, msg): if debug: try: - l = "\n".join(args).split(os.linesep) - id = getid(self) - for x in l: - debug.write(x+"\n") + debug.write(unicode(msg) + "\n") debug.flush() except sysex: raise except: - traceback.print_exc() + sys.stderr.write("exception during tracing\n") def _traceex(self, excinfo): try: @@ -629,12 +629,13 @@ class BaseGateway(object): def _thread_receiver(self): """ thread to read and handle Messages half-sync-half-async. """ + self._trace("starting to receive") try: while 1: try: msg = Message.readfrom(self._io) self._trace("received <- %r" % msg) - _receivelock = self._channelfactory._receivelock + _receivelock = self._receivelock _receivelock.acquire() try: msg.received(self) @@ -669,11 +670,16 @@ class BaseGateway(object): except: excinfo = self.exc_info() self._traceex(excinfo) - msg.post_sent(self, excinfo) else: - msg.post_sent(self) self._trace('sent -> %r' % msg) + def _stopsend(self): + self._send(None) + + def _stopexec(self): + if self._requestqueue is not None: + self._requestqueue.put(None) + def _local_redirect_thread_output(self, outid, errid): l = [] for name, id in ('stdout', outid), ('stderr', errid): @@ -719,14 +725,14 @@ class BaseGateway(object): try: loc = { 'channel' : channel, '__name__': '__channelexec__'} #open("task.py", 'w').write(source) - self._trace("execution starts:", repr(source)[:50]) + self._trace("execution starts: %s" % repr(source)[:50]) close = self._local_redirect_thread_output(outid, errid) try: co = compile(source+'\n', '', 'exec') do_exec(co, loc) finally: close() - self._trace("execution finished:", repr(source)[:50]) + self._trace("execution finished") except sysex: pass except self._StopExecLoop: @@ -734,10 +740,10 @@ class BaseGateway(object): raise except: excinfo = self.exc_info() + self._trace("got exception %s" % excinfo[1]) l = traceback.format_exception(*excinfo) errortext = "".join(l) channel.close(errortext) - self._trace(errortext) else: channel.close() @@ -760,25 +766,3 @@ class BaseGateway(object): self._trace("joining receiver thread") self._receiverthread.join() - def exit(self): - """ Try to stop all exec and IO activity. """ - self._cleanup.unregister(self) - self._stopexec() - self._stopsend() - self.hook.pyexecnet_gateway_exit(gateway=self) - - def _stopsend(self): - self._send(None) - - def _stopexec(self): - if self._requestqueue is not None: - self._requestqueue.put(None) - -def getid(gw, cache={}): - name = gw.__class__.__name__ - try: - return cache.setdefault(name, {})[id(gw)] - except KeyError: - cache[name][id(gw)] = x = "%s:%s.%d" %(os.getpid(), gw.__class__.__name__, len(cache[name])) - return x - diff --git a/py/execnet/testing/test_gateway.py b/py/execnet/testing/test_gateway.py index 25f87a532..9b84a1dd9 100644 --- a/py/execnet/testing/test_gateway.py +++ b/py/execnet/testing/test_gateway.py @@ -3,12 +3,88 @@ import os, sys, time, signal import py from py.__.execnet.gateway_base import Message, Channel, ChannelFactory from py.__.execnet.gateway_base import ExecnetAPI, queue, Popen2IO +from py.__.execnet import gateway_base, gateway from py.__.execnet.gateway import startup_modules, getsource pytest_plugins = "pytester" TESTTIMEOUT = 10.0 # seconds +def pytest_generate_tests(metafunc): + if 'pythonpath' in metafunc.funcargnames: + for name in 'python2.4', 'python2.5', 'python2.6', 'python3.1': + metafunc.addcall(id=name, param=name) + +def pytest_funcarg__pythonpath(request): + name = request.param + executable = py.path.local.sysfind(name) + if executable is None: + py.test.skip("no %s found" % (name,)) + return executable + +def test_io_message(pythonpath, tmpdir): + check = tmpdir.join("check.py") + check.write(py.code.Source(gateway_base, """ + try: + from io import BytesIO + except ImportError: + from StringIO import StringIO as BytesIO + import tempfile + temp_out = BytesIO() + temp_in = BytesIO() + io = Popen2IO(temp_out, temp_in) + for i, msg_cls in Message._types.items(): + print ("checking %s %s" %(i, msg_cls)) + for data in "hello", "hello".encode('ascii'): + msg1 = msg_cls(i, data) + msg1.writeto(io) + x = io.outfile.getvalue() + io.outfile.truncate(0) + io.outfile.seek(0) + io.infile.seek(0) + io.infile.write(x) + io.infile.seek(0) + msg2 = Message.readfrom(io) + assert msg1.channelid == msg2.channelid, (msg1, msg2) + assert msg1.data == msg2.data + print ("all passed") + """)) + #out = py.process.cmdexec("%s %s" %(executable,check)) + out = pythonpath.sysexec(check) + print (out) + assert "all passed" in out + +def test_popen_io(pythonpath, tmpdir): + check = tmpdir.join("check.py") + check.write(py.code.Source(gateway_base, """ + do_exec(Popen2IO.server_stmt, globals()) + io.write("hello".encode('ascii')) + s = io.read(1) + assert s == "x".encode('ascii') + """)) + from subprocess import Popen, PIPE + args = [str(pythonpath), str(check)] + proc = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) + proc.stdin.write("x".encode('ascii')) + stdout, stderr = proc.communicate() + print (stderr) + ret = proc.wait() + assert "hello".encode('ascii') in stdout + +def test_rinfo_source(pythonpath, tmpdir): + check = tmpdir.join("check.py") + check.write(py.code.Source(""" + class Channel: + def send(self, data): + assert eval(repr(data), {}) == data + channel = Channel() + """, gateway.rinfo_source, """ + print ('all passed') + """)) + out = pythonpath.sysexec(check) + print (out) + assert "all passed" in out + class TestExecnetEvents: def test_popengateway(self, _pytest): rec = _pytest.gethookrecorder(ExecnetAPI) @@ -112,7 +188,7 @@ class BasicRemoteExecution: def test_correct_setup_no_py(self): channel = self.gw.remote_exec(""" import sys - channel.send(sys.modules.keys()) + channel.send(list(sys.modules)) """) remotemodules = channel.receive() assert 'py' not in remotemodules, ( @@ -201,7 +277,7 @@ class BasicRemoteExecution: channel.send(x) """) l = list(channel) - assert l == range(3) + assert l == [0, 1, 2] def test_channel_passing_over_channel(self): channel = self.gw.remote_exec(''' @@ -272,7 +348,11 @@ class BasicRemoteExecution: # with 'earlyfree==True', this tests the "sendonly" channel state. l = [] channel = self.gw.remote_exec(source=''' - import thread, time + try: + import thread + except ImportError: + import _thread as thread + import time def producer(subchannel): for i in range(5): time.sleep(0.15) @@ -472,23 +552,6 @@ class BasicCmdbasedRemoteExecution(BasicRemoteExecution): def test_cmdattr(self): assert hasattr(self.gw, '_cmd') -def test_channel_endmarker_remote_killterm(): - gw = py.execnet.PopenGateway() - try: - q = queue.Queue() - channel = gw.remote_exec(''' - import os - os.kill(os.getpid(), 15) - ''') - channel.setcallback(q.put, endmarker=999) - val = q.get(TESTTIMEOUT) - assert val == 999 - err = channel._getremoteerror() - finally: - gw.exit() - py.test.skip("provide information on causes/signals " - "of dying remote gateways") - #class TestBlockingIssues: # def test_join_blocked_execution_gateway(self): # gateway = py.execnet.PopenGateway() @@ -656,3 +719,21 @@ def test_threads_twice(): def test_nodebug(): from py.__.execnet import gateway_base assert not gateway_base.debug + +def test_channel_endmarker_remote_killterm(): + gw = py.execnet.PopenGateway() + try: + q = queue.Queue() + channel = gw.remote_exec(''' + import os + os.kill(os.getpid(), 15) + ''') + channel.setcallback(q.put, endmarker=999) + val = q.get(TESTTIMEOUT) + assert val == 999 + err = channel._getremoteerror() + finally: + gw.exit() + py.test.skip("provide information on causes/signals " + "of dying remote gateways") + diff --git a/py/execnet/testing/test_message.py b/py/execnet/testing/test_message.py deleted file mode 100644 index 6b07a1f69..000000000 --- a/py/execnet/testing/test_message.py +++ /dev/null @@ -1,39 +0,0 @@ - -import py -from py.__.execnet import gateway_base - -@py.test.mark.multi(ver=["2.4", "2.5", "2.6", "3.1"]) -def test_io_message(ver, tmpdir): - executable = py.path.local.sysfind("python" + ver) - if executable is None: - py.test.skip("no python%s found" % (ver,)) - check = tmpdir.join("check.py") - check.write(py.code.Source(gateway_base, """ - try: - from io import BytesIO - except ImportError: - from StringIO import StringIO as BytesIO - import tempfile - temp_out = BytesIO() - temp_in = BytesIO() - io = Popen2IO(temp_out, temp_in) - for i, msg_cls in Message._types.items(): - print ("checking %s %s" %(i, msg_cls)) - for data in "hello", "hello".encode('ascii'): - msg1 = msg_cls(i, data) - msg1.writeto(io) - x = io.outfile.getvalue() - io.outfile.truncate(0) - io.outfile.seek(0) - io.infile.seek(0) - io.infile.write(x) - io.infile.seek(0) - msg2 = Message.readfrom(io) - assert msg1.channelid == msg2.channelid, (msg1, msg2) - assert msg1.data == msg2.data - print ("all passed") - """)) - #out = py.process.cmdexec("%s %s" %(executable,check)) - out = executable.sysexec(check) - print (out) - assert "all passed" in out