[svn r63547] renaming all event consumers to use the "__" convention that

is also used for funcargs.

--HG--
branch : trunk
This commit is contained in:
hpk 2009-04-02 19:58:51 +02:00
parent 575935a4aa
commit 069ab6ff5f
14 changed files with 77 additions and 78 deletions

View File

@ -1,9 +1,8 @@
py lib Copyright holders, 2003-2008
py lib Copyright holders, 2003-2009
=======================================
Except when otherwise stated (look for LICENSE files or information at
the beginning of each file) the files in the 'py' directory are
copyrighted by one or more of the following people and organizations:
The files in the 'py' directory are copyrighted by one or more
of the following people and organizations:
Holger Krekel, holger at merlinux eu
Guido Wesdorp, johnny at johnnydebris net

View File

@ -9,7 +9,7 @@ registering a plugin
::
>>> class MyPlugin:
... def pyevent_plugin_registered(self, plugin):
... def pyevent__plugin_registered(self, plugin):
... print "registering", plugin.__class__.__name__
...
>>> import py
@ -149,7 +149,7 @@ class PyPlugins:
def notify(self, eventname, *args, **kwargs):
#print "notifying", eventname, args, kwargs
MultiCall(self.listattr("pyevent_" + eventname),
MultiCall(self.listattr("pyevent__" + eventname),
*args, **kwargs).execute()
#print "calling anonymous hooks", args, kwargs
MultiCall(self.listattr("pyevent"),

View File

@ -100,9 +100,9 @@ class TestPyPlugins:
plugins = PyPlugins()
l = []
class MyApi:
def pyevent_plugin_registered(self, plugin):
def pyevent__plugin_registered(self, plugin):
l.append(plugin)
def pyevent_plugin_unregistered(self, plugin):
def pyevent__plugin_unregistered(self, plugin):
l.remove(plugin)
myapi = MyApi()
plugins.register(myapi)
@ -178,7 +178,7 @@ class TestPyPlugins:
plugins = PyPlugins()
l = []
class api1:
def pyevent_hello(self):
def pyevent__hello(self):
l.append("hellospecific")
class api2:
def pyevent(self, name, *args):
@ -201,7 +201,7 @@ class TestPyPlugins:
excinfo = py.test.raises(ImportError, "plugins.consider_module(mod)")
mod.pylib = "os"
class Events(list):
def pyevent_importingmodule(self, mod):
def pyevent__importingmodule(self, mod):
self.append(mod)
l = Events()
plugins.register(l)
@ -226,17 +226,17 @@ def test_subprocess_env(testdir, monkeypatch):
old.chdir()
class TestPyPluginsEvents:
def test_pyevent_named_dispatch(self):
def test_pyevent__named_dispatch(self):
plugins = PyPlugins()
l = []
class A:
def pyevent_name(self, x):
def pyevent__name(self, x):
l.append(x)
plugins.register(A())
plugins.notify("name", 13)
assert l == [13]
def test_pyevent_anonymous_dispatch(self):
def test_pyevent__anonymous_dispatch(self):
plugins = PyPlugins()
l = []
class A:

View File

@ -9,7 +9,7 @@ class TestWarningPlugin:
self.bus.register(self)
self.warnings = []
def pyevent_WARNING(self, warning):
def pyevent__WARNING(self, warning):
self.warnings.append(warning)
def test_event_generation(self):

View File

@ -18,7 +18,7 @@ class WarningPlugin(object):
self.bus = bus
bus.register(self)
def pyevent_WARNING(self, warning):
def pyevent__WARNING(self, warning):
# forward to python warning system
py.std.warnings.warn_explicit(warning, category=Warning,
filename=str(warning.path),

View File

@ -25,27 +25,27 @@ class LoopState(object):
self.shuttingdown = False
self.testsfailed = False
def pyevent_itemtestreport(self, event):
def pyevent__itemtestreport(self, event):
if event.colitem in self.dsession.item2nodes:
self.dsession.removeitem(event.colitem, event.node)
if event.failed:
self.testsfailed = True
def pyevent_collectionreport(self, event):
def pyevent__collectionreport(self, event):
if event.passed:
self.colitems.extend(event.result)
def pyevent_testnodeready(self, node):
def pyevent__testnodeready(self, node):
self.dsession.addnode(node)
def pyevent_testnodedown(self, node, error=None):
def pyevent__testnodedown(self, node, error=None):
pending = self.dsession.removenode(node)
if pending:
crashitem = pending[0]
self.dsession.handle_crashitem(crashitem, node)
self.colitems.extend(pending[1:])
def pyevent_rescheduleitems(self, event):
def pyevent__rescheduleitems(self, event):
self.colitems.extend(event.items)
self.dowork = False # avoid busywait

View File

@ -139,10 +139,10 @@ def slave_runsession(channel, config, fullwidth, hasmarkup):
session.shouldclose = channel.isclosed
class Failures(list):
def pyevent_itemtestreport(self, ev):
def pyevent__itemtestreport(self, ev):
if ev.failed:
self.append(ev)
pyevent_collectionreport = pyevent_itemtestreport
pyevent__collectionreport = pyevent__itemtestreport
failreports = Failures()
session.bus.register(failreports)

View File

@ -11,21 +11,21 @@ class ExecnetcleanupPlugin:
if self._debug:
print "[execnetcleanup %0x] %s %s" %(id(self), msg, args)
def pyevent_gateway_init(self, gateway):
def pyevent__gateway_init(self, gateway):
self.trace("init", gateway)
if self._gateways is not None:
self._gateways.append(gateway)
def pyevent_gateway_exit(self, gateway):
def pyevent__gateway_exit(self, gateway):
self.trace("exit", gateway)
if self._gateways is not None:
self._gateways.remove(gateway)
def pyevent_testrunstart(self, event):
def pyevent__testrunstart(self, event):
self.trace("testrunstart", event)
self._gateways = []
def pyevent_testrunfinish(self, event):
def pyevent__testrunfinish(self, event):
self.trace("testrunfinish", event)
l = []
for gw in self._gateways:

View File

@ -76,7 +76,7 @@ class PluginTester(Support):
if fail:
py.test.fail("Plugin API error")
def collectattr(obj, prefixes=("pytest_", "pyevent_")):
def collectattr(obj, prefixes=("pytest_", "pyevent__")):
methods = {}
for apiname in vars(obj):
for prefix in prefixes:
@ -141,63 +141,63 @@ class PytestPluginHooks:
def pyevent(self, eventname, *args, **kwargs):
""" called for each testing event. """
def pyevent_gateway_init(self, gateway):
def pyevent__gateway_init(self, gateway):
""" called after a gateway has been initialized. """
def pyevent_gateway_exit(self, gateway):
def pyevent__gateway_exit(self, gateway):
""" called when gateway is being exited. """
def pyevent_gwmanage_rsyncstart(self, source, gateways):
def pyevent__gwmanage_rsyncstart(self, source, gateways):
""" called before rsyncing a directory to remote gateways takes place. """
def pyevent_gwmanage_rsyncfinish(self, source, gateways):
def pyevent__gwmanage_rsyncfinish(self, source, gateways):
""" called after rsyncing a directory to remote gateways takes place. """
def pyevent_trace(self, category, msg):
def pyevent__trace(self, category, msg):
""" called for tracing events. """
def pyevent_internalerror(self, event):
def pyevent__internalerror(self, event):
""" called for internal errors. """
def pyevent_itemstart(self, item, node):
def pyevent__itemstart(self, item, node):
""" test item gets collected. """
def pyevent_itemtestreport(self, event):
def pyevent__itemtestreport(self, event):
""" test has been run. """
def pyevent_deselected(self, event):
def pyevent__deselected(self, event):
""" item has been dselected. """
def pyevent_collectionstart(self, event):
def pyevent__collectionstart(self, event):
""" collector starts collecting. """
def pyevent_collectionreport(self, event):
def pyevent__collectionreport(self, event):
""" collector finished collecting. """
def pyevent_testrunstart(self, event):
def pyevent__testrunstart(self, event):
""" whole test run starts. """
def pyevent_testrunfinish(self, event):
def pyevent__testrunfinish(self, event):
""" whole test run finishes. """
def pyevent_gwmanage_newgateway(self, gateway):
def pyevent__gwmanage_newgateway(self, gateway):
""" execnet gateway manager has instantiated a gateway.
The gateway will have an 'id' attribute that is unique
within the gateway manager context.
"""
def pyevent_testnodeready(self, node):
def pyevent__testnodeready(self, node):
""" Node is ready to operate. """
def pyevent_testnodedown(self, node, error):
def pyevent__testnodedown(self, node, error):
""" Node is down. """
def pyevent_rescheduleitems(self, event):
def pyevent__rescheduleitems(self, event):
""" Items from a node that went down. """
def pyevent_looponfailinfo(self, event):
def pyevent__looponfailinfo(self, event):
""" info for repeating failing tests. """
def pyevent_plugin_registered(self, plugin):
def pyevent__plugin_registered(self, plugin):
""" a new py lib plugin got registered. """

View File

@ -171,14 +171,14 @@ class ResultDB(object):
shortrepr, longrepr = getoutcomecodes(event)
self.write_log_entry(event, shortrepr, gpath, longrepr)
def pyevent_itemtestreport(self, event):
def pyevent__itemtestreport(self, event):
self.log_outcome(event)
def pyevent_collectionreport(self, event):
def pyevent__collectionreport(self, event):
if not event.passed:
self.log_outcome(event)
def pyevent_internalerror(self, event):
def pyevent__internalerror(self, event):
path = event.repr.reprcrash.path # fishing :(
self.write_log_entry(event, '!', path, str(event.repr))

View File

@ -82,11 +82,11 @@ class TerminalReporter:
else:
return "???", dict(red=True)
def pyevent_internalerror(self, event):
def pyevent__internalerror(self, event):
for line in str(event.repr).split("\n"):
self.write_line("InternalException: " + line)
def pyevent_gwmanage_newgateway(self, gateway, rinfo):
def pyevent__gwmanage_newgateway(self, gateway, rinfo):
#self.write_line("%s instantiated gateway from spec %r" %(gateway.id, gateway.spec._spec))
d = {}
d['version'] = repr_pythonversion(rinfo.version_info)
@ -105,18 +105,18 @@ class TerminalReporter:
self.write_line(infoline)
self.gateway2info[gateway] = infoline
def pyevent_gwmanage_rsyncstart(self, source, gateways):
def pyevent__gwmanage_rsyncstart(self, source, gateways):
targets = ", ".join([gw.id for gw in gateways])
msg = "rsyncstart: %s -> %s" %(source, targets)
if not self.config.option.verbose:
msg += " # use --verbose to see rsync progress"
self.write_line(msg)
def pyevent_gwmanage_rsyncfinish(self, source, gateways):
def pyevent__gwmanage_rsyncfinish(self, source, gateways):
targets = ", ".join([gw.id for gw in gateways])
self.write_line("rsyncfinish: %s -> %s" %(source, targets))
def pyevent_plugin_registered(self, plugin):
def pyevent__plugin_registered(self, plugin):
if self.config.option.traceconfig:
msg = "PLUGIN registered: %s" %(plugin,)
# XXX this event may happen during setup/teardown time
@ -124,19 +124,19 @@ class TerminalReporter:
# which garbles our output if we use self.write_line
self.write_line(msg)
def pyevent_testnodeready(self, node):
def pyevent__testnodeready(self, node):
self.write_line("%s txnode ready to receive tests" %(node.gateway.id,))
def pyevent_testnodedown(self, node, error):
def pyevent__testnodedown(self, node, error):
if error:
self.write_line("%s node down, error: %s" %(node.gateway.id, error))
def pyevent_trace(self, category, msg):
def pyevent__trace(self, category, msg):
if self.config.option.debug or \
self.config.option.traceconfig and category.find("config") != -1:
self.write_line("[%s] %s" %(category, msg))
def pyevent_itemstart(self, item, node=None):
def pyevent__itemstart(self, item, node=None):
if self.config.option.debug:
info = item.repr_metainfo()
line = info.verboseline(basedir=self.curdir) + " "
@ -154,14 +154,14 @@ class TerminalReporter:
#self.write_fspath_result(fspath, "")
self.write_ensure_prefix(line, "")
def pyevent_rescheduleitems(self, event):
def pyevent__rescheduleitems(self, event):
if self.config.option.debug:
self.write_sep("!", "RESCHEDULING %s " %(event.items,))
def pyevent_deselected(self, event):
def pyevent__deselected(self, event):
self.stats.setdefault('deselected', []).append(event)
def pyevent_itemtestreport(self, event):
def pyevent__itemtestreport(self, event):
fspath = event.colitem.fspath
cat, letter, word = self.getcategoryletterword(event)
if isinstance(word, tuple):
@ -184,7 +184,7 @@ class TerminalReporter:
self._tw.write(" " + line)
self.currentfspath = -2
def pyevent_collectionreport(self, event):
def pyevent__collectionreport(self, event):
if not event.passed:
if event.failed:
self.stats.setdefault("failed", []).append(event)
@ -194,7 +194,7 @@ class TerminalReporter:
self.stats.setdefault("skipped", []).append(event)
self.write_fspath_result(event.colitem.fspath, "S")
def pyevent_testrunstart(self, event):
def pyevent__testrunstart(self, event):
self.write_sep("=", "test session starts", bold=True)
self._sessionstarttime = py.std.time.time()
@ -219,7 +219,7 @@ class TerminalReporter:
for i, testarg in py.builtin.enumerate(self.config.args):
self.write_line("test object %d: %s" %(i+1, testarg))
def pyevent_testrunfinish(self, event):
def pyevent__testrunfinish(self, event):
self._tw.line("")
if event.exitstatus in (0, 1, 2):
self.summary_failures()
@ -232,7 +232,7 @@ class TerminalReporter:
self.summary_deselected()
self.summary_stats()
def pyevent_looponfailinfo(self, event):
def pyevent__looponfailinfo(self, event):
if event.failreports:
self.write_sep("#", "LOOPONFAILING", red=True)
for report in event.failreports:
@ -309,20 +309,20 @@ class CollectonlyReporter:
def outindent(self, line):
self.out.line(self.indent + str(line))
def pyevent_collectionstart(self, event):
def pyevent__collectionstart(self, event):
self.outindent(event.collector)
self.indent += self.INDENT
def pyevent_itemstart(self, item, node=None):
def pyevent__itemstart(self, item, node=None):
self.outindent(item)
def pyevent_collectionreport(self, event):
def pyevent__collectionreport(self, event):
if not event.passed:
self.outindent("!!! %s !!!" % event.longrepr.reprcrash.message)
self._failed.append(event)
self.indent = self.indent[:-len(self.INDENT)]
def pyevent_testrunfinish(self, event):
def pyevent__testrunfinish(self, event):
if self._failed:
self.out.sep("!", "collection failures")
for event in self._failed:
@ -438,7 +438,7 @@ class TestTerminal:
modcol = testdir.getmodulecol("def test_one(): pass")
rep = TerminalReporter(modcol.config, file=linecomp.stringio)
excinfo = py.test.raises(ValueError, "raise ValueError('hello')")
rep.pyevent_internalerror(event.InternalException(excinfo))
rep.pyevent__internalerror(event.InternalException(excinfo))
linecomp.assert_contains_lines([
"InternalException: >*raise ValueError*"
])
@ -462,16 +462,16 @@ class TestTerminal:
platform = "xyz"
cwd = "qwe"
rep.pyevent_gwmanage_newgateway(gw1, rinfo)
rep.pyevent__gwmanage_newgateway(gw1, rinfo)
linecomp.assert_contains_lines([
"X1*popen*xyz*2.5*"
])
rep.pyevent_gwmanage_rsyncstart(source="hello", gateways=[gw1, gw2])
rep.pyevent__gwmanage_rsyncstart(source="hello", gateways=[gw1, gw2])
linecomp.assert_contains_lines([
"rsyncstart: hello -> X1, X2"
])
rep.pyevent_gwmanage_rsyncfinish(source="hello", gateways=[gw1, gw2])
rep.pyevent__gwmanage_rsyncfinish(source="hello", gateways=[gw1, gw2])
linecomp.assert_contains_lines([
"rsyncfinish: hello -> X1, X2"
])
@ -496,7 +496,7 @@ class TestTerminal:
""")
rep = TerminalReporter(modcol.config, file=linecomp.stringio)
reports = [basic_run_report(x) for x in modcol.collect()]
rep.pyevent_looponfailinfo(event.LooponfailingInfo(reports, [modcol.config.topdir]))
rep.pyevent__looponfailinfo(event.LooponfailingInfo(reports, [modcol.config.topdir]))
linecomp.assert_contains_lines([
"*test_looponfailreport.py:2: assert 0",
"*test_looponfailreport.py:4: ValueError*",

View File

@ -82,7 +82,7 @@ class PytestPlugins(object):
mc = py._com.MultiCall(methods, parser=parser)
mc.execute()
def pyevent_plugin_registered(self, plugin):
def pyevent__plugin_registered(self, plugin):
if hasattr(self, '_config'):
self.pyplugins.call_plugin(plugin, "pytest_addoption", parser=self._config._parser)
self.pyplugins.call_plugin(plugin, "pytest_configure", config=self._config)

View File

@ -81,12 +81,12 @@ class Session(object):
""" setup any neccessary resources ahead of the test run. """
self.bus.notify("testrunstart", event.TestrunStart())
def pyevent_itemtestreport(self, rep):
def pyevent__itemtestreport(self, rep):
if rep.failed:
self._testsfailed = True
if self.config.option.exitfirst:
self.shouldstop = True
pyevent_collectionreport = pyevent_itemtestreport
pyevent__collectionreport = pyevent__itemtestreport
def sessionfinishes(self, exitstatus=0, excinfo=None):
""" teardown any resources after a test run. """

View File

@ -177,7 +177,7 @@ class TestPytestPluginInteractions:
class A:
def pytest_configure(self, config):
l.append(self)
def pyevent_hello(self, obj):
def pyevent__hello(self, obj):
events.append(obj)
config.bus.register(A())