adapt pytest to using pluggy (current master)

--HG--
branch : pluggy1
This commit is contained in:
holger krekel 2015-04-29 16:40:51 +02:00
parent 9aec5cd52d
commit 5ee7ee0850
10 changed files with 334 additions and 1649 deletions

View File

@ -9,7 +9,7 @@ import py
# DON't import pytest here because it causes import cycle troubles # DON't import pytest here because it causes import cycle troubles
import sys, os import sys, os
from _pytest import hookspec # the extension point definitions from _pytest import hookspec # the extension point definitions
from _pytest.core import PluginManager, hookimpl_opts, varnames from pluggy import PluginManager, hookimpl_opts, varnames
# pytest startup # pytest startup
# #

View File

@ -1,590 +0,0 @@
"""
PluginManager, basic initialization and tracing.
"""
import sys
from inspect import isfunction, ismethod, isclass, formatargspec, getargspec
import py
py3 = sys.version_info > (3,0)
def hookspec_opts(firstresult=False, historic=False):
""" returns a decorator which will define a function as a hook specfication.
If firstresult is True the 1:N hook call (N being the number of registered
hook implementation functions) will stop at I<=N when the I'th function
returns a non-None result.
If historic is True calls to a hook will be memorized and replayed
on later registered plugins.
"""
def setattr_hookspec_opts(func):
if historic and firstresult:
raise ValueError("cannot have a historic firstresult hook")
if firstresult:
func.firstresult = firstresult
if historic:
func.historic = historic
return func
return setattr_hookspec_opts
def hookimpl_opts(hookwrapper=False, optionalhook=False,
tryfirst=False, trylast=False):
""" Return a decorator which marks a function as a hook implementation.
If optionalhook is True a missing matching hook specification will not result
in an error (by default it is an error if no matching spec is found).
If tryfirst is True this hook implementation will run as early as possible
in the chain of N hook implementations for a specfication.
If trylast is True this hook implementation will run as late as possible
in the chain of N hook implementations.
If hookwrapper is True the hook implementations needs to execute exactly
one "yield". The code before the yield is run early before any non-hookwrapper
function is run. The code after the yield is run after all non-hookwrapper
function have run. The yield receives an ``CallOutcome`` object representing
the exception or result outcome of the inner calls (including other hookwrapper
calls).
"""
def setattr_hookimpl_opts(func):
if hookwrapper:
func.hookwrapper = True
if optionalhook:
func.optionalhook = True
if tryfirst:
func.tryfirst = True
if trylast:
func.trylast = True
return func
return setattr_hookimpl_opts
class TagTracer:
def __init__(self):
self._tag2proc = {}
self.writer = None
self.indent = 0
def get(self, name):
return TagTracerSub(self, (name,))
def format_message(self, tags, args):
if isinstance(args[-1], dict):
extra = args[-1]
args = args[:-1]
else:
extra = {}
content = " ".join(map(str, args))
indent = " " * self.indent
lines = [
"%s%s [%s]\n" %(indent, content, ":".join(tags))
]
for name, value in extra.items():
lines.append("%s %s: %s\n" % (indent, name, value))
return lines
def processmessage(self, tags, args):
if self.writer is not None and args:
lines = self.format_message(tags, args)
self.writer(''.join(lines))
try:
self._tag2proc[tags](tags, args)
except KeyError:
pass
def setwriter(self, writer):
self.writer = writer
def setprocessor(self, tags, processor):
if isinstance(tags, str):
tags = tuple(tags.split(":"))
else:
assert isinstance(tags, tuple)
self._tag2proc[tags] = processor
class TagTracerSub:
def __init__(self, root, tags):
self.root = root
self.tags = tags
def __call__(self, *args):
self.root.processmessage(self.tags, args)
def setmyprocessor(self, processor):
self.root.setprocessor(self.tags, processor)
def get(self, name):
return self.__class__(self.root, self.tags + (name,))
def raise_wrapfail(wrap_controller, msg):
co = wrap_controller.gi_code
raise RuntimeError("wrap_controller at %r %s:%d %s" %
(co.co_name, co.co_filename, co.co_firstlineno, msg))
def wrapped_call(wrap_controller, func):
""" Wrap calling to a function with a generator which needs to yield
exactly once. The yield point will trigger calling the wrapped function
and return its CallOutcome to the yield point. The generator then needs
to finish (raise StopIteration) in order for the wrapped call to complete.
"""
try:
next(wrap_controller) # first yield
except StopIteration:
raise_wrapfail(wrap_controller, "did not yield")
call_outcome = CallOutcome(func)
try:
wrap_controller.send(call_outcome)
raise_wrapfail(wrap_controller, "has second yield")
except StopIteration:
pass
return call_outcome.get_result()
class CallOutcome:
""" Outcome of a function call, either an exception or a proper result.
Calling the ``get_result`` method will return the result or reraise
the exception raised when the function was called. """
excinfo = None
def __init__(self, func):
try:
self.result = func()
except BaseException:
self.excinfo = sys.exc_info()
def force_result(self, result):
self.result = result
self.excinfo = None
def get_result(self):
if self.excinfo is None:
return self.result
else:
ex = self.excinfo
if py3:
raise ex[1].with_traceback(ex[2])
py.builtin._reraise(*ex)
class TracedHookExecution:
def __init__(self, pluginmanager, before, after):
self.pluginmanager = pluginmanager
self.before = before
self.after = after
self.oldcall = pluginmanager._inner_hookexec
assert not isinstance(self.oldcall, TracedHookExecution)
self.pluginmanager._inner_hookexec = self
def __call__(self, hook, methods, kwargs):
self.before(hook, methods, kwargs)
outcome = CallOutcome(lambda: self.oldcall(hook, methods, kwargs))
self.after(outcome, hook, methods, kwargs)
return outcome.get_result()
def undo(self):
self.pluginmanager._inner_hookexec = self.oldcall
class PluginManager(object):
""" Core Pluginmanager class which manages registration
of plugin objects and 1:N hook calling.
You can register new hooks by calling ``addhooks(module_or_class)``.
You can register plugin objects (which contain hooks) by calling
``register(plugin)``. The Pluginmanager is initialized with a
prefix that is searched for in the names of the dict of registered
plugin objects. An optional excludefunc allows to blacklist names which
are not considered as hooks despite a matching prefix.
For debugging purposes you can call ``enable_tracing()``
which will subsequently send debug information to the trace helper.
"""
def __init__(self, prefix, excludefunc=None):
self._prefix = prefix
self._excludefunc = excludefunc
self._name2plugin = {}
self._plugin2hookcallers = {}
self._plugin_distinfo = []
self.trace = TagTracer().get("pluginmanage")
self.hook = HookRelay(self.trace.root.get("hook"))
self._inner_hookexec = lambda hook, methods, kwargs: \
MultiCall(methods, kwargs, hook.firstresult).execute()
def _hookexec(self, hook, methods, kwargs):
# called from all hookcaller instances.
# enable_tracing will set its own wrapping function at self._inner_hookexec
return self._inner_hookexec(hook, methods, kwargs)
def enable_tracing(self):
""" enable tracing of hook calls and return an undo function. """
hooktrace = self.hook._trace
def before(hook, methods, kwargs):
hooktrace.root.indent += 1
hooktrace(hook.name, kwargs)
def after(outcome, hook, methods, kwargs):
if outcome.excinfo is None:
hooktrace("finish", hook.name, "-->", outcome.result)
hooktrace.root.indent -= 1
return TracedHookExecution(self, before, after).undo
def subset_hook_caller(self, name, remove_plugins):
""" Return a new HookCaller instance for the named method
which manages calls to all registered plugins except the
ones from remove_plugins. """
orig = getattr(self.hook, name)
plugins_to_remove = [plugin for plugin in remove_plugins
if hasattr(plugin, name)]
if plugins_to_remove:
hc = HookCaller(orig.name, orig._hookexec, orig._specmodule_or_class)
for plugin in orig._plugins:
if plugin not in plugins_to_remove:
hc._add_plugin(plugin)
# we also keep track of this hook caller so it
# gets properly removed on plugin unregistration
self._plugin2hookcallers.setdefault(plugin, []).append(hc)
return hc
return orig
def register(self, plugin, name=None):
""" Register a plugin and return its canonical name or None if the name
is blocked from registering. Raise a ValueError if the plugin is already
registered. """
plugin_name = name or self.get_canonical_name(plugin)
if plugin_name in self._name2plugin or plugin in self._plugin2hookcallers:
if self._name2plugin.get(plugin_name, -1) is None:
return # blocked plugin, return None to indicate no registration
raise ValueError("Plugin already registered: %s=%s\n%s" %(
plugin_name, plugin, self._name2plugin))
self._name2plugin[plugin_name] = plugin
# register prefix-matching hook specs of the plugin
self._plugin2hookcallers[plugin] = hookcallers = []
for name in dir(plugin):
if name.startswith(self._prefix):
hook = getattr(self.hook, name, None)
if hook is None:
if self._excludefunc is not None and self._excludefunc(name):
continue
hook = HookCaller(name, self._hookexec)
setattr(self.hook, name, hook)
elif hook.has_spec():
self._verify_hook(hook, plugin)
hook._maybe_apply_history(getattr(plugin, name))
hookcallers.append(hook)
hook._add_plugin(plugin)
return plugin_name
def unregister(self, plugin=None, name=None):
""" unregister a plugin object and all its contained hook implementations
from internal data structures. """
if name is None:
assert plugin is not None, "one of name or plugin needs to be specified"
name = self.get_name(plugin)
if plugin is None:
plugin = self.get_plugin(name)
# if self._name2plugin[name] == None registration was blocked: ignore
if self._name2plugin.get(name):
del self._name2plugin[name]
for hookcaller in self._plugin2hookcallers.pop(plugin, []):
hookcaller._remove_plugin(plugin)
return plugin
def set_blocked(self, name):
""" block registrations of the given name, unregister if already registered. """
self.unregister(name=name)
self._name2plugin[name] = None
def addhooks(self, module_or_class):
""" add new hook definitions from the given module_or_class using
the prefix/excludefunc with which the PluginManager was initialized. """
names = []
for name in dir(module_or_class):
if name.startswith(self._prefix):
hc = getattr(self.hook, name, None)
if hc is None:
hc = HookCaller(name, self._hookexec, module_or_class)
setattr(self.hook, name, hc)
else:
# plugins registered this hook without knowing the spec
hc.set_specification(module_or_class)
for plugin in hc._plugins:
self._verify_hook(hc, plugin)
names.append(name)
if not names:
raise ValueError("did not find new %r hooks in %r"
%(self._prefix, module_or_class))
def get_plugins(self):
""" return the set of registered plugins. """
return set(self._plugin2hookcallers)
def is_registered(self, plugin):
""" Return True if the plugin is already registered. """
return plugin in self._plugin2hookcallers
def get_canonical_name(self, plugin):
""" Return canonical name for a plugin object. Note that a plugin
may be registered under a different name which was specified
by the caller of register(plugin, name). To obtain the name
of an registered plugin use ``get_name(plugin)`` instead."""
return getattr(plugin, "__name__", None) or str(id(plugin))
def get_plugin(self, name):
""" Return a plugin or None for the given name. """
return self._name2plugin.get(name)
def get_name(self, plugin):
""" Return name for registered plugin or None if not registered. """
for name, val in self._name2plugin.items():
if plugin == val:
return name
def _verify_hook(self, hook, plugin):
method = getattr(plugin, hook.name)
pluginname = self.get_name(plugin)
if hook.is_historic() and hasattr(method, "hookwrapper"):
raise PluginValidationError(
"Plugin %r\nhook %r\nhistoric incompatible to hookwrapper" %(
pluginname, hook.name))
for arg in varnames(method):
if arg not in hook.argnames:
raise PluginValidationError(
"Plugin %r\nhook %r\nargument %r not available\n"
"plugin definition: %s\n"
"available hookargs: %s" %(
pluginname, hook.name, arg, formatdef(method),
", ".join(hook.argnames)))
def check_pending(self):
""" Verify that all hooks which have not been verified against
a hook specification are optional, otherwise raise PluginValidationError"""
for name in self.hook.__dict__:
if name.startswith(self._prefix):
hook = getattr(self.hook, name)
if not hook.has_spec():
for plugin in hook._plugins:
method = getattr(plugin, hook.name)
if not getattr(method, "optionalhook", False):
raise PluginValidationError(
"unknown hook %r in plugin %r" %(name, plugin))
def load_setuptools_entrypoints(self, entrypoint_name):
""" Load modules from querying the specified setuptools entrypoint name.
Return the number of loaded plugins. """
from pkg_resources import iter_entry_points, DistributionNotFound
for ep in iter_entry_points(entrypoint_name):
# is the plugin registered or blocked?
if self.get_plugin(ep.name) or ep.name in self._name2plugin:
continue
try:
plugin = ep.load()
except DistributionNotFound:
continue
self.register(plugin, name=ep.name)
self._plugin_distinfo.append((ep.dist, plugin))
return len(self._plugin_distinfo)
class MultiCall:
""" execute a call into multiple python functions/methods. """
# XXX note that the __multicall__ argument is supported only
# for pytest compatibility reasons. It was never officially
# supported there and is explicitely deprecated since 2.8
# so we can remove it soon, allowing to avoid the below recursion
# in execute() and simplify/speed up the execute loop.
def __init__(self, methods, kwargs, firstresult=False):
self.methods = methods
self.kwargs = kwargs
self.kwargs["__multicall__"] = self
self.firstresult = firstresult
def execute(self):
all_kwargs = self.kwargs
self.results = results = []
firstresult = self.firstresult
while self.methods:
method = self.methods.pop()
args = [all_kwargs[argname] for argname in varnames(method)]
if hasattr(method, "hookwrapper"):
return wrapped_call(method(*args), self.execute)
res = method(*args)
if res is not None:
if firstresult:
return res
results.append(res)
if not firstresult:
return results
def __repr__(self):
status = "%d meths" % (len(self.methods),)
if hasattr(self, "results"):
status = ("%d results, " % len(self.results)) + status
return "<MultiCall %s, kwargs=%r>" %(status, self.kwargs)
def varnames(func, startindex=None):
""" return argument name tuple for a function, method, class or callable.
In case of a class, its "__init__" method is considered.
For methods the "self" parameter is not included unless you are passing
an unbound method with Python3 (which has no supports for unbound methods)
"""
cache = getattr(func, "__dict__", {})
try:
return cache["_varnames"]
except KeyError:
pass
if isclass(func):
try:
func = func.__init__
except AttributeError:
return ()
startindex = 1
else:
if not isfunction(func) and not ismethod(func):
func = getattr(func, '__call__', func)
if startindex is None:
startindex = int(ismethod(func))
rawcode = py.code.getrawcode(func)
try:
x = rawcode.co_varnames[startindex:rawcode.co_argcount]
except AttributeError:
x = ()
else:
defaults = func.__defaults__
if defaults:
x = x[:-len(defaults)]
try:
cache["_varnames"] = x
except TypeError:
pass
return x
class HookRelay:
def __init__(self, trace):
self._trace = trace
class HookCaller(object):
def __init__(self, name, hook_execute, specmodule_or_class=None):
self.name = name
self._plugins = []
self._wrappers = []
self._nonwrappers = []
self._hookexec = hook_execute
if specmodule_or_class is not None:
self.set_specification(specmodule_or_class)
def has_spec(self):
return hasattr(self, "_specmodule_or_class")
def set_specification(self, specmodule_or_class):
assert not self.has_spec()
self._specmodule_or_class = specmodule_or_class
specfunc = getattr(specmodule_or_class, self.name)
argnames = varnames(specfunc, startindex=isclass(specmodule_or_class))
assert "self" not in argnames # sanity check
self.argnames = ["__multicall__"] + list(argnames)
self.firstresult = getattr(specfunc, 'firstresult', False)
if hasattr(specfunc, "historic"):
self._call_history = []
def is_historic(self):
return hasattr(self, "_call_history")
def _remove_plugin(self, plugin):
self._plugins.remove(plugin)
meth = getattr(plugin, self.name)
try:
self._nonwrappers.remove(meth)
except ValueError:
self._wrappers.remove(meth)
def _add_plugin(self, plugin):
self._plugins.append(plugin)
self._add_method(getattr(plugin, self.name))
def _add_method(self, meth):
if hasattr(meth, 'hookwrapper'):
methods = self._wrappers
else:
methods = self._nonwrappers
if hasattr(meth, 'trylast'):
methods.insert(0, meth)
elif hasattr(meth, 'tryfirst'):
methods.append(meth)
else:
# find last non-tryfirst method
i = len(methods) - 1
while i >= 0 and hasattr(methods[i], "tryfirst"):
i -= 1
methods.insert(i + 1, meth)
def __repr__(self):
return "<HookCaller %r>" %(self.name,)
def __call__(self, **kwargs):
assert not self.is_historic()
return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
def call_historic(self, proc=None, kwargs=None):
self._call_history.append((kwargs or {}, proc))
# historizing hooks don't return results
self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
def call_extra(self, methods, kwargs):
""" Call the hook with some additional temporarily participating
methods using the specified kwargs as call parameters. """
old = list(self._nonwrappers), list(self._wrappers)
for method in methods:
self._add_method(method)
try:
return self(**kwargs)
finally:
self._nonwrappers, self._wrappers = old
def _maybe_apply_history(self, method):
if self.is_historic():
for kwargs, proc in self._call_history:
res = self._hookexec(self, [method], kwargs)
if res and proc is not None:
proc(res[0])
class PluginValidationError(Exception):
""" plugin failed validation. """
def formatdef(func):
return "%s%s" % (
func.__name__,
formatargspec(*getargspec(func))
)

