simplify tracing mechanics by simply going through an indirection

--HG--
branch : more_plugin
This commit is contained in:
holger krekel 2015-04-25 18:15:39 +02:00
parent 9c5495832c
commit 1e883f5979
5 changed files with 74 additions and 170 deletions

View File

@ -115,7 +115,8 @@ class PytestPluginManager(PluginManager):
err = py.io.dupfile(err, encoding=encoding)
except Exception:
pass
self.set_tracing(err.write)
self.trace.root.setwriter(err.write)
self.enable_tracing()
def register(self, plugin, name=None, conftest=False):
ret = super(PytestPluginManager, self).register(plugin, name)

View File

@ -60,6 +60,7 @@ def hookimpl_opts(hookwrapper=False, optionalhook=False,
return func
return setattr_hookimpl_opts
class TagTracer:
def __init__(self):
self._tag2proc = {}
@ -106,6 +107,7 @@ class TagTracer:
assert isinstance(tags, tuple)
self._tag2proc[tags] = processor
class TagTracerSub:
def __init__(self, root, tags):
self.root = root
@ -118,25 +120,6 @@ class TagTracerSub:
return self.__class__(self.root, self.tags + (name,))
def add_method_wrapper(cls, wrapper_func):
""" Substitute the function named "wrapperfunc.__name__" at class
"cls" with a function that wraps the call to the original function.
Return an undo function which can be called to reset the class to use
the old method again.
wrapper_func is called with the same arguments as the method
it wraps and its result is used as a wrap_controller for
calling the original function.
"""
name = wrapper_func.__name__
oldcall = getattr(cls, name)
def wrap_exec(*args, **kwargs):
gen = wrapper_func(*args, **kwargs)
return wrapped_call(gen, lambda: oldcall(*args, **kwargs))
setattr(cls, name, wrap_exec)
return lambda: setattr(cls, name, oldcall)
def raise_wrapfail(wrap_controller, msg):
co = wrap_controller.gi_code
raise RuntimeError("wrap_controller at %r %s:%d %s" %
@ -186,6 +169,25 @@ class CallOutcome:
py.builtin._reraise(*ex)
class TracedHookExecution:
def __init__(self, pluginmanager, before, after):
self.pluginmanager = pluginmanager
self.before = before
self.after = after
self.oldcall = pluginmanager._inner_hookexec
assert not isinstance(self.oldcall, TracedHookExecution)
self.pluginmanager._inner_hookexec = self
def __call__(self, hook, methods, kwargs):
self.before(hook, methods, kwargs)
outcome = CallOutcome(lambda: self.oldcall(hook, methods, kwargs))
self.after(outcome, hook, methods, kwargs)
return outcome.get_result()
def undo(self):
self.pluginmanager._inner_hookexec = self.oldcall
class PluginManager(object):
""" Core Pluginmanager class which manages registration
of plugin objects and 1:N hook calling.
@ -209,31 +211,31 @@ class PluginManager(object):
self._plugins = []
self._plugin2hookcallers = {}
self.trace = TagTracer().get("pluginmanage")
self.hook = HookRelay(pm=self)
self.hook = HookRelay(self.trace.root.get("hook"))
self._inner_hookexec = lambda hook, methods, kwargs: \
MultiCall(methods, kwargs, hook.firstresult).execute()
def set_tracing(self, writer):
""" turn on tracing to the given writer method and
return an undo function. """
self.trace.root.setwriter(writer)
# reconfigure HookCalling to perform tracing
assert not hasattr(self, "_wrapping")
self._wrapping = True
def _hookexec(self, hook, methods, kwargs):
return self._inner_hookexec(hook, methods, kwargs)
hooktrace = self.hook.trace
def enable_tracing(self):
""" enable tracing of hook calls and return an undo function. """
hooktrace = self.hook._trace
def _docall(self, methods, kwargs):
def before(hook, methods, kwargs):
hooktrace.root.indent += 1
hooktrace(self.name, kwargs)
box = yield
if box.excinfo is None:
hooktrace("finish", self.name, "-->", box.result)
hooktrace(hook.name, kwargs)
def after(outcome, hook, methods, kwargs):
if outcome.excinfo is None:
hooktrace("finish", hook.name, "-->", outcome.result)
hooktrace.root.indent -= 1
return add_method_wrapper(HookCaller, _docall)
return TracedHookExecution(self, before, after).undo
def make_hook_caller(self, name, plugins):
caller = getattr(self.hook, name)
hc = HookCaller(caller.name, caller._specmodule_or_class)
hc = HookCaller(caller.name, self._hookexec, caller._specmodule_or_class)
for plugin in plugins:
if hasattr(plugin, name):
hc._add_plugin(plugin)
@ -277,7 +279,7 @@ class PluginManager(object):
if name.startswith(self._prefix):
hc = getattr(self.hook, name, None)
if hc is None:
hc = HookCaller(name, module_or_class)
hc = HookCaller(name, self._hookexec, module_or_class)
setattr(self.hook, name, hc)
else:
# plugins registered this hook without knowing the spec
@ -319,7 +321,7 @@ class PluginManager(object):
if hook is None:
if self._excludefunc is not None and self._excludefunc(name):
continue
hook = HookCaller(name)
hook = HookCaller(name, self._hookexec)
setattr(self.hook, name, hook)
elif hook.has_spec():
self._verify_hook(hook, plugin)
@ -362,15 +364,11 @@ class MultiCall:
self.methods = methods
self.kwargs = kwargs
self.kwargs["__multicall__"] = self
self.results = []
self.firstresult = firstresult
def __repr__(self):
status = "%d results, %d meths" % (len(self.results), len(self.methods))
return "<MultiCall %s, kwargs=%r>" %(status, self.kwargs)
def execute(self):
all_kwargs = self.kwargs
self.results = results = []
while self.methods:
method = self.methods.pop()
args = [all_kwargs[argname] for argname in varnames(method)]
@ -378,11 +376,18 @@ class MultiCall:
return wrapped_call(method(*args), self.execute)
res = method(*args)
if res is not None:
self.results.append(res)
if self.firstresult:
return res
results.append(res)
if not self.firstresult:
return self.results
return results
def __repr__(self):
status = "%d meths" % (len(self.methods),)
if hasattr(self, "results"):
status = ("%d results, " % len(self.results)) + status
return "<MultiCall %s, kwargs=%r>" %(status, self.kwargs)
def varnames(func, startindex=None):
@ -426,17 +431,17 @@ def varnames(func, startindex=None):
class HookRelay:
def __init__(self, pm):
self._pm = pm
self.trace = pm.trace.root.get("hook")
def __init__(self, trace):
self._trace = trace
class HookCaller(object):
def __init__(self, name, specmodule_or_class=None):
def __init__(self, name, hook_execute, specmodule_or_class=None):
self.name = name
self._plugins = []
self._wrappers = []
self._nonwrappers = []
self._hookexec = hook_execute
if specmodule_or_class is not None:
self.set_specification(specmodule_or_class)
@ -495,7 +500,12 @@ class HookCaller(object):
def __call__(self, **kwargs):
assert not self.is_historic()
return self._docall(self._nonwrappers + self._wrappers, kwargs)
return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
def call_historic(self, proc=None, kwargs=None):
self._call_history.append((kwargs or {}, proc))
# historizing hooks don't return results
self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
def call_extra(self, methods, kwargs):
""" Call the hook with some additional temporarily participating
@ -508,20 +518,13 @@ class HookCaller(object):
finally:
self._nonwrappers, self._wrappers = old
def call_historic(self, proc=None, kwargs=None):
self._call_history.append((kwargs or {}, proc))
self._docall(self._nonwrappers + self._wrappers, kwargs)
def _apply_history(self, method):
if self.is_historic():
for kwargs, proc in self._call_history:
res = self._docall([method], kwargs)
res = self._hookexec(self, [method], kwargs)
if res and proc is not None:
proc(res[0])
def _docall(self, methods, kwargs):
return MultiCall(methods, kwargs, firstresult=self.firstresult).execute()
class PluginValidationError(Exception):
""" plugin failed validation. """

View File

@ -34,13 +34,15 @@ def pytest_cmdline_parse():
pytest.__version__, py.__version__,
".".join(map(str, sys.version_info)),
os.getcwd(), config._origargs))
config.pluginmanager.set_tracing(debugfile.write)
config.trace.root.setwriter(debugfile.write)
undo_tracing = config.pluginmanager.enable_tracing()
sys.stderr.write("writing pytestdebug information to %s\n" % path)
def unset_tracing():
debugfile.close()
sys.stderr.write("wrote pytestdebug information to %s\n" %
debugfile.name)
config.trace.root.setwriter(None)
undo_tracing()
config.add_cleanup(unset_tracing)
def pytest_cmdline_main(config):

