merge pytest default

This commit is contained in:
holger krekel 2014-10-07 18:11:15 +02:00
commit b6e619413f
12 changed files with 446 additions and 416 deletions

View File

@ -98,7 +98,7 @@ class PytestPluginManager(PluginManager):
err = py.io.dupfile(err, encoding=encoding)
except Exception:
pass
self.trace.root.setwriter(err.write)
self.set_tracing(err.write)
def pytest_configure(self, config):
config.addinivalue_line("markers",
@ -682,11 +682,8 @@ class Config(object):
setattr(self.option, opt.dest, opt.default)
def _getmatchingplugins(self, fspath):
allconftests = self._conftest._conftestpath2mod.values()
plugins = [x for x in self.pluginmanager.getplugins()
if x not in allconftests]
plugins += self._conftest.getconftestmodules(fspath)
return plugins
return self.pluginmanager._plugins + \
self._conftest.getconftestmodules(fspath)
def pytest_load_initial_conftests(self, early_config):
self._conftest.setinitial(early_config.known_args_namespace)

View File

@ -67,16 +67,74 @@ class TagTracerSub:
def get(self, name):
return self.__class__(self.root, self.tags + (name,))
def add_method_controller(cls, func):
""" Use func as the method controler for the method found
at the class named func.__name__.
A method controler is invoked with the same arguments
as the function it substitutes and is required to yield once
which will trigger calling the controlled method.
If it yields a second value, the value will be returned
as the result of the invocation. Errors in the controlled function
are re-raised to the controller during the first yield.
"""
name = func.__name__
oldcall = getattr(cls, name)
def wrap_exec(*args, **kwargs):
gen = func(*args, **kwargs)
next(gen) # first yield
try:
res = oldcall(*args, **kwargs)
except Exception:
excinfo = sys.exc_info()
try:
# reraise exception to controller
res = gen.throw(*excinfo)
except StopIteration:
py.builtin._reraise(*excinfo)
else:
try:
res = gen.send(res)
except StopIteration:
pass
return res
setattr(cls, name, wrap_exec)
return lambda: setattr(cls, name, oldcall)
class PluginManager(object):
def __init__(self, hookspecs=None):
def __init__(self, hookspecs=None, prefix="pytest_"):
self._name2plugin = {}
self._listattrcache = {}
self._plugins = []
self._conftestplugins = []
self._warnings = []
self.trace = TagTracer().get("pluginmanage")
self._plugin_distinfo = []
self._shutdown = []
self.hook = HookRelay(hookspecs or [], pm=self)
self.hook = HookRelay(hookspecs or [], pm=self, prefix=prefix)
def set_tracing(self, writer):
self.trace.root.setwriter(writer)
# reconfigure HookCalling to perform tracing
assert not hasattr(self, "_wrapping")
self._wrapping = True
def _docall(self, methods, kwargs):
trace = self.hookrelay.trace
trace.root.indent += 1
trace(self.name, kwargs)
res = None
try:
res = yield
finally:
if res:
trace("finish", self.name, "-->", res)
trace.root.indent -= 1
undo = add_method_controller(HookCaller, _docall)
self.add_shutdown(undo)
def do_configure(self, config):
# backward compatibility
@ -86,7 +144,7 @@ class PluginManager(object):
assert not hasattr(self, "_registercallback")
self._registercallback = callback
def register(self, plugin, name=None, prepend=False):
def register(self, plugin, name=None, prepend=False, conftest=False):
if self._name2plugin.get(name, None) == -1:
return
name = name or getattr(plugin, '__name__', str(id(plugin)))
@ -94,20 +152,27 @@ class PluginManager(object):
raise ValueError("Plugin already registered: %s=%s\n%s" %(
name, plugin, self._name2plugin))
#self.trace("registering", name, plugin)
self._name2plugin[name] = plugin
reg = getattr(self, "_registercallback", None)
if reg is not None:
reg(plugin, name)
if not prepend:
self._plugins.append(plugin)
reg(plugin, name) # may call addhooks
self.hook._scan_plugin(plugin)
self._name2plugin[name] = plugin
if conftest:
self._conftestplugins.append(plugin)
else:
self._plugins.insert(0, plugin)
if not prepend:
self._plugins.append(plugin)
else:
self._plugins.insert(0, plugin)
return True
def unregister(self, plugin=None, name=None):
if plugin is None:
plugin = self.getplugin(name=name)
self._plugins.remove(plugin)
try:
self._plugins.remove(plugin)
except KeyError:
self._conftestplugins.remove(plugin)
for name, value in list(self._name2plugin.items()):
if value == plugin:
del self._name2plugin[name]
@ -119,9 +184,8 @@ class PluginManager(object):
while self._shutdown:
func = self._shutdown.pop()
func()
self._plugins = []
self._plugins = self._conftestplugins = []
self._name2plugin.clear()
self._listattrcache.clear()
def isregistered(self, plugin, name=None):
if self.getplugin(name) is not None:
@ -134,7 +198,7 @@ class PluginManager(object):
self.hook._addhooks(spec, prefix=prefix)
def getplugins(self):
return list(self._plugins)
return self._plugins + self._conftestplugins
def skipifmissing(self, name):
if not self.hasplugin(name):
@ -198,7 +262,8 @@ class PluginManager(object):
self.import_plugin(arg)
def consider_conftest(self, conftestmodule):
if self.register(conftestmodule, name=conftestmodule.__file__):
if self.register(conftestmodule, name=conftestmodule.__file__,
conftest=True):
self.consider_module(conftestmodule)
def consider_module(self, mod):
@ -233,12 +298,7 @@ class PluginManager(object):
def listattr(self, attrname, plugins=None):
if plugins is None:
plugins = self._plugins
key = (attrname,) + tuple(plugins)
try:
return list(self._listattrcache[key])
except KeyError:
pass
plugins = self._plugins + self._conftestplugins
l = []
last = []
wrappers = []
@ -257,7 +317,6 @@ class PluginManager(object):
l.append(meth)
l.extend(last)
l.extend(wrappers)
self._listattrcache[key] = list(l)
return l
def call_plugin(self, plugin, methname, kwargs):
@ -288,6 +347,7 @@ class MultiCall:
def __init__(self, methods, kwargs, firstresult=False):
self.methods = list(methods)
self.kwargs = kwargs
self.kwargs["__multicall__"] = self
self.results = []
self.firstresult = firstresult
@ -298,11 +358,12 @@ class MultiCall:
def execute(self):
next_finalizers = []
try:
all_kwargs = self.kwargs
while self.methods:
method = self.methods.pop()
kwargs = self.getkwargs(method)
args = [all_kwargs[argname] for argname in varnames(method)]
if hasattr(method, "hookwrapper"):
it = method(**kwargs)
it = method(*args)
next = getattr(it, "next", None)
if next is None:
next = getattr(it, "__next__", None)
@ -312,7 +373,7 @@ class MultiCall:
res = next()
next_finalizers.append((method, next))
else:
res = method(**kwargs)
res = method(*args)
if res is not None:
self.results.append(res)
if self.firstresult:
@ -330,17 +391,7 @@ class MultiCall:
"wrapper contain more than one yield")
def getkwargs(self, method):
kwargs = {}
for argname in varnames(method):
try:
kwargs[argname] = self.kwargs[argname]
except KeyError:
if argname == "__multicall__":
kwargs[argname] = self
return kwargs
def varnames(func):
def varnames(func, startindex=None):
""" return argument name tuple for a function, method, class or callable.
In case of a class, its "__init__" method is considered.
@ -357,74 +408,130 @@ def varnames(func):
func = func.__init__
except AttributeError:
return ()
ismethod = True
startindex = 1
else:
if not inspect.isfunction(func) and not inspect.ismethod(func):
func = getattr(func, '__call__', func)
ismethod = inspect.ismethod(func)
if startindex is None:
startindex = int(inspect.ismethod(func))
rawcode = py.code.getrawcode(func)
try:
x = rawcode.co_varnames[ismethod:rawcode.co_argcount]
x = rawcode.co_varnames[startindex:rawcode.co_argcount]
except AttributeError:
x = ()
else:
defaults = func.__defaults__
if defaults:
x = x[:-len(defaults)]
try:
cache["_varnames"] = x
except TypeError:
pass
return x
class HookRelay:
def __init__(self, hookspecs, pm, prefix="pytest_"):
if not isinstance(hookspecs, list):
hookspecs = [hookspecs]
self._hookspecs = []
self._pm = pm
self.trace = pm.trace.root.get("hook")
self.prefix = prefix
for hookspec in hookspecs:
self._addhooks(hookspec, prefix)
def _addhooks(self, hookspecs, prefix):
self._hookspecs.append(hookspecs)
def _addhooks(self, hookspec, prefix):
added = False
for name, method in vars(hookspecs).items():
isclass = int(inspect.isclass(hookspec))
for name, method in vars(hookspec).items():
if name.startswith(prefix):
firstresult = getattr(method, 'firstresult', False)
hc = HookCaller(self, name, firstresult=firstresult)
hc = HookCaller(self, name, firstresult=firstresult,
argnames=varnames(method, startindex=isclass))
setattr(self, name, hc)
added = True
#print ("setting new hook", name)
if not added:
raise ValueError("did not find new %r hooks in %r" %(
prefix, hookspecs,))
prefix, hookspec,))
def _getcaller(self, name, plugins):
caller = getattr(self, name)
methods = self._pm.listattr(name, plugins=plugins)
if methods:
return caller.new_cached_caller(methods)
return caller
def _scan_plugin(self, plugin):
def fail(msg, *args):
name = getattr(plugin, '__name__', plugin)
raise PluginValidationError("plugin %r\n%s" %(name, msg % args))
for name in dir(plugin):
if not name.startswith(self.prefix):
continue
hook = getattr(self, name, None)
method = getattr(plugin, name)
if hook is None:
is_optional = getattr(method, 'optionalhook', False)
if not isgenerichook(name) and not is_optional:
fail("found unknown hook: %r", name)
continue
for arg in varnames(method):
if arg not in hook.argnames:
fail("argument %r not available\n"
"actual definition: %s\n"
"available hookargs: %s",
arg, formatdef(method),
", ".join(hook.argnames))
getattr(self, name).clear_method_cache()
class HookCaller:
def __init__(self, hookrelay, name, firstresult):
def __init__(self, hookrelay, name, firstresult, argnames, methods=None):
self.hookrelay = hookrelay
self.name = name
self.firstresult = firstresult
self.trace = self.hookrelay.trace
self.methods = methods
self.argnames = ["__multicall__"]
self.argnames.extend(argnames)
assert "self" not in argnames # prevent oversights
def new_cached_caller(self, methods):
return HookCaller(self.hookrelay, self.name, self.firstresult,
argnames=self.argnames, methods=methods)
def __repr__(self):
return "<HookCaller %r>" %(self.name,)
def clear_method_cache(self):
self.methods = None
def __call__(self, **kwargs):
methods = self.hookrelay._pm.listattr(self.name)
methods = self.methods
if methods is None:
self.methods = methods = self.hookrelay._pm.listattr(self.name)
return self._docall(methods, kwargs)
def pcall(self, plugins, **kwargs):
methods = self.hookrelay._pm.listattr(self.name, plugins=plugins)
return self._docall(methods, kwargs)
def callextra(self, methods, **kwargs):
return self._docall(self.methods + methods, kwargs)
def _docall(self, methods, kwargs):
self.trace(self.name, kwargs)
self.trace.root.indent += 1
mc = MultiCall(methods, kwargs, firstresult=self.firstresult)
try:
res = mc.execute()
if res:
self.trace("finish", self.name, "-->", res)
finally:
self.trace.root.indent -= 1
return res
return MultiCall(methods, kwargs,
firstresult=self.firstresult).execute()
class PluginValidationError(Exception):
""" plugin failed validation. """
def isgenerichook(name):
return name == "pytest_plugins" or \
name.startswith("pytest_funcarg__")
def formatdef(func):
return "%s%s" % (
func.__name__,
inspect.formatargspec(*inspect.getargspec(func))
)

View File

@ -1,8 +1,7 @@
""" version info, help messages, tracing configuration. """
import py
import pytest
import os, inspect, sys
from _pytest.core import varnames
import os, sys
def pytest_addoption(parser):
group = parser.getgroup('debugconfig')
@ -32,7 +31,7 @@ def pytest_cmdline_parse(__multicall__):
f.write("versions pytest-%s, py-%s, python-%s\ncwd=%s\nargs=%s\n\n" %(
pytest.__version__, py.__version__, ".".join(map(str, sys.version_info)),
os.getcwd(), config._origargs))
config.trace.root.setwriter(f.write)
config.pluginmanager.set_tracing(f.write)
sys.stderr.write("writing pytestdebug information to %s\n" % path)
return config
@ -127,70 +126,3 @@ def pytest_report_header(config):
return lines
# =====================================================
# validate plugin syntax and hooks
# =====================================================
def pytest_plugin_registered(manager, plugin):
methods = collectattr(plugin)
hooks = {}
for hookspec in manager.hook._hookspecs:
hooks.update(collectattr(hookspec))
stringio = py.io.TextIO()
def Print(*args):
if args:
stringio.write(" ".join(map(str, args)))
stringio.write("\n")
fail = False
while methods:
name, method = methods.popitem()
#print "checking", name
if isgenerichook(name):
continue
if name not in hooks:
if not getattr(method, 'optionalhook', False):
Print("found unknown hook:", name)
fail = True
else:
#print "checking", method
method_args = list(varnames(method))
if '__multicall__' in method_args:
method_args.remove('__multicall__')
hook = hooks[name]
hookargs = varnames(hook)
for arg in method_args:
if arg not in hookargs:
Print("argument %r not available" %(arg, ))
Print("actual definition: %s" %(formatdef(method)))
Print("available hook arguments: %s" %
", ".join(hookargs))
fail = True
break
#if not fail:
# print "matching hook:", formatdef(method)
if fail:
name = getattr(plugin, '__name__', plugin)
raise PluginValidationError("%s:\n%s" % (name, stringio.getvalue()))
class PluginValidationError(Exception):
""" plugin failed validation. """
def isgenerichook(name):
return name == "pytest_plugins" or \
name.startswith("pytest_funcarg__")
def collectattr(obj):
methods = {}
for apiname in dir(obj):
if apiname.startswith("pytest_"):
methods[apiname] = getattr(obj, apiname)
return methods
def formatdef(func):
return "%s%s" % (
func.__name__,
inspect.formatargspec(*inspect.getargspec(func))
)

View File

@ -142,7 +142,7 @@ def pytest_generate_tests(metafunc):
# -------------------------------------------------------------------------
# generic runtest related hooks
# -------------------------------------------------------------------------
def pytest_itemstart(item, node=None):
def pytest_itemstart(item, node):
""" (deprecated, use pytest_runtest_logstart). """
def pytest_runtest_protocol(item, nextitem):

View File

@ -153,19 +153,17 @@ def pytest_ignore_collect(path, config):
ignore_paths.extend([py.path.local(x) for x in excludeopt])
return path in ignore_paths
class HookProxy(object):
class FSHookProxy(object):
def __init__(self, fspath, config):
self.fspath = fspath
self.config = config
def __getattr__(self, name):
config = object.__getattribute__(self, "config")
hookmethod = getattr(config.hook, name)
plugins = self.config._getmatchingplugins(self.fspath)
x = self.config.hook._getcaller(name, plugins)
self.__dict__[name] = x
return x
def call_matching_hooks(**kwargs):
plugins = self.config._getmatchingplugins(self.fspath)
return hookmethod.pcall(plugins, **kwargs)
return call_matching_hooks
def compatproperty(name):
def fget(self):
@ -520,6 +518,7 @@ class Session(FSCollector):
self.trace = config.trace.root.get("collection")
self._norecursepatterns = config.getini("norecursedirs")
self.startdir = py.path.local()
self._fs2hookproxy = {}
def pytest_collectstart(self):
if self.shouldstop:
@ -538,7 +537,11 @@ class Session(FSCollector):
return path in self._initialpaths
def gethookproxy(self, fspath):
return HookProxy(fspath, self.config)
try:
return self._fs2hookproxy[fspath]
except KeyError:
self._fs2hookproxy[fspath] = x = FSHookProxy(fspath, self.config)
return x
def perform_collect(self, args=None, genitems=True):
hook = self.config.hook

View File

@ -1,5 +1,4 @@
""" (disabled by default) support for testing pytest and pytest plugins. """
import inspect
import sys
import os
import codecs
@ -12,7 +11,7 @@ import subprocess
import py
import pytest
from py.builtin import print_
from _pytest.core import HookRelay
from _pytest.core import HookCaller, add_method_controller
from _pytest.main import Session, EXIT_OK
@ -38,24 +37,10 @@ def pytest_configure(config):
_pytest_fullpath = os.path.abspath(pytest.__file__.rstrip("oc"))
_pytest_fullpath = _pytest_fullpath.replace("$py.class", ".py")
def pytest_funcarg___pytest(request):
return PytestArg(request)
class PytestArg:
def __init__(self, request):
self.request = request
def gethookrecorder(self, hook):
hookrecorder = HookRecorder(hook._pm)
hookrecorder.start_recording(hook._hookspecs)
self.request.addfinalizer(hookrecorder.finish_recording)
return hookrecorder
class ParsedCall:
def __init__(self, name, locals):
assert '_name' not in locals
self.__dict__.update(locals)
self.__dict__.pop('self')
def __init__(self, name, kwargs):
self.__dict__.update(kwargs)
self._name = name
def __repr__(self):
@ -63,68 +48,27 @@ class ParsedCall:
del d['_name']
return "<ParsedCall %r(**%r)>" %(self._name, d)
class HookRecorder:
def __init__(self, pluginmanager):
self._pluginmanager = pluginmanager
self.calls = []
self._recorders = {}
def start_recording(self, hookspecs):
if not isinstance(hookspecs, (list, tuple)):
hookspecs = [hookspecs]
for hookspec in hookspecs:
assert hookspec not in self._recorders
class RecordCalls:
_recorder = self
for name, method in vars(hookspec).items():
if name[0] != "_":
setattr(RecordCalls, name, self._makecallparser(method))
recorder = RecordCalls()
self._recorders[hookspec] = recorder
self._pluginmanager.register(recorder)
self.hook = HookRelay(hookspecs, pm=self._pluginmanager,
prefix="pytest_")
def _docall(hookcaller, methods, kwargs):
self.calls.append(ParsedCall(hookcaller.name, kwargs))
yield
self._undo_wrapping = add_method_controller(HookCaller, _docall)
pluginmanager.add_shutdown(self._undo_wrapping)
def finish_recording(self):
for recorder in self._recorders.values():
if self._pluginmanager.isregistered(recorder):
self._pluginmanager.unregister(recorder)
self._recorders.clear()
def _makecallparser(self, method):
name = method.__name__
args, varargs, varkw, default = inspect.getargspec(method)
if not args or args[0] != "self":
args.insert(0, 'self')
fspec = inspect.formatargspec(args, varargs, varkw, default)
# we use exec because we want to have early type
# errors on wrong input arguments, using
# *args/**kwargs delays this and gives errors
# elsewhere
exec (py.code.compile("""
def %(name)s%(fspec)s:
self._recorder.calls.append(
ParsedCall(%(name)r, locals()))
""" % locals()))
return locals()[name]
self._undo_wrapping()
def getcalls(self, names):
if isinstance(names, str):
names = names.split()
for name in names:
for cls in self._recorders:
if name in vars(cls):
break
else:
raise ValueError("callname %r not found in %r" %(
name, self._recorders.keys()))
l = []
for call in self.calls:
if call._name in names:
l.append(call)
return l
return [call for call in self.calls if call._name in names]
def contains(self, entries):
def assert_contains(self, entries):
__tracebackhide__ = True
i = 0
entries = list(entries)
@ -160,6 +104,69 @@ class HookRecorder:
assert len(l) == 1, (name, l)
return l[0]
# functionality for test reports
def getreports(self,
names="pytest_runtest_logreport pytest_collectreport"):
return [x.report for x in self.getcalls(names)]
def matchreport(self, inamepart="",
names="pytest_runtest_logreport pytest_collectreport", when=None):
""" return a testreport whose dotted import path matches """
l = []
for rep in self.getreports(names=names):
try:
if not when and rep.when != "call" and rep.passed:
# setup/teardown passing reports - let's ignore those
continue
except AttributeError:
pass
if when and getattr(rep, 'when', None) != when:
continue
if not inamepart or inamepart in rep.nodeid.split("::"):
l.append(rep)
if not l:
raise ValueError("could not find test report matching %r: "
"no test reports at all!" % (inamepart,))
if len(l) > 1:
raise ValueError(
"found 2 or more testreports matching %r: %s" %(inamepart, l))
return l[0]
def getfailures(self,
names='pytest_runtest_logreport pytest_collectreport'):
return [rep for rep in self.getreports(names) if rep.failed]
def getfailedcollections(self):
return self.getfailures('pytest_collectreport')
def listoutcomes(self):
passed = []
skipped = []
failed = []
for rep in self.getreports(
"pytest_collectreport pytest_runtest_logreport"):
if rep.passed:
if getattr(rep, "when", None) == "call":
passed.append(rep)
elif rep.skipped:
skipped.append(rep)
elif rep.failed:
failed.append(rep)
return passed, skipped, failed
def countoutcomes(self):
return [len(x) for x in self.listoutcomes()]
def assertoutcome(self, passed=0, skipped=0, failed=0):
realpassed, realskipped, realfailed = self.listoutcomes()
assert passed == len(realpassed)
assert skipped == len(realskipped)
assert failed == len(realfailed)
def clear(self):
self.calls[:] = []
def pytest_funcarg__linecomp(request):
return LineComp()
@ -195,7 +202,6 @@ class TmpTestdir:
def __init__(self, request):
self.request = request
self.Config = request.config.__class__
self._pytest = request.getfuncargvalue("_pytest")
# XXX remove duplication with tmpdir plugin
basetmp = request.config._tmpdirhandler.ensuretemp("testdir")
name = request.function.__name__
@ -226,15 +232,10 @@ class TmpTestdir:
if fn and fn.startswith(str(self.tmpdir)):
del sys.modules[name]
def getreportrecorder(self, obj):
if hasattr(obj, 'config'):
obj = obj.config
if hasattr(obj, 'hook'):
obj = obj.hook
assert hasattr(obj, '_hookspecs'), obj
reprec = ReportRecorder(obj)
reprec.hookrecorder = self._pytest.gethookrecorder(obj)
reprec.hook = reprec.hookrecorder.hook
def make_hook_recorder(self, pluginmanager):
assert not hasattr(pluginmanager, "reprec")
pluginmanager.reprec = reprec = HookRecorder(pluginmanager)
self.request.addfinalizer(reprec.finish_recording)
return reprec
def chdir(self):
@ -353,26 +354,23 @@ class TmpTestdir:
def inline_genitems(self, *args):
return self.inprocess_run(list(args) + ['--collectonly'])
def inline_run(self, *args):
items, rec = self.inprocess_run(args)
return rec
def inprocess_run(self, args, plugins=()):
rec = self.inline_run(*args, plugins=plugins)
items = [x.item for x in rec.getcalls("pytest_itemcollected")]
return items, rec
def inprocess_run(self, args, plugins=None):
def inline_run(self, *args, **kwargs):
rec = []
items = []
class Collect:
def pytest_configure(x, config):
rec.append(self.getreportrecorder(config))
def pytest_itemcollected(self, item):
items.append(item)
if not plugins:
plugins = []
rec.append(self.make_hook_recorder(config.pluginmanager))
plugins = kwargs.get("plugins") or []
plugins.append(Collect())
ret = pytest.main(list(args), plugins=plugins)
assert len(rec) == 1
reprec = rec[0]
reprec.ret = ret
assert len(rec) == 1
return items, reprec
return reprec
def parseconfig(self, *args):
args = [str(x) for x in args]
@ -547,86 +545,6 @@ def getdecoded(out):
return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % (
py.io.saferepr(out),)
class ReportRecorder(object):
def __init__(self, hook):
self.hook = hook
self.pluginmanager = hook._pm
self.pluginmanager.register(self)
def getcall(self, name):
return self.hookrecorder.getcall(name)
def popcall(self, name):
return self.hookrecorder.popcall(name)
def getcalls(self, names):
""" return list of ParsedCall instances matching the given eventname. """
return self.hookrecorder.getcalls(names)
# functionality for test reports
def getreports(self, names="pytest_runtest_logreport pytest_collectreport"):
return [x.report for x in self.getcalls(names)]
def matchreport(self, inamepart="",
names="pytest_runtest_logreport pytest_collectreport", when=None):
""" return a testreport whose dotted import path matches """
l = []
for rep in self.getreports(names=names):
try:
if not when and rep.when != "call" and rep.passed:
# setup/teardown passing reports - let's ignore those
continue
except AttributeError:
pass
if when and getattr(rep, 'when', None) != when:
continue
if not inamepart or inamepart in rep.nodeid.split("::"):
l.append(rep)
if not l:
raise ValueError("could not find test report matching %r: no test reports at all!" %
(inamepart,))
if len(l) > 1:
raise ValueError("found more than one testreport matching %r: %s" %(
inamepart, l))
return l[0]
def getfailures(self, names='pytest_runtest_logreport pytest_collectreport'):
return [rep for rep in self.getreports(names) if rep.failed]
def getfailedcollections(self):
return self.getfailures('pytest_collectreport')
def listoutcomes(self):
passed = []
skipped = []
failed = []
for rep in self.getreports(
"pytest_collectreport pytest_runtest_logreport"):
if rep.passed:
if getattr(rep, "when", None) == "call":
passed.append(rep)
elif rep.skipped:
skipped.append(rep)
elif rep.failed:
failed.append(rep)
return passed, skipped, failed
def countoutcomes(self):
return [len(x) for x in self.listoutcomes()]
def assertoutcome(self, passed=0, skipped=0, failed=0):
realpassed, realskipped, realfailed = self.listoutcomes()
assert passed == len(realpassed)
assert skipped == len(realskipped)
assert failed == len(realfailed)
def clear(self):
self.hookrecorder.calls[:] = []
def unregister(self):
self.pluginmanager.unregister(self)
self.hookrecorder.finish_recording()
class LineComp:
def __init__(self):

View File

@ -353,15 +353,17 @@ class PyCollector(PyobjMixin, pytest.Collector):
fixtureinfo = fm.getfixtureinfo(self, funcobj, cls)
metafunc = Metafunc(funcobj, fixtureinfo, self.config,
cls=cls, module=module)
gentesthook = self.config.hook.pytest_generate_tests
extra = [module]
if cls is not None:
extra.append(cls())
plugins = self.getplugins() + extra
gentesthook.pcall(plugins, metafunc=metafunc)
try:
methods = [module.pytest_generate_tests]
except AttributeError:
methods = []
if hasattr(cls, "pytest_generate_tests"):
methods.append(cls().pytest_generate_tests)
self.ihook.pytest_generate_tests.callextra(methods, metafunc=metafunc)
Function = self._getcustomclass("Function")
if not metafunc._calls:
yield Function(name, parent=self)
yield Function(name, parent=self, fixtureinfo=fixtureinfo)
else:
# add funcargs() as fixturedefs to fixtureinfo.arg2fixturedefs
add_funcarg_pseudo_fixture_def(self, metafunc, fm)
@ -370,6 +372,7 @@ class PyCollector(PyobjMixin, pytest.Collector):
subname = "%s[%s]" %(name, callspec.id)
yield Function(name=subname, parent=self,
callspec=callspec, callobj=funcobj,
fixtureinfo=fixtureinfo,
keywords={callspec.id:True})
def add_funcarg_pseudo_fixture_def(collector, metafunc, fixturemanager):
@ -1065,28 +1068,27 @@ class Function(FunctionMixin, pytest.Item, FuncargnamesCompatAttr):
"""
_genid = None
def __init__(self, name, parent, args=None, config=None,
callspec=None, callobj=NOTSET, keywords=None, session=None):
callspec=None, callobj=NOTSET, keywords=None, session=None,
fixtureinfo=None):
super(Function, self).__init__(name, parent, config=config,
session=session)
self._args = args
if callobj is not NOTSET:
self.obj = callobj
for name, val in (py.builtin._getfuncdict(self.obj) or {}).items():
self.keywords[name] = val
self.keywords.update(self.obj.__dict__)
if callspec:
for name, val in callspec.keywords.items():
self.keywords[name] = val
if keywords:
for name, val in keywords.items():
self.keywords[name] = val
isyield = self._isyieldedfunction()
self._fixtureinfo = fi = self.session._fixturemanager.getfixtureinfo(
self.parent, self.obj, self.cls, funcargs=not isyield)
self.fixturenames = fi.names_closure
if callspec is not None:
self.callspec = callspec
self.keywords.update(callspec.keywords)
if keywords:
self.keywords.update(keywords)
if fixtureinfo is None:
fixtureinfo = self.session._fixturemanager.getfixtureinfo(
self.parent, self.obj, self.cls,
funcargs=not self._isyieldedfunction())
self._fixtureinfo = fixtureinfo
self.fixturenames = fixtureinfo.names_closure
self._initrequest()
def _initrequest(self):
@ -1571,15 +1573,8 @@ class FixtureManager:
self._nodeid_and_autousenames = [("", self.config.getini("usefixtures"))]
session.config.pluginmanager.register(self, "funcmanage")
self._nodename2fixtureinfo = {}
def getfixtureinfo(self, node, func, cls, funcargs=True):
# node is the "collection node" for "func"
key = (node, func)
try:
return self._nodename2fixtureinfo[key]
except KeyError:
pass
if funcargs and not hasattr(node, "nofuncargs"):
if cls is not None:
startindex = 1
@ -1595,10 +1590,7 @@ class FixtureManager:
fm = node.session._fixturemanager
names_closure, arg2fixturedefs = fm.getfixtureclosure(initialnames,
node)
fixtureinfo = FuncFixtureInfo(argnames, names_closure,
arg2fixturedefs)
self._nodename2fixtureinfo[key] = fixtureinfo
return fixtureinfo
return FuncFixtureInfo(argnames, names_closure, arg2fixturedefs)
### XXX this hook should be called for historic events like pytest_configure
### so that we don't have to do the below pytest_configure hook

