- refactor wrapped call support to also accomodate

pytest.mark.hookwrapper
- introduce a CallOutcome class to hold the result/excinfo status of
  calling a function.
- rename add_method_controller to add_method_wrapper
This commit is contained in:
holger krekel 2014-10-08 11:27:14 +02:00
parent b6e619413f
commit c3d1986101
3 changed files with 90 additions and 99 deletions

View File

@ -10,6 +10,8 @@ import py
assert py.__version__.split(".")[:2] >= ['1', '4'], ("installation problem: " assert py.__version__.split(".")[:2] >= ['1', '4'], ("installation problem: "
"%s is too old, remove or upgrade 'py'" % (py.__version__)) "%s is too old, remove or upgrade 'py'" % (py.__version__))
py3 = sys.version_info > (3,0)
class TagTracer: class TagTracer:
def __init__(self): def __init__(self):
self._tag2proc = {} self._tag2proc = {}
@ -68,42 +70,62 @@ class TagTracerSub:
return self.__class__(self.root, self.tags + (name,)) return self.__class__(self.root, self.tags + (name,))
def add_method_controller(cls, func): def add_method_wrapper(cls, wrapper_func):
""" Use func as the method controler for the method found """ Substitute the function named "wrapperfunc.__name__" at class
at the class named func.__name__. "cls" with a function that wraps the call to the original function.
Return an undo function which can be called to reset the class to use
the old method again.
A method controler is invoked with the same arguments wrapper_func is called with the same arguments as the method
as the function it substitutes and is required to yield once it wraps and its result is used as a wrap_controller for
which will trigger calling the controlled method. calling the original function.
If it yields a second value, the value will be returned
as the result of the invocation. Errors in the controlled function
are re-raised to the controller during the first yield.
""" """
name = func.__name__ name = wrapper_func.__name__
oldcall = getattr(cls, name) oldcall = getattr(cls, name)
def wrap_exec(*args, **kwargs): def wrap_exec(*args, **kwargs):
gen = func(*args, **kwargs) gen = wrapper_func(*args, **kwargs)
next(gen) # first yield return wrapped_call(gen, lambda: oldcall(*args, **kwargs))
try:
res = oldcall(*args, **kwargs)
except Exception:
excinfo = sys.exc_info()
try:
# reraise exception to controller
res = gen.throw(*excinfo)
except StopIteration:
py.builtin._reraise(*excinfo)
else:
try:
res = gen.send(res)
except StopIteration:
pass
return res
setattr(cls, name, wrap_exec) setattr(cls, name, wrap_exec)
return lambda: setattr(cls, name, oldcall) return lambda: setattr(cls, name, oldcall)
def wrapped_call(wrap_controller, func):
""" Wrap calling to a function with a generator. The first yield
will trigger calling the function and receive an according CallOutcome
object representing an exception or a result.
"""
next(wrap_controller) # first yield
call_outcome = CallOutcome(func)
try:
wrap_controller.send(call_outcome)
co = wrap_controller.gi_frame.f_code
raise RuntimeError("wrap_controller for %r %s:%d has second yield" %
(co.co_name, co.co_filename, co.co_firstlineno))
except StopIteration:
pass
if call_outcome.excinfo is None:
return call_outcome.result
else:
ex = call_outcome.excinfo
if py3:
raise ex[1].with_traceback(ex[2])
py.builtin._reraise(*ex)
class CallOutcome:
excinfo = None
def __init__(self, func):
try:
self.result = func()
except Exception:
self.excinfo = sys.exc_info()
def force_result(self, result):
self.result = result
self.excinfo = None
class PluginManager(object): class PluginManager(object):
def __init__(self, hookspecs=None, prefix="pytest_"): def __init__(self, hookspecs=None, prefix="pytest_"):
self._name2plugin = {} self._name2plugin = {}
@ -125,15 +147,12 @@ class PluginManager(object):
trace = self.hookrelay.trace trace = self.hookrelay.trace
trace.root.indent += 1 trace.root.indent += 1
trace(self.name, kwargs) trace(self.name, kwargs)
res = None box = yield
try: if box.excinfo is None:
res = yield trace("finish", self.name, "-->", box.result)
finally: trace.root.indent -= 1
if res:
trace("finish", self.name, "-->", res)
trace.root.indent -= 1
undo = add_method_controller(HookCaller, _docall) undo = add_method_wrapper(HookCaller, _docall)
self.add_shutdown(undo) self.add_shutdown(undo)
def do_configure(self, config): def do_configure(self, config):
@ -356,39 +375,19 @@ class MultiCall:
return "<MultiCall %s, kwargs=%r>" %(status, self.kwargs) return "<MultiCall %s, kwargs=%r>" %(status, self.kwargs)
def execute(self): def execute(self):
next_finalizers = [] all_kwargs = self.kwargs
try: while self.methods:
all_kwargs = self.kwargs method = self.methods.pop()
while self.methods: args = [all_kwargs[argname] for argname in varnames(method)]
method = self.methods.pop() if hasattr(method, "hookwrapper"):
args = [all_kwargs[argname] for argname in varnames(method)] return wrapped_call(method(*args), self.execute)
if hasattr(method, "hookwrapper"): res = method(*args)
it = method(*args) if res is not None:
next = getattr(it, "next", None) self.results.append(res)
if next is None: if self.firstresult:
next = getattr(it, "__next__", None) return res
if next is None: if not self.firstresult:
raise self.WrongHookWrapper(method, return self.results
"wrapper does not contain a yield")
res = next()
next_finalizers.append((method, next))
else:
res = method(*args)
if res is not None:
self.results.append(res)
if self.firstresult:
return res
if not self.firstresult:
return self.results
finally:
for method, fin in reversed(next_finalizers):
try:
fin()
except StopIteration:
pass
else:
raise self.WrongHookWrapper(method,
"wrapper contain more than one yield")
def varnames(func, startindex=None): def varnames(func, startindex=None):

