""" master code and test dispatching for making 1-n master -> slave connection and test it locally. """ import time, threading import py, sys if sys.platform == 'win32': py.test.skip("rsession is unsupported on Windows.") from py.__.test.rsession.master import dispatch_loop, MasterNode from py.__.test.rsession.slave import setup_slave from py.__.test.outcome import ReprOutcome, SerializableOutcome from py.__.test import repevent from py.__.test.rsession.hostmanage import HostInfo def setup_module(mod): # bind an empty config mod.tmpdir = tmpdir = py.test.ensuretemp(mod.__name__) # to avoid rsyncing config = py.test.config._reparse([tmpdir]) config.option.dist_taskspernode = 10 mod.rootcol = config._getcollector(tmpdir) class DummyGateway(object): def __init__(self): self.host = HostInfo("localhost") class DummyChannel(object): def __init__(self): self.sent = [] self.gateway = DummyGateway() def setcallback(self, func): self.callback = func def send(self, item): assert py.std.marshal.dumps(item) self.sent.append(item) class NonWorkingChannel(object): def setcallback(self, func): pass def send(self, item): raise IOError def _getremoteerror(self): return "blah" class Item(py.test.collect.Item): def _get_collector_trail(self): return (self.name,) def test_masternode(): try: raise ValueError() except ValueError: excinfo = py.code.ExceptionInfo() ch = DummyChannel() reportlist = [] mnode = MasterNode(ch, reportlist.append) mnode.send(Item("ok")) mnode.send(Item("notok")) ch.callback(SerializableOutcome().make_repr()) ch.callback(SerializableOutcome(excinfo=excinfo).make_repr()) assert len(reportlist) == 4 received = [i for i in reportlist if isinstance(i, repevent.ReceivedItemOutcome)] assert received[0].outcome.passed assert not received[1].outcome.passed def test_masternode_nonworking_channel(): ch = NonWorkingChannel() reportlist = [] mnode = MasterNode(ch, reportlist.append) cap = py.io.StdCaptureFD() py.test.raises(IOError, 'mnode.send(Item("ok"))') out, err = cap.reset() assert out.find("blah") != -1 def test_sending_two_noes(): # XXX fijal: this test previously tested that the second # item result would not get send. why? did i miss # something? # ch = DummyChannel() reportlist = [] mnode = MasterNode(ch, reportlist.append) mnode.send(Item("ok")) mnode.send(Item("ok")) ch.callback(SerializableOutcome().make_repr()) ch.callback(SerializableOutcome().make_repr()) assert len(reportlist) == 4 def test_outcome_repr(): out = ReprOutcome(SerializableOutcome(skipped="xxx").make_repr()) s = repr(out) assert s.lower().find("skip") != -1 class DummyMasterNode(object): def __init__(self): self.pending = [] def send(self, data): self.pending.append(data) def test_dispatch_loop(): masternodes = [DummyMasterNode(), DummyMasterNode()] itemgenerator = iter(range(100)) shouldstop = lambda : False def waiter(): for node in masternodes: node.pending.pop() dispatch_loop(masternodes, itemgenerator, shouldstop, waiter=waiter) class TestSlave: def setup_class(cls): cls.tmpdir = tmpdir = py.test.ensuretemp(cls.__name__) cls.pkgpath = pkgpath = tmpdir.join("slavetestpkg") pkgpath.ensure("__init__.py") pkgpath.join("test_something.py").write(py.code.Source(""" def funcpass(): pass def funcfail(): raise AssertionError("hello world") """)) cls.config = py.test.config._reparse([tmpdir]) assert cls.config.topdir == tmpdir cls.rootcol = cls.config._getcollector(tmpdir) def _gettrail(self, *names): item = self.rootcol._getitembynames(names) return self.config.get_collector_trail(item) def test_slave_running(self): py.test.skip("XXX test broken, needs refactoring") def simple_report(event): if not isinstance(event, repevent.ReceivedItemOutcome): return item = event.item if item.code.name == 'funcpass': assert event.outcome.passed else: assert not event.outcome.passed def open_gw(): gw = py.execnet.PopenGateway() host = HostInfo("localhost") host.gw_remotepath = '' host.gw = gw #gw.host.gw = gw config = py.test.config._reparse([tmpdir]) channel = setup_slave(host, config) mn = MasterNode(channel, simple_report) return mn master_nodes = [open_gw(), open_gw(), open_gw()] funcpass_item = self.xxx funcfail_item = rootcol._getitembynames(funcfail_spec) itemgenerator = iter([funcfail_item] + [funcpass_item] * 5 + [funcfail_item] * 5) shouldstop = lambda : False dispatch_loop(master_nodes, itemgenerator, shouldstop) def test_slave_running_interrupted(): py.test.skip("XXX test broken, needs refactoring") #def simple_report(event): # if not isinstance(event, repevent.ReceivedItemOutcome): # return # item = event.item # if item.code.name == 'funcpass': # assert event.outcome.passed # else: # assert not event.outcome.passed reports = [] def open_gw(): gw = py.execnet.PopenGateway() gw.host = HostInfo("localhost") gw.host.gw = gw config = py.test.config._reparse([tmpdir]) channel = setup_slave(gw.host, config) mn = MasterNode(channel, reports.append, {}) return mn, gw, channel mn, gw, channel = open_gw() rootcol = py.test.collect.Directory(pkgdir) funchang_item = rootcol._getitembynames(funchang_spec) mn.send(funchang_item) mn.send(StopIteration) # XXX: We have to wait here a bit to make sure that it really did happen channel.waitclose(2)