View File

@ -4,6 +4,7 @@ import sys
import pkgutil import pkgutil
import py import py
import pluggy
import _pytest import _pytest
@ -69,7 +70,7 @@ def pytest_cmdline_main(config):
genscript = config.getvalue("genscript") genscript = config.getvalue("genscript")
if genscript: if genscript:
tw = py.io.TerminalWriter() tw = py.io.TerminalWriter()
deps = ['py', '_pytest', 'pytest'] deps = ['py', '_pytest', 'pytest', 'pluggy']
if sys.version_info < (2,7): if sys.version_info < (2,7):
deps.append("argparse") deps.append("argparse")
tw.line("generated script will run on python2.6-python3.3++") tw.line("generated script will run on python2.6-python3.3++")

View File

@ -1,6 +1,6 @@
""" hook specifications for pytest plugins, invoked from main.py and builtin plugins. """ """ hook specifications for pytest plugins, invoked from main.py and builtin plugins. """
from _pytest.core import hookspec_opts from pluggy import hookspec_opts
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
# Initialization hooks called for every plugin # Initialization hooks called for every plugin

View File

@ -13,7 +13,7 @@ import subprocess
import py import py
import pytest import pytest
from py.builtin import print_ from py.builtin import print_
from _pytest.core import TracedHookExecution from pluggy import _TracedHookExecution
from _pytest.main import Session, EXIT_OK from _pytest.main import Session, EXIT_OK
@ -198,7 +198,7 @@ class HookRecorder:
self.calls.append(ParsedCall(hook.name, kwargs)) self.calls.append(ParsedCall(hook.name, kwargs))
def after(outcome, hook, method, kwargs): def after(outcome, hook, method, kwargs):
pass pass
executor = TracedHookExecution(pluginmanager, before, after) executor = _TracedHookExecution(pluginmanager, before, after)
self._undo_wrapping = executor.undo self._undo_wrapping = executor.undo
def finish_recording(self): def finish_recording(self):

