From 5ee7ee0850e73382cde2e431fa081ebc3ce1cf22 Mon Sep 17 00:00:00 2001 From: holger krekel Date: Wed, 29 Apr 2015 16:40:51 +0200 Subject: [PATCH] adapt pytest to using pluggy (current master) --HG-- branch : pluggy1 --- _pytest/config.py | 2 +- _pytest/core.py | 590 ------------------ _pytest/genscript.py | 3 +- _pytest/hookspec.py | 2 +- _pytest/pytester.py | 4 +- _pytest/python.py | 17 +- pytest.py | 2 +- setup.py | 2 +- testing/test_core.py | 1050 --------------------------------- testing/test_pluginmanager.py | 311 ++++++++++ 10 files changed, 334 insertions(+), 1649 deletions(-) delete mode 100644 _pytest/core.py delete mode 100644 testing/test_core.py create mode 100644 testing/test_pluginmanager.py diff --git a/_pytest/config.py b/_pytest/config.py index a7a887224..5b17d7327 100644 --- a/_pytest/config.py +++ b/_pytest/config.py @@ -9,7 +9,7 @@ import py # DON't import pytest here because it causes import cycle troubles import sys, os from _pytest import hookspec # the extension point definitions -from _pytest.core import PluginManager, hookimpl_opts, varnames +from pluggy import PluginManager, hookimpl_opts, varnames # pytest startup # diff --git a/_pytest/core.py b/_pytest/core.py deleted file mode 100644 index 5dcc30801..000000000 --- a/_pytest/core.py +++ /dev/null @@ -1,590 +0,0 @@ -""" -PluginManager, basic initialization and tracing. -""" -import sys -from inspect import isfunction, ismethod, isclass, formatargspec, getargspec -import py - -py3 = sys.version_info > (3,0) - -def hookspec_opts(firstresult=False, historic=False): - """ returns a decorator which will define a function as a hook specfication. - - If firstresult is True the 1:N hook call (N being the number of registered - hook implementation functions) will stop at I<=N when the I'th function - returns a non-None result. - - If historic is True calls to a hook will be memorized and replayed - on later registered plugins. - """ - def setattr_hookspec_opts(func): - if historic and firstresult: - raise ValueError("cannot have a historic firstresult hook") - if firstresult: - func.firstresult = firstresult - if historic: - func.historic = historic - return func - return setattr_hookspec_opts - - -def hookimpl_opts(hookwrapper=False, optionalhook=False, - tryfirst=False, trylast=False): - """ Return a decorator which marks a function as a hook implementation. - - If optionalhook is True a missing matching hook specification will not result - in an error (by default it is an error if no matching spec is found). - - If tryfirst is True this hook implementation will run as early as possible - in the chain of N hook implementations for a specfication. - - If trylast is True this hook implementation will run as late as possible - in the chain of N hook implementations. - - If hookwrapper is True the hook implementations needs to execute exactly - one "yield". The code before the yield is run early before any non-hookwrapper - function is run. The code after the yield is run after all non-hookwrapper - function have run. The yield receives an ``CallOutcome`` object representing - the exception or result outcome of the inner calls (including other hookwrapper - calls). - """ - def setattr_hookimpl_opts(func): - if hookwrapper: - func.hookwrapper = True - if optionalhook: - func.optionalhook = True - if tryfirst: - func.tryfirst = True - if trylast: - func.trylast = True - return func - return setattr_hookimpl_opts - - -class TagTracer: - def __init__(self): - self._tag2proc = {} - self.writer = None - self.indent = 0 - - def get(self, name): - return TagTracerSub(self, (name,)) - - def format_message(self, tags, args): - if isinstance(args[-1], dict): - extra = args[-1] - args = args[:-1] - else: - extra = {} - - content = " ".join(map(str, args)) - indent = " " * self.indent - - lines = [ - "%s%s [%s]\n" %(indent, content, ":".join(tags)) - ] - - for name, value in extra.items(): - lines.append("%s %s: %s\n" % (indent, name, value)) - return lines - - def processmessage(self, tags, args): - if self.writer is not None and args: - lines = self.format_message(tags, args) - self.writer(''.join(lines)) - try: - self._tag2proc[tags](tags, args) - except KeyError: - pass - - def setwriter(self, writer): - self.writer = writer - - def setprocessor(self, tags, processor): - if isinstance(tags, str): - tags = tuple(tags.split(":")) - else: - assert isinstance(tags, tuple) - self._tag2proc[tags] = processor - - -class TagTracerSub: - def __init__(self, root, tags): - self.root = root - self.tags = tags - - def __call__(self, *args): - self.root.processmessage(self.tags, args) - - def setmyprocessor(self, processor): - self.root.setprocessor(self.tags, processor) - - def get(self, name): - return self.__class__(self.root, self.tags + (name,)) - - -def raise_wrapfail(wrap_controller, msg): - co = wrap_controller.gi_code - raise RuntimeError("wrap_controller at %r %s:%d %s" % - (co.co_name, co.co_filename, co.co_firstlineno, msg)) - - -def wrapped_call(wrap_controller, func): - """ Wrap calling to a function with a generator which needs to yield - exactly once. The yield point will trigger calling the wrapped function - and return its CallOutcome to the yield point. The generator then needs - to finish (raise StopIteration) in order for the wrapped call to complete. - """ - try: - next(wrap_controller) # first yield - except StopIteration: - raise_wrapfail(wrap_controller, "did not yield") - call_outcome = CallOutcome(func) - try: - wrap_controller.send(call_outcome) - raise_wrapfail(wrap_controller, "has second yield") - except StopIteration: - pass - return call_outcome.get_result() - - -class CallOutcome: - """ Outcome of a function call, either an exception or a proper result. - Calling the ``get_result`` method will return the result or reraise - the exception raised when the function was called. """ - excinfo = None - def __init__(self, func): - try: - self.result = func() - except BaseException: - self.excinfo = sys.exc_info() - - def force_result(self, result): - self.result = result - self.excinfo = None - - def get_result(self): - if self.excinfo is None: - return self.result - else: - ex = self.excinfo - if py3: - raise ex[1].with_traceback(ex[2]) - py.builtin._reraise(*ex) - - -class TracedHookExecution: - def __init__(self, pluginmanager, before, after): - self.pluginmanager = pluginmanager - self.before = before - self.after = after - self.oldcall = pluginmanager._inner_hookexec - assert not isinstance(self.oldcall, TracedHookExecution) - self.pluginmanager._inner_hookexec = self - - def __call__(self, hook, methods, kwargs): - self.before(hook, methods, kwargs) - outcome = CallOutcome(lambda: self.oldcall(hook, methods, kwargs)) - self.after(outcome, hook, methods, kwargs) - return outcome.get_result() - - def undo(self): - self.pluginmanager._inner_hookexec = self.oldcall - - -class PluginManager(object): - """ Core Pluginmanager class which manages registration - of plugin objects and 1:N hook calling. - - You can register new hooks by calling ``addhooks(module_or_class)``. - You can register plugin objects (which contain hooks) by calling - ``register(plugin)``. The Pluginmanager is initialized with a - prefix that is searched for in the names of the dict of registered - plugin objects. An optional excludefunc allows to blacklist names which - are not considered as hooks despite a matching prefix. - - For debugging purposes you can call ``enable_tracing()`` - which will subsequently send debug information to the trace helper. - """ - - def __init__(self, prefix, excludefunc=None): - self._prefix = prefix - self._excludefunc = excludefunc - self._name2plugin = {} - self._plugin2hookcallers = {} - self._plugin_distinfo = [] - self.trace = TagTracer().get("pluginmanage") - self.hook = HookRelay(self.trace.root.get("hook")) - self._inner_hookexec = lambda hook, methods, kwargs: \ - MultiCall(methods, kwargs, hook.firstresult).execute() - - def _hookexec(self, hook, methods, kwargs): - # called from all hookcaller instances. - # enable_tracing will set its own wrapping function at self._inner_hookexec - return self._inner_hookexec(hook, methods, kwargs) - - def enable_tracing(self): - """ enable tracing of hook calls and return an undo function. """ - hooktrace = self.hook._trace - - def before(hook, methods, kwargs): - hooktrace.root.indent += 1 - hooktrace(hook.name, kwargs) - - def after(outcome, hook, methods, kwargs): - if outcome.excinfo is None: - hooktrace("finish", hook.name, "-->", outcome.result) - hooktrace.root.indent -= 1 - - return TracedHookExecution(self, before, after).undo - - def subset_hook_caller(self, name, remove_plugins): - """ Return a new HookCaller instance for the named method - which manages calls to all registered plugins except the - ones from remove_plugins. """ - orig = getattr(self.hook, name) - plugins_to_remove = [plugin for plugin in remove_plugins - if hasattr(plugin, name)] - if plugins_to_remove: - hc = HookCaller(orig.name, orig._hookexec, orig._specmodule_or_class) - for plugin in orig._plugins: - if plugin not in plugins_to_remove: - hc._add_plugin(plugin) - # we also keep track of this hook caller so it - # gets properly removed on plugin unregistration - self._plugin2hookcallers.setdefault(plugin, []).append(hc) - return hc - return orig - - def register(self, plugin, name=None): - """ Register a plugin and return its canonical name or None if the name - is blocked from registering. Raise a ValueError if the plugin is already - registered. """ - plugin_name = name or self.get_canonical_name(plugin) - - if plugin_name in self._name2plugin or plugin in self._plugin2hookcallers: - if self._name2plugin.get(plugin_name, -1) is None: - return # blocked plugin, return None to indicate no registration - raise ValueError("Plugin already registered: %s=%s\n%s" %( - plugin_name, plugin, self._name2plugin)) - - self._name2plugin[plugin_name] = plugin - - # register prefix-matching hook specs of the plugin - self._plugin2hookcallers[plugin] = hookcallers = [] - for name in dir(plugin): - if name.startswith(self._prefix): - hook = getattr(self.hook, name, None) - if hook is None: - if self._excludefunc is not None and self._excludefunc(name): - continue - hook = HookCaller(name, self._hookexec) - setattr(self.hook, name, hook) - elif hook.has_spec(): - self._verify_hook(hook, plugin) - hook._maybe_apply_history(getattr(plugin, name)) - hookcallers.append(hook) - hook._add_plugin(plugin) - return plugin_name - - def unregister(self, plugin=None, name=None): - """ unregister a plugin object and all its contained hook implementations - from internal data structures. """ - if name is None: - assert plugin is not None, "one of name or plugin needs to be specified" - name = self.get_name(plugin) - - if plugin is None: - plugin = self.get_plugin(name) - - # if self._name2plugin[name] == None registration was blocked: ignore - if self._name2plugin.get(name): - del self._name2plugin[name] - - for hookcaller in self._plugin2hookcallers.pop(plugin, []): - hookcaller._remove_plugin(plugin) - - return plugin - - def set_blocked(self, name): - """ block registrations of the given name, unregister if already registered. """ - self.unregister(name=name) - self._name2plugin[name] = None - - def addhooks(self, module_or_class): - """ add new hook definitions from the given module_or_class using - the prefix/excludefunc with which the PluginManager was initialized. """ - names = [] - for name in dir(module_or_class): - if name.startswith(self._prefix): - hc = getattr(self.hook, name, None) - if hc is None: - hc = HookCaller(name, self._hookexec, module_or_class) - setattr(self.hook, name, hc) - else: - # plugins registered this hook without knowing the spec - hc.set_specification(module_or_class) - for plugin in hc._plugins: - self._verify_hook(hc, plugin) - names.append(name) - - if not names: - raise ValueError("did not find new %r hooks in %r" - %(self._prefix, module_or_class)) - - def get_plugins(self): - """ return the set of registered plugins. """ - return set(self._plugin2hookcallers) - - def is_registered(self, plugin): - """ Return True if the plugin is already registered. """ - return plugin in self._plugin2hookcallers - - def get_canonical_name(self, plugin): - """ Return canonical name for a plugin object. Note that a plugin - may be registered under a different name which was specified - by the caller of register(plugin, name). To obtain the name - of an registered plugin use ``get_name(plugin)`` instead.""" - return getattr(plugin, "__name__", None) or str(id(plugin)) - - def get_plugin(self, name): - """ Return a plugin or None for the given name. """ - return self._name2plugin.get(name) - - def get_name(self, plugin): - """ Return name for registered plugin or None if not registered. """ - for name, val in self._name2plugin.items(): - if plugin == val: - return name - - def _verify_hook(self, hook, plugin): - method = getattr(plugin, hook.name) - pluginname = self.get_name(plugin) - - if hook.is_historic() and hasattr(method, "hookwrapper"): - raise PluginValidationError( - "Plugin %r\nhook %r\nhistoric incompatible to hookwrapper" %( - pluginname, hook.name)) - - for arg in varnames(method): - if arg not in hook.argnames: - raise PluginValidationError( - "Plugin %r\nhook %r\nargument %r not available\n" - "plugin definition: %s\n" - "available hookargs: %s" %( - pluginname, hook.name, arg, formatdef(method), - ", ".join(hook.argnames))) - - def check_pending(self): - """ Verify that all hooks which have not been verified against - a hook specification are optional, otherwise raise PluginValidationError""" - for name in self.hook.__dict__: - if name.startswith(self._prefix): - hook = getattr(self.hook, name) - if not hook.has_spec(): - for plugin in hook._plugins: - method = getattr(plugin, hook.name) - if not getattr(method, "optionalhook", False): - raise PluginValidationError( - "unknown hook %r in plugin %r" %(name, plugin)) - - def load_setuptools_entrypoints(self, entrypoint_name): - """ Load modules from querying the specified setuptools entrypoint name. - Return the number of loaded plugins. """ - from pkg_resources import iter_entry_points, DistributionNotFound - for ep in iter_entry_points(entrypoint_name): - # is the plugin registered or blocked? - if self.get_plugin(ep.name) or ep.name in self._name2plugin: - continue - try: - plugin = ep.load() - except DistributionNotFound: - continue - self.register(plugin, name=ep.name) - self._plugin_distinfo.append((ep.dist, plugin)) - return len(self._plugin_distinfo) - - -class MultiCall: - """ execute a call into multiple python functions/methods. """ - - # XXX note that the __multicall__ argument is supported only - # for pytest compatibility reasons. It was never officially - # supported there and is explicitely deprecated since 2.8 - # so we can remove it soon, allowing to avoid the below recursion - # in execute() and simplify/speed up the execute loop. - - def __init__(self, methods, kwargs, firstresult=False): - self.methods = methods - self.kwargs = kwargs - self.kwargs["__multicall__"] = self - self.firstresult = firstresult - - def execute(self): - all_kwargs = self.kwargs - self.results = results = [] - firstresult = self.firstresult - - while self.methods: - method = self.methods.pop() - args = [all_kwargs[argname] for argname in varnames(method)] - if hasattr(method, "hookwrapper"): - return wrapped_call(method(*args), self.execute) - res = method(*args) - if res is not None: - if firstresult: - return res - results.append(res) - - if not firstresult: - return results - - def __repr__(self): - status = "%d meths" % (len(self.methods),) - if hasattr(self, "results"): - status = ("%d results, " % len(self.results)) + status - return "" %(status, self.kwargs) - - - -def varnames(func, startindex=None): - """ return argument name tuple for a function, method, class or callable. - - In case of a class, its "__init__" method is considered. - For methods the "self" parameter is not included unless you are passing - an unbound method with Python3 (which has no supports for unbound methods) - """ - cache = getattr(func, "__dict__", {}) - try: - return cache["_varnames"] - except KeyError: - pass - if isclass(func): - try: - func = func.__init__ - except AttributeError: - return () - startindex = 1 - else: - if not isfunction(func) and not ismethod(func): - func = getattr(func, '__call__', func) - if startindex is None: - startindex = int(ismethod(func)) - - rawcode = py.code.getrawcode(func) - try: - x = rawcode.co_varnames[startindex:rawcode.co_argcount] - except AttributeError: - x = () - else: - defaults = func.__defaults__ - if defaults: - x = x[:-len(defaults)] - try: - cache["_varnames"] = x - except TypeError: - pass - return x - - -class HookRelay: - def __init__(self, trace): - self._trace = trace - - -class HookCaller(object): - def __init__(self, name, hook_execute, specmodule_or_class=None): - self.name = name - self._plugins = [] - self._wrappers = [] - self._nonwrappers = [] - self._hookexec = hook_execute - if specmodule_or_class is not None: - self.set_specification(specmodule_or_class) - - def has_spec(self): - return hasattr(self, "_specmodule_or_class") - - def set_specification(self, specmodule_or_class): - assert not self.has_spec() - self._specmodule_or_class = specmodule_or_class - specfunc = getattr(specmodule_or_class, self.name) - argnames = varnames(specfunc, startindex=isclass(specmodule_or_class)) - assert "self" not in argnames # sanity check - self.argnames = ["__multicall__"] + list(argnames) - self.firstresult = getattr(specfunc, 'firstresult', False) - if hasattr(specfunc, "historic"): - self._call_history = [] - - def is_historic(self): - return hasattr(self, "_call_history") - - def _remove_plugin(self, plugin): - self._plugins.remove(plugin) - meth = getattr(plugin, self.name) - try: - self._nonwrappers.remove(meth) - except ValueError: - self._wrappers.remove(meth) - - def _add_plugin(self, plugin): - self._plugins.append(plugin) - self._add_method(getattr(plugin, self.name)) - - def _add_method(self, meth): - if hasattr(meth, 'hookwrapper'): - methods = self._wrappers - else: - methods = self._nonwrappers - - if hasattr(meth, 'trylast'): - methods.insert(0, meth) - elif hasattr(meth, 'tryfirst'): - methods.append(meth) - else: - # find last non-tryfirst method - i = len(methods) - 1 - while i >= 0 and hasattr(methods[i], "tryfirst"): - i -= 1 - methods.insert(i + 1, meth) - - def __repr__(self): - return "" %(self.name,) - - def __call__(self, **kwargs): - assert not self.is_historic() - return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs) - - def call_historic(self, proc=None, kwargs=None): - self._call_history.append((kwargs or {}, proc)) - # historizing hooks don't return results - self._hookexec(self, self._nonwrappers + self._wrappers, kwargs) - - def call_extra(self, methods, kwargs): - """ Call the hook with some additional temporarily participating - methods using the specified kwargs as call parameters. """ - old = list(self._nonwrappers), list(self._wrappers) - for method in methods: - self._add_method(method) - try: - return self(**kwargs) - finally: - self._nonwrappers, self._wrappers = old - - def _maybe_apply_history(self, method): - if self.is_historic(): - for kwargs, proc in self._call_history: - res = self._hookexec(self, [method], kwargs) - if res and proc is not None: - proc(res[0]) - - -class PluginValidationError(Exception): - """ plugin failed validation. """ - - -def formatdef(func): - return "%s%s" % ( - func.__name__, - formatargspec(*getargspec(func)) - ) diff --git a/_pytest/genscript.py b/_pytest/genscript.py index 22f3fdeae..32186cc10 100755 --- a/_pytest/genscript.py +++ b/_pytest/genscript.py @@ -4,6 +4,7 @@ import sys import pkgutil import py +import pluggy import _pytest @@ -69,7 +70,7 @@ def pytest_cmdline_main(config): genscript = config.getvalue("genscript") if genscript: tw = py.io.TerminalWriter() - deps = ['py', '_pytest', 'pytest'] + deps = ['py', '_pytest', 'pytest', 'pluggy'] if sys.version_info < (2,7): deps.append("argparse") tw.line("generated script will run on python2.6-python3.3++") diff --git a/_pytest/hookspec.py b/_pytest/hookspec.py index cf8947ada..0438b9072 100644 --- a/_pytest/hookspec.py +++ b/_pytest/hookspec.py @@ -1,6 +1,6 @@ """ hook specifications for pytest plugins, invoked from main.py and builtin plugins. """ -from _pytest.core import hookspec_opts +from pluggy import hookspec_opts # ------------------------------------------------------------------------- # Initialization hooks called for every plugin diff --git a/_pytest/pytester.py b/_pytest/pytester.py index 53a077a71..ee83bee20 100644 --- a/_pytest/pytester.py +++ b/_pytest/pytester.py @@ -13,7 +13,7 @@ import subprocess import py import pytest from py.builtin import print_ -from _pytest.core import TracedHookExecution +from pluggy import _TracedHookExecution from _pytest.main import Session, EXIT_OK @@ -198,7 +198,7 @@ class HookRecorder: self.calls.append(ParsedCall(hook.name, kwargs)) def after(outcome, hook, method, kwargs): pass - executor = TracedHookExecution(pluginmanager, before, after) + executor = _TracedHookExecution(pluginmanager, before, after) self._undo_wrapping = executor.undo def finish_recording(self): diff --git a/_pytest/python.py b/_pytest/python.py index e849ca6fe..73cce4b32 100644 --- a/_pytest/python.py +++ b/_pytest/python.py @@ -8,7 +8,11 @@ from _pytest.mark import MarkDecorator, MarkerError from py._code.code import TerminalRepr import _pytest -cutdir = py.path.local(_pytest.__file__).dirpath() +import pluggy + +cutdir2 = py.path.local(_pytest.__file__).dirpath() +cutdir1 = py.path.local(pluggy.__file__.rstrip("oc")) + NoneType = type(None) NOTSET = object() @@ -18,6 +22,11 @@ callable = py.builtin.callable # used to work around a python2 exception info leak exc_clear = getattr(sys, 'exc_clear', lambda: None) + +def filter_traceback(entry): + return entry.path != cutdir1 and not entry.path.relto(cutdir2) + + def getfslineno(obj): # xxx let decorators etc specify a sane ordering while hasattr(obj, "__wrapped__"): @@ -604,7 +613,11 @@ class FunctionMixin(PyobjMixin): if ntraceback == traceback: ntraceback = ntraceback.cut(path=path) if ntraceback == traceback: - ntraceback = ntraceback.cut(excludepath=cutdir) + #ntraceback = ntraceback.cut(excludepath=cutdir2) + ntraceback = ntraceback.filter(filter_traceback) + if not ntraceback: + ntraceback = traceback + excinfo.traceback = ntraceback.filter() # issue364: mark all but first and last frames to # only show a single-line message for each frame diff --git a/pytest.py b/pytest.py index 5979d9f2e..9ea6d63ed 100644 --- a/pytest.py +++ b/pytest.py @@ -12,7 +12,7 @@ if __name__ == '__main__': # if run as a script or by 'python -m pytest' # else we are imported from _pytest.config import main, UsageError, _preloadplugins, cmdline -from _pytest.core import hookspec_opts, hookimpl_opts +from pluggy import hookspec_opts, hookimpl_opts from _pytest import __version__ _preloadplugins() # to populate pytest.* namespace so help(pytest) works diff --git a/setup.py b/setup.py index 4671132c5..909c0957e 100644 --- a/setup.py +++ b/setup.py @@ -48,7 +48,7 @@ def has_environment_marker_support(): def main(): - install_requires = ['py>=1.4.27.dev2'] + install_requires = ['py>=1.4.27.dev2', 'pluggy>=0.1.0,<0.2.0'] extras_require = {} if has_environment_marker_support(): extras_require[':python_version=="2.6" or python_version=="3.0" or python_version=="3.1"'] = ['argparse'] diff --git a/testing/test_core.py b/testing/test_core.py deleted file mode 100644 index aad4760e9..000000000 --- a/testing/test_core.py +++ /dev/null @@ -1,1050 +0,0 @@ -import pytest, py, os -from _pytest.core import * # noqa -from _pytest.config import get_config - - -@pytest.fixture -def pm(): - return PluginManager("he") - -@pytest.fixture -def pytestpm(): - return PytestPluginManager() - - -class TestPluginManager: - def test_plugin_double_register(self, pm): - pm.register(42, name="abc") - with pytest.raises(ValueError): - pm.register(42, name="abc") - with pytest.raises(ValueError): - pm.register(42, name="def") - - def test_pm(self, pm): - class A: pass - a1, a2 = A(), A() - pm.register(a1) - assert pm.is_registered(a1) - pm.register(a2, "hello") - assert pm.is_registered(a2) - l = pm.get_plugins() - assert a1 in l - assert a2 in l - assert pm.get_plugin('hello') == a2 - assert pm.unregister(a1) == a1 - assert not pm.is_registered(a1) - - def test_pm_name(self, pm): - class A: pass - a1 = A() - name = pm.register(a1, name="hello") - assert name == "hello" - pm.unregister(a1) - assert pm.get_plugin(a1) is None - assert not pm.is_registered(a1) - assert not pm.get_plugins() - name2 = pm.register(a1, name="hello") - assert name2 == name - pm.unregister(name="hello") - assert pm.get_plugin(a1) is None - assert not pm.is_registered(a1) - assert not pm.get_plugins() - - def test_set_blocked(self, pm): - class A: pass - a1 = A() - name = pm.register(a1) - assert pm.is_registered(a1) - pm.set_blocked(name) - assert not pm.is_registered(a1) - - pm.set_blocked("somename") - assert not pm.register(A(), "somename") - pm.unregister(name="somename") - - def test_register_mismatch_method(self, pytestpm): - class hello: - def pytest_gurgel(self): - pass - pytestpm.register(hello()) - with pytest.raises(PluginValidationError): - pytestpm.check_pending() - - def test_register_mismatch_arg(self): - pm = get_config().pluginmanager - class hello: - def pytest_configure(self, asd): - pass - pytest.raises(Exception, lambda: pm.register(hello())) - - def test_register(self): - pm = get_config().pluginmanager - class MyPlugin: - pass - my = MyPlugin() - pm.register(my) - assert pm.get_plugins() - my2 = MyPlugin() - pm.register(my2) - assert set([my,my2]).issubset(pm.get_plugins()) - - assert pm.is_registered(my) - assert pm.is_registered(my2) - pm.unregister(my) - assert not pm.is_registered(my) - assert my not in pm.get_plugins() - - def test_register_unknown_hooks(self, pm): - class Plugin1: - def he_method1(self, arg): - return arg + 1 - - pm.register(Plugin1()) - class Hooks: - def he_method1(self, arg): - pass - pm.addhooks(Hooks) - #assert not pm._unverified_hooks - assert pm.hook.he_method1(arg=1) == [2] - - def test_register_historic(self, pm): - class Hooks: - @hookspec_opts(historic=True) - def he_method1(self, arg): - pass - pm.addhooks(Hooks) - - pm.hook.he_method1.call_historic(kwargs=dict(arg=1)) - l = [] - class Plugin: - def he_method1(self, arg): - l.append(arg) - - pm.register(Plugin()) - assert l == [1] - - class Plugin2: - def he_method1(self, arg): - l.append(arg*10) - pm.register(Plugin2()) - assert l == [1, 10] - pm.hook.he_method1.call_historic(kwargs=dict(arg=12)) - assert l == [1, 10, 120, 12] - - def test_with_result_memorized(self, pm): - class Hooks: - @hookspec_opts(historic=True) - def he_method1(self, arg): - pass - pm.addhooks(Hooks) - - he_method1 = pm.hook.he_method1 - he_method1.call_historic(lambda res: l.append(res), dict(arg=1)) - l = [] - class Plugin: - def he_method1(self, arg): - return arg * 10 - - pm.register(Plugin()) - assert l == [10] - - def test_register_historic_incompat_hookwrapper(self, pm): - class Hooks: - @hookspec_opts(historic=True) - def he_method1(self, arg): - pass - pm.addhooks(Hooks) - - l = [] - class Plugin: - @hookimpl_opts(hookwrapper=True) - def he_method1(self, arg): - l.append(arg) - - with pytest.raises(PluginValidationError): - pm.register(Plugin()) - - def test_call_extra(self, pm): - class Hooks: - def he_method1(self, arg): - pass - pm.addhooks(Hooks) - - def he_method1(arg): - return arg * 10 - - l = pm.hook.he_method1.call_extra([he_method1], dict(arg=1)) - assert l == [10] - - def test_subset_hook_caller(self, pm): - class Hooks: - def he_method1(self, arg): - pass - pm.addhooks(Hooks) - - l = [] - class Plugin1: - def he_method1(self, arg): - l.append(arg) - class Plugin2: - def he_method1(self, arg): - l.append(arg*10) - class PluginNo: - pass - - plugin1, plugin2, plugin3 = Plugin1(), Plugin2(), PluginNo() - pm.register(plugin1) - pm.register(plugin2) - pm.register(plugin3) - pm.hook.he_method1(arg=1) - assert l == [10, 1] - l[:] = [] - - hc = pm.subset_hook_caller("he_method1", [plugin1]) - hc(arg=2) - assert l == [20] - l[:] = [] - - hc = pm.subset_hook_caller("he_method1", [plugin2]) - hc(arg=2) - assert l == [2] - l[:] = [] - - pm.unregister(plugin1) - hc(arg=2) - assert l == [] - l[:] = [] - - pm.hook.he_method1(arg=1) - assert l == [10] - - - -class TestAddMethodOrdering: - @pytest.fixture - def hc(self, pm): - class Hooks: - def he_method1(self, arg): - pass - pm.addhooks(Hooks) - return pm.hook.he_method1 - - @pytest.fixture - def addmeth(self, hc): - def addmeth(tryfirst=False, trylast=False, hookwrapper=False): - def wrap(func): - if tryfirst: - func.tryfirst = True - if trylast: - func.trylast = True - if hookwrapper: - func.hookwrapper = True - hc._add_method(func) - return func - return wrap - return addmeth - - def test_adding_nonwrappers(self, hc, addmeth): - @addmeth() - def he_method1(): - pass - - @addmeth() - def he_method2(): - pass - - @addmeth() - def he_method3(): - pass - assert hc._nonwrappers == [he_method1, he_method2, he_method3] - - def test_adding_nonwrappers_trylast(self, hc, addmeth): - @addmeth() - def he_method1_middle(): - pass - - @addmeth(trylast=True) - def he_method1(): - pass - - @addmeth() - def he_method1_b(): - pass - assert hc._nonwrappers == [he_method1, he_method1_middle, he_method1_b] - - def test_adding_nonwrappers_trylast3(self, hc, addmeth): - @addmeth() - def he_method1_a(): - pass - - @addmeth(trylast=True) - def he_method1_b(): - pass - - @addmeth() - def he_method1_c(): - pass - - @addmeth(trylast=True) - def he_method1_d(): - pass - assert hc._nonwrappers == [he_method1_d, he_method1_b, - he_method1_a, he_method1_c] - - - def test_adding_nonwrappers_trylast2(self, hc, addmeth): - @addmeth() - def he_method1_middle(): - pass - - @addmeth() - def he_method1_b(): - pass - - @addmeth(trylast=True) - def he_method1(): - pass - assert hc._nonwrappers == [he_method1, he_method1_middle, he_method1_b] - - def test_adding_nonwrappers_tryfirst(self, hc, addmeth): - @addmeth(tryfirst=True) - def he_method1(): - pass - - @addmeth() - def he_method1_middle(): - pass - - @addmeth() - def he_method1_b(): - pass - assert hc._nonwrappers == [he_method1_middle, he_method1_b, he_method1] - - def test_adding_wrappers_ordering(self, hc, addmeth): - @addmeth(hookwrapper=True) - def he_method1(): - pass - - @addmeth() - def he_method1_middle(): - pass - - @addmeth(hookwrapper=True) - def he_method3(): - pass - - assert hc._nonwrappers == [he_method1_middle] - assert hc._wrappers == [he_method1, he_method3] - - def test_adding_wrappers_ordering_tryfirst(self, hc, addmeth): - @addmeth(hookwrapper=True, tryfirst=True) - def he_method1(): - pass - - @addmeth(hookwrapper=True) - def he_method2(): - pass - - assert hc._nonwrappers == [] - assert hc._wrappers == [he_method2, he_method1] - - - def test_hookspec_opts(self, pm): - class HookSpec: - @hookspec_opts() - def he_myhook1(self, arg1): - pass - - @hookspec_opts(firstresult=True) - def he_myhook2(self, arg1): - pass - - @hookspec_opts(firstresult=False) - def he_myhook3(self, arg1): - pass - - pm.addhooks(HookSpec) - assert not pm.hook.he_myhook1.firstresult - assert pm.hook.he_myhook2.firstresult - assert not pm.hook.he_myhook3.firstresult - - - def test_hookimpl_opts(self): - for name in ["hookwrapper", "optionalhook", "tryfirst", "trylast"]: - for val in [True, False]: - @hookimpl_opts(**{name: val}) - def he_myhook1(self, arg1): - pass - if val: - assert getattr(he_myhook1, name) - else: - assert not hasattr(he_myhook1, name) - - def test_decorator_functional(self, pm): - class HookSpec: - @hookspec_opts(firstresult=True) - def he_myhook(self, arg1): - """ add to arg1 """ - pm.addhooks(HookSpec) - - class Plugin: - @hookimpl_opts() - def he_myhook(self, arg1): - return arg1 + 1 - - pm.register(Plugin()) - results = pm.hook.he_myhook(arg1=17) - assert results == 18 - - def test_load_setuptools_instantiation(self, monkeypatch, pm): - pkg_resources = pytest.importorskip("pkg_resources") - def my_iter(name): - assert name == "hello" - class EntryPoint: - name = "myname" - dist = None - def load(self): - class PseudoPlugin: - x = 42 - return PseudoPlugin() - return iter([EntryPoint()]) - - monkeypatch.setattr(pkg_resources, 'iter_entry_points', my_iter) - num = pm.load_setuptools_entrypoints("hello") - assert num == 1 - plugin = pm.get_plugin("myname") - assert plugin.x == 42 - assert pm._plugin_distinfo == [(None, plugin)] - - def test_load_setuptools_not_installed(self, monkeypatch, pm): - monkeypatch.setitem(py.std.sys.modules, 'pkg_resources', - py.std.types.ModuleType("pkg_resources")) - with pytest.raises(ImportError): - pm.load_setuptools_entrypoints("qwe") - - -class TestPytestPluginInteractions: - - def test_addhooks_conftestplugin(self, testdir): - testdir.makepyfile(newhooks=""" - def pytest_myhook(xyz): - "new hook" - """) - conf = testdir.makeconftest(""" - import sys ; sys.path.insert(0, '.') - import newhooks - def pytest_addhooks(pluginmanager): - pluginmanager.addhooks(newhooks) - def pytest_myhook(xyz): - return xyz + 1 - """) - config = get_config() - pm = config.pluginmanager - pm.hook.pytest_addhooks.call_historic( - kwargs=dict(pluginmanager=config.pluginmanager)) - config.pluginmanager._importconftest(conf) - #print(config.pluginmanager.get_plugins()) - res = config.hook.pytest_myhook(xyz=10) - assert res == [11] - - def test_addhooks_nohooks(self, testdir): - testdir.makeconftest(""" - import sys - def pytest_addhooks(pluginmanager): - pluginmanager.addhooks(sys) - """) - res = testdir.runpytest() - assert res.ret != 0 - res.stderr.fnmatch_lines([ - "*did not find*sys*" - ]) - - def test_namespace_early_from_import(self, testdir): - p = testdir.makepyfile(""" - from pytest import Item - from pytest import Item as Item2 - assert Item is Item2 - """) - result = testdir.runpython(p) - assert result.ret == 0 - - def test_do_ext_namespace(self, testdir): - testdir.makeconftest(""" - def pytest_namespace(): - return {'hello': 'world'} - """) - p = testdir.makepyfile(""" - from pytest import hello - import pytest - def test_hello(): - assert hello == "world" - assert 'hello' in pytest.__all__ - """) - reprec = testdir.inline_run(p) - reprec.assertoutcome(passed=1) - - def test_do_option_postinitialize(self, testdir): - config = testdir.parseconfigure() - assert not hasattr(config.option, 'test123') - p = testdir.makepyfile(""" - def pytest_addoption(parser): - parser.addoption('--test123', action="store_true", - default=True) - """) - config.pluginmanager._importconftest(p) - assert config.option.test123 - - def test_configure(self, testdir): - config = testdir.parseconfig() - l = [] - class A: - def pytest_configure(self, config): - l.append(self) - - config.pluginmanager.register(A()) - assert len(l) == 0 - config._do_configure() - assert len(l) == 1 - config.pluginmanager.register(A()) # leads to a configured() plugin - assert len(l) == 2 - assert l[0] != l[1] - - config._ensure_unconfigure() - config.pluginmanager.register(A()) - assert len(l) == 2 - - def test_hook_tracing(self): - pytestpm = get_config().pluginmanager # fully initialized with plugins - saveindent = [] - class api1: - def pytest_plugin_registered(self): - saveindent.append(pytestpm.trace.root.indent) - class api2: - def pytest_plugin_registered(self): - saveindent.append(pytestpm.trace.root.indent) - raise ValueError() - l = [] - pytestpm.trace.root.setwriter(l.append) - undo = pytestpm.enable_tracing() - try: - indent = pytestpm.trace.root.indent - p = api1() - pytestpm.register(p) - assert pytestpm.trace.root.indent == indent - assert len(l) >= 2 - assert 'pytest_plugin_registered' in l[0] - assert 'finish' in l[1] - - l[:] = [] - with pytest.raises(ValueError): - pytestpm.register(api2()) - assert pytestpm.trace.root.indent == indent - assert saveindent[0] > indent - finally: - undo() - - def test_warn_on_deprecated_multicall(self, pytestpm): - class Plugin: - def pytest_configure(self, __multicall__): - pass - - before = list(pytestpm._warnings) - pytestpm.register(Plugin()) - assert len(pytestpm._warnings) == len(before) + 1 - assert "deprecated" in pytestpm._warnings[-1]["message"] - - -def test_namespace_has_default_and_env_plugins(testdir): - p = testdir.makepyfile(""" - import pytest - pytest.mark - """) - result = testdir.runpython(p) - assert result.ret == 0 - -def test_varnames(): - def f(x): - i = 3 # noqa - class A: - def f(self, y): - pass - class B(object): - def __call__(self, z): - pass - assert varnames(f) == ("x",) - assert varnames(A().f) == ('y',) - assert varnames(B()) == ('z',) - -def test_varnames_default(): - def f(x, y=3): - pass - assert varnames(f) == ("x",) - -def test_varnames_class(): - class C: - def __init__(self, x): - pass - class D: - pass - assert varnames(C) == ("x",) - assert varnames(D) == () - -class TestMultiCall: - def test_uses_copy_of_methods(self): - l = [lambda: 42] - mc = MultiCall(l, {}) - repr(mc) - l[:] = [] - res = mc.execute() - return res == 42 - - def test_call_passing(self): - class P1: - def m(self, __multicall__, x): - assert len(__multicall__.results) == 1 - assert not __multicall__.methods - return 17 - - class P2: - def m(self, __multicall__, x): - assert __multicall__.results == [] - assert __multicall__.methods - return 23 - - p1 = P1() - p2 = P2() - multicall = MultiCall([p1.m, p2.m], {'x': 23}) - assert "23" in repr(multicall) - reslist = multicall.execute() - assert len(reslist) == 2 - # ensure reversed order - assert reslist == [23, 17] - - def test_keyword_args(self): - def f(x): - return x + 1 - class A: - def f(self, x, y): - return x + y - multicall = MultiCall([f, A().f], dict(x=23, y=24)) - assert "'x': 23" in repr(multicall) - assert "'y': 24" in repr(multicall) - reslist = multicall.execute() - assert reslist == [24+23, 24] - assert "2 results" in repr(multicall) - - def test_keyword_args_with_defaultargs(self): - def f(x, z=1): - return x + z - reslist = MultiCall([f], dict(x=23, y=24)).execute() - assert reslist == [24] - - def test_tags_call_error(self): - multicall = MultiCall([lambda x: x], {}) - pytest.raises(KeyError, multicall.execute) - - def test_call_subexecute(self): - def m(__multicall__): - subresult = __multicall__.execute() - return subresult + 1 - - def n(): - return 1 - - call = MultiCall([n, m], {}, firstresult=True) - res = call.execute() - assert res == 2 - - def test_call_none_is_no_result(self): - def m1(): - return 1 - def m2(): - return None - res = MultiCall([m1, m2], {}, firstresult=True).execute() - assert res == 1 - res = MultiCall([m1, m2], {}).execute() - assert res == [1] - - def test_hookwrapper(self): - l = [] - def m1(): - l.append("m1 init") - yield None - l.append("m1 finish") - m1.hookwrapper = True - - def m2(): - l.append("m2") - return 2 - res = MultiCall([m2, m1], {}).execute() - assert res == [2] - assert l == ["m1 init", "m2", "m1 finish"] - l[:] = [] - res = MultiCall([m2, m1], {}, firstresult=True).execute() - assert res == 2 - assert l == ["m1 init", "m2", "m1 finish"] - - def test_hookwrapper_order(self): - l = [] - def m1(): - l.append("m1 init") - yield 1 - l.append("m1 finish") - m1.hookwrapper = True - - def m2(): - l.append("m2 init") - yield 2 - l.append("m2 finish") - m2.hookwrapper = True - res = MultiCall([m2, m1], {}).execute() - assert res == [] - assert l == ["m1 init", "m2 init", "m2 finish", "m1 finish"] - - def test_hookwrapper_not_yield(self): - def m1(): - pass - m1.hookwrapper = True - - mc = MultiCall([m1], {}) - with pytest.raises(TypeError): - mc.execute() - - def test_hookwrapper_too_many_yield(self): - def m1(): - yield 1 - yield 2 - m1.hookwrapper = True - - mc = MultiCall([m1], {}) - with pytest.raises(RuntimeError) as ex: - mc.execute() - assert "m1" in str(ex.value) - assert "test_core.py:" in str(ex.value) - - @pytest.mark.parametrize("exc", [ValueError, SystemExit]) - def test_hookwrapper_exception(self, exc): - l = [] - def m1(): - l.append("m1 init") - yield None - l.append("m1 finish") - m1.hookwrapper = True - - def m2(): - raise exc - with pytest.raises(exc): - MultiCall([m2, m1], {}).execute() - assert l == ["m1 init", "m1 finish"] - - -class TestHookRelay: - def test_hapmypath(self): - class Api: - def hello(self, arg): - "api hook 1" - pm = PluginManager("he") - pm.addhooks(Api) - hook = pm.hook - assert hasattr(hook, 'hello') - assert repr(hook.hello).find("hello") != -1 - class Plugin: - def hello(self, arg): - return arg + 1 - plugin = Plugin() - pm.register(plugin) - l = hook.hello(arg=3) - assert l == [4] - assert not hasattr(hook, 'world') - pm.unregister(plugin) - assert hook.hello(arg=3) == [] - - def test_argmismatch(self): - class Api: - def hello(self, arg): - "api hook 1" - pm = PluginManager("he") - pm.addhooks(Api) - class Plugin: - def hello(self, argwrong): - return arg + 1 - with pytest.raises(PluginValidationError) as exc: - pm.register(Plugin()) - assert "argwrong" in str(exc.value) - - def test_only_kwargs(self): - pm = PluginManager("he") - class Api: - def hello(self, arg): - "api hook 1" - pm.addhooks(Api) - pytest.raises(TypeError, lambda: pm.hook.hello(3)) - - def test_firstresult_definition(self): - class Api: - def hello(self, arg): - "api hook 1" - hello.firstresult = True - pm = PluginManager("he") - pm.addhooks(Api) - class Plugin: - def hello(self, arg): - return arg + 1 - pm.register(Plugin()) - res = pm.hook.hello(arg=3) - assert res == 4 - -class TestTracer: - def test_simple(self): - from _pytest.core import TagTracer - rootlogger = TagTracer() - log = rootlogger.get("pytest") - log("hello") - l = [] - rootlogger.setwriter(l.append) - log("world") - assert len(l) == 1 - assert l[0] == "world [pytest]\n" - sublog = log.get("collection") - sublog("hello") - assert l[1] == "hello [pytest:collection]\n" - - def test_indent(self): - from _pytest.core import TagTracer - rootlogger = TagTracer() - log = rootlogger.get("1") - l = [] - log.root.setwriter(lambda arg: l.append(arg)) - log("hello") - log.root.indent += 1 - log("line1") - log("line2") - log.root.indent += 1 - log("line3") - log("line4") - log.root.indent -= 1 - log("line5") - log.root.indent -= 1 - log("last") - assert len(l) == 7 - names = [x[:x.rfind(' [')] for x in l] - assert names == ['hello', ' line1', ' line2', - ' line3', ' line4', ' line5', 'last'] - - def test_readable_output_dictargs(self): - from _pytest.core import TagTracer - rootlogger = TagTracer() - - out = rootlogger.format_message(['test'], [1]) - assert out == ['1 [test]\n'] - - out2= rootlogger.format_message(['test'], ['test', {'a':1}]) - assert out2 ==[ - 'test [test]\n', - ' a: 1\n' - ] - - def test_setprocessor(self): - from _pytest.core import TagTracer - rootlogger = TagTracer() - log = rootlogger.get("1") - log2 = log.get("2") - assert log2.tags == tuple("12") - l = [] - rootlogger.setprocessor(tuple("12"), lambda *args: l.append(args)) - log("not seen") - log2("seen") - assert len(l) == 1 - tags, args = l[0] - assert "1" in tags - assert "2" in tags - assert args == ("seen",) - l2 = [] - rootlogger.setprocessor("1:2", lambda *args: l2.append(args)) - log2("seen") - tags, args = l2[0] - assert args == ("seen",) - - - def test_setmyprocessor(self): - from _pytest.core import TagTracer - rootlogger = TagTracer() - log = rootlogger.get("1") - log2 = log.get("2") - l = [] - log2.setmyprocessor(lambda *args: l.append(args)) - log("not seen") - assert not l - log2(42) - assert len(l) == 1 - tags, args = l[0] - assert "1" in tags - assert "2" in tags - assert args == (42,) - -def test_default_markers(testdir): - result = testdir.runpytest("--markers") - result.stdout.fnmatch_lines([ - "*tryfirst*first*", - "*trylast*last*", - ]) - -def test_importplugin_issue375(testdir, pytestpm): - testdir.syspathinsert(testdir.tmpdir) - testdir.makepyfile(qwe="import aaaa") - with pytest.raises(ImportError) as excinfo: - pytestpm.import_plugin("qwe") - assert "qwe" not in str(excinfo.value) - assert "aaaa" in str(excinfo.value) - - -### to be shifted to own test file -from _pytest.config import PytestPluginManager - -class TestPytestPluginManager: - def test_register_imported_modules(self): - pm = PytestPluginManager() - mod = py.std.types.ModuleType("x.y.pytest_hello") - pm.register(mod) - assert pm.is_registered(mod) - l = pm.get_plugins() - assert mod in l - pytest.raises(ValueError, "pm.register(mod)") - pytest.raises(ValueError, lambda: pm.register(mod)) - #assert not pm.is_registered(mod2) - assert pm.get_plugins() == l - - def test_canonical_import(self, monkeypatch): - mod = py.std.types.ModuleType("pytest_xyz") - monkeypatch.setitem(py.std.sys.modules, 'pytest_xyz', mod) - pm = PytestPluginManager() - pm.import_plugin('pytest_xyz') - assert pm.get_plugin('pytest_xyz') == mod - assert pm.is_registered(mod) - - def test_consider_module(self, testdir, pytestpm): - testdir.syspathinsert() - testdir.makepyfile(pytest_p1="#") - testdir.makepyfile(pytest_p2="#") - mod = py.std.types.ModuleType("temp") - mod.pytest_plugins = ["pytest_p1", "pytest_p2"] - pytestpm.consider_module(mod) - assert pytestpm.get_plugin("pytest_p1").__name__ == "pytest_p1" - assert pytestpm.get_plugin("pytest_p2").__name__ == "pytest_p2" - - def test_consider_module_import_module(self, testdir): - pytestpm = get_config().pluginmanager - mod = py.std.types.ModuleType("x") - mod.pytest_plugins = "pytest_a" - aplugin = testdir.makepyfile(pytest_a="#") - reprec = testdir.make_hook_recorder(pytestpm) - #syspath.prepend(aplugin.dirpath()) - py.std.sys.path.insert(0, str(aplugin.dirpath())) - pytestpm.consider_module(mod) - call = reprec.getcall(pytestpm.hook.pytest_plugin_registered.name) - assert call.plugin.__name__ == "pytest_a" - - # check that it is not registered twice - pytestpm.consider_module(mod) - l = reprec.getcalls("pytest_plugin_registered") - assert len(l) == 1 - - def test_consider_env_fails_to_import(self, monkeypatch, pytestpm): - monkeypatch.setenv('PYTEST_PLUGINS', 'nonexisting', prepend=",") - with pytest.raises(ImportError): - pytestpm.consider_env() - - def test_plugin_skip(self, testdir, monkeypatch): - p = testdir.makepyfile(skipping1=""" - import pytest - pytest.skip("hello") - """) - p.copy(p.dirpath("skipping2.py")) - monkeypatch.setenv("PYTEST_PLUGINS", "skipping2") - result = testdir.runpytest("-rw", "-p", "skipping1", syspathinsert=True) - assert result.ret == 0 - result.stdout.fnmatch_lines([ - "WI1*skipped plugin*skipping1*hello*", - "WI1*skipped plugin*skipping2*hello*", - ]) - - def test_consider_env_plugin_instantiation(self, testdir, monkeypatch, pytestpm): - testdir.syspathinsert() - testdir.makepyfile(xy123="#") - monkeypatch.setitem(os.environ, 'PYTEST_PLUGINS', 'xy123') - l1 = len(pytestpm.get_plugins()) - pytestpm.consider_env() - l2 = len(pytestpm.get_plugins()) - assert l2 == l1 + 1 - assert pytestpm.get_plugin('xy123') - pytestpm.consider_env() - l3 = len(pytestpm.get_plugins()) - assert l2 == l3 - - def test_pluginmanager_ENV_startup(self, testdir, monkeypatch): - testdir.makepyfile(pytest_x500="#") - p = testdir.makepyfile(""" - import pytest - def test_hello(pytestconfig): - plugin = pytestconfig.pluginmanager.get_plugin('pytest_x500') - assert plugin is not None - """) - monkeypatch.setenv('PYTEST_PLUGINS', 'pytest_x500', prepend=",") - result = testdir.runpytest(p, syspathinsert=True) - assert result.ret == 0 - result.stdout.fnmatch_lines(["*1 passed*"]) - - def test_import_plugin_importname(self, testdir, pytestpm): - pytest.raises(ImportError, 'pytestpm.import_plugin("qweqwex.y")') - pytest.raises(ImportError, 'pytestpm.import_plugin("pytest_qweqwx.y")') - - testdir.syspathinsert() - pluginname = "pytest_hello" - testdir.makepyfile(**{pluginname: ""}) - pytestpm.import_plugin("pytest_hello") - len1 = len(pytestpm.get_plugins()) - pytestpm.import_plugin("pytest_hello") - len2 = len(pytestpm.get_plugins()) - assert len1 == len2 - plugin1 = pytestpm.get_plugin("pytest_hello") - assert plugin1.__name__.endswith('pytest_hello') - plugin2 = pytestpm.get_plugin("pytest_hello") - assert plugin2 is plugin1 - - def test_import_plugin_dotted_name(self, testdir, pytestpm): - pytest.raises(ImportError, 'pytestpm.import_plugin("qweqwex.y")') - pytest.raises(ImportError, 'pytestpm.import_plugin("pytest_qweqwex.y")') - - testdir.syspathinsert() - testdir.mkpydir("pkg").join("plug.py").write("x=3") - pluginname = "pkg.plug" - pytestpm.import_plugin(pluginname) - mod = pytestpm.get_plugin("pkg.plug") - assert mod.x == 3 - - def test_consider_conftest_deps(self, testdir, pytestpm): - mod = testdir.makepyfile("pytest_plugins='xyz'").pyimport() - with pytest.raises(ImportError): - pytestpm.consider_conftest(mod) - - -class TestPytestPluginManagerBootstrapming: - def test_preparse_args(self, pytestpm): - pytest.raises(ImportError, lambda: - pytestpm.consider_preparse(["xyz", "-p", "hello123"])) - - def test_plugin_prevent_register(self, pytestpm): - pytestpm.consider_preparse(["xyz", "-p", "no:abc"]) - l1 = pytestpm.get_plugins() - pytestpm.register(42, name="abc") - l2 = pytestpm.get_plugins() - assert len(l2) == len(l1) - assert 42 not in l2 - - def test_plugin_prevent_register_unregistered_alredy_registered(self, pytestpm): - pytestpm.register(42, name="abc") - l1 = pytestpm.get_plugins() - assert 42 in l1 - pytestpm.consider_preparse(["xyz", "-p", "no:abc"]) - l2 = pytestpm.get_plugins() - assert 42 not in l2 diff --git a/testing/test_pluginmanager.py b/testing/test_pluginmanager.py new file mode 100644 index 000000000..62ecc544f --- /dev/null +++ b/testing/test_pluginmanager.py @@ -0,0 +1,311 @@ +import pytest +import py +import os + +from _pytest.config import get_config, PytestPluginManager + +@pytest.fixture +def pytestpm(): + return PytestPluginManager() + +class TestPytestPluginInteractions: + def test_addhooks_conftestplugin(self, testdir): + testdir.makepyfile(newhooks=""" + def pytest_myhook(xyz): + "new hook" + """) + conf = testdir.makeconftest(""" + import sys ; sys.path.insert(0, '.') + import newhooks + def pytest_addhooks(pluginmanager): + pluginmanager.addhooks(newhooks) + def pytest_myhook(xyz): + return xyz + 1 + """) + config = get_config() + pm = config.pluginmanager + pm.hook.pytest_addhooks.call_historic( + kwargs=dict(pluginmanager=config.pluginmanager)) + config.pluginmanager._importconftest(conf) + #print(config.pluginmanager.get_plugins()) + res = config.hook.pytest_myhook(xyz=10) + assert res == [11] + + def test_addhooks_nohooks(self, testdir): + testdir.makeconftest(""" + import sys + def pytest_addhooks(pluginmanager): + pluginmanager.addhooks(sys) + """) + res = testdir.runpytest() + assert res.ret != 0 + res.stderr.fnmatch_lines([ + "*did not find*sys*" + ]) + + def test_namespace_early_from_import(self, testdir): + p = testdir.makepyfile(""" + from pytest import Item + from pytest import Item as Item2 + assert Item is Item2 + """) + result = testdir.runpython(p) + assert result.ret == 0 + + def test_do_ext_namespace(self, testdir): + testdir.makeconftest(""" + def pytest_namespace(): + return {'hello': 'world'} + """) + p = testdir.makepyfile(""" + from pytest import hello + import pytest + def test_hello(): + assert hello == "world" + assert 'hello' in pytest.__all__ + """) + reprec = testdir.inline_run(p) + reprec.assertoutcome(passed=1) + + def test_do_option_postinitialize(self, testdir): + config = testdir.parseconfigure() + assert not hasattr(config.option, 'test123') + p = testdir.makepyfile(""" + def pytest_addoption(parser): + parser.addoption('--test123', action="store_true", + default=True) + """) + config.pluginmanager._importconftest(p) + assert config.option.test123 + + def test_configure(self, testdir): + config = testdir.parseconfig() + l = [] + class A: + def pytest_configure(self, config): + l.append(self) + + config.pluginmanager.register(A()) + assert len(l) == 0 + config._do_configure() + assert len(l) == 1 + config.pluginmanager.register(A()) # leads to a configured() plugin + assert len(l) == 2 + assert l[0] != l[1] + + config._ensure_unconfigure() + config.pluginmanager.register(A()) + assert len(l) == 2 + + def test_hook_tracing(self): + pytestpm = get_config().pluginmanager # fully initialized with plugins + saveindent = [] + class api1: + def pytest_plugin_registered(self): + saveindent.append(pytestpm.trace.root.indent) + class api2: + def pytest_plugin_registered(self): + saveindent.append(pytestpm.trace.root.indent) + raise ValueError() + l = [] + pytestpm.trace.root.setwriter(l.append) + undo = pytestpm.enable_tracing() + try: + indent = pytestpm.trace.root.indent + p = api1() + pytestpm.register(p) + assert pytestpm.trace.root.indent == indent + assert len(l) >= 2 + assert 'pytest_plugin_registered' in l[0] + assert 'finish' in l[1] + + l[:] = [] + with pytest.raises(ValueError): + pytestpm.register(api2()) + assert pytestpm.trace.root.indent == indent + assert saveindent[0] > indent + finally: + undo() + + def test_warn_on_deprecated_multicall(self, pytestpm): + class Plugin: + def pytest_configure(self, __multicall__): + pass + + before = list(pytestpm._warnings) + pytestpm.register(Plugin()) + assert len(pytestpm._warnings) == len(before) + 1 + assert "deprecated" in pytestpm._warnings[-1]["message"] + + +def test_namespace_has_default_and_env_plugins(testdir): + p = testdir.makepyfile(""" + import pytest + pytest.mark + """) + result = testdir.runpython(p) + assert result.ret == 0 + +def test_default_markers(testdir): + result = testdir.runpytest("--markers") + result.stdout.fnmatch_lines([ + "*tryfirst*first*", + "*trylast*last*", + ]) + +def test_importplugin_issue375(testdir, pytestpm): + testdir.syspathinsert(testdir.tmpdir) + testdir.makepyfile(qwe="import aaaa") + with pytest.raises(ImportError) as excinfo: + pytestpm.import_plugin("qwe") + assert "qwe" not in str(excinfo.value) + assert "aaaa" in str(excinfo.value) + + +class TestPytestPluginManager: + def test_register_imported_modules(self): + pm = PytestPluginManager() + mod = py.std.types.ModuleType("x.y.pytest_hello") + pm.register(mod) + assert pm.is_registered(mod) + l = pm.get_plugins() + assert mod in l + pytest.raises(ValueError, "pm.register(mod)") + pytest.raises(ValueError, lambda: pm.register(mod)) + #assert not pm.is_registered(mod2) + assert pm.get_plugins() == l + + def test_canonical_import(self, monkeypatch): + mod = py.std.types.ModuleType("pytest_xyz") + monkeypatch.setitem(py.std.sys.modules, 'pytest_xyz', mod) + pm = PytestPluginManager() + pm.import_plugin('pytest_xyz') + assert pm.get_plugin('pytest_xyz') == mod + assert pm.is_registered(mod) + + def test_consider_module(self, testdir, pytestpm): + testdir.syspathinsert() + testdir.makepyfile(pytest_p1="#") + testdir.makepyfile(pytest_p2="#") + mod = py.std.types.ModuleType("temp") + mod.pytest_plugins = ["pytest_p1", "pytest_p2"] + pytestpm.consider_module(mod) + assert pytestpm.get_plugin("pytest_p1").__name__ == "pytest_p1" + assert pytestpm.get_plugin("pytest_p2").__name__ == "pytest_p2" + + def test_consider_module_import_module(self, testdir): + pytestpm = get_config().pluginmanager + mod = py.std.types.ModuleType("x") + mod.pytest_plugins = "pytest_a" + aplugin = testdir.makepyfile(pytest_a="#") + reprec = testdir.make_hook_recorder(pytestpm) + #syspath.prepend(aplugin.dirpath()) + py.std.sys.path.insert(0, str(aplugin.dirpath())) + pytestpm.consider_module(mod) + call = reprec.getcall(pytestpm.hook.pytest_plugin_registered.name) + assert call.plugin.__name__ == "pytest_a" + + # check that it is not registered twice + pytestpm.consider_module(mod) + l = reprec.getcalls("pytest_plugin_registered") + assert len(l) == 1 + + def test_consider_env_fails_to_import(self, monkeypatch, pytestpm): + monkeypatch.setenv('PYTEST_PLUGINS', 'nonexisting', prepend=",") + with pytest.raises(ImportError): + pytestpm.consider_env() + + def test_plugin_skip(self, testdir, monkeypatch): + p = testdir.makepyfile(skipping1=""" + import pytest + pytest.skip("hello") + """) + p.copy(p.dirpath("skipping2.py")) + monkeypatch.setenv("PYTEST_PLUGINS", "skipping2") + result = testdir.runpytest("-rw", "-p", "skipping1", syspathinsert=True) + assert result.ret == 0 + result.stdout.fnmatch_lines([ + "WI1*skipped plugin*skipping1*hello*", + "WI1*skipped plugin*skipping2*hello*", + ]) + + def test_consider_env_plugin_instantiation(self, testdir, monkeypatch, pytestpm): + testdir.syspathinsert() + testdir.makepyfile(xy123="#") + monkeypatch.setitem(os.environ, 'PYTEST_PLUGINS', 'xy123') + l1 = len(pytestpm.get_plugins()) + pytestpm.consider_env() + l2 = len(pytestpm.get_plugins()) + assert l2 == l1 + 1 + assert pytestpm.get_plugin('xy123') + pytestpm.consider_env() + l3 = len(pytestpm.get_plugins()) + assert l2 == l3 + + def test_pluginmanager_ENV_startup(self, testdir, monkeypatch): + testdir.makepyfile(pytest_x500="#") + p = testdir.makepyfile(""" + import pytest + def test_hello(pytestconfig): + plugin = pytestconfig.pluginmanager.get_plugin('pytest_x500') + assert plugin is not None + """) + monkeypatch.setenv('PYTEST_PLUGINS', 'pytest_x500', prepend=",") + result = testdir.runpytest(p, syspathinsert=True) + assert result.ret == 0 + result.stdout.fnmatch_lines(["*1 passed*"]) + + def test_import_plugin_importname(self, testdir, pytestpm): + pytest.raises(ImportError, 'pytestpm.import_plugin("qweqwex.y")') + pytest.raises(ImportError, 'pytestpm.import_plugin("pytest_qweqwx.y")') + + testdir.syspathinsert() + pluginname = "pytest_hello" + testdir.makepyfile(**{pluginname: ""}) + pytestpm.import_plugin("pytest_hello") + len1 = len(pytestpm.get_plugins()) + pytestpm.import_plugin("pytest_hello") + len2 = len(pytestpm.get_plugins()) + assert len1 == len2 + plugin1 = pytestpm.get_plugin("pytest_hello") + assert plugin1.__name__.endswith('pytest_hello') + plugin2 = pytestpm.get_plugin("pytest_hello") + assert plugin2 is plugin1 + + def test_import_plugin_dotted_name(self, testdir, pytestpm): + pytest.raises(ImportError, 'pytestpm.import_plugin("qweqwex.y")') + pytest.raises(ImportError, 'pytestpm.import_plugin("pytest_qweqwex.y")') + + testdir.syspathinsert() + testdir.mkpydir("pkg").join("plug.py").write("x=3") + pluginname = "pkg.plug" + pytestpm.import_plugin(pluginname) + mod = pytestpm.get_plugin("pkg.plug") + assert mod.x == 3 + + def test_consider_conftest_deps(self, testdir, pytestpm): + mod = testdir.makepyfile("pytest_plugins='xyz'").pyimport() + with pytest.raises(ImportError): + pytestpm.consider_conftest(mod) + + +class TestPytestPluginManagerBootstrapming: + def test_preparse_args(self, pytestpm): + pytest.raises(ImportError, lambda: + pytestpm.consider_preparse(["xyz", "-p", "hello123"])) + + def test_plugin_prevent_register(self, pytestpm): + pytestpm.consider_preparse(["xyz", "-p", "no:abc"]) + l1 = pytestpm.get_plugins() + pytestpm.register(42, name="abc") + l2 = pytestpm.get_plugins() + assert len(l2) == len(l1) + assert 42 not in l2 + + def test_plugin_prevent_register_unregistered_alredy_registered(self, pytestpm): + pytestpm.register(42, name="abc") + l1 = pytestpm.get_plugins() + assert 42 in l1 + pytestpm.consider_preparse(["xyz", "-p", "no:abc"]) + l2 = pytestpm.get_plugins() + assert 42 not in l2