From 5cf27098cfcfa3cd12c9fb82800dad642a85b6eb Mon Sep 17 00:00:00 2001 From: holger krekel Date: Sun, 6 Sep 2009 13:38:21 +0200 Subject: [PATCH] execnet cleanup/refinements: avoid creating a shell for each subprocess * introduce HostNotFound, raised for Socket and SshGateways * factored out basic tests, cleaned up existing tests * removed sshgateway identity argument which was deprecated in 1.0 --HG-- branch : trunk --- py/__init__.py | 1 + py/execnet/gateway.py | 51 +++---- py/execnet/testing/test_basics.py | 215 ++++++++++++++++++++++++++ py/execnet/testing/test_gateway.py | 233 +++-------------------------- 4 files changed, 260 insertions(+), 240 deletions(-) create mode 100644 py/execnet/testing/test_basics.py diff --git a/py/__init__.py b/py/__init__.py index 40835e1ce..f1c403a32 100644 --- a/py/__init__.py +++ b/py/__init__.py @@ -165,6 +165,7 @@ initpkg(__name__, 'execnet.SocketGateway' : ('./execnet/gateway.py', 'SocketGateway'), 'execnet.PopenGateway' : ('./execnet/gateway.py', 'PopenGateway'), 'execnet.SshGateway' : ('./execnet/gateway.py', 'SshGateway'), + 'execnet.HostNotFound' : ('./execnet/gateway.py', 'HostNotFound'), 'execnet.XSpec' : ('./execnet/xspec.py', 'XSpec'), 'execnet.makegateway' : ('./execnet/xspec.py', 'makegateway'), 'execnet.MultiGateway' : ('./execnet/multi.py', 'MultiGateway'), diff --git a/py/execnet/gateway.py b/py/execnet/gateway.py index 762cf274d..3b016df70 100644 --- a/py/execnet/gateway.py +++ b/py/execnet/gateway.py @@ -195,11 +195,8 @@ channel.send(dict( """ class PopenCmdGateway(InitiatingGateway): - def __init__(self, cmd): - # on win close_fds=True does not work, not sure it'd needed - #p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, close_fds=True) - self._popen = p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE) - self._cmd = cmd + def __init__(self, args): + self._popen = p = Popen(args, stdin=PIPE, stdout=PIPE) io = Popen2IO(p.stdin, p.stdout) super(PopenCmdGateway, self).__init__(io=io) @@ -207,6 +204,7 @@ class PopenCmdGateway(InitiatingGateway): super(PopenCmdGateway, self).exit() self._popen.poll() +popen_bootstrapline = "import sys ; exec(eval(sys.stdin.readline()))" class PopenGateway(PopenCmdGateway): """ This Gateway provides interaction with a newly started python subprocess. @@ -217,9 +215,8 @@ class PopenGateway(PopenCmdGateway): """ if not python: python = sys.executable - cmd = ('%s -u -c "import sys ; ' - 'exec(eval(sys.stdin.readline()))"' % python) - super(PopenGateway, self).__init__(cmd) + args = [str(python), '-c', popen_bootstrapline] + super(PopenGateway, self).__init__(args) def _remote_bootstrap_gateway(self, io, extra=''): # have the subprocess use the same PYTHONPATH and py lib @@ -250,7 +247,10 @@ class SocketGateway(InitiatingGateway): self.port = port = int(port) self.remoteaddress = '%s:%d' % (self.host, self.port) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect((host, port)) + try: + sock.connect((host, port)) + except socket.gaierror: + raise HostNotFound(str(sys.exc_info()[1])) io = SocketIO(sock) super(SocketGateway, self).__init__(io=io) @@ -280,36 +280,29 @@ class SocketGateway(InitiatingGateway): return py.execnet.SocketGateway(host, realport) new_remote = classmethod(new_remote) +class HostNotFound(Exception): + pass class SshGateway(PopenCmdGateway): """ This Gateway provides interaction with a remote Python process, established via the 'ssh' command line binary. The remote side needs to have a Python interpreter executable. """ - def __init__(self, sshaddress, remotepython=None, - identity=None, ssh_config=None): + + def __init__(self, sshaddress, remotepython=None, ssh_config=None): """ instantiate a remote ssh process with the given 'sshaddress' and remotepython version. you may specify an ssh_config file. - DEPRECATED: you may specify an 'identity' filepath. """ self.remoteaddress = sshaddress if remotepython is None: remotepython = "python" - remotecmd = '%s -u -c "exec input()"' % (remotepython,) - cmdline = [sshaddress, remotecmd] - # XXX Unix style quoting - for i in range(len(cmdline)): - cmdline[i] = "'" + cmdline[i].replace("'", "'\\''") + "'" - cmd = 'ssh -C' - if identity is not None: - py.log._apiwarn("1.0", "pass in 'ssh_config' file instead of identity") - cmd += ' -i %s' % (identity,) + args = ['ssh', '-C' ] if ssh_config is not None: - cmd += ' -F %s' % (ssh_config) - cmdline.insert(0, cmd) - cmd = ' '.join(cmdline) - super(SshGateway, self).__init__(cmd) + args.extend(['-F', str(ssh_config)]) + remotecmd = '%s -c "%s"' %(remotepython, popen_bootstrapline) + args.extend([sshaddress, remotecmd]) + super(SshGateway, self).__init__(args) def _remote_bootstrap_gateway(self, io, s=""): extra = "\n".join([ @@ -317,8 +310,12 @@ class SshGateway(PopenCmdGateway): "stdouterrin_setnull()", s, ]) - super(SshGateway, self)._remote_bootstrap_gateway(io, extra) - + try: + super(SshGateway, self)._remote_bootstrap_gateway(io, extra) + except EOFError: + ret = self._popen.wait() + if ret == 255: + raise HostNotFound(self.remoteaddress) def stdouterrin_setnull(): """ redirect file descriptors 0 and 1 (and possibly 2) to /dev/null. diff --git a/py/execnet/testing/test_basics.py b/py/execnet/testing/test_basics.py new file mode 100644 index 000000000..a87165bed --- /dev/null +++ b/py/execnet/testing/test_basics.py @@ -0,0 +1,215 @@ + +import py +import sys, os, subprocess, inspect +from py.__.execnet import gateway_base, gateway +from py.__.execnet.gateway_base import Message, Channel, ChannelFactory + +def test_subprocess_interaction(anypython): + line = gateway.popen_bootstrapline + compile(line, 'xyz', 'exec') + args = [str(anypython), '-c', line] + popen = subprocess.Popen(args, bufsize=0, stderr=subprocess.STDOUT, + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + def send(line): + popen.stdin.write(line.encode('ascii')) + if sys.version_info > (3,0): # 3k still buffers + popen.stdin.flush() + def receive(): + return popen.stdout.readline().decode('ascii') + + try: + source = py.code.Source(read_write_loop, "read_write_loop()") + repr_source = repr(str(source)) + "\n" + sendline = repr_source + send(sendline) + s = receive() + assert s == "ok\n" + send("hello\n") + s = receive() + assert s == "received: hello\n" + send("world\n") + s = receive() + assert s == "received: world\n" + finally: + popen.stdin.close() + popen.stdout.close() + popen.wait() + +def read_write_loop(): + import os, sys + sys.stdout.write("ok\n") + sys.stdout.flush() + while 1: + try: + line = sys.stdin.readline() + sys.stdout.write("received: %s" % line) + sys.stdout.flush() + except (IOError, EOFError): + break + +def pytest_generate_tests(metafunc): + if 'anypython' in metafunc.funcargnames: + for name in 'python3.1', 'python2.4', 'python2.5', 'python2.6': + metafunc.addcall(id=name, param=name) + +def pytest_funcarg__anypython(request): + name = request.param + executable = py.path.local.sysfind(name) + if executable is None: + py.test.skip("no %s found" % (name,)) + return executable + +def test_io_message(anypython, tmpdir): + check = tmpdir.join("check.py") + check.write(py.code.Source(gateway_base, """ + try: + from io import BytesIO + except ImportError: + from StringIO import StringIO as BytesIO + import tempfile + temp_out = BytesIO() + temp_in = BytesIO() + io = Popen2IO(temp_out, temp_in) + for i, msg_cls in Message._types.items(): + print ("checking %s %s" %(i, msg_cls)) + for data in "hello", "hello".encode('ascii'): + msg1 = msg_cls(i, data) + msg1.writeto(io) + x = io.outfile.getvalue() + io.outfile.truncate(0) + io.outfile.seek(0) + io.infile.seek(0) + io.infile.write(x) + io.infile.seek(0) + msg2 = Message.readfrom(io) + assert msg1.channelid == msg2.channelid, (msg1, msg2) + assert msg1.data == msg2.data + print ("all passed") + """)) + #out = py.process.cmdexec("%s %s" %(executable,check)) + out = anypython.sysexec(check) + print (out) + assert "all passed" in out + +def test_popen_io(anypython, tmpdir): + check = tmpdir.join("check.py") + check.write(py.code.Source(gateway_base, """ + do_exec(Popen2IO.server_stmt, globals()) + io.write("hello".encode('ascii')) + s = io.read(1) + assert s == "x".encode('ascii') + """)) + from subprocess import Popen, PIPE + args = [str(anypython), str(check)] + proc = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) + proc.stdin.write("x".encode('ascii')) + stdout, stderr = proc.communicate() + print (stderr) + ret = proc.wait() + assert "hello".encode('ascii') in stdout + + +def test_rinfo_source(anypython, tmpdir): + check = tmpdir.join("check.py") + check.write(py.code.Source(""" + class Channel: + def send(self, data): + assert eval(repr(data), {}) == data + channel = Channel() + """, gateway.rinfo_source, """ + print ('all passed') + """)) + out = anypython.sysexec(check) + print (out) + assert "all passed" in out + +def test_geterrortext(anypython, tmpdir): + check = tmpdir.join("check.py") + check.write(py.code.Source(gateway_base, """ + class Arg: + pass + errortext = geterrortext((Arg, "1", 4)) + assert "Arg" in errortext + import sys + try: + raise ValueError("17") + except ValueError: + excinfo = sys.exc_info() + s = geterrortext(excinfo) + assert "17" in s + print ("all passed") + """)) + out = anypython.sysexec(check) + print (out) + assert "all passed" in out + +def test_getsource_import_modules(): + for dottedname in gateway.startup_modules: + yield gateway.getsource, dottedname + +def test_getsource_no_colision(): + seen = {} + for dottedname in gateway.startup_modules: + mod = __import__(dottedname, None, None, ['__doc__']) + for name, value in vars(mod).items(): + if py.std.inspect.isclass(value): + if name in seen: + olddottedname, oldval = seen[name] + if oldval is not value: + py.test.fail("duplicate class %r in %s and %s" % + (name, dottedname, olddottedname)) + seen[name] = (dottedname, value) + +def test_stdouterrin_setnull(): + cap = py.io.StdCaptureFD() + from py.__.execnet.gateway import stdouterrin_setnull + stdouterrin_setnull() + import os + os.write(1, "hello".encode('ascii')) + if os.name == "nt": + os.write(2, "world") + os.read(0, 1) + out, err = cap.reset() + assert not out + assert not err + + +class TestMessage: + def test_wire_protocol(self): + for cls in Message._types.values(): + one = py.io.BytesIO() + data = '23'.encode('ascii') + cls(42, data).writeto(one) + two = py.io.BytesIO(one.getvalue()) + msg = Message.readfrom(two) + assert isinstance(msg, cls) + assert msg.channelid == 42 + assert msg.data == data + assert isinstance(repr(msg), str) + # == "" %(msg.__class__.__name__, ) + +class TestPureChannel: + def setup_method(self, method): + self.fac = ChannelFactory(None) + + def test_factory_create(self): + chan1 = self.fac.new() + assert chan1.id == 1 + chan2 = self.fac.new() + assert chan2.id == 3 + + def test_factory_getitem(self): + chan1 = self.fac.new() + assert self.fac._channels[chan1.id] == chan1 + chan2 = self.fac.new() + assert self.fac._channels[chan2.id] == chan2 + + def test_channel_timeouterror(self): + channel = self.fac.new() + py.test.raises(IOError, channel.waitclose, timeout=0.01) + + def test_channel_makefile_incompatmode(self): + channel = self.fac.new() + py.test.raises(ValueError, 'channel.makefile("rw")') + + diff --git a/py/execnet/testing/test_gateway.py b/py/execnet/testing/test_gateway.py index 36039c607..3cc44af76 100644 --- a/py/execnet/testing/test_gateway.py +++ b/py/execnet/testing/test_gateway.py @@ -1,191 +1,13 @@ from __future__ import generators -import os, sys, time, signal +import os, sys, time import py -from py.__.execnet.gateway_base import Message, Channel, ChannelFactory -from py.__.execnet.gateway_base import ExecnetAPI, queue, Popen2IO from py.__.execnet import gateway_base, gateway +queue = py.builtin._tryimport('queue', 'Queue') -from py.__.execnet.gateway import startup_modules, getsource pytest_plugins = "pytester" TESTTIMEOUT = 10.0 # seconds -def pytest_generate_tests(metafunc): - if 'pythonpath' in metafunc.funcargnames: - for name in 'python2.4', 'python2.5', 'python2.6', 'python3.1': - metafunc.addcall(id=name, param=name) - -def pytest_funcarg__pythonpath(request): - name = request.param - executable = py.path.local.sysfind(name) - if executable is None: - py.test.skip("no %s found" % (name,)) - return executable - -def test_io_message(pythonpath, tmpdir): - check = tmpdir.join("check.py") - check.write(py.code.Source(gateway_base, """ - try: - from io import BytesIO - except ImportError: - from StringIO import StringIO as BytesIO - import tempfile - temp_out = BytesIO() - temp_in = BytesIO() - io = Popen2IO(temp_out, temp_in) - for i, msg_cls in Message._types.items(): - print ("checking %s %s" %(i, msg_cls)) - for data in "hello", "hello".encode('ascii'): - msg1 = msg_cls(i, data) - msg1.writeto(io) - x = io.outfile.getvalue() - io.outfile.truncate(0) - io.outfile.seek(0) - io.infile.seek(0) - io.infile.write(x) - io.infile.seek(0) - msg2 = Message.readfrom(io) - assert msg1.channelid == msg2.channelid, (msg1, msg2) - assert msg1.data == msg2.data - print ("all passed") - """)) - #out = py.process.cmdexec("%s %s" %(executable,check)) - out = pythonpath.sysexec(check) - print (out) - assert "all passed" in out - -def test_popen_io(pythonpath, tmpdir): - check = tmpdir.join("check.py") - check.write(py.code.Source(gateway_base, """ - do_exec(Popen2IO.server_stmt, globals()) - io.write("hello".encode('ascii')) - s = io.read(1) - assert s == "x".encode('ascii') - """)) - from subprocess import Popen, PIPE - args = [str(pythonpath), str(check)] - proc = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) - proc.stdin.write("x".encode('ascii')) - stdout, stderr = proc.communicate() - print (stderr) - ret = proc.wait() - assert "hello".encode('ascii') in stdout - -def test_rinfo_source(pythonpath, tmpdir): - check = tmpdir.join("check.py") - check.write(py.code.Source(""" - class Channel: - def send(self, data): - assert eval(repr(data), {}) == data - channel = Channel() - """, gateway.rinfo_source, """ - print ('all passed') - """)) - out = pythonpath.sysexec(check) - print (out) - assert "all passed" in out - -def test_geterrortext(pythonpath, tmpdir): - check = tmpdir.join("check.py") - check.write(py.code.Source(gateway_base, """ - class Arg: - pass - errortext = geterrortext((Arg, "1", 4)) - assert "Arg" in errortext - import sys - try: - raise ValueError("17") - except ValueError: - excinfo = sys.exc_info() - s = geterrortext(excinfo) - assert "17" in s - print ("all passed") - """)) - out = pythonpath.sysexec(check) - print (out) - assert "all passed" in out - -class TestExecnetEvents: - def test_popengateway(self, _pytest): - rec = _pytest.gethookrecorder(ExecnetAPI) - gw = py.execnet.PopenGateway() - call = rec.popcall("pyexecnet_gateway_init") - assert call.gateway == gw - gw.exit() - call = rec.popcall("pyexecnet_gateway_exit") - assert call.gateway == gw - - -def test_getsource_import_modules(): - for dottedname in startup_modules: - yield getsource, dottedname - -def test_getsource_no_colision(): - seen = {} - for dottedname in startup_modules: - mod = __import__(dottedname, None, None, ['__doc__']) - for name, value in vars(mod).items(): - if py.std.inspect.isclass(value): - if name in seen: - olddottedname, oldval = seen[name] - if oldval is not value: - py.test.fail("duplicate class %r in %s and %s" % - (name, dottedname, olddottedname)) - seen[name] = (dottedname, value) - -def test_stdouterrin_setnull(): - cap = py.io.StdCaptureFD() - from py.__.execnet.gateway import stdouterrin_setnull - stdouterrin_setnull() - import os - os.write(1, "hello".encode('ascii')) - if os.name == "nt": - os.write(2, "world") - os.read(0, 1) - out, err = cap.reset() - assert not out - assert not err - - -class TestMessage: - def test_wire_protocol(self): - for cls in Message._types.values(): - one = py.io.BytesIO() - data = '23'.encode('ascii') - cls(42, data).writeto(one) - two = py.io.BytesIO(one.getvalue()) - msg = Message.readfrom(two) - assert isinstance(msg, cls) - assert msg.channelid == 42 - assert msg.data == data - assert isinstance(repr(msg), str) - # == "" %(msg.__class__.__name__, ) - -class TestPureChannel: - def setup_method(self, method): - self.fac = ChannelFactory(None) - - def test_factory_create(self): - chan1 = self.fac.new() - assert chan1.id == 1 - chan2 = self.fac.new() - assert chan2.id == 3 - - def test_factory_getitem(self): - chan1 = self.fac.new() - assert self.fac._channels[chan1.id] == chan1 - chan2 = self.fac.new() - assert self.fac._channels[chan2.id] == chan2 - - def test_channel_timeouterror(self): - channel = self.fac.new() - py.test.raises(IOError, channel.waitclose, timeout=0.01) - - def test_channel_makefile_incompatmode(self): - channel = self.fac.new() - py.test.raises(ValueError, 'channel.makefile("rw")') - - class PopenGatewayTestSetup: def setup_class(cls): cls.gw = py.execnet.PopenGateway() @@ -648,6 +470,7 @@ class TestPopenGateway(PopenGatewayTestSetup, BasicRemoteExecution): py.test.raises(EOFError, channel.send, None) py.test.raises(EOFError, channel.receive) +@py.test.mark.xfail def test_endmarker_delivery_on_remote_killterm(): if not hasattr(py.std.os, 'kill'): py.test.skip("no os.kill()") @@ -664,8 +487,8 @@ def test_endmarker_delivery_on_remote_killterm(): err = channel._getremoteerror() finally: gw.exit() - py.test.skip("provide information on causes/signals " - "of dying remote gateways") + assert "killed" in str(err) + assert "15" in str(err) class SocketGatewaySetup: @@ -675,6 +498,10 @@ class SocketGatewaySetup: cls.gw = py.execnet.SocketGateway.new_remote(cls.proxygw, ("127.0.0.1", 0) ) + def test_host_not_found(self): + py.test.raises(py.execnet.HostNotFound, + 'py.execnet.SocketGateway("qowieuqowe", 9000)' + ) ## def teardown_class(cls): ## cls.gw.exit() @@ -695,27 +522,16 @@ class TestSshGateway(BasicRemoteExecution): "Host alias123\n" " HostName %s\n" % self.sshhost) gw = py.execnet.SshGateway("alias123", ssh_config=ssh_config) - assert gw._cmd.find("-F") != -1 - assert gw._cmd.find(str(ssh_config)) != -1 pid = gw.remote_exec("import os ; channel.send(os.getpid())").receive() gw.exit() def test_sshaddress(self): assert self.gw.remoteaddress == self.sshhost - @py.test.mark.xfail # XXX ssh-gateway error handling def test_connexion_failes_on_non_existing_hosts(self): - py.test.raises(IOError, + py.test.raises(py.execnet.HostNotFound, "py.execnet.SshGateway('nowhere.codespeak.net')") - @py.test.mark.xfail # "XXX ssh-gateway error handling" - def test_deprecated_identity(self): - py.test.deprecated_call( - py.test.raises, IOError, - py.execnet.SshGateway, - 'nowhere.codespeak.net', identity='qwe') - - def test_threads(): gw = py.execnet.PopenGateway() gw.remote_init_threads(3) @@ -734,26 +550,17 @@ def test_threads_twice(): gw.remote_init_threads(3) py.test.raises(IOError, gw.remote_init_threads, 3) gw.exit() - + +class TestExecnetEvents: + def test_popengateway(self, _pytest): + rec = _pytest.gethookrecorder(gateway_base.ExecnetAPI) + gw = py.execnet.PopenGateway() + call = rec.popcall("pyexecnet_gateway_init") + assert call.gateway == gw + gw.exit() + call = rec.popcall("pyexecnet_gateway_exit") + assert call.gateway == gw def test_nodebug(): from py.__.execnet import gateway_base assert not gateway_base.debug - -def test_channel_endmarker_remote_killterm(): - gw = py.execnet.PopenGateway() - try: - q = queue.Queue() - channel = gw.remote_exec(''' - import os - os.kill(os.getpid(), 15) - ''') - channel.setcallback(q.put, endmarker=999) - val = q.get(TESTTIMEOUT) - assert val == 999 - err = channel._getremoteerror() - finally: - gw.exit() - py.test.skip("provide information on causes/signals " - "of dying remote gateways") -