View File

@ -8,7 +8,11 @@ from _pytest.mark import MarkDecorator, MarkerError
from py._code.code import TerminalRepr from py._code.code import TerminalRepr
import _pytest import _pytest
cutdir = py.path.local(_pytest.__file__).dirpath() import pluggy
cutdir2 = py.path.local(_pytest.__file__).dirpath()
cutdir1 = py.path.local(pluggy.__file__.rstrip("oc"))
NoneType = type(None) NoneType = type(None)
NOTSET = object() NOTSET = object()
@ -18,6 +22,11 @@ callable = py.builtin.callable
# used to work around a python2 exception info leak # used to work around a python2 exception info leak
exc_clear = getattr(sys, 'exc_clear', lambda: None) exc_clear = getattr(sys, 'exc_clear', lambda: None)
def filter_traceback(entry):
return entry.path != cutdir1 and not entry.path.relto(cutdir2)
def getfslineno(obj): def getfslineno(obj):
# xxx let decorators etc specify a sane ordering # xxx let decorators etc specify a sane ordering
while hasattr(obj, "__wrapped__"): while hasattr(obj, "__wrapped__"):
@ -604,7 +613,11 @@ class FunctionMixin(PyobjMixin):
if ntraceback == traceback: if ntraceback == traceback:
ntraceback = ntraceback.cut(path=path) ntraceback = ntraceback.cut(path=path)
if ntraceback == traceback: if ntraceback == traceback:
ntraceback = ntraceback.cut(excludepath=cutdir) #ntraceback = ntraceback.cut(excludepath=cutdir2)
ntraceback = ntraceback.filter(filter_traceback)
if not ntraceback:
ntraceback = traceback
excinfo.traceback = ntraceback.filter() excinfo.traceback = ntraceback.filter()
# issue364: mark all but first and last frames to # issue364: mark all but first and last frames to
# only show a single-line message for each frame # only show a single-line message for each frame

