[svn r57546] * channels now also provide makefile(mode) with mode = 'r'

for providing file-like read/readline/close methods.
* added and refined crash and finalization tests

--HG--
branch : trunk
This commit is contained in:
hpk 2008-08-21 14:04:43 +02:00
parent 9b81b15b74
commit 3702ca2c71
2 changed files with 155 additions and 32 deletions

View File

@ -103,13 +103,16 @@ class Channel(object):
return self._closed
def makefile(self, mode='w', proxyclose=False):
""" return a file-like object. Only supported mode right
now is 'w' for binary writes. If you want to have
a subsequent file.close() mean to close the channel
as well, then pass proxyclose=True.
""" return a file-like object.
mode: 'w' for binary writes, 'r' for binary reads
proxyclose: set to true if you want to have a
subsequent file.close() automatically close the channel.
"""
assert mode == 'w', "mode %r not availabe" %(mode,)
return ChannelFile(channel=self, proxyclose=proxyclose)
if mode == "w":
return ChannelFileWrite(channel=self, proxyclose=proxyclose)
elif mode == "r":
return ChannelFileRead(channel=self, proxyclose=proxyclose)
raise ValueError("mode %r not availabe" %(mode,))
def close(self, error=None):
""" close down this channel on both sides. """
@ -299,18 +302,11 @@ class ChannelFactory(object):
for id in self._callbacks.keys():
self._close_callback(id)
class ChannelFile:
class ChannelFile(object):
def __init__(self, channel, proxyclose=True):
self.channel = channel
self._proxyclose = proxyclose
def write(self, out):
self.channel.send(out)
def flush(self):
pass
def close(self):
if self._proxyclose:
self.channel.close()
@ -319,3 +315,38 @@ class ChannelFile:
state = self.channel.isclosed() and 'closed' or 'open'
return '<ChannelFile %d %s>' %(self.channel.id, state)
class ChannelFileWrite(ChannelFile):
def write(self, out):
self.channel.send(out)
def flush(self):
pass
class ChannelFileRead(ChannelFile):
def __init__(self, channel, proxyclose=True):
super(ChannelFileRead, self).__init__(channel, proxyclose)
self._buffer = ""
def read(self, n):
while len(self._buffer) < n:
try:
self._buffer += self.channel.receive()
except EOFError:
self.close()
break
ret = self._buffer[:n]
self._buffer = self._buffer[n:]
return ret
def readline(self):
i = self._buffer.find("\n")
if i != -1:
return self.read(i+1)
line = self.read(len(self._buffer)+1)
while line and line[-1] != "\n":
c = self.read(1)
if not c:
break
line += c
return line

View File

@ -74,6 +74,11 @@ class TestPureChannel:
channel = self.fac.new()
py.test.raises(IOError, channel.waitclose, timeout=0.01)
def test_channel_makefile_incompatmode(self):
channel = self.fac.new()
py.test.raises(ValueError, 'channel.makefile("rw")')
class PopenGatewayTestSetup:
def setup_class(cls):
cls.gw = py.execnet.PopenGateway()
@ -291,6 +296,19 @@ class BasicRemoteExecution:
assert isinstance(l[2], channel.__class__)
assert l[3] == 999
def test_channel_endmarker_callback_error(self):
from Queue import Queue
q = Queue()
channel = self.gw.remote_exec(source='''
raise ValueError()
''')
channel.setcallback(q.put, endmarker=999)
val = q.get(TESTTIMEOUT)
assert val == 999
err = channel._getremoteerror()
assert err
assert str(err).find("ValueError") != -1
def test_remote_redirect_stdout(self):
out = py.std.StringIO.StringIO()
handle = self.gw._remote_redirect(stdout=out)
@ -315,7 +333,7 @@ class BasicRemoteExecution:
s = subl[0]
assert s.strip() == str(i)
def test_channel_file(self):
def test_channel_file_write(self):
channel = self.gw.remote_exec("""
f = channel.makefile()
print >>f, "hello world"
@ -344,6 +362,43 @@ class BasicRemoteExecution:
assert first.strip() == 'hello world'
py.test.raises(EOFError, channel.receive)
def test_channel_file_read(self):
channel = self.gw.remote_exec("""
f = channel.makefile(mode='r')
s = f.read(2)
channel.send(s)
s = f.read(5)
channel.send(s)
""")
channel.send("xyabcde")
s1 = channel.receive()
s2 = channel.receive()
assert s1 == "xy"
assert s2 == "abcde"
def test_channel_file_read_empty(self):
channel = self.gw.remote_exec("pass")
f = channel.makefile(mode="r")
s = f.read(3)
assert s == ""
s = f.read(5)
assert s == ""
def test_channel_file_readline_remote(self):
channel = self.gw.remote_exec("""
channel.send('123\\n45')
""")
channel.waitclose(TESTTIMEOUT)
f = channel.makefile(mode="r")
s = f.readline()
assert s == "123\n"
s = f.readline()
assert s == "45"
def test_channel_makefile_incompatmode(self):
channel = self.gw.newchannel()
py.test.raises(ValueError, 'channel.makefile("rw")')
def test_confusion_from_os_write_stdout(self):
channel = self.gw.remote_exec("""
import os
@ -384,6 +439,25 @@ class BasicRemoteExecution:
text = c1.receive()
assert text.find("execution disallowed") != -1
def test_channel_endmarker_remote_killterm():
gw = py.execnet.PopenGateway()
try:
from Queue import Queue
q = Queue()
channel = gw.remote_exec(source='''
import os
os.kill(os.getpid(), 15)
''')
channel.setcallback(q.put, endmarker=999)
val = q.get(TESTTIMEOUT)
assert val == 999
err = channel._getremoteerror()
finally:
gw.exit()
py.test.skip("provide information on causes/signals "
"of dying remote gateways")
#class TestBlockingIssues:
# def test_join_blocked_execution_gateway(self):
# gateway = py.execnet.PopenGateway()
@ -437,28 +511,43 @@ class TestPopenGateway(PopenGatewayTestSetup, BasicRemoteExecution):
ret = channel.receive()
assert ret == 42
def disabled_test_remote_is_killed_while_sending(self):
def test_waitclose_on_remote_killed(self):
py.test.skip("fix needed: dying remote process does not cause waitclose() to fail")
if not hasattr(py.std.os, 'kill'):
py.test.skip("no os.kill")
gw = py.execnet.PopenGateway()
channel = gw.remote_exec("""
import os
import time
channel.send(os.getppid())
channel.send(os.getpid())
while 1:
channel.send('#'*1000)
time.sleep(10)
channel.send("#" * 100)
""")
parent = channel.receive()
remote = channel.receive()
assert parent == os.getpid()
time.sleep(0.5)
os.kill(remote, signal.SIGKILL)
time.sleep(1)
channel.waitclose(TESTTIMEOUT)
remotepid = channel.receive()
os.kill(remotepid, 9)
py.test.raises(channel.RemoteError, "channel.waitclose(TESTTIMEOUT)")
py.test.raises(EOFError, channel.send, None)
py.test.raises(EOFError, channel.receive)
#channel.waitclose(TESTTIMEOUT)
#channel.send('#')
def test_endmarker_delivery_on_remote_killterm():
if not hasattr(py.std.os, 'kill'):
py.test.skip("no os.kill()")
gw = py.execnet.PopenGateway()
try:
from Queue import Queue
q = Queue()
channel = gw.remote_exec(source='''
import os
os.kill(os.getpid(), 15)
''')
channel.setcallback(q.put, endmarker=999)
val = q.get(TESTTIMEOUT)
assert val == 999
err = channel._getremoteerror()
finally:
gw.exit()
py.test.skip("provide information on causes/signals "
"of dying remote gateways")
class SocketGatewaySetup:
@ -517,3 +606,6 @@ def test_threads_twice():
gw.exit()
def test_nodebug():
from py.__.execnet import gateway
assert not gateway.debug