From 8cfec56a82749be52bc76c58121e132c751e99a8 Mon Sep 17 00:00:00 2001 From: holger krekel Date: Mon, 6 Oct 2014 13:37:57 +0200 Subject: [PATCH] simplify internal pytester machinery --- _pytest/core.py | 2 +- _pytest/pytester.py | 200 +++++++++++++++---------------------- bench/bench.py | 2 +- testing/test_collection.py | 16 +-- testing/test_core.py | 5 +- testing/test_pytester.py | 25 +---- 6 files changed, 96 insertions(+), 154 deletions(-) diff --git a/_pytest/core.py b/_pytest/core.py index abab607b2..af888024e 100644 --- a/_pytest/core.py +++ b/_pytest/core.py @@ -72,7 +72,7 @@ def add_method_controller(cls, func): oldcall = getattr(cls, name) def wrap_exec(*args, **kwargs): gen = func(*args, **kwargs) - gen.next() # first yield + next(gen) # first yield res = oldcall(*args, **kwargs) try: gen.send(res) diff --git a/_pytest/pytester.py b/_pytest/pytester.py index f1a9e3f8e..ed4580f4a 100644 --- a/_pytest/pytester.py +++ b/_pytest/pytester.py @@ -1,5 +1,4 @@ """ (disabled by default) support for testing pytest and pytest plugins. """ -import inspect import sys import os import codecs @@ -12,7 +11,7 @@ import subprocess import py import pytest from py.builtin import print_ -from _pytest.core import HookRelay, HookCaller, Wrapping +from _pytest.core import HookCaller, add_method_controller from _pytest.main import Session, EXIT_OK @@ -38,17 +37,6 @@ def pytest_configure(config): _pytest_fullpath = os.path.abspath(pytest.__file__.rstrip("oc")) _pytest_fullpath = _pytest_fullpath.replace("$py.class", ".py") -def pytest_funcarg___pytest(request): - return PytestArg(request) - -class PytestArg: - def __init__(self, request): - self.request = request - - def gethookrecorder(self, hook): - hookrecorder = HookRecorder(hook._pm) - self.request.addfinalizer(hookrecorder.finish_recording) - return hookrecorder class ParsedCall: def __init__(self, name, kwargs): @@ -65,21 +53,22 @@ class HookRecorder: def __init__(self, pluginmanager): self._pluginmanager = pluginmanager self.calls = [] - self.wrapping = Wrapping() - @self.wrapping.method(HookCaller) + def _docall(hookcaller, methods, kwargs): self.calls.append(ParsedCall(hookcaller.name, kwargs)) yield + self._undo_wrapping = add_method_controller(HookCaller, _docall) + pluginmanager.add_shutdown(self._undo_wrapping) def finish_recording(self): - self.wrapping.undo() + self._undo_wrapping() def getcalls(self, names): if isinstance(names, str): names = names.split() return [call for call in self.calls if call._name in names] - def contains(self, entries): + def assert_contains(self, entries): __tracebackhide__ = True i = 0 entries = list(entries) @@ -115,6 +104,69 @@ class HookRecorder: assert len(l) == 1, (name, l) return l[0] + # functionality for test reports + + def getreports(self, + names="pytest_runtest_logreport pytest_collectreport"): + return [x.report for x in self.getcalls(names)] + + def matchreport(self, inamepart="", + names="pytest_runtest_logreport pytest_collectreport", when=None): + """ return a testreport whose dotted import path matches """ + l = [] + for rep in self.getreports(names=names): + try: + if not when and rep.when != "call" and rep.passed: + # setup/teardown passing reports - let's ignore those + continue + except AttributeError: + pass + if when and getattr(rep, 'when', None) != when: + continue + if not inamepart or inamepart in rep.nodeid.split("::"): + l.append(rep) + if not l: + raise ValueError("could not find test report matching %r: " + "no test reports at all!" % (inamepart,)) + if len(l) > 1: + raise ValueError( + "found 2 or more testreports matching %r: %s" %(inamepart, l)) + return l[0] + + def getfailures(self, + names='pytest_runtest_logreport pytest_collectreport'): + return [rep for rep in self.getreports(names) if rep.failed] + + def getfailedcollections(self): + return self.getfailures('pytest_collectreport') + + def listoutcomes(self): + passed = [] + skipped = [] + failed = [] + for rep in self.getreports( + "pytest_collectreport pytest_runtest_logreport"): + if rep.passed: + if getattr(rep, "when", None) == "call": + passed.append(rep) + elif rep.skipped: + skipped.append(rep) + elif rep.failed: + failed.append(rep) + return passed, skipped, failed + + def countoutcomes(self): + return [len(x) for x in self.listoutcomes()] + + def assertoutcome(self, passed=0, skipped=0, failed=0): + realpassed, realskipped, realfailed = self.listoutcomes() + assert passed == len(realpassed) + assert skipped == len(realskipped) + assert failed == len(realfailed) + + def clear(self): + self.calls[:] = [] + def pytest_funcarg__linecomp(request): return LineComp() @@ -150,7 +202,6 @@ class TmpTestdir: def __init__(self, request): self.request = request self.Config = request.config.__class__ - self._pytest = request.getfuncargvalue("_pytest") # XXX remove duplication with tmpdir plugin basetmp = request.config._tmpdirhandler.ensuretemp("testdir") name = request.function.__name__ @@ -181,14 +232,10 @@ class TmpTestdir: if fn and fn.startswith(str(self.tmpdir)): del sys.modules[name] - def getreportrecorder(self, obj): - if hasattr(obj, 'config'): - obj = obj.config - if hasattr(obj, 'hook'): - obj = obj.hook - assert isinstance(obj, HookRelay) - reprec = ReportRecorder(obj) - reprec.hookrecorder = self._pytest.gethookrecorder(obj) + def make_hook_recorder(self, pluginmanager): + assert not hasattr(pluginmanager, "reprec") + pluginmanager.reprec = reprec = HookRecorder(pluginmanager) + self.request.addfinalizer(reprec.finish_recording) return reprec def chdir(self): @@ -307,26 +354,23 @@ class TmpTestdir: def inline_genitems(self, *args): return self.inprocess_run(list(args) + ['--collectonly']) - def inline_run(self, *args): - items, rec = self.inprocess_run(args) - return rec + def inprocess_run(self, args, plugins=()): + rec = self.inline_run(*args, plugins=plugins) + items = [x.item for x in rec.getcalls("pytest_itemcollected")] + return items, rec - def inprocess_run(self, args, plugins=None): + def inline_run(self, *args, **kwargs): rec = [] - items = [] class Collect: def pytest_configure(x, config): - rec.append(self.getreportrecorder(config)) - def pytest_itemcollected(self, item): - items.append(item) - if not plugins: - plugins = [] + rec.append(self.make_hook_recorder(config.pluginmanager)) + plugins = kwargs.get("plugins") or [] plugins.append(Collect()) ret = pytest.main(list(args), plugins=plugins) + assert len(rec) == 1 reprec = rec[0] reprec.ret = ret - assert len(rec) == 1 - return items, reprec + return reprec def parseconfig(self, *args): args = [str(x) for x in args] @@ -501,86 +545,6 @@ def getdecoded(out): return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % ( py.io.saferepr(out),) -class ReportRecorder(object): - def __init__(self, hook): - self.hook = hook - self.pluginmanager = hook._pm - self.pluginmanager.register(self) - - def getcall(self, name): - return self.hookrecorder.getcall(name) - - def popcall(self, name): - return self.hookrecorder.popcall(name) - - def getcalls(self, names): - """ return list of ParsedCall instances matching the given eventname. """ - return self.hookrecorder.getcalls(names) - - # functionality for test reports - - def getreports(self, names="pytest_runtest_logreport pytest_collectreport"): - return [x.report for x in self.getcalls(names)] - - def matchreport(self, inamepart="", - names="pytest_runtest_logreport pytest_collectreport", when=None): - """ return a testreport whose dotted import path matches """ - l = [] - for rep in self.getreports(names=names): - try: - if not when and rep.when != "call" and rep.passed: - # setup/teardown passing reports - let's ignore those - continue - except AttributeError: - pass - if when and getattr(rep, 'when', None) != when: - continue - if not inamepart or inamepart in rep.nodeid.split("::"): - l.append(rep) - if not l: - raise ValueError("could not find test report matching %r: no test reports at all!" % - (inamepart,)) - if len(l) > 1: - raise ValueError("found more than one testreport matching %r: %s" %( - inamepart, l)) - return l[0] - - def getfailures(self, names='pytest_runtest_logreport pytest_collectreport'): - return [rep for rep in self.getreports(names) if rep.failed] - - def getfailedcollections(self): - return self.getfailures('pytest_collectreport') - - def listoutcomes(self): - passed = [] - skipped = [] - failed = [] - for rep in self.getreports( - "pytest_collectreport pytest_runtest_logreport"): - if rep.passed: - if getattr(rep, "when", None) == "call": - passed.append(rep) - elif rep.skipped: - skipped.append(rep) - elif rep.failed: - failed.append(rep) - return passed, skipped, failed - - def countoutcomes(self): - return [len(x) for x in self.listoutcomes()] - - def assertoutcome(self, passed=0, skipped=0, failed=0): - realpassed, realskipped, realfailed = self.listoutcomes() - assert passed == len(realpassed) - assert skipped == len(realskipped) - assert failed == len(realfailed) - - def clear(self): - self.hookrecorder.calls[:] = [] - - def unregister(self): - self.pluginmanager.unregister(self) - self.hookrecorder.finish_recording() class LineComp: def __init__(self): diff --git a/bench/bench.py b/bench/bench.py index c99bc3234..ce9496417 100644 --- a/bench/bench.py +++ b/bench/bench.py @@ -9,4 +9,4 @@ if __name__ == '__main__': p = pstats.Stats("prof") p.strip_dirs() p.sort_stats('cumulative') - print(p.print_stats(250)) + print(p.print_stats(500)) diff --git a/testing/test_collection.py b/testing/test_collection.py index 4adf46886..754f3c9ab 100644 --- a/testing/test_collection.py +++ b/testing/test_collection.py @@ -334,9 +334,9 @@ class TestSession: assert item.name == "test_func" newid = item.nodeid assert newid == id - py.std.pprint.pprint(hookrec.hookrecorder.calls) + py.std.pprint.pprint(hookrec.calls) topdir = testdir.tmpdir # noqa - hookrec.hookrecorder.contains([ + hookrec.assert_contains([ ("pytest_collectstart", "collector.fspath == topdir"), ("pytest_make_collect_report", "collector.fspath == topdir"), ("pytest_collectstart", "collector.fspath == p"), @@ -381,9 +381,9 @@ class TestSession: id = p.basename items, hookrec = testdir.inline_genitems(id) - py.std.pprint.pprint(hookrec.hookrecorder.calls) + py.std.pprint.pprint(hookrec.calls) assert len(items) == 2 - hookrec.hookrecorder.contains([ + hookrec.assert_contains([ ("pytest_collectstart", "collector.fspath == collector.session.fspath"), ("pytest_collectstart", @@ -404,8 +404,8 @@ class TestSession: items, hookrec = testdir.inline_genitems() assert len(items) == 1 - py.std.pprint.pprint(hookrec.hookrecorder.calls) - hookrec.hookrecorder.contains([ + py.std.pprint.pprint(hookrec.calls) + hookrec.assert_contains([ ("pytest_collectstart", "collector.fspath == test_aaa"), ("pytest_pycollect_makeitem", "name == 'test_func'"), ("pytest_collectreport", @@ -425,8 +425,8 @@ class TestSession: items, hookrec = testdir.inline_genitems(id) assert len(items) == 2 - py.std.pprint.pprint(hookrec.hookrecorder.calls) - hookrec.hookrecorder.contains([ + py.std.pprint.pprint(hookrec.calls) + hookrec.assert_contains([ ("pytest_collectstart", "collector.fspath == test_aaa"), ("pytest_pycollect_makeitem", "name == 'test_func'"), ("pytest_collectreport", "report.nodeid == 'aaa/test_aaa.py'"), diff --git a/testing/test_core.py b/testing/test_core.py index 6cafc8812..5e7113974 100644 --- a/testing/test_core.py +++ b/testing/test_core.py @@ -149,7 +149,7 @@ class TestBootstrapping: mod.pytest_plugins = "pytest_a" aplugin = testdir.makepyfile(pytest_a="#") pluginmanager = get_plugin_manager() - reprec = testdir.getreportrecorder(pluginmanager) + reprec = testdir.make_hook_recorder(pluginmanager) #syspath.prepend(aplugin.dirpath()) py.std.sys.path.insert(0, str(aplugin.dirpath())) pluginmanager.consider_module(mod) @@ -771,11 +771,10 @@ def test_wrapping(): def f(self): return "A.f" - shutdown = [] l = [] def f(self): l.append(1) - x = yield + yield l.append(2) undo = add_method_controller(A, f) diff --git a/testing/test_pytester.py b/testing/test_pytester.py index ef817e68c..ac57f2c87 100644 --- a/testing/test_pytester.py +++ b/testing/test_pytester.py @@ -3,9 +3,9 @@ import os from _pytest.pytester import HookRecorder from _pytest.core import PluginManager -def test_reportrecorder(testdir): +def test_make_hook_recorder(testdir): item = testdir.getitem("def test_func(): pass") - recorder = testdir.getreportrecorder(item.config) + recorder = testdir.make_hook_recorder(item.config.pluginmanager) assert not recorder.getfailures() pytest.xfail("internal reportrecorder tests need refactoring") @@ -104,27 +104,6 @@ def test_hookrecorder_basic(holder): assert call._name == "pytest_xyz_noarg" -def test_functional(testdir, linecomp): - reprec = testdir.inline_runsource(""" - import pytest - from _pytest.core import HookRelay, PluginManager - pytest_plugins="pytester" - def test_func(_pytest): - class ApiClass: - def pytest_xyz(self, arg): "x" - pm = PluginManager() - pm.hook._addhooks(ApiClass, "pytest_") - rec = _pytest.gethookrecorder(pm.hook) - class Plugin: - def pytest_xyz(self, arg): - return arg + 1 - rec._pluginmanager.register(Plugin()) - res = pm.hook.pytest_xyz(arg=41) - assert res == [42] - """) - reprec.assertoutcome(passed=1) - - def test_makepyfile_unicode(testdir): global unichr try: