293 lines
9.7 KiB
Python
293 lines
9.7 KiB
Python
"""
|
|
managing loading and interacting with pytest plugins.
|
|
"""
|
|
import py
|
|
from py.__.test.plugin import hookspec
|
|
from py.__.test.outcome import Skipped
|
|
|
|
def check_old_use(mod, modname):
|
|
clsname = modname[len('pytest_'):].capitalize() + "Plugin"
|
|
assert not hasattr(mod, clsname), (mod, clsname)
|
|
|
|
class PluginManager(object):
|
|
class Error(Exception):
|
|
"""signals a plugin specific error."""
|
|
def __init__(self, comregistry=None):
|
|
if comregistry is None:
|
|
comregistry = py._com.Registry()
|
|
self.comregistry = comregistry
|
|
self.impname2plugin = {}
|
|
|
|
self.hook = py._com.HookRelay(
|
|
hookspecs=hookspec,
|
|
registry=self.comregistry)
|
|
|
|
def _getpluginname(self, plugin, name):
|
|
if name is None:
|
|
if hasattr(plugin, '__name__'):
|
|
name = plugin.__name__.split(".")[-1]
|
|
else:
|
|
name = id(plugin)
|
|
return name
|
|
|
|
def register(self, plugin, name=None):
|
|
assert not self.isregistered(plugin)
|
|
name = self._getpluginname(plugin, name)
|
|
if name in self.impname2plugin:
|
|
return False
|
|
self.impname2plugin[name] = plugin
|
|
self.hook.pytest_plugin_registered(plugin=plugin)
|
|
self._checkplugin(plugin)
|
|
self.comregistry.register(plugin)
|
|
return True
|
|
|
|
def unregister(self, plugin):
|
|
self.hook.pytest_plugin_unregistered(plugin=plugin)
|
|
self.comregistry.unregister(plugin)
|
|
for name, value in list(self.impname2plugin.items()):
|
|
if value == plugin:
|
|
del self.impname2plugin[name]
|
|
|
|
def isregistered(self, plugin, name=None):
|
|
return self._getpluginname(plugin, name) in self.impname2plugin
|
|
|
|
def getplugins(self):
|
|
return list(self.comregistry)
|
|
|
|
def getplugin(self, importname):
|
|
impname = canonical_importname(importname)
|
|
return self.impname2plugin[impname]
|
|
|
|
# API for bootstrapping
|
|
#
|
|
def _envlist(self, varname):
|
|
val = py.std.os.environ.get(varname, None)
|
|
if val is not None:
|
|
return val.split(',')
|
|
return ()
|
|
|
|
def consider_env(self):
|
|
for spec in self._envlist("PYTEST_PLUGINS"):
|
|
self.import_plugin(spec)
|
|
|
|
def consider_preparse(self, args):
|
|
for opt1,opt2 in zip(args, args[1:]):
|
|
if opt1 == "-p":
|
|
self.import_plugin(opt2)
|
|
|
|
def consider_conftest(self, conftestmodule):
|
|
cls = getattr(conftestmodule, 'ConftestPlugin', None)
|
|
if cls is not None:
|
|
raise ValueError("%r: 'ConftestPlugins' only existed till 1.0.0b1, "
|
|
"were removed in 1.0.0b2" % (cls,))
|
|
if self.register(conftestmodule, name=conftestmodule.__file__):
|
|
self.consider_module(conftestmodule)
|
|
|
|
def consider_module(self, mod):
|
|
attr = getattr(mod, "pytest_plugins", ())
|
|
if attr:
|
|
if not isinstance(attr, (list, tuple)):
|
|
attr = (attr,)
|
|
for spec in attr:
|
|
self.import_plugin(spec)
|
|
|
|
def import_plugin(self, spec):
|
|
assert isinstance(spec, str)
|
|
modname = canonical_importname(spec)
|
|
if modname in self.impname2plugin:
|
|
return
|
|
try:
|
|
mod = importplugin(modname)
|
|
except KeyboardInterrupt:
|
|
raise
|
|
except Skipped:
|
|
e = py.std.sys.exc_info()[1]
|
|
self._warn("could not import plugin %r, reason: %r" %(
|
|
(modname, e.msg)))
|
|
else:
|
|
check_old_use(mod, modname)
|
|
self.register(mod)
|
|
self.consider_module(mod)
|
|
|
|
def _warn(self, msg):
|
|
print ("===WARNING=== %s" % (msg,))
|
|
|
|
def _checkplugin(self, plugin):
|
|
# =====================================================
|
|
# check plugin hooks
|
|
# =====================================================
|
|
methods = collectattr(plugin)
|
|
hooks = collectattr(hookspec)
|
|
stringio = py.io.TextIO()
|
|
def Print(*args):
|
|
if args:
|
|
stringio.write(" ".join(map(str, args)))
|
|
stringio.write("\n")
|
|
|
|
fail = False
|
|
while methods:
|
|
name, method = methods.popitem()
|
|
#print "checking", name
|
|
if isgenerichook(name):
|
|
continue
|
|
if name not in hooks:
|
|
Print("found unknown hook:", name)
|
|
fail = True
|
|
else:
|
|
method_args = getargs(method)
|
|
if '__multicall__' in method_args:
|
|
method_args.remove('__multicall__')
|
|
hook = hooks[name]
|
|
hookargs = getargs(hook)
|
|
for arg in method_args:
|
|
if arg not in hookargs:
|
|
Print("argument %r not available" %(arg, ))
|
|
Print("actual definition: %s" %(formatdef(method)))
|
|
Print("available hook arguments: %s" %
|
|
", ".join(hookargs))
|
|
fail = True
|
|
break
|
|
#if not fail:
|
|
# print "matching hook:", formatdef(method)
|
|
if fail:
|
|
name = getattr(plugin, '__name__', plugin)
|
|
raise self.Error("%s:\n%s" %(name, stringio.getvalue()))
|
|
#
|
|
#
|
|
# API for interacting with registered and instantiated plugin objects
|
|
#
|
|
#
|
|
def listattr(self, attrname, plugins=None, extra=()):
|
|
return self.comregistry.listattr(attrname, plugins=plugins, extra=extra)
|
|
|
|
def notify_exception(self, excinfo=None):
|
|
if excinfo is None:
|
|
excinfo = py.code.ExceptionInfo()
|
|
excrepr = excinfo.getrepr(funcargs=True, showlocals=True)
|
|
return self.hook.pytest_internalerror(excrepr=excrepr)
|
|
|
|
def do_addoption(self, parser):
|
|
mname = "pytest_addoption"
|
|
methods = self.comregistry.listattr(mname, reverse=True)
|
|
mc = py._com.MultiCall(methods, {'parser': parser})
|
|
mc.execute()
|
|
|
|
def pytest_plugin_registered(self, plugin):
|
|
if hasattr(self, '_config'):
|
|
self.call_plugin(plugin, "pytest_addoption",
|
|
{'parser': self._config._parser})
|
|
self.call_plugin(plugin, "pytest_configure",
|
|
{'config': self._config})
|
|
#dic = self.call_plugin(plugin, "pytest_namespace")
|
|
#self._updateext(dic)
|
|
|
|
def call_plugin(self, plugin, methname, kwargs):
|
|
return py._com.MultiCall(
|
|
methods=self.listattr(methname, plugins=[plugin]),
|
|
kwargs=kwargs, firstresult=True).execute()
|
|
|
|
def _updateext(self, dic):
|
|
if dic:
|
|
for name, value in dic.items():
|
|
setattr(py.test, name, value)
|
|
|
|
def do_configure(self, config):
|
|
assert not hasattr(self, '_config')
|
|
config.pluginmanager.register(self)
|
|
self._config = config
|
|
config.hook.pytest_configure(config=self._config)
|
|
for dic in config.hook.pytest_namespace() or []:
|
|
self._updateext(dic)
|
|
|
|
def do_unconfigure(self, config):
|
|
config = self._config
|
|
del self._config
|
|
config.hook.pytest_unconfigure(config=config)
|
|
config.pluginmanager.unregister(self)
|
|
|
|
#
|
|
# XXX old code to automatically load classes
|
|
#
|
|
def canonical_importname(name):
|
|
name = name.lower()
|
|
modprefix = "pytest_"
|
|
if not name.startswith(modprefix):
|
|
name = modprefix + name
|
|
return name
|
|
|
|
def importplugin(importspec):
|
|
try:
|
|
return __import__(importspec)
|
|
except ImportError:
|
|
e = py.std.sys.exc_info()[1]
|
|
if str(e).find(importspec) == -1:
|
|
raise
|
|
try:
|
|
return __import__("py.__.test.plugin.%s" %(importspec), None, None, '__doc__')
|
|
except ImportError:
|
|
e = py.std.sys.exc_info()[1]
|
|
if str(e).find(importspec) == -1:
|
|
raise
|
|
#print "syspath:", py.std.sys.path
|
|
#print "curdir:", py.std.os.getcwd()
|
|
return __import__(importspec) # show the original exception
|
|
|
|
|
|
|
|
def isgenerichook(name):
|
|
return name == "pytest_plugins" or \
|
|
name.startswith("pytest_funcarg__")
|
|
|
|
def getargs(func):
|
|
args = py.std.inspect.getargs(py.code.getrawcode(func))[0]
|
|
startindex = py.std.inspect.ismethod(func) and 1 or 0
|
|
return args[startindex:]
|
|
|
|
def collectattr(obj, prefixes=("pytest_",)):
|
|
methods = {}
|
|
for apiname in dir(obj):
|
|
for prefix in prefixes:
|
|
if apiname.startswith(prefix):
|
|
methods[apiname] = getattr(obj, apiname)
|
|
return methods
|
|
|
|
def formatdef(func):
|
|
return "%s%s" %(
|
|
func.__name__,
|
|
py.std.inspect.formatargspec(*py.std.inspect.getargspec(func))
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
import py.__.test.plugin
|
|
basedir = py.path.local(py.__.test.plugin.__file__).dirpath()
|
|
name2text = {}
|
|
for p in basedir.listdir("pytest_*"):
|
|
if p.ext == ".py" or (
|
|
p.check(dir=1) and p.join("__init__.py").check()):
|
|
impname = p.purebasename
|
|
if impname.find("__") != -1:
|
|
continue
|
|
try:
|
|
plugin = importplugin(impname)
|
|
except (ImportError, py.__.test.outcome.Skipped):
|
|
name2text[impname] = "IMPORT ERROR"
|
|
else:
|
|
doc = plugin.__doc__ or ""
|
|
doc = doc.strip()
|
|
name2text[impname] = doc
|
|
|
|
for name in sorted(name2text.keys()):
|
|
text = name2text[name]
|
|
if name[0] == "_":
|
|
continue
|
|
print ("%-20s %s" % (name, text.split("\n")[0]))
|
|
|
|
#text = py.std.textwrap.wrap(name2text[name],
|
|
# width = 80,
|
|
# initial_indent="%s: " % name,
|
|
# replace_whitespace = False)
|
|
#for line in text:
|
|
# print line
|
|
|
|
|