* fix various remaining 3k issues until test_gateway.py passes with python3 py/bin/py.test
* we now wait on gateway initialization until we got a byte back after we sent the bootstrap --HG-- branch : trunk
This commit is contained in:
parent
6c3e961bc5
commit
c7f11745cd
|
@ -71,7 +71,10 @@ class InitiatingGateway(BaseGateway):
|
|||
|
||||
def exit(self):
|
||||
""" Try to stop all exec and IO activity. """
|
||||
self._cleanup.unregister(self)
|
||||
try:
|
||||
self._cleanup.unregister(self)
|
||||
except KeyError:
|
||||
return # we assume it's already happened
|
||||
self._stopexec()
|
||||
self._stopsend()
|
||||
self.hook.pyexecnet_gateway_exit(gateway=self)
|
||||
|
@ -87,6 +90,7 @@ class InitiatingGateway(BaseGateway):
|
|||
bootstrap = [extra]
|
||||
bootstrap += [getsource(x) for x in startup_modules]
|
||||
bootstrap += [io.server_stmt,
|
||||
"io.write('1'.encode('ascii'))",
|
||||
"BaseGateway(io=io, _startcount=2)._servemain()",
|
||||
]
|
||||
source = "\n".join(bootstrap)
|
||||
|
@ -94,6 +98,8 @@ class InitiatingGateway(BaseGateway):
|
|||
#open("/tmp/bootstrap.py", 'w').write(source)
|
||||
repr_source = repr(source) + "\n"
|
||||
io.write(repr_source.encode('ascii'))
|
||||
s = io.read(1)
|
||||
assert s == "1".encode('ascii')
|
||||
|
||||
def _rinfo(self, update=False):
|
||||
""" return some sys/env information from remote. """
|
||||
|
@ -242,11 +248,11 @@ class SocketGateway(InitiatingGateway):
|
|||
"""
|
||||
self.host = host = str(host)
|
||||
self.port = port = int(port)
|
||||
self.remoteaddress = '%s:%d' % (self.host, self.port)
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.connect((host, port))
|
||||
io = SocketIO(sock)
|
||||
super(SocketGateway, self).__init__(io=io)
|
||||
self.remoteaddress = '%s:%d' % (self.host, self.port)
|
||||
|
||||
def new_remote(cls, gateway, hostport=None):
|
||||
""" return a new (connected) socket gateway, instatiated
|
||||
|
|
|
@ -59,7 +59,7 @@ class SocketIO:
|
|||
|
||||
def read(self, numbytes):
|
||||
"Read exactly 'bytes' bytes from the socket."
|
||||
buf = ""
|
||||
buf = bytes()
|
||||
while len(buf) < numbytes:
|
||||
t = self.sock.recv(numbytes - len(buf))
|
||||
if not t:
|
||||
|
@ -91,8 +91,8 @@ class Popen2IO:
|
|||
import os, sys, tempfile
|
||||
#io = Popen2IO(os.fdopen(1, 'wb'), os.fdopen(0, 'rb'))
|
||||
io = Popen2IO(sys.stdout, sys.stdin)
|
||||
sys.stdout = tempfile.TemporaryFile()
|
||||
sys.stdin = tempfile.TemporaryFile()
|
||||
sys.stdout = tempfile.TemporaryFile('w')
|
||||
sys.stdin = tempfile.TemporaryFile('r')
|
||||
"""
|
||||
error = (IOError, OSError, EOFError)
|
||||
|
||||
|
@ -143,6 +143,8 @@ sys.stdin = tempfile.TemporaryFile()
|
|||
HDR_FORMAT = "!hhii"
|
||||
HDR_SIZE = struct.calcsize(HDR_FORMAT)
|
||||
|
||||
is3k = sys.version_info >= (3,0)
|
||||
|
||||
class Message:
|
||||
""" encapsulates Messages and their wire protocol. """
|
||||
_types = {}
|
||||
|
@ -152,15 +154,17 @@ class Message:
|
|||
|
||||
def writeto(self, io):
|
||||
# XXX marshal.dumps doesn't work for exchanging data across Python
|
||||
# version :-((( There is no sane solution, short of a custom
|
||||
# pure Python marshaller
|
||||
# version :-((( XXX check this statement wrt python2.4 through 3.1
|
||||
data = self.data
|
||||
if isinstance(data, bytes):
|
||||
dataformat = 1
|
||||
dataformat = 1 + int(is3k)
|
||||
else:
|
||||
data = repr(self.data) # argh
|
||||
if isinstance(data, unicode):
|
||||
dataformat = 3
|
||||
else:
|
||||
data = repr(self.data) # argh
|
||||
dataformat = 4
|
||||
data = data.encode(default_encoding)
|
||||
dataformat = 2
|
||||
header = struct.pack(HDR_FORMAT, self.msgtype, dataformat,
|
||||
self.channelid, len(data))
|
||||
io.write(header + data)
|
||||
|
@ -171,14 +175,21 @@ class Message:
|
|||
senderid, stringlen) = struct.unpack(HDR_FORMAT, header)
|
||||
data = io.read(stringlen)
|
||||
if dataformat == 1:
|
||||
pass
|
||||
if is3k:
|
||||
# remote was python2-str, we are 3k-text
|
||||
data = data.decode(default_encoding)
|
||||
elif dataformat == 2:
|
||||
data = data.decode(default_encoding)
|
||||
data = eval(data, {}) # reversed argh
|
||||
# remote was python3-bytes
|
||||
pass
|
||||
else:
|
||||
raise ValueError("bad data format")
|
||||
msg = cls._types[msgtype](senderid, data)
|
||||
return msg
|
||||
data = data.decode(default_encoding)
|
||||
if dataformat == 3:
|
||||
pass
|
||||
elif dataformat == 4:
|
||||
data = eval(data, {}) # reversed argh
|
||||
else:
|
||||
raise ValueError("bad data format")
|
||||
return cls._types[msgtype](senderid, data)
|
||||
readfrom = classmethod(readfrom)
|
||||
|
||||
def __repr__(self):
|
||||
|
@ -214,8 +225,7 @@ def _setupmessages():
|
|||
|
||||
class CHANNEL_CLOSE_ERROR(Message):
|
||||
def received(self, gateway):
|
||||
data = self.data.decode(default_encoding)
|
||||
remote_error = gateway._channelfactory.RemoteError(data)
|
||||
remote_error = gateway._channelfactory.RemoteError(self.data)
|
||||
gateway._channelfactory._local_close(self.channelid, remote_error)
|
||||
|
||||
class CHANNEL_LAST_MESSAGE(Message):
|
||||
|
@ -367,8 +377,7 @@ class Channel(object):
|
|||
# but it's never damaging to send too many CHANNEL_CLOSE messages
|
||||
put = self.gateway._send
|
||||
if error is not None:
|
||||
put(Message.CHANNEL_CLOSE_ERROR(self.id,
|
||||
error.encode(default_encoding)))
|
||||
put(Message.CHANNEL_CLOSE_ERROR(self.id, error))
|
||||
else:
|
||||
put(Message.CHANNEL_CLOSE(self.id))
|
||||
if isinstance(error, RemoteError):
|
||||
|
|
|
@ -20,7 +20,7 @@ class RSync(object):
|
|||
def __init__(self, sourcedir, callback=None, verbose=True):
|
||||
self._sourcedir = str(sourcedir)
|
||||
self._verbose = verbose
|
||||
assert callback is None or callable(callback)
|
||||
assert callback is None or py.builtin.callable(callback)
|
||||
self._callback = callback
|
||||
self._channels = {}
|
||||
self._receivequeue = Queue()
|
||||
|
|
|
@ -7,7 +7,6 @@
|
|||
#
|
||||
|
||||
progname = 'socket_readline_exec_server-1.2'
|
||||
debug = 0
|
||||
|
||||
import sys, socket, os
|
||||
try:
|
||||
|
@ -15,8 +14,10 @@ try:
|
|||
except ImportError:
|
||||
fcntl = None
|
||||
|
||||
debug = 0
|
||||
|
||||
if debug: # and not os.isatty(sys.stdin.fileno()):
|
||||
f = open('/tmp/execnet-socket-pyout.log', 'a', 0)
|
||||
f = open('/tmp/execnet-socket-pyout.log', 'w')
|
||||
old = sys.stdout, sys.stderr
|
||||
sys.stdout = sys.stderr = f
|
||||
#import py
|
||||
|
@ -36,7 +37,7 @@ def exec_from_one_connection(serversock):
|
|||
print_(progname, 'Entering Accept loop', serversock.getsockname())
|
||||
clientsock,address = serversock.accept()
|
||||
print_(progname, 'got new connection from %s %s' % address)
|
||||
clientfile = clientsock.makefile('r+',0)
|
||||
clientfile = clientsock.makefile('rb')
|
||||
print_("reading line")
|
||||
# rstrip so that we can use \r\n for telnet testing
|
||||
source = clientfile.readline().rstrip()
|
||||
|
|
|
@ -233,9 +233,9 @@ class BasicRemoteExecution:
|
|||
|
||||
def test_remote_exec_channel_anonymous(self):
|
||||
channel = self.gw.remote_exec('''
|
||||
obj = channel.receive()
|
||||
channel.send(obj)
|
||||
''')
|
||||
obj = channel.receive()
|
||||
channel.send(obj)
|
||||
''')
|
||||
channel.send(42)
|
||||
result = channel.receive()
|
||||
assert result == 42
|
||||
|
@ -458,11 +458,11 @@ class BasicRemoteExecution:
|
|||
def test_channel_file_write(self):
|
||||
channel = self.gw.remote_exec("""
|
||||
f = channel.makefile()
|
||||
print >>f, "hello world"
|
||||
f.write("hello world\\n")
|
||||
f.close()
|
||||
channel.send(42)
|
||||
""")
|
||||
first = channel.receive() + channel.receive()
|
||||
first = channel.receive()
|
||||
assert first.strip() == 'hello world'
|
||||
second = channel.receive()
|
||||
assert second == 42
|
||||
|
@ -476,11 +476,11 @@ class BasicRemoteExecution:
|
|||
def test_channel_file_proxyclose(self):
|
||||
channel = self.gw.remote_exec("""
|
||||
f = channel.makefile(proxyclose=True)
|
||||
print >>f, "hello world"
|
||||
f.write("hello world")
|
||||
f.close()
|
||||
channel.send(42)
|
||||
""")
|
||||
first = channel.receive() + channel.receive()
|
||||
first = channel.receive()
|
||||
assert first.strip() == 'hello world'
|
||||
py.test.raises(EOFError, channel.receive)
|
||||
|
||||
|
@ -524,7 +524,7 @@ class BasicRemoteExecution:
|
|||
def test_confusion_from_os_write_stdout(self):
|
||||
channel = self.gw.remote_exec("""
|
||||
import os
|
||||
os.write(1, 'confusion!')
|
||||
os.write(1, 'confusion!'.encode('ascii'))
|
||||
channel.send(channel.receive() * 6)
|
||||
channel.send(channel.receive() * 6)
|
||||
""")
|
||||
|
@ -538,7 +538,7 @@ class BasicRemoteExecution:
|
|||
def test_confusion_from_os_write_stderr(self):
|
||||
channel = self.gw.remote_exec("""
|
||||
import os
|
||||
os.write(2, 'test')
|
||||
os.write(2, 'test'.encode('ascii'))
|
||||
channel.send(channel.receive() * 6)
|
||||
channel.send(channel.receive() * 6)
|
||||
""")
|
||||
|
@ -592,14 +592,13 @@ class BasicCmdbasedRemoteExecution(BasicRemoteExecution):
|
|||
|
||||
class TestPopenGateway(PopenGatewayTestSetup, BasicRemoteExecution):
|
||||
def test_rinfo_popen(self):
|
||||
#rinfo = py.execnet.PopenGateway()._rinfo()
|
||||
rinfo = self.gw._rinfo()
|
||||
assert rinfo.executable == py.std.sys.executable
|
||||
assert rinfo.cwd == py.std.os.getcwd()
|
||||
assert rinfo.version_info == py.std.sys.version_info
|
||||
|
||||
def test_chdir_separation(self):
|
||||
old = py.test.ensuretemp('chdirtest').chdir()
|
||||
def test_chdir_separation(self, tmpdir):
|
||||
old = tmpdir.chdir()
|
||||
try:
|
||||
gw = py.execnet.PopenGateway()
|
||||
finally:
|
||||
|
|
Loading…
Reference in New Issue