[svn r57594] * adding tracing to dsession and master/slave communication
(enable with --tracedir) * factor slave loop into a class * add comment to pickling --HG-- branch : trunk
This commit is contained in:
parent
a6f1e3d82f
commit
354feff9a6
|
@ -114,12 +114,18 @@ class Node(object):
|
|||
return (self.name, self.parent)
|
||||
def __setstate__(self, (name, parent)):
|
||||
newnode = parent.join(name)
|
||||
assert newnode is not None, (self, name, parent)
|
||||
if newnode is None:
|
||||
raise AssertionError(self, name, parent, parent.__dict__)
|
||||
self.__dict__.update(newnode.__dict__)
|
||||
#self.__init__(name=name, parent=parent)
|
||||
|
||||
def __repr__(self):
|
||||
return "<%s %r>" %(self.__class__.__name__, getattr(self, 'name', None))
|
||||
if self._config.option.debug:
|
||||
return "<%s %r %0x>" %(self.__class__.__name__,
|
||||
getattr(self, 'name', None), id(self))
|
||||
else:
|
||||
return "<%s %r>" %(self.__class__.__name__,
|
||||
getattr(self, 'name', None))
|
||||
|
||||
# methods for ordering nodes
|
||||
|
||||
|
|
|
@ -241,7 +241,8 @@ class Tracer(object):
|
|||
self.flush = flush
|
||||
|
||||
def __call__(self, *args):
|
||||
print >>self.file, " ".join(map(str, args))
|
||||
time = round(py.std.time.time(), 3)
|
||||
print >>self.file, time, " ".join(map(str, args))
|
||||
if self.flush:
|
||||
self.file.flush()
|
||||
|
||||
|
|
|
@ -15,7 +15,6 @@ from py.__.test.session import Session
|
|||
from py.__.test.runner import OutcomeRepr
|
||||
from py.__.test import outcome
|
||||
|
||||
|
||||
import Queue
|
||||
|
||||
class LoopState(object):
|
||||
|
@ -34,7 +33,7 @@ class DSession(Session):
|
|||
Session drives the collection and running of tests
|
||||
and generates test events for reporters.
|
||||
"""
|
||||
MAXITEMSPERHOST = 10
|
||||
MAXITEMSPERHOST = 15
|
||||
|
||||
def __init__(self, config):
|
||||
super(DSession, self).__init__(config=config)
|
||||
|
@ -43,6 +42,7 @@ class DSession(Session):
|
|||
self.host2pending = {}
|
||||
self.item2host = {}
|
||||
self._testsfailed = False
|
||||
self.trace = config.maketrace("dsession.log")
|
||||
|
||||
def fixoptions(self):
|
||||
""" check, fix and determine conflicting options. """
|
||||
|
@ -173,6 +173,7 @@ class DSession(Session):
|
|||
pending = self.host2pending.pop(host)
|
||||
for item in pending:
|
||||
del self.item2host[item]
|
||||
self.trace("removehost %s, pending=%r" %(host.hostid, pending))
|
||||
return pending
|
||||
|
||||
def triggertesting(self, colitems):
|
||||
|
@ -195,6 +196,7 @@ class DSession(Session):
|
|||
if room > 0:
|
||||
sending = tosend[:room]
|
||||
host.node.sendlist(sending)
|
||||
self.trace("sent to host %s: %r" %(host.hostid, sending))
|
||||
for item in sending:
|
||||
#assert item not in self.item2host, (
|
||||
# "sending same item %r to multiple hosts "
|
||||
|
@ -210,8 +212,11 @@ class DSession(Session):
|
|||
self.queue.put(event.RescheduleItems(tosend))
|
||||
|
||||
def removeitem(self, item):
|
||||
if item not in self.item2host:
|
||||
raise AssertionError(item, self.item2host)
|
||||
host = self.item2host.pop(item)
|
||||
self.host2pending[host].remove(item)
|
||||
self.trace("removed %r, host=%r" %(item,host.hostid))
|
||||
|
||||
def handle_crashitem(self, item, host):
|
||||
longrepr = "%r CRASHED THE HOST %r" %(item, host)
|
||||
|
@ -228,3 +233,15 @@ class DSession(Session):
|
|||
""" teardown any resources after a test run. """
|
||||
for host in self.host2pending:
|
||||
host.gw.exit()
|
||||
|
||||
|
||||
# debugging function
|
||||
def dump_picklestate(item):
|
||||
l = []
|
||||
while 1:
|
||||
state = item.__getstate__()
|
||||
l.append(state)
|
||||
item = state[-1]
|
||||
if len(state) != 2:
|
||||
break
|
||||
return l
|
||||
|
|
|
@ -33,7 +33,7 @@ class Host(object):
|
|||
|
||||
def _getuniqueid(self, hostname):
|
||||
l = self._hostname2list.setdefault(hostname, [])
|
||||
hostid = hostname + "[%d]" % len(l)
|
||||
hostid = hostname + "-%d" % len(l)
|
||||
l.append(hostid)
|
||||
return hostid
|
||||
|
||||
|
|
|
@ -78,7 +78,8 @@ def install_slave(host, config):
|
|||
channel = PickleChannel(channel)
|
||||
from py.__.test.dsession import masterslave
|
||||
config = masterslave.receive_and_send_pickled_config(channel)
|
||||
masterslave.setup_at_slave_side(channel, config)
|
||||
slavenode = masterslave.SlaveNode(channel, config)
|
||||
slavenode.run()
|
||||
""")
|
||||
channel = PickleChannel(channel)
|
||||
remote_topdir = host.gw_remotepath
|
||||
|
@ -89,28 +90,53 @@ def install_slave(host, config):
|
|||
channel.send(host)
|
||||
return channel
|
||||
|
||||
def setup_at_slave_side(channel, config):
|
||||
# our current dir is the topdir
|
||||
# XXX what about neccessary PYTHONPATHs?
|
||||
class SlaveNode(object):
|
||||
def __init__(self, channel, config):
|
||||
self.channel = channel
|
||||
self.config = config
|
||||
import os
|
||||
if hasattr(os, 'nice'):
|
||||
nice_level = config.getvalue('dist_nicelevel')
|
||||
os.nice(nice_level)
|
||||
|
||||
def __repr__(self):
|
||||
host = getattr(self, 'host', '<uninitialized>')
|
||||
return "<%s host=%s>" %(self.__class__.__name__, host.hostid)
|
||||
|
||||
def run(self):
|
||||
from py.__.test.dsession.hostmanage import makehostup
|
||||
host = channel.receive()
|
||||
channel = self.channel
|
||||
self.host = host = channel.receive()
|
||||
channel.send(makehostup(host))
|
||||
self.trace = self.config.maketrace(host.hostid)
|
||||
self.trace("initialized")
|
||||
|
||||
try:
|
||||
while 1:
|
||||
task = channel.receive()
|
||||
self.trace("received", task)
|
||||
|
||||
if task is None: # shutdown
|
||||
channel.send(None)
|
||||
self.trace("shutting down, send None to", channel)
|
||||
break
|
||||
if isinstance(task, list):
|
||||
for item in task:
|
||||
runtest(channel, item)
|
||||
self.runtest(item)
|
||||
else:
|
||||
runtest(channel, task)
|
||||
self.runtest(task)
|
||||
except KeyboardInterrupt:
|
||||
raise
|
||||
except:
|
||||
rep = event.InternalException()
|
||||
self.trace("sending back internal exception report, breaking loop")
|
||||
channel.send(rep)
|
||||
raise
|
||||
else:
|
||||
self.trace("normal shutdown")
|
||||
|
||||
def runtest(channel, item):
|
||||
def runtest(self, item):
|
||||
runner = item._getrunner()
|
||||
testrep = runner(item)
|
||||
channel.send(testrep)
|
||||
self.channel.send(testrep)
|
||||
self.trace("sent back testreport", testrep)
|
||||
|
|
|
@ -23,6 +23,9 @@ class MyPickler(Pickler):
|
|||
""" Pickler with a custom memoize()
|
||||
to take care of unique ID creation.
|
||||
See the usage in ImmutablePickler
|
||||
XXX we could probably extend Pickler
|
||||
and Unpickler classes to directly
|
||||
update the other'S memos.
|
||||
"""
|
||||
def __init__(self, file, protocol, uneven):
|
||||
Pickler.__init__(self, file, protocol)
|
||||
|
@ -82,9 +85,8 @@ class ImmutablePickler:
|
|||
return res
|
||||
|
||||
def _updatepicklememo(self):
|
||||
self._picklememo.update(dict(
|
||||
[(id(obj), (int(x), obj))
|
||||
for x, obj in self._unpicklememo.items()]))
|
||||
for x, obj in self._unpicklememo.items():
|
||||
self._picklememo[id(obj)] = (int(x), obj)
|
||||
|
||||
def _updateunpicklememo(self):
|
||||
for key,obj in self._picklememo.values():
|
||||
|
|
Loading…
Reference in New Issue