simplify internal pytester machinery

This commit is contained in:
holger krekel 2014-10-06 13:37:57 +02:00
parent 3d84f35850
commit 818a412d29
6 changed files with 96 additions and 154 deletions

View File

@ -72,7 +72,7 @@ def add_method_controller(cls, func):
oldcall = getattr(cls, name)
def wrap_exec(*args, **kwargs):
gen = func(*args, **kwargs)
gen.next() # first yield
next(gen) # first yield
res = oldcall(*args, **kwargs)
try:
gen.send(res)

View File

@ -1,5 +1,4 @@
""" (disabled by default) support for testing pytest and pytest plugins. """
import inspect
import sys
import os
import codecs
@ -12,7 +11,7 @@ import subprocess
import py
import pytest
from py.builtin import print_
from _pytest.core import HookRelay, HookCaller, Wrapping
from _pytest.core import HookCaller, add_method_controller
from _pytest.main import Session, EXIT_OK
@ -38,17 +37,6 @@ def pytest_configure(config):
_pytest_fullpath = os.path.abspath(pytest.__file__.rstrip("oc"))
_pytest_fullpath = _pytest_fullpath.replace("$py.class", ".py")
def pytest_funcarg___pytest(request):
return PytestArg(request)
class PytestArg:
def __init__(self, request):
self.request = request
def gethookrecorder(self, hook):
hookrecorder = HookRecorder(hook._pm)
self.request.addfinalizer(hookrecorder.finish_recording)
return hookrecorder
class ParsedCall:
def __init__(self, name, kwargs):
@ -65,21 +53,22 @@ class HookRecorder:
def __init__(self, pluginmanager):
self._pluginmanager = pluginmanager
self.calls = []
self.wrapping = Wrapping()
@self.wrapping.method(HookCaller)
def _docall(hookcaller, methods, kwargs):
self.calls.append(ParsedCall(hookcaller.name, kwargs))
yield
self._undo_wrapping = add_method_controller(HookCaller, _docall)
pluginmanager.add_shutdown(self._undo_wrapping)
def finish_recording(self):
self.wrapping.undo()
self._undo_wrapping()
def getcalls(self, names):
if isinstance(names, str):
names = names.split()
return [call for call in self.calls if call._name in names]
def contains(self, entries):
def assert_contains(self, entries):
__tracebackhide__ = True
i = 0
entries = list(entries)
@ -115,6 +104,69 @@ class HookRecorder:
assert len(l) == 1, (name, l)
return l[0]
# functionality for test reports
def getreports(self,
names="pytest_runtest_logreport pytest_collectreport"):
return [x.report for x in self.getcalls(names)]
def matchreport(self, inamepart="",
names="pytest_runtest_logreport pytest_collectreport", when=None):
""" return a testreport whose dotted import path matches """
l = []
for rep in self.getreports(names=names):
try:
if not when and rep.when != "call" and rep.passed:
# setup/teardown passing reports - let's ignore those
continue
except AttributeError:
pass
if when and getattr(rep, 'when', None) != when:
continue
if not inamepart or inamepart in rep.nodeid.split("::"):
l.append(rep)
if not l:
raise ValueError("could not find test report matching %r: "
"no test reports at all!" % (inamepart,))
if len(l) > 1:
raise ValueError(
"found 2 or more testreports matching %r: %s" %(inamepart, l))
return l[0]
def getfailures(self,
names='pytest_runtest_logreport pytest_collectreport'):
return [rep for rep in self.getreports(names) if rep.failed]
def getfailedcollections(self):
return self.getfailures('pytest_collectreport')
def listoutcomes(self):
passed = []
skipped = []
failed = []
for rep in self.getreports(
"pytest_collectreport pytest_runtest_logreport"):
if rep.passed:
if getattr(rep, "when", None) == "call":
passed.append(rep)
elif rep.skipped:
skipped.append(rep)
elif rep.failed:
failed.append(rep)
return passed, skipped, failed
def countoutcomes(self):
return [len(x) for x in self.listoutcomes()]
def assertoutcome(self, passed=0, skipped=0, failed=0):
realpassed, realskipped, realfailed = self.listoutcomes()
assert passed == len(realpassed)
assert skipped == len(realskipped)
assert failed == len(realfailed)
def clear(self):
self.calls[:] = []
def pytest_funcarg__linecomp(request):
return LineComp()
@ -150,7 +202,6 @@ class TmpTestdir:
def __init__(self, request):
self.request = request
self.Config = request.config.__class__
self._pytest = request.getfuncargvalue("_pytest")
# XXX remove duplication with tmpdir plugin
basetmp = request.config._tmpdirhandler.ensuretemp("testdir")
name = request.function.__name__
@ -181,14 +232,10 @@ class TmpTestdir:
if fn and fn.startswith(str(self.tmpdir)):
del sys.modules[name]
def getreportrecorder(self, obj):
if hasattr(obj, 'config'):
obj = obj.config
if hasattr(obj, 'hook'):
obj = obj.hook
assert isinstance(obj, HookRelay)
reprec = ReportRecorder(obj)
reprec.hookrecorder = self._pytest.gethookrecorder(obj)
def make_hook_recorder(self, pluginmanager):
assert not hasattr(pluginmanager, "reprec")
pluginmanager.reprec = reprec = HookRecorder(pluginmanager)
self.request.addfinalizer(reprec.finish_recording)
return reprec
def chdir(self):
@ -307,26 +354,23 @@ class TmpTestdir:
def inline_genitems(self, *args):
return self.inprocess_run(list(args) + ['--collectonly'])
def inline_run(self, *args):
items, rec = self.inprocess_run(args)
return rec
def inprocess_run(self, args, plugins=()):
rec = self.inline_run(*args, plugins=plugins)
items = [x.item for x in rec.getcalls("pytest_itemcollected")]
return items, rec
def inprocess_run(self, args, plugins=None):
def inline_run(self, *args, **kwargs):
rec = []
items = []
class Collect:
def pytest_configure(x, config):
rec.append(self.getreportrecorder(config))
def pytest_itemcollected(self, item):
items.append(item)
if not plugins:
plugins = []
rec.append(self.make_hook_recorder(config.pluginmanager))
plugins = kwargs.get("plugins") or []
plugins.append(Collect())
ret = pytest.main(list(args), plugins=plugins)
assert len(rec) == 1
reprec = rec[0]
reprec.ret = ret
assert len(rec) == 1
return items, reprec
return reprec
def parseconfig(self, *args):
args = [str(x) for x in args]
@ -501,86 +545,6 @@ def getdecoded(out):
return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % (
py.io.saferepr(out),)
class ReportRecorder(object):
def __init__(self, hook):
self.hook = hook
self.pluginmanager = hook._pm
self.pluginmanager.register(self)
def getcall(self, name):
return self.hookrecorder.getcall(name)
def popcall(self, name):
return self.hookrecorder.popcall(name)
def getcalls(self, names):
""" return list of ParsedCall instances matching the given eventname. """
return self.hookrecorder.getcalls(names)
# functionality for test reports
def getreports(self, names="pytest_runtest_logreport pytest_collectreport"):
return [x.report for x in self.getcalls(names)]
def matchreport(self, inamepart="",
names="pytest_runtest_logreport pytest_collectreport", when=None):
""" return a testreport whose dotted import path matches """
l = []
for rep in self.getreports(names=names):
try:
if not when and rep.when != "call" and rep.passed:
# setup/teardown passing reports - let's ignore those
continue
except AttributeError:
pass
if when and getattr(rep, 'when', None) != when:
continue
if not inamepart or inamepart in rep.nodeid.split("::"):
l.append(rep)
if not l:
raise ValueError("could not find test report matching %r: no test reports at all!" %
(inamepart,))
if len(l) > 1:
raise ValueError("found more than one testreport matching %r: %s" %(
inamepart, l))
return l[0]
def getfailures(self, names='pytest_runtest_logreport pytest_collectreport'):
return [rep for rep in self.getreports(names) if rep.failed]
def getfailedcollections(self):
return self.getfailures('pytest_collectreport')
def listoutcomes(self):
passed = []
skipped = []
failed = []
for rep in self.getreports(
"pytest_collectreport pytest_runtest_logreport"):
if rep.passed:
if getattr(rep, "when", None) == "call":
passed.append(rep)
elif rep.skipped:
skipped.append(rep)
elif rep.failed:
failed.append(rep)
return passed, skipped, failed
def countoutcomes(self):
return [len(x) for x in self.listoutcomes()]
def assertoutcome(self, passed=0, skipped=0, failed=0):
realpassed, realskipped, realfailed = self.listoutcomes()
assert passed == len(realpassed)
assert skipped == len(realskipped)
assert failed == len(realfailed)
def clear(self):
self.hookrecorder.calls[:] = []
def unregister(self):
self.pluginmanager.unregister(self)
self.hookrecorder.finish_recording()
class LineComp:
def __init__(self):

