test_ok2/py/test/rsession/master.py

77 lines
2.5 KiB
Python

"""
Node code for Master.
"""
import py
from py.__.test.rsession.outcome import ReprOutcome
from py.__.test.rsession import report
class MasterNode(object):
def __init__(self, channel, reporter, done_dict):
self.channel = channel
self.reporter = reporter
def callback(outcome):
item = self.pending.pop()
if not item in done_dict:
self.receive_result(outcome, item)
done_dict[item] = True
channel.setcallback(callback)
self.pending = []
def receive_result(self, outcomestring, item):
repr_outcome = ReprOutcome(outcomestring)
# send finish report
self.reporter(report.ReceivedItemOutcome(
self.channel, item, repr_outcome))
def send(self, item):
if item is StopIteration:
self.channel.send(42)
else:
self.pending.insert(0, item)
#itemspec = item.listnames()[1:]
self.channel.send(item._get_collector_trail())
# send start report
self.reporter(report.SendItem(self.channel, item))
def itemgen(colitems, reporter, keyword, reporterror):
for x in colitems:
for y in x._tryiter(reporterror = lambda x: reporterror(reporter, x), keyword = keyword):
yield y
def randomgen(items, done_dict):
""" Generator, which randomly gets all the tests from items as long
as they're not in done_dict
"""
import random
while items:
values = items.keys()
item = values[int(random.random()*len(values))]
if item in done_dict:
del items[item]
else:
yield item
def dispatch_loop(masternodes, itemgenerator, shouldstop,
waiter = lambda: py.std.time.sleep(0.1),
max_tasks_per_node=None):
if not max_tasks_per_node:
max_tasks_per_node = py.test.config.getvalue("dist_taskspernode")
all_tests = {}
while 1:
try:
for node in masternodes:
if len(node.pending) < max_tasks_per_node:
item = itemgenerator.next()
all_tests[item] = True
if shouldstop():
for _node in masternodes:
_node.send(StopIteration) # magic connector
return None
node.send(item)
except StopIteration:
break
waiter()
return all_tests