discover funcarg factories independently from request/Function items

This commit is contained in:
holger krekel 2012-07-20 14:16:28 +02:00
parent 4e4b507472
commit e14459d45c
6 changed files with 188 additions and 95 deletions

View File

@ -285,13 +285,15 @@ class Node(object):
def _repr_failure_py(self, excinfo, style=None):
LE = self.session.funcargmanager.FuncargLookupError
if excinfo.errisinstance(LE):
request = excinfo.value.request
fspath, lineno, msg = request._pyfuncitem.reportinfo()
lines, _ = inspect.getsourcelines(request.function)
for i, line in enumerate(lines):
if line.strip().startswith('def'):
return FuncargLookupErrorRepr(fspath, lineno, lines[:i+1],
str(excinfo.value.msg))
function = excinfo.value.function
if function is not None:
fspath, lineno = getfslineno(function)
lines, _ = inspect.getsourcelines(function)
for i, line in enumerate(lines):
if line.strip().startswith('def'):
return FuncargLookupErrorRepr(fspath,
lineno, lines[:i+1],
str(excinfo.value.msg))
if self.config.option.fulltrace:
style="long"
else:
@ -406,8 +408,8 @@ class Item(Node):
class FuncargLookupError(LookupError):
""" could not find a factory. """
def __init__(self, request, msg):
self.request = request
def __init__(self, function, msg):
self.function = function
self.msg = msg
class FuncargManager:
@ -417,60 +419,68 @@ class FuncargManager:
def __init__(self, session):
self.session = session
self.config = session.config
self.node2name2factory = {}
self.arg2facspec = {}
session.config.pluginmanager.register(self, "funcmanage")
self._holderobjseen = set()
def _discoverfactories(self, request, argname):
node = request._pyfuncitem
name2factory = self.node2name2factory.setdefault(node, {})
if argname not in name2factory:
name2factory[argname] = self.config.pluginmanager.listattr(
plugins=request._plugins,
attrname=self._argprefix + str(argname)
)
#else: we are called recursively
if not name2factory[argname]:
self._raiselookupfailed(request, argname)
def _getfuncarg(self, request, argname):
node = request._pyfuncitem
### XXX this hook should be called for historic events like pytest_configure
### so that we don't have to do the below pytest_collection hook
def pytest_plugin_registered(self, plugin):
#print "plugin_registered", plugin
nodeid = ""
try:
factorylist = self.node2name2factory[node][argname]
except KeyError:
# XXX at collection time this funcarg was not know to be a
# requirement, would be better if it would be known
self._discoverfactories(request, argname)
factorylist = self.node2name2factory[node][argname]
if not factorylist:
self._raiselookupfailed(request, argname)
funcargfactory = factorylist.pop()
oldarg = request._currentarg
mp = monkeypatch()
mp.setattr(request, '_currentarg', argname)
try:
param = node.callspec.getparam(argname)
except (AttributeError, ValueError):
p = py.path.local(plugin.__file__)
except AttributeError:
pass
else:
mp.setattr(request, 'param', param, raising=False)
try:
return funcargfactory(request=request)
finally:
mp.undo()
if p.basename.startswith("conftest.py"):
nodeid = p.dirpath().relto(self.session.fspath)
self._parsefactories(plugin, nodeid)
def _raiselookupfailed(self, request, argname):
@pytest.mark.tryfirst
def pytest_collection(self, session):
plugins = session.config.pluginmanager.getplugins()
for plugin in plugins:
self.pytest_plugin_registered(plugin)
def _parsefactories(self, holderobj, nodeid):
if holderobj in self._holderobjseen:
return
#print "parsefactories", holderobj
self._holderobjseen.add(holderobj)
for name in dir(holderobj):
#print "check", holderobj, name
if name.startswith(self._argprefix):
fname = name[len(self._argprefix):]
faclist = self.arg2facspec.setdefault(fname, [])
obj = getattr(holderobj, name)
faclist.append((nodeid, obj))
def getfactorylist(self, argname, nodeid, function):
try:
factorydef = self.arg2facspec[argname]
except KeyError:
self._raiselookupfailed(argname, function, nodeid)
return self._matchfactories(factorydef, nodeid)
def _matchfactories(self, factorydef, nodeid):
l = []
for baseid, factory in factorydef:
#print "check", basepath, nodeid
if nodeid.startswith(baseid):
l.append(factory)
return l
def _raiselookupfailed(self, argname, function, nodeid):
available = []
for plugin in request._plugins:
for name in vars(plugin):
if name.startswith(self._argprefix):
name = name[len(self._argprefix):]
if name not in available:
available.append(name)
fspath, lineno, msg = request._pyfuncitem.reportinfo()
for name, facdef in self.arg2facspec.items():
faclist = self._matchfactories(facdef, nodeid)
if faclist:
available.append(name)
msg = "LookupError: no factory found for argument %r" % (argname,)
msg += "\n available funcargs: %s" %(", ".join(available),)
msg += "\n use 'py.test --funcargs [testpath]' for help on them."
raise FuncargLookupError(request, msg)
raise FuncargLookupError(function, msg)
class NoMatch(Exception):
@ -715,6 +725,13 @@ class Session(FSCollector):
to cache on a per-session level.
"""
def getfslineno(obj):
# xxx let decorators etc specify a sane ordering
if hasattr(obj, 'place_as'):
obj = obj.place_as
fslineno = py.code.getfslineno(obj)
assert isinstance(fslineno[1], int), obj
return fslineno
class FuncargLookupErrorRepr(TerminalRepr):

View File

@ -388,10 +388,12 @@ class TmpTestdir:
return config
def getitem(self, source, funcname="test_func"):
for item in self.getitems(source):
items = self.getitems(source)
for item in items:
if item.name == funcname:
return item
assert 0, "%r item not found in module:\n%s" %(funcname, source)
assert 0, "%r item not found in module:\n%s\nitems: %s" %(
funcname, source, items)
def getitems(self, source):
modcol = self.getmodulecol(source)

View File

@ -3,6 +3,8 @@ import py
import inspect
import sys
import pytest
from _pytest.main import getfslineno
from _pytest.monkeypatch import monkeypatch
import _pytest
cutdir = py.path.local(_pytest.__file__).dirpath()
@ -192,18 +194,7 @@ class PyobjMixin(PyobjContext):
return s.replace(".[", "[")
def _getfslineno(self):
try:
return self._fslineno
except AttributeError:
pass
obj = self.obj
# xxx let decorators etc specify a sane ordering
if hasattr(obj, 'place_as'):
obj = obj.place_as
self._fslineno = py.code.getfslineno(obj)
assert isinstance(self._fslineno[1], int), obj
return self._fslineno
return getfslineno(self.obj)
def reportinfo(self):
# XXX caching?
@ -213,12 +204,10 @@ class PyobjMixin(PyobjContext):
fspath = sys.modules[obj.__module__].__file__
if fspath.endswith(".pyc"):
fspath = fspath[:-1]
#assert 0
#fn = inspect.getsourcefile(obj) or inspect.getfile(obj)
lineno = obj.compat_co_firstlineno
modpath = obj.__module__
else:
fspath, lineno = self._getfslineno()
fspath, lineno = getfslineno(obj)
modpath = self.getmodpath()
assert isinstance(lineno, int)
return fspath, lineno, modpath
@ -306,6 +295,10 @@ class Module(pytest.File, PyCollector):
def _getobj(self):
return self._memoizedcall('_obj', self._importtestmodule)
def collect(self):
self.session.funcargmanager._parsefactories(self.obj, self.nodeid)
return super(Module, self).collect()
def _importtestmodule(self):
# we assume we are only called once per module
try:
@ -370,7 +363,12 @@ class Class(PyCollector):
class Instance(PyCollector):
def _getobj(self):
return self.parent.obj()
obj = self.parent.obj()
return obj
def collect(self):
self.session.funcargmanager._parsefactories(self.obj, self.nodeid)
return super(Instance, self).collect()
def newinstance(self):
self.obj = self._getobj()
@ -809,7 +807,7 @@ class Function(FunctionMixin, pytest.Item):
else:
self.funcargs = {}
self._request = req = FuncargRequest(self)
req._discoverfactories()
#req._discoverfactories()
if callobj is not _dummy:
self.obj = callobj
startindex = int(self.cls is not None)
@ -885,20 +883,28 @@ class FuncargRequest:
self.funcargmanager = pyfuncitem.session.funcargmanager
self._currentarg = None
self.funcargnames = getfuncargnames(self.function)
self.parentid = pyfuncitem.parent.nodeid
def _discoverfactories(self):
for argname in self.funcargnames:
if argname not in self._funcargs:
self.funcargmanager._discoverfactories(self, argname)
self._getfaclist(argname)
@cached_property
def _plugins(self):
extra = [obj for obj in (self.module, self.instance) if obj]
return self._pyfuncitem.getplugins() + extra
def _getfaclist(self, argname):
faclist = self._name2factory.get(argname, None)
if faclist is None:
faclist = self.funcargmanager.getfactorylist(argname,
self.parentid,
self.function)
self._name2factory[argname] = faclist
elif not faclist:
self.funcargmanager._raiselookupfailed(argname, self.function,
self.parentid)
return faclist
def raiseerror(self, msg):
""" raise a FuncargLookupError with the given message. """
raise self.funcargmanager.FuncargLookupError(self, msg)
raise self.funcargmanager.FuncargLookupError(self.function, msg)
@property
def function(self):
@ -1001,9 +1007,23 @@ class FuncargRequest:
return self._funcargs[argname]
except KeyError:
pass
val = self.funcargmanager._getfuncarg(self, argname)
self._funcargs[argname] = val
return val
factorylist = self._getfaclist(argname)
funcargfactory = factorylist.pop()
node = self._pyfuncitem
oldarg = self._currentarg
mp = monkeypatch()
mp.setattr(self, '_currentarg', argname)
try:
param = node.callspec.getparam(argname)
except (AttributeError, ValueError):
pass
else:
mp.setattr(self, 'param', param, raising=False)
try:
self._funcargs[argname] = val = funcargfactory(request=self)
return val
finally:
mp.undo()
def _getscopeitem(self, scope):
if scope == "function":

View File

@ -647,15 +647,14 @@ class TestRequest:
def pytest_funcarg__something(request):
return 1
""")
item = testdir.getitem("""
item = testdir.makepyfile("""
def pytest_funcarg__something(request):
return request.getfuncargvalue("something") + 1
def test_func(something):
assert something == 2
""")
req = funcargs.FuncargRequest(item)
val = req.getfuncargvalue("something")
assert val == 2
reprec = testdir.inline_run()
reprec.assertoutcome(passed=1)
def test_getfuncargvalue(self, testdir):
item = testdir.getitem("""
@ -1296,7 +1295,9 @@ def test_funcarg_non_pycollectobj(testdir): # rough jstests usage
class MyClass:
pass
""")
clscol = modcol.collect()[0]
# this hook finds funcarg factories
rep = modcol.ihook.pytest_make_collect_report(collector=modcol)
clscol = rep.result[0]
clscol.obj = lambda arg1: None
clscol.funcargs = {}
funcargs.fillfuncargs(clscol)
@ -1310,7 +1311,7 @@ def test_funcarg_lookup_error(testdir):
""")
result = testdir.runpytest()
result.stdout.fnmatch_lines([
"*ERROR*collecting*test_funcarg_lookup_error.py*",
"*ERROR*test_lookup_error*",
"*def test_lookup_error(unknown):*",
"*LookupError: no factory found*unknown*",
"*available funcargs*",
@ -1633,3 +1634,49 @@ class TestResourceIntegrationFunctional:
"*test_function*basic*PASSED",
"*test_function*advanced*FAILED",
])
### XXX shift to test_session.py
class TestFuncargManager:
def pytest_funcarg__testdir(self, request):
testdir = request.getfuncargvalue("testdir")
testdir.makeconftest("""
def pytest_funcarg__hello(request):
return "conftest"
def pytest_funcarg__fm(request):
return request.funcargmanager
def pytest_funcarg__item(request):
return request._pyfuncitem
""")
return testdir
def test_parsefactories_conftest(self, testdir):
testdir.makepyfile("""
def test_hello(item, fm):
for name in ("fm", "hello", "item"):
faclist = fm.getfactorylist(name, item.nodeid, item.obj)
assert len(faclist) == 1
fac = faclist[0]
assert fac.__name__ == "pytest_funcarg__" + name
""")
reprec = testdir.inline_run("-s")
reprec.assertoutcome(passed=1)
def test_parsefactories_conftest_and_module_and_class(self, testdir):
testdir.makepyfile("""
def pytest_funcarg__hello(request):
return "module"
class TestClass:
def pytest_funcarg__hello(self, request):
return "class"
def test_hello(self, item, fm):
faclist = fm.getfactorylist("hello", item.nodeid, item.obj)
print faclist
assert len(faclist) == 3
assert faclist[0](item._request) == "conftest"
assert faclist[1](item._request) == "module"
assert faclist[2](item._request) == "class"
""")
reprec = testdir.inline_run("-s")
reprec.assertoutcome(passed=1)