View File

@ -12,7 +12,7 @@ if __name__ == '__main__': # if run as a script or by 'python -m pytest'
# else we are imported # else we are imported
from _pytest.config import main, UsageError, _preloadplugins, cmdline from _pytest.config import main, UsageError, _preloadplugins, cmdline
from _pytest.core import hookspec_opts, hookimpl_opts from pluggy import hookspec_opts, hookimpl_opts
from _pytest import __version__ from _pytest import __version__
_preloadplugins() # to populate pytest.* namespace so help(pytest) works _preloadplugins() # to populate pytest.* namespace so help(pytest) works

View File

@ -48,7 +48,7 @@ def has_environment_marker_support():
def main(): def main():
install_requires = ['py>=1.4.27.dev2'] install_requires = ['py>=1.4.27.dev2', 'pluggy>=0.1.0,<0.2.0']
extras_require = {} extras_require = {}
if has_environment_marker_support(): if has_environment_marker_support():
extras_require[':python_version=="2.6" or python_version=="3.0" or python_version=="3.1"'] = ['argparse'] extras_require[':python_version=="2.6" or python_version=="3.0" or python_version=="3.1"'] = ['argparse']

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,311 @@
import pytest
import py
import os
from _pytest.config import get_config, PytestPluginManager
@pytest.fixture
def pytestpm():
return PytestPluginManager()
class TestPytestPluginInteractions:
def test_addhooks_conftestplugin(self, testdir):
testdir.makepyfile(newhooks="""
def pytest_myhook(xyz):
"new hook"
""")
conf = testdir.makeconftest("""
import sys ; sys.path.insert(0, '.')
import newhooks
def pytest_addhooks(pluginmanager):
pluginmanager.addhooks(newhooks)
def pytest_myhook(xyz):
return xyz + 1
""")
config = get_config()
pm = config.pluginmanager
pm.hook.pytest_addhooks.call_historic(
kwargs=dict(pluginmanager=config.pluginmanager))
config.pluginmanager._importconftest(conf)
#print(config.pluginmanager.get_plugins())
res = config.hook.pytest_myhook(xyz=10)
assert res == [11]
def test_addhooks_nohooks(self, testdir):
testdir.makeconftest("""
import sys
def pytest_addhooks(pluginmanager):
pluginmanager.addhooks(sys)
""")
res = testdir.runpytest()
assert res.ret != 0
res.stderr.fnmatch_lines([
"*did not find*sys*"
])
def test_namespace_early_from_import(self, testdir):
p = testdir.makepyfile("""
from pytest import Item
from pytest import Item as Item2
assert Item is Item2
""")
result = testdir.runpython(p)
assert result.ret == 0
def test_do_ext_namespace(self, testdir):
testdir.makeconftest("""
def pytest_namespace():
return {'hello': 'world'}
""")
p = testdir.makepyfile("""
from pytest import hello
import pytest
def test_hello():
assert hello == "world"
assert 'hello' in pytest.__all__
""")
reprec = testdir.inline_run(p)
reprec.assertoutcome(passed=1)
def test_do_option_postinitialize(self, testdir):
config = testdir.parseconfigure()
assert not hasattr(config.option, 'test123')
p = testdir.makepyfile("""
def pytest_addoption(parser):
parser.addoption('--test123', action="store_true",
default=True)
""")
config.pluginmanager._importconftest(p)
assert config.option.test123
def test_configure(self, testdir):
config = testdir.parseconfig()
l = []
class A:
def pytest_configure(self, config):
l.append(self)
config.pluginmanager.register(A())
assert len(l) == 0
config._do_configure()
assert len(l) == 1
config.pluginmanager.register(A()) # leads to a configured() plugin
assert len(l) == 2
assert l[0] != l[1]
config._ensure_unconfigure()
config.pluginmanager.register(A())
assert len(l) == 2
def test_hook_tracing(self):
pytestpm = get_config().pluginmanager # fully initialized with plugins
saveindent = []
class api1:
def pytest_plugin_registered(self):
saveindent.append(pytestpm.trace.root.indent)
class api2:
def pytest_plugin_registered(self):
saveindent.append(pytestpm.trace.root.indent)
raise ValueError()
l = []
pytestpm.trace.root.setwriter(l.append)
undo = pytestpm.enable_tracing()
try:
indent = pytestpm.trace.root.indent
p = api1()
pytestpm.register(p)
assert pytestpm.trace.root.indent == indent
assert len(l) >= 2
assert 'pytest_plugin_registered' in l[0]
assert 'finish' in l[1]
l[:] = []
with pytest.raises(ValueError):
pytestpm.register(api2())
assert pytestpm.trace.root.indent == indent
assert saveindent[0] > indent
finally:
undo()
def test_warn_on_deprecated_multicall(self, pytestpm):
class Plugin:
def pytest_configure(self, __multicall__):
pass
before = list(pytestpm._warnings)
pytestpm.register(Plugin())
assert len(pytestpm._warnings) == len(before) + 1
assert "deprecated" in pytestpm._warnings[-1]["message"]
def test_namespace_has_default_and_env_plugins(testdir):
p = testdir.makepyfile("""
import pytest
pytest.mark
""")
result = testdir.runpython(p)
assert result.ret == 0
def test_default_markers(testdir):
result = testdir.runpytest("--markers")
result.stdout.fnmatch_lines([
"*tryfirst*first*",
"*trylast*last*",
])
def test_importplugin_issue375(testdir, pytestpm):
testdir.syspathinsert(testdir.tmpdir)
testdir.makepyfile(qwe="import aaaa")
with pytest.raises(ImportError) as excinfo:
pytestpm.import_plugin("qwe")
assert "qwe" not in str(excinfo.value)
assert "aaaa" in str(excinfo.value)
class TestPytestPluginManager:
def test_register_imported_modules(self):
pm = PytestPluginManager()
mod = py.std.types.ModuleType("x.y.pytest_hello")
pm.register(mod)
assert pm.is_registered(mod)
l = pm.get_plugins()
assert mod in l
pytest.raises(ValueError, "pm.register(mod)")
pytest.raises(ValueError, lambda: pm.register(mod))
#assert not pm.is_registered(mod2)
assert pm.get_plugins() == l
def test_canonical_import(self, monkeypatch):
mod = py.std.types.ModuleType("pytest_xyz")
monkeypatch.setitem(py.std.sys.modules, 'pytest_xyz', mod)
pm = PytestPluginManager()
pm.import_plugin('pytest_xyz')
assert pm.get_plugin('pytest_xyz') == mod
assert pm.is_registered(mod)
def test_consider_module(self, testdir, pytestpm):
testdir.syspathinsert()
testdir.makepyfile(pytest_p1="#")
testdir.makepyfile(pytest_p2="#")
mod = py.std.types.ModuleType("temp")
mod.pytest_plugins = ["pytest_p1", "pytest_p2"]
pytestpm.consider_module(mod)
assert pytestpm.get_plugin("pytest_p1").__name__ == "pytest_p1"
assert pytestpm.get_plugin("pytest_p2").__name__ == "pytest_p2"
def test_consider_module_import_module(self, testdir):
pytestpm = get_config().pluginmanager
mod = py.std.types.ModuleType("x")
mod.pytest_plugins = "pytest_a"
aplugin = testdir.makepyfile(pytest_a="#")
reprec = testdir.make_hook_recorder(pytestpm)
#syspath.prepend(aplugin.dirpath())
py.std.sys.path.insert(0, str(aplugin.dirpath()))
pytestpm.consider_module(mod)
call = reprec.getcall(pytestpm.hook.pytest_plugin_registered.name)
assert call.plugin.__name__ == "pytest_a"
# check that it is not registered twice
pytestpm.consider_module(mod)
l = reprec.getcalls("pytest_plugin_registered")
assert len(l) == 1
def test_consider_env_fails_to_import(self, monkeypatch, pytestpm):
monkeypatch.setenv('PYTEST_PLUGINS', 'nonexisting', prepend=",")
with pytest.raises(ImportError):
pytestpm.consider_env()
def test_plugin_skip(self, testdir, monkeypatch):
p = testdir.makepyfile(skipping1="""
import pytest
pytest.skip("hello")
""")
p.copy(p.dirpath("skipping2.py"))
monkeypatch.setenv("PYTEST_PLUGINS", "skipping2")
result = testdir.runpytest("-rw", "-p", "skipping1", syspathinsert=True)
assert result.ret == 0
result.stdout.fnmatch_lines([
"WI1*skipped plugin*skipping1*hello*",
"WI1*skipped plugin*skipping2*hello*",
])
def test_consider_env_plugin_instantiation(self, testdir, monkeypatch, pytestpm):
testdir.syspathinsert()
testdir.makepyfile(xy123="#")
monkeypatch.setitem(os.environ, 'PYTEST_PLUGINS', 'xy123')
l1 = len(pytestpm.get_plugins())
pytestpm.consider_env()
l2 = len(pytestpm.get_plugins())
assert l2 == l1 + 1
assert pytestpm.get_plugin('xy123')
pytestpm.consider_env()
l3 = len(pytestpm.get_plugins())
assert l2 == l3
def test_pluginmanager_ENV_startup(self, testdir, monkeypatch):
testdir.makepyfile(pytest_x500="#")
p = testdir.makepyfile("""
import pytest
def test_hello(pytestconfig):
plugin = pytestconfig.pluginmanager.get_plugin('pytest_x500')
assert plugin is not None
""")
monkeypatch.setenv('PYTEST_PLUGINS', 'pytest_x500', prepend=",")
result = testdir.runpytest(p, syspathinsert=True)
assert result.ret == 0
result.stdout.fnmatch_lines(["*1 passed*"])
def test_import_plugin_importname(self, testdir, pytestpm):
pytest.raises(ImportError, 'pytestpm.import_plugin("qweqwex.y")')
pytest.raises(ImportError, 'pytestpm.import_plugin("pytest_qweqwx.y")')
testdir.syspathinsert()
pluginname = "pytest_hello"
testdir.makepyfile(**{pluginname: ""})
pytestpm.import_plugin("pytest_hello")
len1 = len(pytestpm.get_plugins())
pytestpm.import_plugin("pytest_hello")
len2 = len(pytestpm.get_plugins())
assert len1 == len2
plugin1 = pytestpm.get_plugin("pytest_hello")
assert plugin1.__name__.endswith('pytest_hello')
plugin2 = pytestpm.get_plugin("pytest_hello")
assert plugin2 is plugin1
def test_import_plugin_dotted_name(self, testdir, pytestpm):
pytest.raises(ImportError, 'pytestpm.import_plugin("qweqwex.y")')
pytest.raises(ImportError, 'pytestpm.import_plugin("pytest_qweqwex.y")')
testdir.syspathinsert()
testdir.mkpydir("pkg").join("plug.py").write("x=3")
pluginname = "pkg.plug"
pytestpm.import_plugin(pluginname)
mod = pytestpm.get_plugin("pkg.plug")
assert mod.x == 3
def test_consider_conftest_deps(self, testdir, pytestpm):
mod = testdir.makepyfile("pytest_plugins='xyz'").pyimport()
with pytest.raises(ImportError):
pytestpm.consider_conftest(mod)
class TestPytestPluginManagerBootstrapming:
def test_preparse_args(self, pytestpm):
pytest.raises(ImportError, lambda:
pytestpm.consider_preparse(["xyz", "-p", "hello123"]))
def test_plugin_prevent_register(self, pytestpm):
pytestpm.consider_preparse(["xyz", "-p", "no:abc"])
l1 = pytestpm.get_plugins()
pytestpm.register(42, name="abc")
l2 = pytestpm.get_plugins()
assert len(l2) == len(l1)
assert 42 not in l2
def test_plugin_prevent_register_unregistered_alredy_registered(self, pytestpm):
pytestpm.register(42, name="abc")
l1 = pytestpm.get_plugins()
assert 42 in l1
pytestpm.consider_preparse(["xyz", "-p", "no:abc"])
l2 = pytestpm.get_plugins()
assert 42 not in l2