View File

@ -9,4 +9,4 @@ if __name__ == '__main__':
p = pstats.Stats("prof")
p.strip_dirs()
p.sort_stats('cumulative')
print(p.print_stats(250))
print(p.print_stats(500))

View File

@ -334,9 +334,9 @@ class TestSession:
assert item.name == "test_func"
newid = item.nodeid
assert newid == id
py.std.pprint.pprint(hookrec.hookrecorder.calls)
py.std.pprint.pprint(hookrec.calls)
topdir = testdir.tmpdir # noqa
hookrec.hookrecorder.contains([
hookrec.assert_contains([
("pytest_collectstart", "collector.fspath == topdir"),
("pytest_make_collect_report", "collector.fspath == topdir"),
("pytest_collectstart", "collector.fspath == p"),
@ -381,9 +381,9 @@ class TestSession:
id = p.basename
items, hookrec = testdir.inline_genitems(id)
py.std.pprint.pprint(hookrec.hookrecorder.calls)
py.std.pprint.pprint(hookrec.calls)
assert len(items) == 2
hookrec.hookrecorder.contains([
hookrec.assert_contains([
("pytest_collectstart",
"collector.fspath == collector.session.fspath"),
("pytest_collectstart",
@ -404,8 +404,8 @@ class TestSession:
items, hookrec = testdir.inline_genitems()
assert len(items) == 1
py.std.pprint.pprint(hookrec.hookrecorder.calls)
hookrec.hookrecorder.contains([
py.std.pprint.pprint(hookrec.calls)
hookrec.assert_contains([
("pytest_collectstart", "collector.fspath == test_aaa"),
("pytest_pycollect_makeitem", "name == 'test_func'"),
("pytest_collectreport",
@ -425,8 +425,8 @@ class TestSession:
items, hookrec = testdir.inline_genitems(id)
assert len(items) == 2
py.std.pprint.pprint(hookrec.hookrecorder.calls)
hookrec.hookrecorder.contains([
py.std.pprint.pprint(hookrec.calls)
hookrec.assert_contains([
("pytest_collectstart", "collector.fspath == test_aaa"),
("pytest_pycollect_makeitem", "name == 'test_func'"),
("pytest_collectreport", "report.nodeid == 'aaa/test_aaa.py'"),

View File

@ -149,7 +149,7 @@ class TestBootstrapping:
mod.pytest_plugins = "pytest_a"
aplugin = testdir.makepyfile(pytest_a="#")
pluginmanager = get_plugin_manager()
reprec = testdir.getreportrecorder(pluginmanager)
reprec = testdir.make_hook_recorder(pluginmanager)
#syspath.prepend(aplugin.dirpath())
py.std.sys.path.insert(0, str(aplugin.dirpath()))
pluginmanager.consider_module(mod)
@ -274,7 +274,7 @@ class TestBootstrapping:
saveindent.append(pm.trace.root.indent)
raise ValueError(42)
l = []
pm.trace.root.setwriter(l.append)
pm.set_tracing(l.append)
indent = pm.trace.root.indent
p = api1()
pm.register(p)
@ -405,11 +405,7 @@ class TestPytestPluginInteractions:
pluginmanager.register(p3)
methods = pluginmanager.listattr('m')
assert methods == [p2.m, p3.m, p1.m]
# listattr keeps a cache and deleting
# a function attribute requires clearing it
pluginmanager._listattrcache.clear()
del P1.m.__dict__['tryfirst']
pytest.mark.trylast(getattr(P2.m, 'im_func', P2.m))
methods = pluginmanager.listattr('m')
assert methods == [p2.m, p1.m, p3.m]
@ -436,6 +432,11 @@ def test_varnames():
assert varnames(A().f) == ('y',)
assert varnames(B()) == ('z',)
def test_varnames_default():
def f(x, y=3):
pass
assert varnames(f) == ("x",)
def test_varnames_class():
class C:
def __init__(self, x):
@ -494,12 +495,10 @@ class TestMultiCall:
return x + z
reslist = MultiCall([f], dict(x=23, y=24)).execute()
assert reslist == [24]
reslist = MultiCall([f], dict(x=23, z=2)).execute()
assert reslist == [25]
def test_tags_call_error(self):
multicall = MultiCall([lambda x: x], {})
pytest.raises(TypeError, multicall.execute)
pytest.raises(KeyError, multicall.execute)
def test_call_subexecute(self):
def m(__multicall__):
@ -630,6 +629,18 @@ class TestHookRelay:
assert l == [4]
assert not hasattr(mcm, 'world')
def test_argmismatch(self):
class Api:
def hello(self, arg):
"api hook 1"
pm = PluginManager(Api, prefix="he")
class Plugin:
def hello(self, argwrong):
return arg + 1
with pytest.raises(PluginValidationError) as exc:
pm.register(Plugin())
assert "argwrong" in str(exc.value)
def test_only_kwargs(self):
pm = PluginManager()
class Api:
@ -754,3 +765,96 @@ def test_importplugin_issue375(testdir):
assert "qwe" not in str(excinfo.value)
assert "aaaa" in str(excinfo.value)
class TestWrapMethod:
def test_basic_happypath(self):
class A:
def f(self):
return "A.f"
l = []
def f(self):
l.append(1)
yield
l.append(2)
undo = add_method_controller(A, f)
assert A().f() == "A.f"
assert l == [1,2]
undo()
l[:] = []
assert A().f() == "A.f"
assert l == []
def test_method_raises(self):
class A:
def error(self, val):
raise ValueError(val)
l = []
def error(self, val):
l.append(val)
try:
yield
except ValueError:
l.append(None)
raise
undo = add_method_controller(A, error)
with pytest.raises(ValueError):
A().error(42)
assert l == [42, None]
undo()
l[:] = []
with pytest.raises(ValueError):
A().error(42)
assert l == []
def test_controller_swallows_method_raises(self):
class A:
def error(self, val):
raise ValueError(val)
def error(self, val):
try:
yield
except ValueError:
yield 2
add_method_controller(A, error)
assert A().error(42) == 2
def test_reraise_on_controller_StopIteration(self):
class A:
def error(self, val):
raise ValueError(val)
def error(self, val):
try:
yield
except ValueError:
pass
add_method_controller(A, error)
with pytest.raises(ValueError):
A().error(42)
@pytest.mark.xfail(reason="if needed later")
def test_modify_call_args(self):
class A:
def error(self, val1, val2):
raise ValueError(val1+val2)
l = []
def error(self):
try:
yield (1,), {'val2': 2}
except ValueError as ex:
assert ex.args == (3,)
l.append(1)
add_method_controller(A, error)
with pytest.raises(ValueError):
A().error()
assert l == [1]

View File

@ -1,5 +1,4 @@
import py, pytest
from _pytest.helpconfig import collectattr
import pytest
def test_version(testdir, pytestconfig):
result = testdir.runpytest("--version")
@ -25,18 +24,6 @@ def test_help(testdir):
*to see*fixtures*py.test --fixtures*
""")
def test_collectattr():
class A:
def pytest_hello(self):
pass
class B(A):
def pytest_world(self):
pass
methods = py.builtin.sorted(collectattr(B))
assert list(methods) == ['pytest_hello', 'pytest_world']
methods = py.builtin.sorted(collectattr(B()))
assert list(methods) == ['pytest_hello', 'pytest_world']
def test_hookvalidation_unknown(testdir):
testdir.makeconftest("""
def pytest_hello(xyz):

View File

@ -3,9 +3,9 @@ import os
from _pytest.pytester import HookRecorder
from _pytest.core import PluginManager
def test_reportrecorder(testdir):
def test_make_hook_recorder(testdir):
item = testdir.getitem("def test_func(): pass")
recorder = testdir.getreportrecorder(item.config)
recorder = testdir.make_hook_recorder(item.config.pluginmanager)
assert not recorder.getfailures()
pytest.xfail("internal reportrecorder tests need refactoring")
@ -71,47 +71,37 @@ def test_testdir_runs_with_plugin(testdir):
"*1 passed*"
])
def test_hookrecorder_basic():
rec = HookRecorder(PluginManager())
class ApiClass:
def make_holder():
class apiclass:
def pytest_xyz(self, arg):
"x"
rec.start_recording(ApiClass)
rec.hook.pytest_xyz(arg=123)
def pytest_xyz_noarg(self):
"x"
apimod = type(os)('api')
def pytest_xyz(arg):
"x"
def pytest_xyz_noarg():
"x"
apimod.pytest_xyz = pytest_xyz
apimod.pytest_xyz_noarg = pytest_xyz_noarg
return apiclass, apimod
@pytest.mark.parametrize("holder", make_holder())
def test_hookrecorder_basic(holder):
pm = PluginManager()
pm.hook._addhooks(holder, "pytest_")
rec = HookRecorder(pm)
pm.hook.pytest_xyz(arg=123)
call = rec.popcall("pytest_xyz")
assert call.arg == 123
assert call._name == "pytest_xyz"
pytest.raises(pytest.fail.Exception, "rec.popcall('abc')")
def test_hookrecorder_basic_no_args_hook():
rec = HookRecorder(PluginManager())
apimod = type(os)('api')
def pytest_xyz():
"x"
apimod.pytest_xyz = pytest_xyz
rec.start_recording(apimod)
rec.hook.pytest_xyz()
call = rec.popcall("pytest_xyz")
assert call._name == "pytest_xyz"
def test_functional(testdir, linecomp):
reprec = testdir.inline_runsource("""
import pytest
from _pytest.core import HookRelay, PluginManager
pytest_plugins="pytester"
def test_func(_pytest):
class ApiClass:
def pytest_xyz(self, arg): "x"
hook = HookRelay([ApiClass], PluginManager())
rec = _pytest.gethookrecorder(hook)
class Plugin:
def pytest_xyz(self, arg):
return arg + 1
rec._pluginmanager.register(Plugin())
res = rec.hook.pytest_xyz(arg=41)
assert res == [42]
""")
reprec.assertoutcome(passed=1)
pm.hook.pytest_xyz_noarg()
call = rec.popcall("pytest_xyz_noarg")
assert call._name == "pytest_xyz_noarg"
def test_makepyfile_unicode(testdir):