merge pytest default
This commit is contained in:
commit
3d6ad054c0
|
@ -98,7 +98,7 @@ class PytestPluginManager(PluginManager):
|
|||
err = py.io.dupfile(err, encoding=encoding)
|
||||
except Exception:
|
||||
pass
|
||||
self.trace.root.setwriter(err.write)
|
||||
self.set_tracing(err.write)
|
||||
|
||||
def pytest_configure(self, config):
|
||||
config.addinivalue_line("markers",
|
||||
|
@ -682,11 +682,8 @@ class Config(object):
|
|||
setattr(self.option, opt.dest, opt.default)
|
||||
|
||||
def _getmatchingplugins(self, fspath):
|
||||
allconftests = self._conftest._conftestpath2mod.values()
|
||||
plugins = [x for x in self.pluginmanager.getplugins()
|
||||
if x not in allconftests]
|
||||
plugins += self._conftest.getconftestmodules(fspath)
|
||||
return plugins
|
||||
return self.pluginmanager._plugins + \
|
||||
self._conftest.getconftestmodules(fspath)
|
||||
|
||||
def pytest_load_initial_conftests(self, early_config):
|
||||
self._conftest.setinitial(early_config.known_args_namespace)
|
||||
|
|
227
_pytest/core.py
227
_pytest/core.py
|
@ -67,16 +67,74 @@ class TagTracerSub:
|
|||
def get(self, name):
|
||||
return self.__class__(self.root, self.tags + (name,))
|
||||
|
||||
|
||||
def add_method_controller(cls, func):
|
||||
""" Use func as the method controler for the method found
|
||||
at the class named func.__name__.
|
||||
|
||||
A method controler is invoked with the same arguments
|
||||
as the function it substitutes and is required to yield once
|
||||
which will trigger calling the controlled method.
|
||||
If it yields a second value, the value will be returned
|
||||
as the result of the invocation. Errors in the controlled function
|
||||
are re-raised to the controller during the first yield.
|
||||
"""
|
||||
name = func.__name__
|
||||
oldcall = getattr(cls, name)
|
||||
def wrap_exec(*args, **kwargs):
|
||||
gen = func(*args, **kwargs)
|
||||
next(gen) # first yield
|
||||
try:
|
||||
res = oldcall(*args, **kwargs)
|
||||
except Exception:
|
||||
excinfo = sys.exc_info()
|
||||
try:
|
||||
# reraise exception to controller
|
||||
res = gen.throw(*excinfo)
|
||||
except StopIteration:
|
||||
py.builtin._reraise(*excinfo)
|
||||
else:
|
||||
try:
|
||||
res = gen.send(res)
|
||||
except StopIteration:
|
||||
pass
|
||||
return res
|
||||
|
||||
setattr(cls, name, wrap_exec)
|
||||
return lambda: setattr(cls, name, oldcall)
|
||||
|
||||
|
||||
class PluginManager(object):
|
||||
def __init__(self, hookspecs=None):
|
||||
def __init__(self, hookspecs=None, prefix="pytest_"):
|
||||
self._name2plugin = {}
|
||||
self._listattrcache = {}
|
||||
self._plugins = []
|
||||
self._conftestplugins = []
|
||||
self._warnings = []
|
||||
self.trace = TagTracer().get("pluginmanage")
|
||||
self._plugin_distinfo = []
|
||||
self._shutdown = []
|
||||
self.hook = HookRelay(hookspecs or [], pm=self)
|
||||
self.hook = HookRelay(hookspecs or [], pm=self, prefix=prefix)
|
||||
|
||||
def set_tracing(self, writer):
|
||||
self.trace.root.setwriter(writer)
|
||||
# reconfigure HookCalling to perform tracing
|
||||
assert not hasattr(self, "_wrapping")
|
||||
self._wrapping = True
|
||||
|
||||
def _docall(self, methods, kwargs):
|
||||
trace = self.hookrelay.trace
|
||||
trace.root.indent += 1
|
||||
trace(self.name, kwargs)
|
||||
res = None
|
||||
try:
|
||||
res = yield
|
||||
finally:
|
||||
if res:
|
||||
trace("finish", self.name, "-->", res)
|
||||
trace.root.indent -= 1
|
||||
|
||||
undo = add_method_controller(HookCaller, _docall)
|
||||
self.add_shutdown(undo)
|
||||
|
||||
def do_configure(self, config):
|
||||
# backward compatibility
|
||||
|
@ -86,7 +144,7 @@ class PluginManager(object):
|
|||
assert not hasattr(self, "_registercallback")
|
||||
self._registercallback = callback
|
||||
|
||||
def register(self, plugin, name=None, prepend=False):
|
||||
def register(self, plugin, name=None, prepend=False, conftest=False):
|
||||
if self._name2plugin.get(name, None) == -1:
|
||||
return
|
||||
name = name or getattr(plugin, '__name__', str(id(plugin)))
|
||||
|
@ -94,20 +152,27 @@ class PluginManager(object):
|
|||
raise ValueError("Plugin already registered: %s=%s\n%s" %(
|
||||
name, plugin, self._name2plugin))
|
||||
#self.trace("registering", name, plugin)
|
||||
self._name2plugin[name] = plugin
|
||||
reg = getattr(self, "_registercallback", None)
|
||||
if reg is not None:
|
||||
reg(plugin, name)
|
||||
if not prepend:
|
||||
self._plugins.append(plugin)
|
||||
reg(plugin, name) # may call addhooks
|
||||
self.hook._scan_plugin(plugin)
|
||||
self._name2plugin[name] = plugin
|
||||
if conftest:
|
||||
self._conftestplugins.append(plugin)
|
||||
else:
|
||||
self._plugins.insert(0, plugin)
|
||||
if not prepend:
|
||||
self._plugins.append(plugin)
|
||||
else:
|
||||
self._plugins.insert(0, plugin)
|
||||
return True
|
||||
|
||||
def unregister(self, plugin=None, name=None):
|
||||
if plugin is None:
|
||||
plugin = self.getplugin(name=name)
|
||||
self._plugins.remove(plugin)
|
||||
try:
|
||||
self._plugins.remove(plugin)
|
||||
except KeyError:
|
||||
self._conftestplugins.remove(plugin)
|
||||
for name, value in list(self._name2plugin.items()):
|
||||
if value == plugin:
|
||||
del self._name2plugin[name]
|
||||
|
@ -119,9 +184,8 @@ class PluginManager(object):
|
|||
while self._shutdown:
|
||||
func = self._shutdown.pop()
|
||||
func()
|
||||
self._plugins = []
|
||||
self._plugins = self._conftestplugins = []
|
||||
self._name2plugin.clear()
|
||||
self._listattrcache.clear()
|
||||
|
||||
def isregistered(self, plugin, name=None):
|
||||
if self.getplugin(name) is not None:
|
||||
|
@ -134,7 +198,7 @@ class PluginManager(object):
|
|||
self.hook._addhooks(spec, prefix=prefix)
|
||||
|
||||
def getplugins(self):
|
||||
return list(self._plugins)
|
||||
return self._plugins + self._conftestplugins
|
||||
|
||||
def skipifmissing(self, name):
|
||||
if not self.hasplugin(name):
|
||||
|
@ -198,7 +262,8 @@ class PluginManager(object):
|
|||
self.import_plugin(arg)
|
||||
|
||||
def consider_conftest(self, conftestmodule):
|
||||
if self.register(conftestmodule, name=conftestmodule.__file__):
|
||||
if self.register(conftestmodule, name=conftestmodule.__file__,
|
||||
conftest=True):
|
||||
self.consider_module(conftestmodule)
|
||||
|
||||
def consider_module(self, mod):
|
||||
|
@ -233,12 +298,7 @@ class PluginManager(object):
|
|||
|
||||
def listattr(self, attrname, plugins=None):
|
||||
if plugins is None:
|
||||
plugins = self._plugins
|
||||
key = (attrname,) + tuple(plugins)
|
||||
try:
|
||||
return list(self._listattrcache[key])
|
||||
except KeyError:
|
||||
pass
|
||||
plugins = self._plugins + self._conftestplugins
|
||||
l = []
|
||||
last = []
|
||||
wrappers = []
|
||||
|
@ -257,7 +317,6 @@ class PluginManager(object):
|
|||
l.append(meth)
|
||||
l.extend(last)
|
||||
l.extend(wrappers)
|
||||
self._listattrcache[key] = list(l)
|
||||
return l
|
||||
|
||||
def call_plugin(self, plugin, methname, kwargs):
|
||||
|
@ -288,6 +347,7 @@ class MultiCall:
|
|||
def __init__(self, methods, kwargs, firstresult=False):
|
||||
self.methods = list(methods)
|
||||
self.kwargs = kwargs
|
||||
self.kwargs["__multicall__"] = self
|
||||
self.results = []
|
||||
self.firstresult = firstresult
|
||||
|
||||
|
@ -298,11 +358,12 @@ class MultiCall:
|
|||
def execute(self):
|
||||
next_finalizers = []
|
||||
try:
|
||||
all_kwargs = self.kwargs
|
||||
while self.methods:
|
||||
method = self.methods.pop()
|
||||
kwargs = self.getkwargs(method)
|
||||
args = [all_kwargs[argname] for argname in varnames(method)]
|
||||
if hasattr(method, "hookwrapper"):
|
||||
it = method(**kwargs)
|
||||
it = method(*args)
|
||||
next = getattr(it, "next", None)
|
||||
if next is None:
|
||||
next = getattr(it, "__next__", None)
|
||||
|
@ -312,7 +373,7 @@ class MultiCall:
|
|||
res = next()
|
||||
next_finalizers.append((method, next))
|
||||
else:
|
||||
res = method(**kwargs)
|
||||
res = method(*args)
|
||||
if res is not None:
|
||||
self.results.append(res)
|
||||
if self.firstresult:
|
||||
|
@ -330,17 +391,7 @@ class MultiCall:
|
|||
"wrapper contain more than one yield")
|
||||
|
||||
|
||||
def getkwargs(self, method):
|
||||
kwargs = {}
|
||||
for argname in varnames(method):
|
||||
try:
|
||||
kwargs[argname] = self.kwargs[argname]
|
||||
except KeyError:
|
||||
if argname == "__multicall__":
|
||||
kwargs[argname] = self
|
||||
return kwargs
|
||||
|
||||
def varnames(func):
|
||||
def varnames(func, startindex=None):
|
||||
""" return argument name tuple for a function, method, class or callable.
|
||||
|
||||
In case of a class, its "__init__" method is considered.
|
||||
|
@ -357,74 +408,130 @@ def varnames(func):
|
|||
func = func.__init__
|
||||
except AttributeError:
|
||||
return ()
|
||||
ismethod = True
|
||||
startindex = 1
|
||||
else:
|
||||
if not inspect.isfunction(func) and not inspect.ismethod(func):
|
||||
func = getattr(func, '__call__', func)
|
||||
ismethod = inspect.ismethod(func)
|
||||
if startindex is None:
|
||||
startindex = int(inspect.ismethod(func))
|
||||
|
||||
rawcode = py.code.getrawcode(func)
|
||||
try:
|
||||
x = rawcode.co_varnames[ismethod:rawcode.co_argcount]
|
||||
x = rawcode.co_varnames[startindex:rawcode.co_argcount]
|
||||
except AttributeError:
|
||||
x = ()
|
||||
else:
|
||||
defaults = func.__defaults__
|
||||
if defaults:
|
||||
x = x[:-len(defaults)]
|
||||
try:
|
||||
cache["_varnames"] = x
|
||||
except TypeError:
|
||||
pass
|
||||
return x
|
||||
|
||||
|
||||
class HookRelay:
|
||||
def __init__(self, hookspecs, pm, prefix="pytest_"):
|
||||
if not isinstance(hookspecs, list):
|
||||
hookspecs = [hookspecs]
|
||||
self._hookspecs = []
|
||||
self._pm = pm
|
||||
self.trace = pm.trace.root.get("hook")
|
||||
self.prefix = prefix
|
||||
for hookspec in hookspecs:
|
||||
self._addhooks(hookspec, prefix)
|
||||
|
||||
def _addhooks(self, hookspecs, prefix):
|
||||
self._hookspecs.append(hookspecs)
|
||||
def _addhooks(self, hookspec, prefix):
|
||||
added = False
|
||||
for name, method in vars(hookspecs).items():
|
||||
isclass = int(inspect.isclass(hookspec))
|
||||
for name, method in vars(hookspec).items():
|
||||
if name.startswith(prefix):
|
||||
firstresult = getattr(method, 'firstresult', False)
|
||||
hc = HookCaller(self, name, firstresult=firstresult)
|
||||
hc = HookCaller(self, name, firstresult=firstresult,
|
||||
argnames=varnames(method, startindex=isclass))
|
||||
setattr(self, name, hc)
|
||||
added = True
|
||||
#print ("setting new hook", name)
|
||||
if not added:
|
||||
raise ValueError("did not find new %r hooks in %r" %(
|
||||
prefix, hookspecs,))
|
||||
prefix, hookspec,))
|
||||
|
||||
def _getcaller(self, name, plugins):
|
||||
caller = getattr(self, name)
|
||||
methods = self._pm.listattr(name, plugins=plugins)
|
||||
if methods:
|
||||
return caller.new_cached_caller(methods)
|
||||
return caller
|
||||
|
||||
def _scan_plugin(self, plugin):
|
||||
def fail(msg, *args):
|
||||
name = getattr(plugin, '__name__', plugin)
|
||||
raise PluginValidationError("plugin %r\n%s" %(name, msg % args))
|
||||
|
||||
for name in dir(plugin):
|
||||
if not name.startswith(self.prefix):
|
||||
continue
|
||||
hook = getattr(self, name, None)
|
||||
method = getattr(plugin, name)
|
||||
if hook is None:
|
||||
is_optional = getattr(method, 'optionalhook', False)
|
||||
if not isgenerichook(name) and not is_optional:
|
||||
fail("found unknown hook: %r", name)
|
||||
continue
|
||||
for arg in varnames(method):
|
||||
if arg not in hook.argnames:
|
||||
fail("argument %r not available\n"
|
||||
"actual definition: %s\n"
|
||||
"available hookargs: %s",
|
||||
arg, formatdef(method),
|
||||
", ".join(hook.argnames))
|
||||
getattr(self, name).clear_method_cache()
|
||||
|
||||
|
||||
class HookCaller:
|
||||
def __init__(self, hookrelay, name, firstresult):
|
||||
def __init__(self, hookrelay, name, firstresult, argnames, methods=None):
|
||||
self.hookrelay = hookrelay
|
||||
self.name = name
|
||||
self.firstresult = firstresult
|
||||
self.trace = self.hookrelay.trace
|
||||
self.methods = methods
|
||||
self.argnames = ["__multicall__"]
|
||||
self.argnames.extend(argnames)
|
||||
assert "self" not in argnames # prevent oversights
|
||||
|
||||
def new_cached_caller(self, methods):
|
||||
return HookCaller(self.hookrelay, self.name, self.firstresult,
|
||||
argnames=self.argnames, methods=methods)
|
||||
|
||||
def __repr__(self):
|
||||
return "<HookCaller %r>" %(self.name,)
|
||||
|
||||
def clear_method_cache(self):
|
||||
self.methods = None
|
||||
|
||||
def __call__(self, **kwargs):
|
||||
methods = self.hookrelay._pm.listattr(self.name)
|
||||
methods = self.methods
|
||||
if methods is None:
|
||||
self.methods = methods = self.hookrelay._pm.listattr(self.name)
|
||||
return self._docall(methods, kwargs)
|
||||
|
||||
def pcall(self, plugins, **kwargs):
|
||||
methods = self.hookrelay._pm.listattr(self.name, plugins=plugins)
|
||||
return self._docall(methods, kwargs)
|
||||
def callextra(self, methods, **kwargs):
|
||||
return self._docall(self.methods + methods, kwargs)
|
||||
|
||||
def _docall(self, methods, kwargs):
|
||||
self.trace(self.name, kwargs)
|
||||
self.trace.root.indent += 1
|
||||
mc = MultiCall(methods, kwargs, firstresult=self.firstresult)
|
||||
try:
|
||||
res = mc.execute()
|
||||
if res:
|
||||
self.trace("finish", self.name, "-->", res)
|
||||
finally:
|
||||
self.trace.root.indent -= 1
|
||||
return res
|
||||
return MultiCall(methods, kwargs,
|
||||
firstresult=self.firstresult).execute()
|
||||
|
||||
|
||||
class PluginValidationError(Exception):
|
||||
""" plugin failed validation. """
|
||||
|
||||
def isgenerichook(name):
|
||||
return name == "pytest_plugins" or \
|
||||
name.startswith("pytest_funcarg__")
|
||||
|
||||
def formatdef(func):
|
||||
return "%s%s" % (
|
||||
func.__name__,
|
||||
inspect.formatargspec(*inspect.getargspec(func))
|
||||
)
|
||||
|
||||
|
|
|
@ -1,8 +1,7 @@
|
|||
""" version info, help messages, tracing configuration. """
|
||||
import py
|
||||
import pytest
|
||||
import os, inspect, sys
|
||||
from _pytest.core import varnames
|
||||
import os, sys
|
||||
|
||||
def pytest_addoption(parser):
|
||||
group = parser.getgroup('debugconfig')
|
||||
|
@ -32,7 +31,7 @@ def pytest_cmdline_parse(__multicall__):
|
|||
f.write("versions pytest-%s, py-%s, python-%s\ncwd=%s\nargs=%s\n\n" %(
|
||||
pytest.__version__, py.__version__, ".".join(map(str, sys.version_info)),
|
||||
os.getcwd(), config._origargs))
|
||||
config.trace.root.setwriter(f.write)
|
||||
config.pluginmanager.set_tracing(f.write)
|
||||
sys.stderr.write("writing pytestdebug information to %s\n" % path)
|
||||
return config
|
||||
|
||||
|
@ -127,70 +126,3 @@ def pytest_report_header(config):
|
|||
return lines
|
||||
|
||||
|
||||
# =====================================================
|
||||
# validate plugin syntax and hooks
|
||||
# =====================================================
|
||||
|
||||
def pytest_plugin_registered(manager, plugin):
|
||||
methods = collectattr(plugin)
|
||||
hooks = {}
|
||||
for hookspec in manager.hook._hookspecs:
|
||||
hooks.update(collectattr(hookspec))
|
||||
|
||||
stringio = py.io.TextIO()
|
||||
def Print(*args):
|
||||
if args:
|
||||
stringio.write(" ".join(map(str, args)))
|
||||
stringio.write("\n")
|
||||
|
||||
fail = False
|
||||
while methods:
|
||||
name, method = methods.popitem()
|
||||
#print "checking", name
|
||||
if isgenerichook(name):
|
||||
continue
|
||||
if name not in hooks:
|
||||
if not getattr(method, 'optionalhook', False):
|
||||
Print("found unknown hook:", name)
|
||||
fail = True
|
||||
else:
|
||||
#print "checking", method
|
||||
method_args = list(varnames(method))
|
||||
if '__multicall__' in method_args:
|
||||
method_args.remove('__multicall__')
|
||||
hook = hooks[name]
|
||||
hookargs = varnames(hook)
|
||||
for arg in method_args:
|
||||
if arg not in hookargs:
|
||||
Print("argument %r not available" %(arg, ))
|
||||
Print("actual definition: %s" %(formatdef(method)))
|
||||
Print("available hook arguments: %s" %
|
||||
", ".join(hookargs))
|
||||
fail = True
|
||||
break
|
||||
#if not fail:
|
||||
# print "matching hook:", formatdef(method)
|
||||
if fail:
|
||||
name = getattr(plugin, '__name__', plugin)
|
||||
raise PluginValidationError("%s:\n%s" % (name, stringio.getvalue()))
|
||||
|
||||
class PluginValidationError(Exception):
|
||||
""" plugin failed validation. """
|
||||
|
||||
def isgenerichook(name):
|
||||
return name == "pytest_plugins" or \
|
||||
name.startswith("pytest_funcarg__")
|
||||
|
||||
def collectattr(obj):
|
||||
methods = {}
|
||||
for apiname in dir(obj):
|
||||
if apiname.startswith("pytest_"):
|
||||
methods[apiname] = getattr(obj, apiname)
|
||||
return methods
|
||||
|
||||
def formatdef(func):
|
||||
return "%s%s" % (
|
||||
func.__name__,
|
||||
inspect.formatargspec(*inspect.getargspec(func))
|
||||
)
|
||||
|
||||
|
|
|
@ -142,7 +142,7 @@ def pytest_generate_tests(metafunc):
|
|||
# -------------------------------------------------------------------------
|
||||
# generic runtest related hooks
|
||||
# -------------------------------------------------------------------------
|
||||
def pytest_itemstart(item, node=None):
|
||||
def pytest_itemstart(item, node):
|
||||
""" (deprecated, use pytest_runtest_logstart). """
|
||||
|
||||
def pytest_runtest_protocol(item, nextitem):
|
||||
|
|
|
@ -153,19 +153,17 @@ def pytest_ignore_collect(path, config):
|
|||
ignore_paths.extend([py.path.local(x) for x in excludeopt])
|
||||
return path in ignore_paths
|
||||
|
||||
class HookProxy(object):
|
||||
class FSHookProxy(object):
|
||||
def __init__(self, fspath, config):
|
||||
self.fspath = fspath
|
||||
self.config = config
|
||||
|
||||
def __getattr__(self, name):
|
||||
config = object.__getattribute__(self, "config")
|
||||
hookmethod = getattr(config.hook, name)
|
||||
plugins = self.config._getmatchingplugins(self.fspath)
|
||||
x = self.config.hook._getcaller(name, plugins)
|
||||
self.__dict__[name] = x
|
||||
return x
|
||||
|
||||
def call_matching_hooks(**kwargs):
|
||||
plugins = self.config._getmatchingplugins(self.fspath)
|
||||
return hookmethod.pcall(plugins, **kwargs)
|
||||
return call_matching_hooks
|
||||
|
||||
def compatproperty(name):
|
||||
def fget(self):
|
||||
|
@ -520,6 +518,7 @@ class Session(FSCollector):
|
|||
self.trace = config.trace.root.get("collection")
|
||||
self._norecursepatterns = config.getini("norecursedirs")
|
||||
self.startdir = py.path.local()
|
||||
self._fs2hookproxy = {}
|
||||
|
||||
def pytest_collectstart(self):
|
||||
if self.shouldstop:
|
||||
|
@ -538,7 +537,11 @@ class Session(FSCollector):
|
|||
return path in self._initialpaths
|
||||
|
||||
def gethookproxy(self, fspath):
|
||||
return HookProxy(fspath, self.config)
|
||||
try:
|
||||
return self._fs2hookproxy[fspath]
|
||||
except KeyError:
|
||||
self._fs2hookproxy[fspath] = x = FSHookProxy(fspath, self.config)
|
||||
return x
|
||||
|
||||
def perform_collect(self, args=None, genitems=True):
|
||||
hook = self.config.hook
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
""" (disabled by default) support for testing pytest and pytest plugins. """
|
||||
import inspect
|
||||
import sys
|
||||
import os
|
||||
import codecs
|
||||
|
@ -12,7 +11,7 @@ import subprocess
|
|||
import py
|
||||
import pytest
|
||||
from py.builtin import print_
|
||||
from _pytest.core import HookRelay
|
||||
from _pytest.core import HookCaller, add_method_controller
|
||||
|
||||
from _pytest.main import Session, EXIT_OK
|
||||
|
||||
|
@ -38,24 +37,10 @@ def pytest_configure(config):
|
|||
_pytest_fullpath = os.path.abspath(pytest.__file__.rstrip("oc"))
|
||||
_pytest_fullpath = _pytest_fullpath.replace("$py.class", ".py")
|
||||
|
||||
def pytest_funcarg___pytest(request):
|
||||
return PytestArg(request)
|
||||
|
||||
class PytestArg:
|
||||
def __init__(self, request):
|
||||
self.request = request
|
||||
|
||||
def gethookrecorder(self, hook):
|
||||
hookrecorder = HookRecorder(hook._pm)
|
||||
hookrecorder.start_recording(hook._hookspecs)
|
||||
self.request.addfinalizer(hookrecorder.finish_recording)
|
||||
return hookrecorder
|
||||
|
||||
class ParsedCall:
|
||||
def __init__(self, name, locals):
|
||||
assert '_name' not in locals
|
||||
self.__dict__.update(locals)
|
||||
self.__dict__.pop('self')
|
||||
def __init__(self, name, kwargs):
|
||||
self.__dict__.update(kwargs)
|
||||
self._name = name
|
||||
|
||||
def __repr__(self):
|
||||
|
@ -63,68 +48,27 @@ class ParsedCall:
|
|||
del d['_name']
|
||||
return "<ParsedCall %r(**%r)>" %(self._name, d)
|
||||
|
||||
|
||||
class HookRecorder:
|
||||
def __init__(self, pluginmanager):
|
||||
self._pluginmanager = pluginmanager
|
||||
self.calls = []
|
||||
self._recorders = {}
|
||||
|
||||
def start_recording(self, hookspecs):
|
||||
if not isinstance(hookspecs, (list, tuple)):
|
||||
hookspecs = [hookspecs]
|
||||
for hookspec in hookspecs:
|
||||
assert hookspec not in self._recorders
|
||||
class RecordCalls:
|
||||
_recorder = self
|
||||
for name, method in vars(hookspec).items():
|
||||
if name[0] != "_":
|
||||
setattr(RecordCalls, name, self._makecallparser(method))
|
||||
recorder = RecordCalls()
|
||||
self._recorders[hookspec] = recorder
|
||||
self._pluginmanager.register(recorder)
|
||||
self.hook = HookRelay(hookspecs, pm=self._pluginmanager,
|
||||
prefix="pytest_")
|
||||
def _docall(hookcaller, methods, kwargs):
|
||||
self.calls.append(ParsedCall(hookcaller.name, kwargs))
|
||||
yield
|
||||
self._undo_wrapping = add_method_controller(HookCaller, _docall)
|
||||
pluginmanager.add_shutdown(self._undo_wrapping)
|
||||
|
||||
def finish_recording(self):
|
||||
for recorder in self._recorders.values():
|
||||
if self._pluginmanager.isregistered(recorder):
|
||||
self._pluginmanager.unregister(recorder)
|
||||
self._recorders.clear()
|
||||
|
||||
def _makecallparser(self, method):
|
||||
name = method.__name__
|
||||
args, varargs, varkw, default = inspect.getargspec(method)
|
||||
if not args or args[0] != "self":
|
||||
args.insert(0, 'self')
|
||||
fspec = inspect.formatargspec(args, varargs, varkw, default)
|
||||
# we use exec because we want to have early type
|
||||
# errors on wrong input arguments, using
|
||||
# *args/**kwargs delays this and gives errors
|
||||
# elsewhere
|
||||
exec (py.code.compile("""
|
||||
def %(name)s%(fspec)s:
|
||||
self._recorder.calls.append(
|
||||
ParsedCall(%(name)r, locals()))
|
||||
""" % locals()))
|
||||
return locals()[name]
|
||||
self._undo_wrapping()
|
||||
|
||||
def getcalls(self, names):
|
||||
if isinstance(names, str):
|
||||
names = names.split()
|
||||
for name in names:
|
||||
for cls in self._recorders:
|
||||
if name in vars(cls):
|
||||
break
|
||||
else:
|
||||
raise ValueError("callname %r not found in %r" %(
|
||||
name, self._recorders.keys()))
|
||||
l = []
|
||||
for call in self.calls:
|
||||
if call._name in names:
|
||||
l.append(call)
|
||||
return l
|
||||
return [call for call in self.calls if call._name in names]
|
||||
|
||||
def contains(self, entries):
|
||||
def assert_contains(self, entries):
|
||||
__tracebackhide__ = True
|
||||
i = 0
|
||||
entries = list(entries)
|
||||
|
@ -160,6 +104,69 @@ class HookRecorder:
|
|||
assert len(l) == 1, (name, l)
|
||||
return l[0]
|
||||
|
||||
# functionality for test reports
|
||||
|
||||
def getreports(self,
|
||||
names="pytest_runtest_logreport pytest_collectreport"):
|
||||
return [x.report for x in self.getcalls(names)]
|
||||
|
||||
def matchreport(self, inamepart="",
|
||||
names="pytest_runtest_logreport pytest_collectreport", when=None):
|
||||
""" return a testreport whose dotted import path matches """
|
||||
l = []
|
||||
for rep in self.getreports(names=names):
|
||||
try:
|
||||
if not when and rep.when != "call" and rep.passed:
|
||||
# setup/teardown passing reports - let's ignore those
|
||||
continue
|
||||
except AttributeError:
|
||||
pass
|
||||
if when and getattr(rep, 'when', None) != when:
|
||||
continue
|
||||
if not inamepart or inamepart in rep.nodeid.split("::"):
|
||||
l.append(rep)
|
||||
if not l:
|
||||
raise ValueError("could not find test report matching %r: "
|
||||
"no test reports at all!" % (inamepart,))
|
||||
if len(l) > 1:
|
||||
raise ValueError(
|
||||
"found 2 or more testreports matching %r: %s" %(inamepart, l))
|
||||
return l[0]
|
||||
|
||||
def getfailures(self,
|
||||
names='pytest_runtest_logreport pytest_collectreport'):
|
||||
return [rep for rep in self.getreports(names) if rep.failed]
|
||||
|
||||
def getfailedcollections(self):
|
||||
return self.getfailures('pytest_collectreport')
|
||||
|
||||
def listoutcomes(self):
|
||||
passed = []
|
||||
skipped = []
|
||||
failed = []
|
||||
for rep in self.getreports(
|
||||
"pytest_collectreport pytest_runtest_logreport"):
|
||||
if rep.passed:
|
||||
if getattr(rep, "when", None) == "call":
|
||||
passed.append(rep)
|
||||
elif rep.skipped:
|
||||
skipped.append(rep)
|
||||
elif rep.failed:
|
||||
failed.append(rep)
|
||||
return passed, skipped, failed
|
||||
|
||||
def countoutcomes(self):
|
||||
return [len(x) for x in self.listoutcomes()]
|
||||
|
||||
def assertoutcome(self, passed=0, skipped=0, failed=0):
|
||||
realpassed, realskipped, realfailed = self.listoutcomes()
|
||||
assert passed == len(realpassed)
|
||||
assert skipped == len(realskipped)
|
||||
assert failed == len(realfailed)
|
||||
|
||||
def clear(self):
|
||||
self.calls[:] = []
|
||||
|
||||
|
||||
def pytest_funcarg__linecomp(request):
|
||||
return LineComp()
|
||||
|
@ -195,7 +202,6 @@ class TmpTestdir:
|
|||
def __init__(self, request):
|
||||
self.request = request
|
||||
self.Config = request.config.__class__
|
||||
self._pytest = request.getfuncargvalue("_pytest")
|
||||
# XXX remove duplication with tmpdir plugin
|
||||
basetmp = request.config._tmpdirhandler.ensuretemp("testdir")
|
||||
name = request.function.__name__
|
||||
|
@ -226,15 +232,10 @@ class TmpTestdir:
|
|||
if fn and fn.startswith(str(self.tmpdir)):
|
||||
del sys.modules[name]
|
||||
|
||||
def getreportrecorder(self, obj):
|
||||
if hasattr(obj, 'config'):
|
||||
obj = obj.config
|
||||
if hasattr(obj, 'hook'):
|
||||
obj = obj.hook
|
||||
assert hasattr(obj, '_hookspecs'), obj
|
||||
reprec = ReportRecorder(obj)
|
||||
reprec.hookrecorder = self._pytest.gethookrecorder(obj)
|
||||
reprec.hook = reprec.hookrecorder.hook
|
||||
def make_hook_recorder(self, pluginmanager):
|
||||
assert not hasattr(pluginmanager, "reprec")
|
||||
pluginmanager.reprec = reprec = HookRecorder(pluginmanager)
|
||||
self.request.addfinalizer(reprec.finish_recording)
|
||||
return reprec
|
||||
|
||||
def chdir(self):
|
||||
|
@ -353,26 +354,23 @@ class TmpTestdir:
|
|||
def inline_genitems(self, *args):
|
||||
return self.inprocess_run(list(args) + ['--collectonly'])
|
||||
|
||||
def inline_run(self, *args):
|
||||
items, rec = self.inprocess_run(args)
|
||||
return rec
|
||||
def inprocess_run(self, args, plugins=()):
|
||||
rec = self.inline_run(*args, plugins=plugins)
|
||||
items = [x.item for x in rec.getcalls("pytest_itemcollected")]
|
||||
return items, rec
|
||||
|
||||
def inprocess_run(self, args, plugins=None):
|
||||
def inline_run(self, *args, **kwargs):
|
||||
rec = []
|
||||
items = []
|
||||
class Collect:
|
||||
def pytest_configure(x, config):
|
||||
rec.append(self.getreportrecorder(config))
|
||||
def pytest_itemcollected(self, item):
|
||||
items.append(item)
|
||||
if not plugins:
|
||||
plugins = []
|
||||
rec.append(self.make_hook_recorder(config.pluginmanager))
|
||||
plugins = kwargs.get("plugins") or []
|
||||
plugins.append(Collect())
|
||||
ret = pytest.main(list(args), plugins=plugins)
|
||||
assert len(rec) == 1
|
||||
reprec = rec[0]
|
||||
reprec.ret = ret
|
||||
assert len(rec) == 1
|
||||
return items, reprec
|
||||
return reprec
|
||||
|
||||
def parseconfig(self, *args):
|
||||
args = [str(x) for x in args]
|
||||
|
@ -547,86 +545,6 @@ def getdecoded(out):
|
|||
return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % (
|
||||
py.io.saferepr(out),)
|
||||
|
||||
class ReportRecorder(object):
|
||||
def __init__(self, hook):
|
||||
self.hook = hook
|
||||
self.pluginmanager = hook._pm
|
||||
self.pluginmanager.register(self)
|
||||
|
||||
def getcall(self, name):
|
||||
return self.hookrecorder.getcall(name)
|
||||
|
||||
def popcall(self, name):
|
||||
return self.hookrecorder.popcall(name)
|
||||
|
||||
def getcalls(self, names):
|
||||
""" return list of ParsedCall instances matching the given eventname. """
|
||||
return self.hookrecorder.getcalls(names)
|
||||
|
||||
# functionality for test reports
|
||||
|
||||
def getreports(self, names="pytest_runtest_logreport pytest_collectreport"):
|
||||
return [x.report for x in self.getcalls(names)]
|
||||
|
||||
def matchreport(self, inamepart="",
|
||||
names="pytest_runtest_logreport pytest_collectreport", when=None):
|
||||
""" return a testreport whose dotted import path matches """
|
||||
l = []
|
||||
for rep in self.getreports(names=names):
|
||||
try:
|
||||
if not when and rep.when != "call" and rep.passed:
|
||||
# setup/teardown passing reports - let's ignore those
|
||||
continue
|
||||
except AttributeError:
|
||||
pass
|
||||
if when and getattr(rep, 'when', None) != when:
|
||||
continue
|
||||
if not inamepart or inamepart in rep.nodeid.split("::"):
|
||||
l.append(rep)
|
||||
if not l:
|
||||
raise ValueError("could not find test report matching %r: no test reports at all!" %
|
||||
(inamepart,))
|
||||
if len(l) > 1:
|
||||
raise ValueError("found more than one testreport matching %r: %s" %(
|
||||
inamepart, l))
|
||||
return l[0]
|
||||
|
||||
def getfailures(self, names='pytest_runtest_logreport pytest_collectreport'):
|
||||
return [rep for rep in self.getreports(names) if rep.failed]
|
||||
|
||||
def getfailedcollections(self):
|
||||
return self.getfailures('pytest_collectreport')
|
||||
|
||||
def listoutcomes(self):
|
||||
passed = []
|
||||
skipped = []
|
||||
failed = []
|
||||
for rep in self.getreports(
|
||||
"pytest_collectreport pytest_runtest_logreport"):
|
||||
if rep.passed:
|
||||
if getattr(rep, "when", None) == "call":
|
||||
passed.append(rep)
|
||||
elif rep.skipped:
|
||||
skipped.append(rep)
|
||||
elif rep.failed:
|
||||
failed.append(rep)
|
||||
return passed, skipped, failed
|
||||
|
||||
def countoutcomes(self):
|
||||
return [len(x) for x in self.listoutcomes()]
|
||||
|
||||
def assertoutcome(self, passed=0, skipped=0, failed=0):
|
||||
realpassed, realskipped, realfailed = self.listoutcomes()
|
||||
assert passed == len(realpassed)
|
||||
assert skipped == len(realskipped)
|
||||
assert failed == len(realfailed)
|
||||
|
||||
def clear(self):
|
||||
self.hookrecorder.calls[:] = []
|
||||
|
||||
def unregister(self):
|
||||
self.pluginmanager.unregister(self)
|
||||
self.hookrecorder.finish_recording()
|
||||
|
||||
class LineComp:
|
||||
def __init__(self):
|
||||
|
|
|
@ -353,15 +353,17 @@ class PyCollector(PyobjMixin, pytest.Collector):
|
|||
fixtureinfo = fm.getfixtureinfo(self, funcobj, cls)
|
||||
metafunc = Metafunc(funcobj, fixtureinfo, self.config,
|
||||
cls=cls, module=module)
|
||||
gentesthook = self.config.hook.pytest_generate_tests
|
||||
extra = [module]
|
||||
if cls is not None:
|
||||
extra.append(cls())
|
||||
plugins = self.getplugins() + extra
|
||||
gentesthook.pcall(plugins, metafunc=metafunc)
|
||||
try:
|
||||
methods = [module.pytest_generate_tests]
|
||||
except AttributeError:
|
||||
methods = []
|
||||
if hasattr(cls, "pytest_generate_tests"):
|
||||
methods.append(cls().pytest_generate_tests)
|
||||
self.ihook.pytest_generate_tests.callextra(methods, metafunc=metafunc)
|
||||
|
||||
Function = self._getcustomclass("Function")
|
||||
if not metafunc._calls:
|
||||
yield Function(name, parent=self)
|
||||
yield Function(name, parent=self, fixtureinfo=fixtureinfo)
|
||||
else:
|
||||
# add funcargs() as fixturedefs to fixtureinfo.arg2fixturedefs
|
||||
add_funcarg_pseudo_fixture_def(self, metafunc, fm)
|
||||
|
@ -370,6 +372,7 @@ class PyCollector(PyobjMixin, pytest.Collector):
|
|||
subname = "%s[%s]" %(name, callspec.id)
|
||||
yield Function(name=subname, parent=self,
|
||||
callspec=callspec, callobj=funcobj,
|
||||
fixtureinfo=fixtureinfo,
|
||||
keywords={callspec.id:True})
|
||||
|
||||
def add_funcarg_pseudo_fixture_def(collector, metafunc, fixturemanager):
|
||||
|
@ -1065,28 +1068,27 @@ class Function(FunctionMixin, pytest.Item, FuncargnamesCompatAttr):
|
|||
"""
|
||||
_genid = None
|
||||
def __init__(self, name, parent, args=None, config=None,
|
||||
callspec=None, callobj=NOTSET, keywords=None, session=None):
|
||||
callspec=None, callobj=NOTSET, keywords=None, session=None,
|
||||
fixtureinfo=None):
|
||||
super(Function, self).__init__(name, parent, config=config,
|
||||
session=session)
|
||||
self._args = args
|
||||
if callobj is not NOTSET:
|
||||
self.obj = callobj
|
||||
|
||||
for name, val in (py.builtin._getfuncdict(self.obj) or {}).items():
|
||||
self.keywords[name] = val
|
||||
self.keywords.update(self.obj.__dict__)
|
||||
if callspec:
|
||||
for name, val in callspec.keywords.items():
|
||||
self.keywords[name] = val
|
||||
if keywords:
|
||||
for name, val in keywords.items():
|
||||
self.keywords[name] = val
|
||||
|
||||
isyield = self._isyieldedfunction()
|
||||
self._fixtureinfo = fi = self.session._fixturemanager.getfixtureinfo(
|
||||
self.parent, self.obj, self.cls, funcargs=not isyield)
|
||||
self.fixturenames = fi.names_closure
|
||||
if callspec is not None:
|
||||
self.callspec = callspec
|
||||
self.keywords.update(callspec.keywords)
|
||||
if keywords:
|
||||
self.keywords.update(keywords)
|
||||
|
||||
if fixtureinfo is None:
|
||||
fixtureinfo = self.session._fixturemanager.getfixtureinfo(
|
||||
self.parent, self.obj, self.cls,
|
||||
funcargs=not self._isyieldedfunction())
|
||||
self._fixtureinfo = fixtureinfo
|
||||
self.fixturenames = fixtureinfo.names_closure
|
||||
self._initrequest()
|
||||
|
||||
def _initrequest(self):
|
||||
|
@ -1571,15 +1573,8 @@ class FixtureManager:
|
|||
self._nodeid_and_autousenames = [("", self.config.getini("usefixtures"))]
|
||||
session.config.pluginmanager.register(self, "funcmanage")
|
||||
|
||||
self._nodename2fixtureinfo = {}
|
||||
|
||||
def getfixtureinfo(self, node, func, cls, funcargs=True):
|
||||
# node is the "collection node" for "func"
|
||||
key = (node, func)
|
||||
try:
|
||||
return self._nodename2fixtureinfo[key]
|
||||
except KeyError:
|
||||
pass
|
||||
if funcargs and not hasattr(node, "nofuncargs"):
|
||||
if cls is not None:
|
||||
startindex = 1
|
||||
|
@ -1595,10 +1590,7 @@ class FixtureManager:
|
|||
fm = node.session._fixturemanager
|
||||
names_closure, arg2fixturedefs = fm.getfixtureclosure(initialnames,
|
||||
node)
|
||||
fixtureinfo = FuncFixtureInfo(argnames, names_closure,
|
||||
arg2fixturedefs)
|
||||
self._nodename2fixtureinfo[key] = fixtureinfo
|
||||
return fixtureinfo
|
||||
return FuncFixtureInfo(argnames, names_closure, arg2fixturedefs)
|
||||
|
||||
### XXX this hook should be called for historic events like pytest_configure
|
||||
### so that we don't have to do the below pytest_configure hook
|
||||
|
|
|
@ -9,4 +9,4 @@ if __name__ == '__main__':
|
|||
p = pstats.Stats("prof")
|
||||
p.strip_dirs()
|
||||
p.sort_stats('cumulative')
|
||||
print(p.print_stats(250))
|
||||
print(p.print_stats(500))
|
||||
|
|
|
@ -334,9 +334,9 @@ class TestSession:
|
|||
assert item.name == "test_func"
|
||||
newid = item.nodeid
|
||||
assert newid == id
|
||||
py.std.pprint.pprint(hookrec.hookrecorder.calls)
|
||||
py.std.pprint.pprint(hookrec.calls)
|
||||
topdir = testdir.tmpdir # noqa
|
||||
hookrec.hookrecorder.contains([
|
||||
hookrec.assert_contains([
|
||||
("pytest_collectstart", "collector.fspath == topdir"),
|
||||
("pytest_make_collect_report", "collector.fspath == topdir"),
|
||||
("pytest_collectstart", "collector.fspath == p"),
|
||||
|
@ -381,9 +381,9 @@ class TestSession:
|
|||
id = p.basename
|
||||
|
||||
items, hookrec = testdir.inline_genitems(id)
|
||||
py.std.pprint.pprint(hookrec.hookrecorder.calls)
|
||||
py.std.pprint.pprint(hookrec.calls)
|
||||
assert len(items) == 2
|
||||
hookrec.hookrecorder.contains([
|
||||
hookrec.assert_contains([
|
||||
("pytest_collectstart",
|
||||
"collector.fspath == collector.session.fspath"),
|
||||
("pytest_collectstart",
|
||||
|
@ -404,8 +404,8 @@ class TestSession:
|
|||
|
||||
items, hookrec = testdir.inline_genitems()
|
||||
assert len(items) == 1
|
||||
py.std.pprint.pprint(hookrec.hookrecorder.calls)
|
||||
hookrec.hookrecorder.contains([
|
||||
py.std.pprint.pprint(hookrec.calls)
|
||||
hookrec.assert_contains([
|
||||
("pytest_collectstart", "collector.fspath == test_aaa"),
|
||||
("pytest_pycollect_makeitem", "name == 'test_func'"),
|
||||
("pytest_collectreport",
|
||||
|
@ -425,8 +425,8 @@ class TestSession:
|
|||
|
||||
items, hookrec = testdir.inline_genitems(id)
|
||||
assert len(items) == 2
|
||||
py.std.pprint.pprint(hookrec.hookrecorder.calls)
|
||||
hookrec.hookrecorder.contains([
|
||||
py.std.pprint.pprint(hookrec.calls)
|
||||
hookrec.assert_contains([
|
||||
("pytest_collectstart", "collector.fspath == test_aaa"),
|
||||
("pytest_pycollect_makeitem", "name == 'test_func'"),
|
||||
("pytest_collectreport", "report.nodeid == 'aaa/test_aaa.py'"),
|
||||
|
|
|
@ -149,7 +149,7 @@ class TestBootstrapping:
|
|||
mod.pytest_plugins = "pytest_a"
|
||||
aplugin = testdir.makepyfile(pytest_a="#")
|
||||
pluginmanager = get_plugin_manager()
|
||||
reprec = testdir.getreportrecorder(pluginmanager)
|
||||
reprec = testdir.make_hook_recorder(pluginmanager)
|
||||
#syspath.prepend(aplugin.dirpath())
|
||||
py.std.sys.path.insert(0, str(aplugin.dirpath()))
|
||||
pluginmanager.consider_module(mod)
|
||||
|
@ -274,7 +274,7 @@ class TestBootstrapping:
|
|||
saveindent.append(pm.trace.root.indent)
|
||||
raise ValueError(42)
|
||||
l = []
|
||||
pm.trace.root.setwriter(l.append)
|
||||
pm.set_tracing(l.append)
|
||||
indent = pm.trace.root.indent
|
||||
p = api1()
|
||||
pm.register(p)
|
||||
|
@ -405,11 +405,7 @@ class TestPytestPluginInteractions:
|
|||
pluginmanager.register(p3)
|
||||
methods = pluginmanager.listattr('m')
|
||||
assert methods == [p2.m, p3.m, p1.m]
|
||||
# listattr keeps a cache and deleting
|
||||
# a function attribute requires clearing it
|
||||
pluginmanager._listattrcache.clear()
|
||||
del P1.m.__dict__['tryfirst']
|
||||
|
||||
pytest.mark.trylast(getattr(P2.m, 'im_func', P2.m))
|
||||
methods = pluginmanager.listattr('m')
|
||||
assert methods == [p2.m, p1.m, p3.m]
|
||||
|
@ -436,6 +432,11 @@ def test_varnames():
|
|||
assert varnames(A().f) == ('y',)
|
||||
assert varnames(B()) == ('z',)
|
||||
|
||||
def test_varnames_default():
|
||||
def f(x, y=3):
|
||||
pass
|
||||
assert varnames(f) == ("x",)
|
||||
|
||||
def test_varnames_class():
|
||||
class C:
|
||||
def __init__(self, x):
|
||||
|
@ -494,12 +495,10 @@ class TestMultiCall:
|
|||
return x + z
|
||||
reslist = MultiCall([f], dict(x=23, y=24)).execute()
|
||||
assert reslist == [24]
|
||||
reslist = MultiCall([f], dict(x=23, z=2)).execute()
|
||||
assert reslist == [25]
|
||||
|
||||
def test_tags_call_error(self):
|
||||
multicall = MultiCall([lambda x: x], {})
|
||||
pytest.raises(TypeError, multicall.execute)
|
||||
pytest.raises(KeyError, multicall.execute)
|
||||
|
||||
def test_call_subexecute(self):
|
||||
def m(__multicall__):
|
||||
|
@ -630,6 +629,18 @@ class TestHookRelay:
|
|||
assert l == [4]
|
||||
assert not hasattr(mcm, 'world')
|
||||
|
||||
def test_argmismatch(self):
|
||||
class Api:
|
||||
def hello(self, arg):
|
||||
"api hook 1"
|
||||
pm = PluginManager(Api, prefix="he")
|
||||
class Plugin:
|
||||
def hello(self, argwrong):
|
||||
return arg + 1
|
||||
with pytest.raises(PluginValidationError) as exc:
|
||||
pm.register(Plugin())
|
||||
assert "argwrong" in str(exc.value)
|
||||
|
||||
def test_only_kwargs(self):
|
||||
pm = PluginManager()
|
||||
class Api:
|
||||
|
@ -754,3 +765,96 @@ def test_importplugin_issue375(testdir):
|
|||
assert "qwe" not in str(excinfo.value)
|
||||
assert "aaaa" in str(excinfo.value)
|
||||
|
||||
class TestWrapMethod:
|
||||
def test_basic_happypath(self):
|
||||
class A:
|
||||
def f(self):
|
||||
return "A.f"
|
||||
|
||||
l = []
|
||||
def f(self):
|
||||
l.append(1)
|
||||
yield
|
||||
l.append(2)
|
||||
undo = add_method_controller(A, f)
|
||||
|
||||
assert A().f() == "A.f"
|
||||
assert l == [1,2]
|
||||
undo()
|
||||
l[:] = []
|
||||
assert A().f() == "A.f"
|
||||
assert l == []
|
||||
|
||||
def test_method_raises(self):
|
||||
class A:
|
||||
def error(self, val):
|
||||
raise ValueError(val)
|
||||
|
||||
l = []
|
||||
def error(self, val):
|
||||
l.append(val)
|
||||
try:
|
||||
yield
|
||||
except ValueError:
|
||||
l.append(None)
|
||||
raise
|
||||
|
||||
|
||||
undo = add_method_controller(A, error)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
A().error(42)
|
||||
assert l == [42, None]
|
||||
undo()
|
||||
l[:] = []
|
||||
with pytest.raises(ValueError):
|
||||
A().error(42)
|
||||
assert l == []
|
||||
|
||||
def test_controller_swallows_method_raises(self):
|
||||
class A:
|
||||
def error(self, val):
|
||||
raise ValueError(val)
|
||||
|
||||
def error(self, val):
|
||||
try:
|
||||
yield
|
||||
except ValueError:
|
||||
yield 2
|
||||
|
||||
add_method_controller(A, error)
|
||||
assert A().error(42) == 2
|
||||
|
||||
def test_reraise_on_controller_StopIteration(self):
|
||||
class A:
|
||||
def error(self, val):
|
||||
raise ValueError(val)
|
||||
|
||||
def error(self, val):
|
||||
try:
|
||||
yield
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
add_method_controller(A, error)
|
||||
with pytest.raises(ValueError):
|
||||
A().error(42)
|
||||
|
||||
@pytest.mark.xfail(reason="if needed later")
|
||||
def test_modify_call_args(self):
|
||||
class A:
|
||||
def error(self, val1, val2):
|
||||
raise ValueError(val1+val2)
|
||||
|
||||
l = []
|
||||
def error(self):
|
||||
try:
|
||||
yield (1,), {'val2': 2}
|
||||
except ValueError as ex:
|
||||
assert ex.args == (3,)
|
||||
l.append(1)
|
||||
|
||||
add_method_controller(A, error)
|
||||
with pytest.raises(ValueError):
|
||||
A().error()
|
||||
assert l == [1]
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
import py, pytest
|
||||
from _pytest.helpconfig import collectattr
|
||||
import pytest
|
||||
|
||||
def test_version(testdir, pytestconfig):
|
||||
result = testdir.runpytest("--version")
|
||||
|
@ -25,18 +24,6 @@ def test_help(testdir):
|
|||
*to see*fixtures*py.test --fixtures*
|
||||
""")
|
||||
|
||||
def test_collectattr():
|
||||
class A:
|
||||
def pytest_hello(self):
|
||||
pass
|
||||
class B(A):
|
||||
def pytest_world(self):
|
||||
pass
|
||||
methods = py.builtin.sorted(collectattr(B))
|
||||
assert list(methods) == ['pytest_hello', 'pytest_world']
|
||||
methods = py.builtin.sorted(collectattr(B()))
|
||||
assert list(methods) == ['pytest_hello', 'pytest_world']
|
||||
|
||||
def test_hookvalidation_unknown(testdir):
|
||||
testdir.makeconftest("""
|
||||
def pytest_hello(xyz):
|
||||
|
|
|
@ -3,9 +3,9 @@ import os
|
|||
from _pytest.pytester import HookRecorder
|
||||
from _pytest.core import PluginManager
|
||||
|
||||
def test_reportrecorder(testdir):
|
||||
def test_make_hook_recorder(testdir):
|
||||
item = testdir.getitem("def test_func(): pass")
|
||||
recorder = testdir.getreportrecorder(item.config)
|
||||
recorder = testdir.make_hook_recorder(item.config.pluginmanager)
|
||||
assert not recorder.getfailures()
|
||||
|
||||
pytest.xfail("internal reportrecorder tests need refactoring")
|
||||
|
@ -71,47 +71,37 @@ def test_testdir_runs_with_plugin(testdir):
|
|||
"*1 passed*"
|
||||
])
|
||||
|
||||
def test_hookrecorder_basic():
|
||||
rec = HookRecorder(PluginManager())
|
||||
class ApiClass:
|
||||
|
||||
def make_holder():
|
||||
class apiclass:
|
||||
def pytest_xyz(self, arg):
|
||||
"x"
|
||||
rec.start_recording(ApiClass)
|
||||
rec.hook.pytest_xyz(arg=123)
|
||||
def pytest_xyz_noarg(self):
|
||||
"x"
|
||||
|
||||
apimod = type(os)('api')
|
||||
def pytest_xyz(arg):
|
||||
"x"
|
||||
def pytest_xyz_noarg():
|
||||
"x"
|
||||
apimod.pytest_xyz = pytest_xyz
|
||||
apimod.pytest_xyz_noarg = pytest_xyz_noarg
|
||||
return apiclass, apimod
|
||||
|
||||
|
||||
@pytest.mark.parametrize("holder", make_holder())
|
||||
def test_hookrecorder_basic(holder):
|
||||
pm = PluginManager()
|
||||
pm.hook._addhooks(holder, "pytest_")
|
||||
rec = HookRecorder(pm)
|
||||
pm.hook.pytest_xyz(arg=123)
|
||||
call = rec.popcall("pytest_xyz")
|
||||
assert call.arg == 123
|
||||
assert call._name == "pytest_xyz"
|
||||
pytest.raises(pytest.fail.Exception, "rec.popcall('abc')")
|
||||
|
||||
def test_hookrecorder_basic_no_args_hook():
|
||||
rec = HookRecorder(PluginManager())
|
||||
apimod = type(os)('api')
|
||||
def pytest_xyz():
|
||||
"x"
|
||||
apimod.pytest_xyz = pytest_xyz
|
||||
rec.start_recording(apimod)
|
||||
rec.hook.pytest_xyz()
|
||||
call = rec.popcall("pytest_xyz")
|
||||
assert call._name == "pytest_xyz"
|
||||
|
||||
def test_functional(testdir, linecomp):
|
||||
reprec = testdir.inline_runsource("""
|
||||
import pytest
|
||||
from _pytest.core import HookRelay, PluginManager
|
||||
pytest_plugins="pytester"
|
||||
def test_func(_pytest):
|
||||
class ApiClass:
|
||||
def pytest_xyz(self, arg): "x"
|
||||
hook = HookRelay([ApiClass], PluginManager())
|
||||
rec = _pytest.gethookrecorder(hook)
|
||||
class Plugin:
|
||||
def pytest_xyz(self, arg):
|
||||
return arg + 1
|
||||
rec._pluginmanager.register(Plugin())
|
||||
res = rec.hook.pytest_xyz(arg=41)
|
||||
assert res == [42]
|
||||
""")
|
||||
reprec.assertoutcome(passed=1)
|
||||
pm.hook.pytest_xyz_noarg()
|
||||
call = rec.popcall("pytest_xyz_noarg")
|
||||
assert call._name == "pytest_xyz_noarg"
|
||||
|
||||
|
||||
def test_makepyfile_unicode(testdir):
|
||||
|
|
Loading…
Reference in New Issue