View File

@ -11,7 +11,7 @@ import subprocess
import py import py
import pytest import pytest
from py.builtin import print_ from py.builtin import print_
from _pytest.core import HookCaller, add_method_controller from _pytest.core import HookCaller, add_method_wrapper
from _pytest.main import Session, EXIT_OK from _pytest.main import Session, EXIT_OK
@ -57,7 +57,7 @@ class HookRecorder:
def _docall(hookcaller, methods, kwargs): def _docall(hookcaller, methods, kwargs):
self.calls.append(ParsedCall(hookcaller.name, kwargs)) self.calls.append(ParsedCall(hookcaller.name, kwargs))
yield yield
self._undo_wrapping = add_method_controller(HookCaller, _docall) self._undo_wrapping = add_method_wrapper(HookCaller, _docall)
pluginmanager.add_shutdown(self._undo_wrapping) pluginmanager.add_shutdown(self._undo_wrapping)
def finish_recording(self): def finish_recording(self):

View File

@ -280,8 +280,9 @@ class TestBootstrapping:
pm.register(p) pm.register(p)
assert pm.trace.root.indent == indent assert pm.trace.root.indent == indent
assert len(l) == 1 assert len(l) == 2
assert 'pytest_plugin_registered' in l[0] assert 'pytest_plugin_registered' in l[0]
assert 'finish' in l[1]
pytest.raises(ValueError, lambda: pm.register(api1())) pytest.raises(ValueError, lambda: pm.register(api1()))
assert pm.trace.root.indent == indent assert pm.trace.root.indent == indent
assert saveindent[0] > indent assert saveindent[0] > indent
@ -555,7 +556,7 @@ class TestMultiCall:
l.append("m2 finish") l.append("m2 finish")
m2.hookwrapper = True m2.hookwrapper = True
res = MultiCall([m2, m1], {}).execute() res = MultiCall([m2, m1], {}).execute()
assert res == [1, 2] assert res == []
assert l == ["m1 init", "m2 init", "m2 finish", "m1 finish"] assert l == ["m1 init", "m2 init", "m2 finish", "m1 finish"]
def test_listattr_hookwrapper_ordering(self): def test_listattr_hookwrapper_ordering(self):
@ -593,10 +594,8 @@ class TestMultiCall:
m1.hookwrapper = True m1.hookwrapper = True
mc = MultiCall([m1], {}) mc = MultiCall([m1], {})
with pytest.raises(mc.WrongHookWrapper) as ex: with pytest.raises(TypeError):
mc.execute() mc.execute()
assert ex.value.func == m1
assert ex.value.message
def test_hookwrapper_too_many_yield(self): def test_hookwrapper_too_many_yield(self):
def m1(): def m1():
@ -605,10 +604,10 @@ class TestMultiCall:
m1.hookwrapper = True m1.hookwrapper = True
mc = MultiCall([m1], {}) mc = MultiCall([m1], {})
with pytest.raises(mc.WrongHookWrapper) as ex: with pytest.raises(RuntimeError) as ex:
mc.execute() mc.execute()
assert ex.value.func == m1 assert "m1" in str(ex.value)
assert ex.value.message assert "test_core.py:" in str(ex.value)
class TestHookRelay: class TestHookRelay:
@ -774,9 +773,10 @@ class TestWrapMethod:
l = [] l = []
def f(self): def f(self):
l.append(1) l.append(1)
yield box = yield
assert box.result == "A.f"
l.append(2) l.append(2)
undo = add_method_controller(A, f) undo = add_method_wrapper(A, f)
assert A().f() == "A.f" assert A().f() == "A.f"
assert l == [1,2] assert l == [1,2]
@ -793,14 +793,10 @@ class TestWrapMethod:
l = [] l = []
def error(self, val): def error(self, val):
l.append(val) l.append(val)
try: yield
yield l.append(None)
except ValueError:
l.append(None)
raise
undo = add_method_wrapper(A, error)
undo = add_method_controller(A, error)
with pytest.raises(ValueError): with pytest.raises(ValueError):
A().error(42) A().error(42)
@ -817,12 +813,10 @@ class TestWrapMethod:
raise ValueError(val) raise ValueError(val)
def error(self, val): def error(self, val):
try: box = yield
yield box.force_result(2)
except ValueError:
yield 2
add_method_controller(A, error) add_method_wrapper(A, error)
assert A().error(42) == 2 assert A().error(42) == 2
def test_reraise_on_controller_StopIteration(self): def test_reraise_on_controller_StopIteration(self):
@ -836,7 +830,7 @@ class TestWrapMethod:
except ValueError: except ValueError:
pass pass
add_method_controller(A, error) add_method_wrapper(A, error)
with pytest.raises(ValueError): with pytest.raises(ValueError):
A().error(42) A().error(42)
@ -848,13 +842,11 @@ class TestWrapMethod:
l = [] l = []
def error(self): def error(self):
try: box = yield (1,), {'val2': 2}
yield (1,), {'val2': 2} assert box.excinfo[1].args == (3,)
except ValueError as ex: l.append(1)
assert ex.args == (3,)
l.append(1)
add_method_controller(A, error) add_method_wrapper(A, error)
with pytest.raises(ValueError): with pytest.raises(ValueError):
A().error() A().error()
assert l == [1] assert l == [1]