176 lines
6.2 KiB
Python
176 lines
6.2 KiB
Python
import py
|
|
|
|
def getfuncargnames(function):
|
|
argnames = py.std.inspect.getargs(function.func_code)[0]
|
|
startindex = hasattr(function, 'im_self') and 1 or 0
|
|
numdefaults = len(function.func_defaults or ())
|
|
if numdefaults:
|
|
return argnames[startindex:-numdefaults]
|
|
return argnames[startindex:]
|
|
|
|
def fillfuncargs(function):
|
|
""" fill missing funcargs. """
|
|
request = FuncargRequest(pyfuncitem=function)
|
|
request._fillfuncargs()
|
|
|
|
|
|
_notexists = object()
|
|
class CallSpec:
|
|
def __init__(self, funcargs, id, param):
|
|
self.funcargs = funcargs
|
|
self.id = id
|
|
if param is not _notexists:
|
|
self.param = param
|
|
|
|
class Metafunc:
|
|
def __init__(self, function, config=None, cls=None, module=None):
|
|
self.config = config
|
|
self.module = module
|
|
self.function = function
|
|
self.funcargnames = getfuncargnames(function)
|
|
self.cls = cls
|
|
self.module = module
|
|
self._calls = []
|
|
self._ids = py.builtin.set()
|
|
|
|
def addcall(self, funcargs=None, id=None, param=_notexists):
|
|
assert funcargs is None or isinstance(funcargs, dict)
|
|
if id is None:
|
|
id = len(self._calls)
|
|
id = str(id)
|
|
if id in self._ids:
|
|
raise ValueError("duplicate id %r" % id)
|
|
self._ids.add(id)
|
|
self._calls.append(CallSpec(funcargs, id, param))
|
|
|
|
class FunctionCollector(py.test.collect.Collector):
|
|
def __init__(self, name, parent, calls):
|
|
super(FunctionCollector, self).__init__(name, parent)
|
|
self.calls = calls
|
|
self.obj = getattr(self.parent.obj, name)
|
|
|
|
def collect(self):
|
|
l = []
|
|
for callspec in self.calls:
|
|
name = "%s[%s]" %(self.name, callspec.id)
|
|
function = self.parent.Function(name=name, parent=self,
|
|
callspec=callspec, callobj=self.obj)
|
|
l.append(function)
|
|
return l
|
|
|
|
def reportinfo(self):
|
|
try:
|
|
return self._fslineno, self.name
|
|
except AttributeError:
|
|
pass
|
|
fspath, lineno = py.code.getfslineno(self.obj)
|
|
self._fslineno = fspath, lineno
|
|
return fspath, lineno, self.name
|
|
|
|
|
|
class FuncargRequest:
|
|
_argprefix = "pytest_funcarg__"
|
|
_argname = None
|
|
|
|
class Error(LookupError):
|
|
""" error on performing funcarg request. """
|
|
|
|
def __init__(self, pyfuncitem):
|
|
self._pyfuncitem = pyfuncitem
|
|
self.function = pyfuncitem.obj
|
|
self.module = pyfuncitem.getparent(py.test.collect.Module).obj
|
|
self.cls = getattr(self.function, 'im_class', None)
|
|
self.instance = getattr(self.function, 'im_self', None)
|
|
self.config = pyfuncitem.config
|
|
self.fspath = pyfuncitem.fspath
|
|
if hasattr(pyfuncitem, '_requestparam'):
|
|
self.param = pyfuncitem._requestparam
|
|
self._plugins = self.config.pluginmanager.getplugins()
|
|
self._plugins.append(self.module)
|
|
if self.instance is not None:
|
|
self._plugins.append(self.instance)
|
|
self._funcargs = self._pyfuncitem.funcargs.copy()
|
|
self._provider = {}
|
|
|
|
def _fillfuncargs(self):
|
|
argnames = getfuncargnames(self.function)
|
|
if argnames:
|
|
assert not self._pyfuncitem._args, "yielded functions cannot have funcargs"
|
|
for argname in argnames:
|
|
if argname not in self._pyfuncitem.funcargs:
|
|
self._pyfuncitem.funcargs[argname] = self.getfuncargvalue(argname)
|
|
|
|
def cached_setup(self, setup, teardown=None, scope="module", extrakey=None):
|
|
if not hasattr(self.config, '_setupcache'):
|
|
self.config._setupcache = {} # XXX weakref?
|
|
cachekey = (self._getscopeitem(scope), extrakey)
|
|
cache = self.config._setupcache
|
|
try:
|
|
val = cache[cachekey]
|
|
except KeyError:
|
|
val = setup()
|
|
cache[cachekey] = val
|
|
if teardown is not None:
|
|
self.addfinalizer(lambda: teardown(val), scope=scope)
|
|
return val
|
|
|
|
def call_next_provider(self):
|
|
if not self._provider[self._argname]:
|
|
raise self.Error("no provider methods left for %r" % self._argname)
|
|
next_provider = self._provider[self._argname].pop()
|
|
return next_provider(request=self)
|
|
|
|
def getfuncargvalue(self, argname):
|
|
try:
|
|
return self._funcargs[argname]
|
|
except KeyError:
|
|
pass
|
|
assert argname not in self._provider
|
|
self._provider[argname] = self.config.pluginmanager.listattr(
|
|
plugins=self._plugins,
|
|
attrname=self._argprefix + str(argname)
|
|
)
|
|
# during call_next_provider() we keep state about the current
|
|
# argument on the request object - we may go for instantiating
|
|
# request objects per each funcargname if neccessary
|
|
oldname = self._argname
|
|
self._argname = argname
|
|
try:
|
|
self._funcargs[argname] = res = self.call_next_provider()
|
|
except self.Error:
|
|
self._raiselookupfailed(argname)
|
|
if oldname:
|
|
self._argname = oldname
|
|
return res
|
|
|
|
def _getscopeitem(self, scope):
|
|
if scope == "function":
|
|
return self._pyfuncitem
|
|
elif scope == "module":
|
|
return self._pyfuncitem.getparent(py.test.collect.Module)
|
|
raise ValueError("unknown finalization scope %r" %(scope,))
|
|
|
|
def addfinalizer(self, finalizer, scope="function"):
|
|
colitem = self._getscopeitem(scope)
|
|
self.config._setupstate.addfinalizer(finalizer=finalizer, colitem=colitem)
|
|
|
|
def __repr__(self):
|
|
return "<FuncargRequest for %r>" %(self._pyfuncitem)
|
|
|
|
def _raiselookupfailed(self, argname):
|
|
available = []
|
|
for plugin in self._plugins:
|
|
for name in vars(plugin):
|
|
if name.startswith(self._argprefix):
|
|
name = name[len(self._argprefix):]
|
|
if name not in available:
|
|
available.append(name)
|
|
fspath, lineno, msg = self._pyfuncitem.reportinfo()
|
|
line = "%s:%s" %(fspath, lineno)
|
|
msg = "funcargument %r not found for: %s" %(argname, line)
|
|
msg += "\n available funcargs: %s" %(", ".join(available),)
|
|
raise LookupError(msg)
|
|
|
|
|
|
|