introduce a minimal tag-based tracer, to be extended if needed, strike pytest_trace hook.
This commit is contained in:
parent
132eeeeade
commit
6a734efe44
|
@ -13,11 +13,53 @@ default_plugins = (
|
||||||
|
|
||||||
IMPORTPREFIX = "pytest_"
|
IMPORTPREFIX = "pytest_"
|
||||||
|
|
||||||
|
class TagTracer:
|
||||||
|
def __init__(self):
|
||||||
|
self._tag2proc = {}
|
||||||
|
self.writer = None
|
||||||
|
|
||||||
|
def get(self, name):
|
||||||
|
return TagTracerSub(self, (name,))
|
||||||
|
|
||||||
|
def processmessage(self, tags, args):
|
||||||
|
if self.writer is not None:
|
||||||
|
prefix = ":".join(tags)
|
||||||
|
content = " ".join(map(str, args))
|
||||||
|
self.writer("[%s] %s\n" %(prefix, content))
|
||||||
|
try:
|
||||||
|
self._tag2proc[tags](tags, args)
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def setwriter(self, writer):
|
||||||
|
self.writer = writer
|
||||||
|
|
||||||
|
def setprocessor(self, tags, processor):
|
||||||
|
if isinstance(tags, str):
|
||||||
|
tags = tuple(tags.split(":"))
|
||||||
|
else:
|
||||||
|
assert isinstance(tags, tuple)
|
||||||
|
self._tag2proc[tags] = processor
|
||||||
|
|
||||||
|
class TagTracerSub:
|
||||||
|
def __init__(self, root, tags):
|
||||||
|
self.root = root
|
||||||
|
self.tags = tags
|
||||||
|
def __call__(self, *args):
|
||||||
|
self.root.processmessage(self.tags, args)
|
||||||
|
def setmyprocessor(self, processor):
|
||||||
|
self.root.setprocessor(self.tags, processor)
|
||||||
|
def get(self, name):
|
||||||
|
return self.__class__(self.root, self.tags + (name,))
|
||||||
|
|
||||||
class PluginManager(object):
|
class PluginManager(object):
|
||||||
def __init__(self, load=False):
|
def __init__(self, load=False):
|
||||||
self._name2plugin = {}
|
self._name2plugin = {}
|
||||||
self._plugins = []
|
self._plugins = []
|
||||||
self._hints = []
|
self._hints = []
|
||||||
|
self.trace = TagTracer().get("pytest")
|
||||||
|
if os.environ.get('PYTEST_DEBUG'):
|
||||||
|
self.trace.root.setwriter(sys.stderr.write)
|
||||||
self.hook = HookRelay([hookspec], pm=self)
|
self.hook = HookRelay([hookspec], pm=self)
|
||||||
self.register(self)
|
self.register(self)
|
||||||
if load:
|
if load:
|
||||||
|
@ -41,6 +83,7 @@ class PluginManager(object):
|
||||||
self._name2plugin[name] = plugin
|
self._name2plugin[name] = plugin
|
||||||
self.call_plugin(plugin, "pytest_addhooks", {'pluginmanager': self})
|
self.call_plugin(plugin, "pytest_addhooks", {'pluginmanager': self})
|
||||||
self.hook.pytest_plugin_registered(manager=self, plugin=plugin)
|
self.hook.pytest_plugin_registered(manager=self, plugin=plugin)
|
||||||
|
self.trace("registered", plugin)
|
||||||
if not prepend:
|
if not prepend:
|
||||||
self._plugins.append(plugin)
|
self._plugins.append(plugin)
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -8,6 +8,8 @@ import pytest
|
||||||
def pytest_cmdline_parse(pluginmanager, args):
|
def pytest_cmdline_parse(pluginmanager, args):
|
||||||
config = Config(pluginmanager)
|
config = Config(pluginmanager)
|
||||||
config.parse(args)
|
config.parse(args)
|
||||||
|
if config.option.debug:
|
||||||
|
config.trace.root.setwriter(sys.stderr.write)
|
||||||
return config
|
return config
|
||||||
|
|
||||||
class Parser:
|
class Parser:
|
||||||
|
@ -253,6 +255,7 @@ class Config(object):
|
||||||
)
|
)
|
||||||
#: a pluginmanager instance
|
#: a pluginmanager instance
|
||||||
self.pluginmanager = pluginmanager or PluginManager(load=True)
|
self.pluginmanager = pluginmanager or PluginManager(load=True)
|
||||||
|
self.trace = self.pluginmanager.trace.get("config")
|
||||||
self._conftest = Conftest(onimport=self._onimportconftest)
|
self._conftest = Conftest(onimport=self._onimportconftest)
|
||||||
self.hook = self.pluginmanager.hook
|
self.hook = self.pluginmanager.hook
|
||||||
|
|
||||||
|
@ -272,10 +275,6 @@ class Config(object):
|
||||||
plugins += self._conftest.getconftestmodules(fspath)
|
plugins += self._conftest.getconftestmodules(fspath)
|
||||||
return plugins
|
return plugins
|
||||||
|
|
||||||
def trace(self, msg):
|
|
||||||
if getattr(self.option, 'traceconfig', None):
|
|
||||||
self.hook.pytest_trace(category="config", msg=msg)
|
|
||||||
|
|
||||||
def _setinitialconftest(self, args):
|
def _setinitialconftest(self, args):
|
||||||
# capture output during conftest init (#issue93)
|
# capture output during conftest init (#issue93)
|
||||||
name = hasattr(os, 'dup') and 'StdCaptureFD' or 'StdCapture'
|
name = hasattr(os, 'dup') and 'StdCaptureFD' or 'StdCapture'
|
||||||
|
|
|
@ -50,6 +50,11 @@ def pytest_configure(config):
|
||||||
config._toclose = stdout
|
config._toclose = stdout
|
||||||
reporter = TerminalReporter(config, stdout)
|
reporter = TerminalReporter(config, stdout)
|
||||||
config.pluginmanager.register(reporter, 'terminalreporter')
|
config.pluginmanager.register(reporter, 'terminalreporter')
|
||||||
|
if config.option.debug or config.option.traceconfig:
|
||||||
|
def mywriter(tags, args):
|
||||||
|
msg = " ".join(map(str, args))
|
||||||
|
reporter.write_line("[traceconfig] " + msg)
|
||||||
|
config.trace.root.setprocessor("pytest:config", mywriter)
|
||||||
|
|
||||||
def pytest_unconfigure(config):
|
def pytest_unconfigure(config):
|
||||||
if hasattr(config, '_toclose'):
|
if hasattr(config, '_toclose'):
|
||||||
|
@ -152,11 +157,6 @@ class TerminalReporter:
|
||||||
# which garbles our output if we use self.write_line
|
# which garbles our output if we use self.write_line
|
||||||
self.write_line(msg)
|
self.write_line(msg)
|
||||||
|
|
||||||
def pytest_trace(self, category, msg):
|
|
||||||
if self.config.option.debug or \
|
|
||||||
self.config.option.traceconfig and category.find("config") != -1:
|
|
||||||
self.write_line("[%s] %s" %(category, msg))
|
|
||||||
|
|
||||||
def pytest_deselected(self, items):
|
def pytest_deselected(self, items):
|
||||||
self.stats.setdefault('deselected', []).extend(items)
|
self.stats.setdefault('deselected', []).extend(items)
|
||||||
|
|
||||||
|
|
|
@ -511,20 +511,28 @@ def test_tbstyle_short(testdir):
|
||||||
assert 'x = 0' in s
|
assert 'x = 0' in s
|
||||||
assert 'assert x' in s
|
assert 'assert x' in s
|
||||||
|
|
||||||
def test_trace_reporting(testdir):
|
def test_traceconfig(testdir, monkeypatch):
|
||||||
result = testdir.runpytest("--traceconfig")
|
result = testdir.runpytest("--traceconfig")
|
||||||
result.stdout.fnmatch_lines([
|
result.stdout.fnmatch_lines([
|
||||||
"*active plugins*"
|
"*active plugins*"
|
||||||
])
|
])
|
||||||
assert result.ret == 0
|
assert result.ret == 0
|
||||||
|
|
||||||
def test_trace_reporting(testdir):
|
def test_debug(testdir, monkeypatch):
|
||||||
result = testdir.runpytest("--traceconfig")
|
result = testdir.runpytest("--debug")
|
||||||
result.stdout.fnmatch_lines([
|
result.stderr.fnmatch_lines([
|
||||||
"*active plugins*"
|
"*registered*session*",
|
||||||
])
|
])
|
||||||
assert result.ret == 0
|
assert result.ret == 0
|
||||||
|
|
||||||
|
def test_PYTEST_DEBUG(testdir, monkeypatch):
|
||||||
|
monkeypatch.setenv("PYTEST_DEBUG", "1")
|
||||||
|
result = testdir.runpytest()
|
||||||
|
assert result.ret == 0
|
||||||
|
result.stderr.fnmatch_lines([
|
||||||
|
"*registered*PluginManager*"
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
class TestGenericReporting:
|
class TestGenericReporting:
|
||||||
""" this test class can be subclassed with a different option
|
""" this test class can be subclassed with a different option
|
||||||
|
|
|
@ -75,6 +75,14 @@ class TestConfigTmpdir:
|
||||||
|
|
||||||
class TestConfigAPI:
|
class TestConfigAPI:
|
||||||
|
|
||||||
|
def test_config_trace(self, testdir):
|
||||||
|
config = testdir.Config()
|
||||||
|
l = []
|
||||||
|
config.trace.root.setwriter(l.append)
|
||||||
|
config.trace("hello")
|
||||||
|
assert len(l) == 1
|
||||||
|
assert l[0] == "[pytest:config] hello\n"
|
||||||
|
|
||||||
def test_config_getvalue_honours_conftest(self, testdir):
|
def test_config_getvalue_honours_conftest(self, testdir):
|
||||||
testdir.makepyfile(conftest="x=1")
|
testdir.makepyfile(conftest="x=1")
|
||||||
testdir.mkdir("sub").join("conftest.py").write("x=2 ; y = 3")
|
testdir.mkdir("sub").join("conftest.py").write("x=2 ; y = 3")
|
||||||
|
|
|
@ -266,6 +266,19 @@ class TestBootstrapping:
|
||||||
l = list(plugins.listattr('x'))
|
l = list(plugins.listattr('x'))
|
||||||
assert l == [41, 42, 43]
|
assert l == [41, 42, 43]
|
||||||
|
|
||||||
|
def test_register_trace(self):
|
||||||
|
pm = PluginManager()
|
||||||
|
class api1:
|
||||||
|
x = 41
|
||||||
|
l = []
|
||||||
|
pm.trace.setmyprocessor(lambda kw, args: l.append((kw, args)))
|
||||||
|
p = api1()
|
||||||
|
pm.register(p)
|
||||||
|
assert len(l) == 1
|
||||||
|
kw, args = l[0]
|
||||||
|
assert args[0] == "registered"
|
||||||
|
assert args[1] == p
|
||||||
|
|
||||||
class TestPytestPluginInteractions:
|
class TestPytestPluginInteractions:
|
||||||
|
|
||||||
def test_addhooks_conftestplugin(self, testdir):
|
def test_addhooks_conftestplugin(self, testdir):
|
||||||
|
@ -470,7 +483,7 @@ class TestMultiCall:
|
||||||
reslist = MultiCall([f], dict(x=23, z=2)).execute()
|
reslist = MultiCall([f], dict(x=23, z=2)).execute()
|
||||||
assert reslist == [25]
|
assert reslist == [25]
|
||||||
|
|
||||||
def test_keywords_call_error(self):
|
def test_tags_call_error(self):
|
||||||
multicall = MultiCall([lambda x: x], {})
|
multicall = MultiCall([lambda x: x], {})
|
||||||
py.test.raises(TypeError, "multicall.execute()")
|
py.test.raises(TypeError, "multicall.execute()")
|
||||||
|
|
||||||
|
@ -537,3 +550,55 @@ class TestHookRelay:
|
||||||
res = mcm.hello(arg=3)
|
res = mcm.hello(arg=3)
|
||||||
assert res == 4
|
assert res == 4
|
||||||
|
|
||||||
|
class TestTracer:
|
||||||
|
def test_simple(self):
|
||||||
|
from pytest._core import TagTracer
|
||||||
|
rootlogger = TagTracer()
|
||||||
|
log = rootlogger.get("pytest")
|
||||||
|
log("hello")
|
||||||
|
l = []
|
||||||
|
rootlogger.setwriter(l.append)
|
||||||
|
log("world")
|
||||||
|
assert len(l) == 1
|
||||||
|
assert l[0] == "[pytest] world\n"
|
||||||
|
sublog = log.get("collection")
|
||||||
|
sublog("hello")
|
||||||
|
assert l[1] == "[pytest:collection] hello\n"
|
||||||
|
|
||||||
|
def test_setprocessor(self):
|
||||||
|
from pytest._core import TagTracer
|
||||||
|
rootlogger = TagTracer()
|
||||||
|
log = rootlogger.get("1")
|
||||||
|
log2 = log.get("2")
|
||||||
|
assert log2.tags == tuple("12")
|
||||||
|
l = []
|
||||||
|
rootlogger.setprocessor(tuple("12"), lambda *args: l.append(args))
|
||||||
|
log("not seen")
|
||||||
|
log2("seen")
|
||||||
|
assert len(l) == 1
|
||||||
|
tags, args = l[0]
|
||||||
|
assert "1" in tags
|
||||||
|
assert "2" in tags
|
||||||
|
assert args == ("seen",)
|
||||||
|
l2 = []
|
||||||
|
rootlogger.setprocessor("1:2", lambda *args: l2.append(args))
|
||||||
|
log2("seen")
|
||||||
|
tags, args = l2[0]
|
||||||
|
assert args == ("seen",)
|
||||||
|
|
||||||
|
|
||||||
|
def test_setmyprocessor(self):
|
||||||
|
from pytest._core import TagTracer
|
||||||
|
rootlogger = TagTracer()
|
||||||
|
log = rootlogger.get("1")
|
||||||
|
log2 = log.get("2")
|
||||||
|
l = []
|
||||||
|
log2.setmyprocessor(lambda *args: l.append(args))
|
||||||
|
log("not seen")
|
||||||
|
assert not l
|
||||||
|
log2(42)
|
||||||
|
assert len(l) == 1
|
||||||
|
tags, args = l[0]
|
||||||
|
assert "1" in tags
|
||||||
|
assert "2" in tags
|
||||||
|
assert args == (42,)
|
||||||
|
|
Loading…
Reference in New Issue