diff --git a/py/execnet/channel.py b/py/execnet/channel.py index 04e4b1224..088fa97e2 100644 --- a/py/execnet/channel.py +++ b/py/execnet/channel.py @@ -103,13 +103,16 @@ class Channel(object): return self._closed def makefile(self, mode='w', proxyclose=False): - """ return a file-like object. Only supported mode right - now is 'w' for binary writes. If you want to have - a subsequent file.close() mean to close the channel - as well, then pass proxyclose=True. + """ return a file-like object. + mode: 'w' for binary writes, 'r' for binary reads + proxyclose: set to true if you want to have a + subsequent file.close() automatically close the channel. """ - assert mode == 'w', "mode %r not availabe" %(mode,) - return ChannelFile(channel=self, proxyclose=proxyclose) + if mode == "w": + return ChannelFileWrite(channel=self, proxyclose=proxyclose) + elif mode == "r": + return ChannelFileRead(channel=self, proxyclose=proxyclose) + raise ValueError("mode %r not availabe" %(mode,)) def close(self, error=None): """ close down this channel on both sides. """ @@ -299,18 +302,11 @@ class ChannelFactory(object): for id in self._callbacks.keys(): self._close_callback(id) - -class ChannelFile: +class ChannelFile(object): def __init__(self, channel, proxyclose=True): self.channel = channel self._proxyclose = proxyclose - def write(self, out): - self.channel.send(out) - - def flush(self): - pass - def close(self): if self._proxyclose: self.channel.close() @@ -319,3 +315,38 @@ class ChannelFile: state = self.channel.isclosed() and 'closed' or 'open' return '' %(self.channel.id, state) +class ChannelFileWrite(ChannelFile): + def write(self, out): + self.channel.send(out) + + def flush(self): + pass + +class ChannelFileRead(ChannelFile): + def __init__(self, channel, proxyclose=True): + super(ChannelFileRead, self).__init__(channel, proxyclose) + self._buffer = "" + + def read(self, n): + while len(self._buffer) < n: + try: + self._buffer += self.channel.receive() + except EOFError: + self.close() + break + ret = self._buffer[:n] + self._buffer = self._buffer[n:] + return ret + + def readline(self): + i = self._buffer.find("\n") + if i != -1: + return self.read(i+1) + line = self.read(len(self._buffer)+1) + while line and line[-1] != "\n": + c = self.read(1) + if not c: + break + line += c + return line + diff --git a/py/execnet/testing/test_gateway.py b/py/execnet/testing/test_gateway.py index bec53771e..d5948b596 100644 --- a/py/execnet/testing/test_gateway.py +++ b/py/execnet/testing/test_gateway.py @@ -74,6 +74,11 @@ class TestPureChannel: channel = self.fac.new() py.test.raises(IOError, channel.waitclose, timeout=0.01) + def test_channel_makefile_incompatmode(self): + channel = self.fac.new() + py.test.raises(ValueError, 'channel.makefile("rw")') + + class PopenGatewayTestSetup: def setup_class(cls): cls.gw = py.execnet.PopenGateway() @@ -291,6 +296,19 @@ class BasicRemoteExecution: assert isinstance(l[2], channel.__class__) assert l[3] == 999 + def test_channel_endmarker_callback_error(self): + from Queue import Queue + q = Queue() + channel = self.gw.remote_exec(source=''' + raise ValueError() + ''') + channel.setcallback(q.put, endmarker=999) + val = q.get(TESTTIMEOUT) + assert val == 999 + err = channel._getremoteerror() + assert err + assert str(err).find("ValueError") != -1 + def test_remote_redirect_stdout(self): out = py.std.StringIO.StringIO() handle = self.gw._remote_redirect(stdout=out) @@ -315,7 +333,7 @@ class BasicRemoteExecution: s = subl[0] assert s.strip() == str(i) - def test_channel_file(self): + def test_channel_file_write(self): channel = self.gw.remote_exec(""" f = channel.makefile() print >>f, "hello world" @@ -344,6 +362,43 @@ class BasicRemoteExecution: assert first.strip() == 'hello world' py.test.raises(EOFError, channel.receive) + def test_channel_file_read(self): + channel = self.gw.remote_exec(""" + f = channel.makefile(mode='r') + s = f.read(2) + channel.send(s) + s = f.read(5) + channel.send(s) + """) + channel.send("xyabcde") + s1 = channel.receive() + s2 = channel.receive() + assert s1 == "xy" + assert s2 == "abcde" + + def test_channel_file_read_empty(self): + channel = self.gw.remote_exec("pass") + f = channel.makefile(mode="r") + s = f.read(3) + assert s == "" + s = f.read(5) + assert s == "" + + def test_channel_file_readline_remote(self): + channel = self.gw.remote_exec(""" + channel.send('123\\n45') + """) + channel.waitclose(TESTTIMEOUT) + f = channel.makefile(mode="r") + s = f.readline() + assert s == "123\n" + s = f.readline() + assert s == "45" + + def test_channel_makefile_incompatmode(self): + channel = self.gw.newchannel() + py.test.raises(ValueError, 'channel.makefile("rw")') + def test_confusion_from_os_write_stdout(self): channel = self.gw.remote_exec(""" import os @@ -383,7 +438,26 @@ class BasicRemoteExecution: """) text = c1.receive() assert text.find("execution disallowed") != -1 - + + +def test_channel_endmarker_remote_killterm(): + gw = py.execnet.PopenGateway() + try: + from Queue import Queue + q = Queue() + channel = gw.remote_exec(source=''' + import os + os.kill(os.getpid(), 15) + ''') + channel.setcallback(q.put, endmarker=999) + val = q.get(TESTTIMEOUT) + assert val == 999 + err = channel._getremoteerror() + finally: + gw.exit() + py.test.skip("provide information on causes/signals " + "of dying remote gateways") + #class TestBlockingIssues: # def test_join_blocked_execution_gateway(self): # gateway = py.execnet.PopenGateway() @@ -437,29 +511,44 @@ class TestPopenGateway(PopenGatewayTestSetup, BasicRemoteExecution): ret = channel.receive() assert ret == 42 - def disabled_test_remote_is_killed_while_sending(self): + def test_waitclose_on_remote_killed(self): + py.test.skip("fix needed: dying remote process does not cause waitclose() to fail") + if not hasattr(py.std.os, 'kill'): + py.test.skip("no os.kill") gw = py.execnet.PopenGateway() channel = gw.remote_exec(""" import os import time - channel.send(os.getppid()) channel.send(os.getpid()) while 1: - channel.send('#'*1000) - time.sleep(10) + channel.send("#" * 100) """) - parent = channel.receive() - remote = channel.receive() - assert parent == os.getpid() - time.sleep(0.5) - os.kill(remote, signal.SIGKILL) - time.sleep(1) - channel.waitclose(TESTTIMEOUT) + remotepid = channel.receive() + os.kill(remotepid, 9) + py.test.raises(channel.RemoteError, "channel.waitclose(TESTTIMEOUT)") + py.test.raises(EOFError, channel.send, None) py.test.raises(EOFError, channel.receive) - #channel.waitclose(TESTTIMEOUT) - #channel.send('#') - - + +def test_endmarker_delivery_on_remote_killterm(): + if not hasattr(py.std.os, 'kill'): + py.test.skip("no os.kill()") + gw = py.execnet.PopenGateway() + try: + from Queue import Queue + q = Queue() + channel = gw.remote_exec(source=''' + import os + os.kill(os.getpid(), 15) + ''') + channel.setcallback(q.put, endmarker=999) + val = q.get(TESTTIMEOUT) + assert val == 999 + err = channel._getremoteerror() + finally: + gw.exit() + py.test.skip("provide information on causes/signals " + "of dying remote gateways") + class SocketGatewaySetup: def setup_class(cls): @@ -516,4 +605,7 @@ def test_threads_twice(): py.test.raises(IOError, gw.remote_init_threads, 3) gw.exit() - + +def test_nodebug(): + from py.__.execnet import gateway + assert not gateway.debug