View File

@ -11,7 +11,7 @@ import subprocess
import py
import pytest
from py.builtin import print_
from _pytest.core import HookCaller, add_method_wrapper
from _pytest.core import HookCaller, TracedHookExecution
from _pytest.main import Session, EXIT_OK
@ -79,12 +79,12 @@ class HookRecorder:
self._pluginmanager = pluginmanager
self.calls = []
def _docall(hookcaller, methods, kwargs):
self.calls.append(ParsedCall(hookcaller.name, kwargs))
yield
self._undo_wrapping = add_method_wrapper(HookCaller, _docall)
#if hasattr(pluginmanager, "config"):
# pluginmanager.add_shutdown(self._undo_wrapping)
def before(hook, method, kwargs):
self.calls.append(ParsedCall(hook.name, kwargs))
def after(outcome, hook, method, kwargs):
pass
executor = TracedHookExecution(pluginmanager, before, after)
self._undo_wrapping = executor.undo
def finish_recording(self):
self._undo_wrapping()

View File

@ -426,7 +426,8 @@ class TestPytestPluginInteractions:
saveindent.append(pytestpm.trace.root.indent)
raise ValueError()
l = []
undo = pytestpm.set_tracing(l.append)
pytestpm.trace.root.setwriter(l.append)
undo = pytestpm.enable_tracing()
try:
indent = pytestpm.trace.root.indent
p = api1()
@ -788,109 +789,6 @@ def test_importplugin_issue375(testdir, pytestpm):
assert "qwe" not in str(excinfo.value)
assert "aaaa" in str(excinfo.value)
class TestWrapMethod:
def test_basic_hapmypath(self):
class A:
def f(self):
return "A.f"
l = []
def f(self):
l.append(1)
box = yield
assert box.result == "A.f"
l.append(2)
undo = add_method_wrapper(A, f)
assert A().f() == "A.f"
assert l == [1,2]
undo()
l[:] = []
assert A().f() == "A.f"
assert l == []
def test_no_yield(self):
class A:
def method(self):
return
def method(self):
if 0:
yield
add_method_wrapper(A, method)
with pytest.raises(RuntimeError) as excinfo:
A().method()
assert "method" in str(excinfo.value)
assert "did not yield" in str(excinfo.value)
def test_method_raises(self):
class A:
def error(self, val):
raise ValueError(val)
l = []
def error(self, val):
l.append(val)
yield
l.append(None)
undo = add_method_wrapper(A, error)
with pytest.raises(ValueError):
A().error(42)
assert l == [42, None]
undo()
l[:] = []
with pytest.raises(ValueError):
A().error(42)
assert l == []
def test_controller_swallows_method_raises(self):
class A:
def error(self, val):
raise ValueError(val)
def error(self, val):
box = yield
box.force_result(2)
add_method_wrapper(A, error)
assert A().error(42) == 2
def test_reraise_on_controller_StopIteration(self):
class A:
def error(self, val):
raise ValueError(val)
def error(self, val):
try:
yield
except ValueError:
pass
add_method_wrapper(A, error)
with pytest.raises(ValueError):
A().error(42)
@pytest.mark.xfail(reason="if needed later")
def test_modify_call_args(self):
class A:
def error(self, val1, val2):
raise ValueError(val1+val2)
l = []
def error(self):
box = yield (1,), {'val2': 2}
assert box.excinfo[1].args == (3,)
l.append(1)
add_method_wrapper(A, error)
with pytest.raises(ValueError):
A().error()
assert l == [1]
### to be shifted to own test file
from _pytest.config import PytestPluginManager