View File

@ -9,4 +9,4 @@ if __name__ == '__main__':
p = pstats.Stats("prof")
p.strip_dirs()
p.sort_stats('cumulative')
print(p.print_stats(250))
print(p.print_stats(500))

View File

@ -334,9 +334,9 @@ class TestSession:
assert item.name == "test_func"
newid = item.nodeid
assert newid == id
py.std.pprint.pprint(hookrec.hookrecorder.calls)
py.std.pprint.pprint(hookrec.calls)
topdir = testdir.tmpdir # noqa
hookrec.hookrecorder.contains([
hookrec.assert_contains([
("pytest_collectstart", "collector.fspath == topdir"),
("pytest_make_collect_report", "collector.fspath == topdir"),
("pytest_collectstart", "collector.fspath == p"),
@ -381,9 +381,9 @@ class TestSession:
id = p.basename
items, hookrec = testdir.inline_genitems(id)
py.std.pprint.pprint(hookrec.hookrecorder.calls)
py.std.pprint.pprint(hookrec.calls)
assert len(items) == 2
hookrec.hookrecorder.contains([
hookrec.assert_contains([
("pytest_collectstart",
"collector.fspath == collector.session.fspath"),
("pytest_collectstart",
@ -404,8 +404,8 @@ class TestSession:
items, hookrec = testdir.inline_genitems()
assert len(items) == 1
py.std.pprint.pprint(hookrec.hookrecorder.calls)
hookrec.hookrecorder.contains([
py.std.pprint.pprint(hookrec.calls)
hookrec.assert_contains([
("pytest_collectstart", "collector.fspath == test_aaa"),
("pytest_pycollect_makeitem", "name == 'test_func'"),
("pytest_collectreport",
@ -425,8 +425,8 @@ class TestSession:
items, hookrec = testdir.inline_genitems(id)
assert len(items) == 2
py.std.pprint.pprint(hookrec.hookrecorder.calls)
hookrec.hookrecorder.contains([
py.std.pprint.pprint(hookrec.calls)
hookrec.assert_contains([
("pytest_collectstart", "collector.fspath == test_aaa"),
("pytest_pycollect_makeitem", "name == 'test_func'"),
("pytest_collectreport", "report.nodeid == 'aaa/test_aaa.py'"),

View File

@ -149,7 +149,7 @@ class TestBootstrapping:
mod.pytest_plugins = "pytest_a"
aplugin = testdir.makepyfile(pytest_a="#")
pluginmanager = get_plugin_manager()
reprec = testdir.getreportrecorder(pluginmanager)
reprec = testdir.make_hook_recorder(pluginmanager)
#syspath.prepend(aplugin.dirpath())
py.std.sys.path.insert(0, str(aplugin.dirpath()))
pluginmanager.consider_module(mod)
@ -771,11 +771,10 @@ def test_wrapping():
def f(self):
return "A.f"
shutdown = []
l = []
def f(self):
l.append(1)
x = yield
yield
l.append(2)
undo = add_method_controller(A, f)

View File

@ -3,9 +3,9 @@ import os
from _pytest.pytester import HookRecorder
from _pytest.core import PluginManager
def test_reportrecorder(testdir):
def test_make_hook_recorder(testdir):
item = testdir.getitem("def test_func(): pass")
recorder = testdir.getreportrecorder(item.config)
recorder = testdir.make_hook_recorder(item.config.pluginmanager)
assert not recorder.getfailures()
pytest.xfail("internal reportrecorder tests need refactoring")
@ -104,27 +104,6 @@ def test_hookrecorder_basic(holder):
assert call._name == "pytest_xyz_noarg"
def test_functional(testdir, linecomp):
reprec = testdir.inline_runsource("""
import pytest
from _pytest.core import HookRelay, PluginManager
pytest_plugins="pytester"
def test_func(_pytest):
class ApiClass:
def pytest_xyz(self, arg): "x"
pm = PluginManager()
pm.hook._addhooks(ApiClass, "pytest_")
rec = _pytest.gethookrecorder(pm.hook)
class Plugin:
def pytest_xyz(self, arg):
return arg + 1
rec._pluginmanager.register(Plugin())
res = pm.hook.pytest_xyz(arg=41)
assert res == [42]
""")
reprec.assertoutcome(passed=1)
def test_makepyfile_unicode(testdir):
global unichr
try: