import py def getfuncargnames(function): argnames = py.std.inspect.getargs(function.func_code)[0] startindex = hasattr(function, 'im_self') and 1 or 0 numdefaults = len(function.func_defaults or ()) if numdefaults: return argnames[startindex:-numdefaults] return argnames[startindex:] def fillfuncargs(function): """ fill missing funcargs. """ request = FuncargRequest(pyfuncitem=function) request._fillfuncargs() _notexists = object() class CallSpec: def __init__(self, funcargs, id, param): self.funcargs = funcargs self.id = id if param is not _notexists: self.param = param class Metafunc: def __init__(self, function, config=None, cls=None, module=None): self.config = config self.module = module self.function = function self.funcargnames = getfuncargnames(function) self.cls = cls self.module = module self._calls = [] self._ids = py.builtin.set() def addcall(self, funcargs=None, id=None, param=_notexists): assert funcargs is None or isinstance(funcargs, dict) if id is None: id = len(self._calls) id = str(id) if id in self._ids: raise ValueError("duplicate id %r" % id) self._ids.add(id) self._calls.append(CallSpec(funcargs, id, param)) class FunctionCollector(py.test.collect.Collector): def __init__(self, name, parent, calls): super(FunctionCollector, self).__init__(name, parent) self.calls = calls self.obj = getattr(self.parent.obj, name) def collect(self): l = [] for callspec in self.calls: name = "%s[%s]" %(self.name, callspec.id) function = self.parent.Function(name=name, parent=self, callspec=callspec, callobj=self.obj) l.append(function) return l def reportinfo(self): try: return self._fslineno, self.name except AttributeError: pass fspath, lineno = py.code.getfslineno(self.obj) self._fslineno = fspath, lineno return fspath, lineno, self.name class FuncargRequest: _argprefix = "pytest_funcarg__" _argname = None class Error(LookupError): """ error on performing funcarg request. """ def __init__(self, pyfuncitem): self._pyfuncitem = pyfuncitem self.function = pyfuncitem.obj self.module = pyfuncitem.getparent(py.test.collect.Module).obj self.cls = getattr(self.function, 'im_class', None) self.instance = getattr(self.function, 'im_self', None) self.config = pyfuncitem.config self.fspath = pyfuncitem.fspath if hasattr(pyfuncitem, '_requestparam'): self.param = pyfuncitem._requestparam self._plugins = self.config.pluginmanager.getplugins() self._plugins.append(self.module) if self.instance is not None: self._plugins.append(self.instance) self._funcargs = self._pyfuncitem.funcargs.copy() self._provider = {} def _fillfuncargs(self): argnames = getfuncargnames(self.function) if argnames: assert not getattr(self._pyfuncitem, '_args', None), "yielded functions cannot have funcargs" for argname in argnames: if argname not in self._pyfuncitem.funcargs: self._pyfuncitem.funcargs[argname] = self.getfuncargvalue(argname) def cached_setup(self, setup, teardown=None, scope="module", extrakey=None): if not hasattr(self.config, '_setupcache'): self.config._setupcache = {} # XXX weakref? cachekey = (self._getscopeitem(scope), extrakey) cache = self.config._setupcache try: val = cache[cachekey] except KeyError: val = setup() cache[cachekey] = val if teardown is not None: self.addfinalizer(lambda: teardown(val), scope=scope) return val def call_next_provider(self): if not self._provider[self._argname]: raise self.Error("no provider methods left for %r" % self._argname) next_provider = self._provider[self._argname].pop() return next_provider(request=self) def getfuncargvalue(self, argname): try: return self._funcargs[argname] except KeyError: pass assert argname not in self._provider self._provider[argname] = self.config.pluginmanager.listattr( plugins=self._plugins, attrname=self._argprefix + str(argname) ) # during call_next_provider() we keep state about the current # argument on the request object - we may go for instantiating # request objects per each funcargname if neccessary oldname = self._argname self._argname = argname try: self._funcargs[argname] = res = self.call_next_provider() except self.Error: self._raiselookupfailed(argname) if oldname: self._argname = oldname return res def _getscopeitem(self, scope): if scope == "function": return self._pyfuncitem elif scope == "module": return self._pyfuncitem.getparent(py.test.collect.Module) elif scope == "session": return None raise ValueError("unknown finalization scope %r" %(scope,)) def addfinalizer(self, finalizer, scope="function"): colitem = self._getscopeitem(scope) self.config._setupstate.addfinalizer(finalizer=finalizer, colitem=colitem) def __repr__(self): return "" %(self._pyfuncitem) def _raiselookupfailed(self, argname): available = [] for plugin in self._plugins: for name in vars(plugin): if name.startswith(self._argprefix): name = name[len(self._argprefix):] if name not in available: available.append(name) fspath, lineno, msg = self._pyfuncitem.reportinfo() line = "%s:%s" %(fspath, lineno) msg = "funcargument %r not found for: %s" %(argname, line) msg += "\n available funcargs: %s" %(", ".join(available),) raise LookupError(msg)