from __future__ import generators import os, sys, time, signal import py from py.__.execnet import gateway from py.__.conftest import option mypath = py.magic.autopath() from StringIO import StringIO from py.__.execnet.register import startup_modules, getsource TESTTIMEOUT = 10.0 # seconds def test_getsource_import_modules(): for dottedname in startup_modules: yield getsource, dottedname def test_getsource_no_colision(): seen = {} for dottedname in startup_modules: mod = __import__(dottedname, None, None, ['__doc__']) for name, value in vars(mod).items(): if py.std.inspect.isclass(value): if name in seen: olddottedname, oldval = seen[name] if oldval is not value: py.test.fail("duplicate class %r in %s and %s" % (name, dottedname, olddottedname)) seen[name] = (dottedname, value) def test_stdouterrin_setnull(): cap = py.io.StdCaptureFD() from py.__.execnet.register import stdouterrin_setnull stdouterrin_setnull() import os os.write(1, "hello") if os.name == "nt": os.write(2, "world") os.read(0, 1) out, err = cap.reset() assert not out assert not err class TestMessage: def test_wire_protocol(self): for cls in gateway.Message._types.values(): one = StringIO() cls(42, '23').writeto(one) two = StringIO(one.getvalue()) msg = gateway.Message.readfrom(two) assert isinstance(msg, cls) assert msg.channelid == 42 assert msg.data == '23' assert isinstance(repr(msg), str) # == "" %(msg.__class__.__name__, ) class TestPureChannel: def setup_method(self, method): self.fac = gateway.ChannelFactory(None) def test_factory_create(self): chan1 = self.fac.new() assert chan1.id == 1 chan2 = self.fac.new() assert chan2.id == 3 def test_factory_getitem(self): chan1 = self.fac.new() assert self.fac._channels[chan1.id] == chan1 chan2 = self.fac.new() assert self.fac._channels[chan2.id] == chan2 def test_channel_timeouterror(self): channel = self.fac.new() py.test.raises(IOError, channel.waitclose, timeout=0.01) class PopenGatewayTestSetup: def setup_class(cls): cls.gw = py.execnet.PopenGateway() #def teardown_class(cls): # cls.gw.exit() class BasicRemoteExecution: def test_correct_setup(self): for x in 'sender', 'receiver': assert self.gw._pool.getstarted(x) def test_repr_doesnt_crash(self): assert isinstance(repr(self), str) def test_correct_setup_no_py(self): channel = self.gw.remote_exec(""" import sys channel.send(sys.modules.keys()) """) remotemodules = channel.receive() assert 'py' not in remotemodules, ( "py should not be imported on remote side") def test_remote_exec_waitclose(self): channel = self.gw.remote_exec('pass') channel.waitclose(TESTTIMEOUT) def test_remote_exec_waitclose_2(self): channel = self.gw.remote_exec('def gccycle(): pass') channel.waitclose(TESTTIMEOUT) def test_remote_exec_waitclose_noarg(self): channel = self.gw.remote_exec('pass') channel.waitclose() def test_remote_exec_error_after_close(self): channel = self.gw.remote_exec('pass') channel.waitclose(TESTTIMEOUT) py.test.raises(IOError, channel.send, 0) def test_remote_exec_channel_anonymous(self): channel = self.gw.remote_exec(''' obj = channel.receive() channel.send(obj) ''') channel.send(42) result = channel.receive() assert result == 42 def test_channel_close_and_then_receive_error(self): channel = self.gw.remote_exec('raise ValueError') py.test.raises(channel.RemoteError, channel.receive) def test_channel_finish_and_then_EOFError(self): channel = self.gw.remote_exec('channel.send(42)') x = channel.receive() assert x == 42 py.test.raises(EOFError, channel.receive) py.test.raises(EOFError, channel.receive) py.test.raises(EOFError, channel.receive) def test_channel_close_and_then_receive_error_multiple(self): channel = self.gw.remote_exec('channel.send(42) ; raise ValueError') x = channel.receive() assert x == 42 py.test.raises(channel.RemoteError, channel.receive) def test_channel__local_close(self): channel = self.gw._channelfactory.new() self.gw._channelfactory._local_close(channel.id) channel.waitclose(0.1) def test_channel__local_close_error(self): channel = self.gw._channelfactory.new() self.gw._channelfactory._local_close(channel.id, channel.RemoteError("error")) py.test.raises(channel.RemoteError, channel.waitclose, 0.01) def test_channel_error_reporting(self): channel = self.gw.remote_exec('def foo():\n return foobar()\nfoo()\n') try: channel.receive() except channel.RemoteError, e: assert str(e).startswith('Traceback (most recent call last):') assert str(e).find('NameError: global name \'foobar\' ' 'is not defined') > -1 else: py.test.fail('No exception raised') def test_channel_syntax_error(self): # missing colon channel = self.gw.remote_exec('def foo()\n return 1\nfoo()\n') try: channel.receive() except channel.RemoteError, e: assert str(e).startswith('Traceback (most recent call last):') assert str(e).find('SyntaxError') > -1 def test_channel_iter(self): channel = self.gw.remote_exec(""" for x in range(3): channel.send(x) """) l = list(channel) assert l == range(3) def test_channel_passing_over_channel(self): channel = self.gw.remote_exec(''' c = channel.gateway.newchannel() channel.send(c) c.send(42) ''') c = channel.receive() x = c.receive() assert x == 42 # check that the both sides previous channels are really gone channel.waitclose(TESTTIMEOUT) #assert c.id not in self.gw._channelfactory newchan = self.gw.remote_exec(''' assert %d not in channel.gateway._channelfactory._channels ''' % (channel.id)) newchan.waitclose(TESTTIMEOUT) assert channel.id not in self.gw._channelfactory._channels def test_channel_receiver_callback(self): l = [] #channel = self.gw.newchannel(receiver=l.append) channel = self.gw.remote_exec(source=''' channel.send(42) channel.send(13) channel.send(channel.gateway.newchannel()) ''') channel.setcallback(callback=l.append) py.test.raises(IOError, channel.receive) channel.waitclose(TESTTIMEOUT) assert len(l) == 3 assert l[:2] == [42,13] assert isinstance(l[2], channel.__class__) def test_channel_callback_after_receive(self): l = [] channel = self.gw.remote_exec(source=''' channel.send(42) channel.send(13) channel.send(channel.gateway.newchannel()) ''') x = channel.receive() assert x == 42 channel.setcallback(callback=l.append) py.test.raises(IOError, channel.receive) channel.waitclose(TESTTIMEOUT) assert len(l) == 2 assert l[0] == 13 assert isinstance(l[1], channel.__class__) def test_waiting_for_callbacks(self): l = [] def callback(msg): import time; time.sleep(0.2) l.append(msg) channel = self.gw.remote_exec(source=''' channel.send(42) ''') channel.setcallback(callback) channel.waitclose(TESTTIMEOUT) assert l == [42] def test_channel_callback_stays_active(self, earlyfree=True): # with 'earlyfree==True', this tests the "sendonly" channel state. l = [] channel = self.gw.remote_exec(source=''' import thread, time def producer(subchannel): for i in range(5): time.sleep(0.15) subchannel.send(i*100) channel2 = channel.receive() thread.start_new_thread(producer, (channel2,)) del channel2 ''') subchannel = self.gw.newchannel() subchannel.setcallback(l.append) channel.send(subchannel) if earlyfree: subchannel = None counter = 100 while len(l) < 5: if subchannel and subchannel.isclosed(): break counter -= 1 print counter if not counter: py.test.fail("timed out waiting for the answer[%d]" % len(l)) time.sleep(0.04) # busy-wait assert l == [0, 100, 200, 300, 400] return subchannel def test_channel_callback_remote_freed(self): channel = self.test_channel_callback_stays_active(False) channel.waitclose(TESTTIMEOUT) # freed automatically at the end of producer() def test_channel_endmarker_callback(self): l = [] channel = self.gw.remote_exec(source=''' channel.send(42) channel.send(13) channel.send(channel.gateway.newchannel()) ''') channel.setcallback(l.append, 999) py.test.raises(IOError, channel.receive) channel.waitclose(TESTTIMEOUT) assert len(l) == 4 assert l[:2] == [42,13] assert isinstance(l[2], channel.__class__) assert l[3] == 999 def test_remote_redirect_stdout(self): out = py.std.StringIO.StringIO() handle = self.gw._remote_redirect(stdout=out) c = self.gw.remote_exec("print 42") c.waitclose(TESTTIMEOUT) handle.close() s = out.getvalue() assert s.strip() == "42" def test_remote_exec_redirect_multi(self): num = 3 l = [[] for x in range(num)] channels = [self.gw.remote_exec("print %d" % i, stdout=l[i].append) for i in range(num)] for x in channels: x.waitclose(TESTTIMEOUT) for i in range(num): subl = l[i] assert subl s = subl[0] assert s.strip() == str(i) def test_channel_file(self): channel = self.gw.remote_exec(""" f = channel.makefile() print >>f, "hello world" f.close() channel.send(42) """) first = channel.receive() + channel.receive() assert first.strip() == 'hello world' second = channel.receive() assert second == 42 def test_channel_file_write_error(self): channel = self.gw.remote_exec("pass") f = channel.makefile() channel.waitclose(TESTTIMEOUT) py.test.raises(IOError, f.write, 'hello') def test_channel_file_proxyclose(self): channel = self.gw.remote_exec(""" f = channel.makefile(proxyclose=True) print >>f, "hello world" f.close() channel.send(42) """) first = channel.receive() + channel.receive() assert first.strip() == 'hello world' py.test.raises(EOFError, channel.receive) def test_confusion_from_os_write_stdout(self): channel = self.gw.remote_exec(""" import os os.write(1, 'confusion!') channel.send(channel.receive() * 6) channel.send(channel.receive() * 6) """) channel.send(3) res = channel.receive() assert res == 18 channel.send(7) res = channel.receive() assert res == 42 def test_confusion_from_os_write_stderr(self): channel = self.gw.remote_exec(""" import os os.write(2, 'test') channel.send(channel.receive() * 6) channel.send(channel.receive() * 6) """) channel.send(3) res = channel.receive() assert res == 18 channel.send(7) res = channel.receive() assert res == 42 #class TestBlockingIssues: # def test_join_blocked_execution_gateway(self): # gateway = py.execnet.PopenGateway() # channel = gateway.remote_exec(""" # time.sleep(5.0) # """) # def doit(): # gateway.exit() # gateway.join(joinexec=True) # return 17 # # pool = py._thread.WorkerPool() # reply = pool.dispatch(doit) # x = reply.get(timeout=1.0) # assert x == 17 class TestPopenGateway(PopenGatewayTestSetup, BasicRemoteExecution): #disabled = True def test_chdir_separation(self): old = py.test.ensuretemp('chdirtest').chdir() try: gw = py.execnet.PopenGateway() finally: waschangedir = old.chdir() c = gw.remote_exec("import os ; channel.send(os.getcwd())") x = c.receive() assert x == str(waschangedir) def test_many_popen(self): num = 4 l = [] for i in range(num): l.append(py.execnet.PopenGateway()) channels = [] for gw in l: channel = gw.remote_exec("""channel.send(42)""") channels.append(channel) ## try: ## while channels: ## channel = channels.pop() ## try: ## ret = channel.receive() ## assert ret == 42 ## finally: ## channel.gateway.exit() ## finally: ## for x in channels: ## x.gateway.exit() while channels: channel = channels.pop() ret = channel.receive() assert ret == 42 def disabled_test_remote_is_killed_while_sending(self): gw = py.execnet.PopenGateway() channel = gw.remote_exec(""" import os import time channel.send(os.getppid()) channel.send(os.getpid()) while 1: channel.send('#'*1000) time.sleep(10) """) parent = channel.receive() remote = channel.receive() assert parent == os.getpid() time.sleep(0.5) os.kill(remote, signal.SIGKILL) time.sleep(1) channel.waitclose(TESTTIMEOUT) py.test.raises(EOFError, channel.receive) #channel.waitclose(TESTTIMEOUT) #channel.send('#') class SocketGatewaySetup: def setup_class(cls): # open a gateway to a fresh child process cls.proxygw = py.execnet.PopenGateway() cls.gw = py.execnet.SocketGateway.new_remote(cls.proxygw, ("127.0.0.1", 0) ) ## def teardown_class(cls): ## cls.gw.exit() ## cls.proxygw.exit() class TestSocketGateway(SocketGatewaySetup, BasicRemoteExecution): pass class TestSshGateway(BasicRemoteExecution): def setup_class(cls): if option.sshtarget is None: py.test.skip("no known ssh target, use -S to set one") cls.gw = py.execnet.SshGateway(option.sshtarget) def test_sshaddress(self): assert self.gw.remoteaddress == option.sshtarget def test_failed_connexion(self): gw = py.execnet.SshGateway('nowhere.codespeak.net') try: channel = gw.remote_exec("...") except IOError: pass # connexion failed already else: # connexion did not fail yet py.test.raises(EOFError, channel.receive) # now it did py.test.raises(IOError, gw.remote_exec, "...")