2007-01-24 22:24:01 +08:00
|
|
|
"""
|
|
|
|
Node code for Master.
|
|
|
|
"""
|
|
|
|
import py
|
2007-08-27 17:02:50 +08:00
|
|
|
from py.__.test.outcome import ReprOutcome
|
|
|
|
from py.__.test import repevent
|
|
|
|
from py.__.test.outcome import Skipped
|
2007-09-20 23:17:22 +08:00
|
|
|
from py.builtin import GeneratorExit
|
2007-01-24 22:24:01 +08:00
|
|
|
|
|
|
|
class MasterNode(object):
|
2007-02-05 07:12:12 +08:00
|
|
|
def __init__(self, channel, reporter):
|
2007-01-24 22:24:01 +08:00
|
|
|
self.channel = channel
|
|
|
|
self.reporter = reporter
|
|
|
|
self.pending = []
|
2007-02-05 07:12:12 +08:00
|
|
|
channel.setcallback(self._callback)
|
|
|
|
|
|
|
|
def _callback(self, outcome):
|
|
|
|
item = self.pending.pop()
|
|
|
|
self.receive_result(outcome, item)
|
2007-01-24 22:24:01 +08:00
|
|
|
|
|
|
|
def receive_result(self, outcomestring, item):
|
|
|
|
repr_outcome = ReprOutcome(outcomestring)
|
|
|
|
# send finish report
|
2007-02-05 07:34:23 +08:00
|
|
|
self.reporter(repevent.ReceivedItemOutcome(
|
2007-01-24 22:24:01 +08:00
|
|
|
self.channel, item, repr_outcome))
|
|
|
|
|
|
|
|
def send(self, item):
|
2007-02-08 02:25:01 +08:00
|
|
|
try:
|
|
|
|
if item is StopIteration:
|
|
|
|
self.channel.send(42)
|
|
|
|
else:
|
|
|
|
self.pending.insert(0, item)
|
2007-01-26 19:49:59 +08:00
|
|
|
#itemspec = item.listnames()[1:]
|
2007-02-08 02:25:01 +08:00
|
|
|
self.channel.send(item._get_collector_trail())
|
|
|
|
# send start report
|
|
|
|
self.reporter(repevent.SendItem(self.channel, item))
|
|
|
|
except IOError:
|
2007-02-08 02:36:53 +08:00
|
|
|
print "Sending error, channel IOError"
|
2007-02-08 02:44:49 +08:00
|
|
|
print self.channel._getremoteerror()
|
2007-02-08 02:36:53 +08:00
|
|
|
# XXX: this should go as soon as we'll have proper detection
|
|
|
|
# of hanging nodes and such
|
|
|
|
raise
|
2007-01-24 22:24:01 +08:00
|
|
|
|
|
|
|
def dispatch_loop(masternodes, itemgenerator, shouldstop,
|
|
|
|
waiter = lambda: py.std.time.sleep(0.1),
|
|
|
|
max_tasks_per_node=None):
|
|
|
|
if not max_tasks_per_node:
|
2007-01-25 00:46:46 +08:00
|
|
|
max_tasks_per_node = py.test.config.getvalue("dist_taskspernode")
|
2007-01-24 22:24:01 +08:00
|
|
|
all_tests = {}
|
|
|
|
while 1:
|
|
|
|
try:
|
|
|
|
for node in masternodes:
|
|
|
|
if len(node.pending) < max_tasks_per_node:
|
|
|
|
item = itemgenerator.next()
|
|
|
|
all_tests[item] = True
|
|
|
|
if shouldstop():
|
|
|
|
for _node in masternodes:
|
|
|
|
_node.send(StopIteration) # magic connector
|
|
|
|
return None
|
|
|
|
node.send(item)
|
|
|
|
except StopIteration:
|
|
|
|
break
|
|
|
|
waiter()
|
|
|
|
return all_tests
|
|
|
|
|