View File

@ -17,16 +17,16 @@ class SessionTests:
passed, skipped, failed = reprec.listoutcomes()
assert len(skipped) == 0
assert len(passed) == 1
assert len(failed) == 2
assert len(failed) == 3
end = lambda x: x.nodeid.split("::")[-1]
assert end(failed[0]) == "test_one_one"
assert end(failed[1]) == "test_other"
itemstarted = reprec.getcalls("pytest_itemcollected")
assert len(itemstarted) == 3
assert len(itemstarted) == 4
# XXX check for failing funcarg setup
colreports = reprec.getcalls("pytest_collectreport")
assert len(colreports) == 4
assert colreports[1].report.failed
#colreports = reprec.getcalls("pytest_collectreport")
#assert len(colreports) == 4
#assert colreports[1].report.failed
def test_nested_import_error(self, testdir):
tfile = testdir.makepyfile("""
@ -225,3 +225,4 @@ def test_exclude(testdir):
result = testdir.runpytest("--ignore=hello", "--ignore=hello2")
assert result.ret == 0
result.stdout.fnmatch_lines(["*1 passed*"])

View File

@ -4,12 +4,18 @@ import os
from _pytest.tmpdir import pytest_funcarg__tmpdir, TempdirHandler
def test_funcarg(testdir):
item = testdir.getitem("""
testdir.makepyfile("""
def pytest_generate_tests(metafunc):
metafunc.addcall(id='a')
metafunc.addcall(id='b')
def test_func(tmpdir): pass
""", 'test_func[a]')
""")
reprec = testdir.inline_run()
calls = reprec.getcalls("pytest_runtest_setup")
item = calls[0].item
# pytest_unconfigure has deleted the TempdirHandler already
config = item.config
config._tmpdirhandler = TempdirHandler(config)
p = pytest_funcarg__tmpdir(item)
assert p.check()
bn = p.basename.strip("0123456789")