545 lines
20 KiB
Python
545 lines
20 KiB
Python
"""
|
|
Implements terminal reporting of the full testing process.
|
|
|
|
This is a good source for looking at the various reporting hooks.
|
|
"""
|
|
import py
|
|
import sys
|
|
|
|
optionalhook = py.test.mark.optionalhook
|
|
|
|
def pytest_addoption(parser):
|
|
group = parser.getgroup("terminal reporting", "reporting", after="general")
|
|
group._addoption('-v', '--verbose', action="count",
|
|
dest="verbose", default=0, help="increase verbosity."),
|
|
group._addoption('-r',
|
|
action="store", dest="reportchars", default=None, metavar="chars",
|
|
help="show extra test summary info as specified by chars (f)ailed, "
|
|
"(s)skipped, (x)failed, (X)passed.")
|
|
group._addoption('-l', '--showlocals',
|
|
action="store_true", dest="showlocals", default=False,
|
|
help="show locals in tracebacks (disabled by default).")
|
|
group._addoption('--report',
|
|
action="store", dest="report", default=None, metavar="opts",
|
|
help="(deprecated, use -r)")
|
|
group._addoption('--tb', metavar="style",
|
|
action="store", dest="tbstyle", default='long',
|
|
type="choice", choices=['long', 'short', 'no', 'line'],
|
|
help="traceback print mode (long/short/line/no).")
|
|
group._addoption('--fulltrace',
|
|
action="store_true", dest="fulltrace", default=False,
|
|
help="don't cut any tracebacks (default is to cut).")
|
|
group._addoption('--funcargs',
|
|
action="store_true", dest="showfuncargs", default=False,
|
|
help="show available function arguments, sorted by plugin")
|
|
|
|
def pytest_configure(config):
|
|
if config.option.collectonly:
|
|
reporter = CollectonlyReporter(config)
|
|
elif config.option.showfuncargs:
|
|
config.setsessionclass(ShowFuncargSession)
|
|
reporter = None
|
|
else:
|
|
reporter = TerminalReporter(config)
|
|
if reporter:
|
|
# XXX see remote.py's XXX
|
|
for attr in 'pytest_terminal_hasmarkup', 'pytest_terminal_fullwidth':
|
|
if hasattr(config, attr):
|
|
#print "SETTING TERMINAL OPTIONS", attr, getattr(config, attr)
|
|
name = attr.split("_")[-1]
|
|
assert hasattr(self.reporter._tw, name), name
|
|
setattr(reporter._tw, name, getattr(config, attr))
|
|
config.pluginmanager.register(reporter, 'terminalreporter')
|
|
|
|
def getreportopt(config):
|
|
reportopts = ""
|
|
optvalue = config.getvalue("report")
|
|
if optvalue:
|
|
py.builtin.print_("DEPRECATED: use -r instead of --report option.",
|
|
file=py.std.sys.stderr)
|
|
if optvalue:
|
|
for setting in optvalue.split(","):
|
|
setting = setting.strip()
|
|
if setting == "skipped":
|
|
reportopts += "s"
|
|
elif setting == "xfailed":
|
|
reportopts += "x"
|
|
reportchars = config.getvalue("reportchars")
|
|
if reportchars:
|
|
for char in reportchars:
|
|
if char not in reportopts:
|
|
reportopts += char
|
|
return reportopts
|
|
|
|
class TerminalReporter:
|
|
def __init__(self, config, file=None):
|
|
self.config = config
|
|
self.stats = {}
|
|
self.curdir = py.path.local()
|
|
if file is None:
|
|
file = py.std.sys.stdout
|
|
self._tw = py.io.TerminalWriter(file)
|
|
self.currentfspath = None
|
|
self.gateway2info = {}
|
|
self.reportchars = getreportopt(config)
|
|
|
|
def hasopt(self, char):
|
|
char = {'xfailed': 'x', 'skipped': 's'}.get(char,char)
|
|
return char in self.reportchars
|
|
|
|
def write_fspath_result(self, fspath, res):
|
|
fspath = self.curdir.bestrelpath(fspath)
|
|
if fspath != self.currentfspath:
|
|
self._tw.line()
|
|
relpath = self.curdir.bestrelpath(fspath)
|
|
self._tw.write(relpath + " ")
|
|
self.currentfspath = fspath
|
|
self._tw.write(res)
|
|
|
|
def write_ensure_prefix(self, prefix, extra="", **kwargs):
|
|
if self.currentfspath != prefix:
|
|
self._tw.line()
|
|
self.currentfspath = prefix
|
|
self._tw.write(prefix)
|
|
if extra:
|
|
self._tw.write(extra, **kwargs)
|
|
self.currentfspath = -2
|
|
|
|
def ensure_newline(self):
|
|
if self.currentfspath:
|
|
self._tw.line()
|
|
self.currentfspath = None
|
|
|
|
def write_line(self, line, **markup):
|
|
line = str(line)
|
|
self.ensure_newline()
|
|
self._tw.line(line, **markup)
|
|
|
|
def write_sep(self, sep, title=None, **markup):
|
|
self.ensure_newline()
|
|
self._tw.sep(sep, title, **markup)
|
|
|
|
def getcategoryletterword(self, rep):
|
|
res = self.config.hook.pytest_report_teststatus(report=rep)
|
|
if res:
|
|
return res
|
|
for cat in 'skipped failed passed ???'.split():
|
|
if getattr(rep, cat, None):
|
|
break
|
|
return cat, self.getoutcomeletter(rep), self.getoutcomeword(rep)
|
|
|
|
def getoutcomeletter(self, rep):
|
|
return rep.shortrepr
|
|
|
|
def getoutcomeword(self, rep):
|
|
if rep.passed:
|
|
return "PASS", dict(green=True)
|
|
elif rep.failed:
|
|
return "FAIL", dict(red=True)
|
|
elif rep.skipped:
|
|
return "SKIP"
|
|
else:
|
|
return "???", dict(red=True)
|
|
|
|
def gettestid(self, item, relative=True):
|
|
fspath = item.fspath
|
|
chain = [x for x in item.listchain() if x.fspath == fspath]
|
|
chain = chain[1:]
|
|
names = [x.name for x in chain if x.name != "()"]
|
|
path = item.fspath
|
|
if relative:
|
|
relpath = path.relto(self.curdir)
|
|
if relpath:
|
|
path = relpath
|
|
names.insert(0, str(path))
|
|
return "::".join(names)
|
|
|
|
|
|
def pytest_internalerror(self, excrepr):
|
|
for line in str(excrepr).split("\n"):
|
|
self.write_line("INTERNALERROR> " + line)
|
|
|
|
def pytest_plugin_registered(self, plugin):
|
|
if self.config.option.traceconfig:
|
|
msg = "PLUGIN registered: %s" %(plugin,)
|
|
# XXX this event may happen during setup/teardown time
|
|
# which unfortunately captures our output here
|
|
# which garbles our output if we use self.write_line
|
|
self.write_line(msg)
|
|
|
|
@optionalhook
|
|
def pytest_gwmanage_newgateway(self, gateway, platinfo):
|
|
#self.write_line("%s instantiated gateway from spec %r" %(gateway.id, gateway.spec._spec))
|
|
d = {}
|
|
d['version'] = repr_pythonversion(platinfo.version_info)
|
|
d['id'] = gateway.id
|
|
d['spec'] = gateway.spec._spec
|
|
d['platform'] = platinfo.platform
|
|
if self.config.option.verbose:
|
|
d['extra'] = "- " + platinfo.executable
|
|
else:
|
|
d['extra'] = ""
|
|
d['cwd'] = platinfo.cwd
|
|
infoline = ("[%(id)s] %(spec)s -- platform %(platform)s, "
|
|
"Python %(version)s "
|
|
"cwd: %(cwd)s"
|
|
"%(extra)s" % d)
|
|
self.write_line(infoline)
|
|
self.gateway2info[gateway] = infoline
|
|
|
|
@optionalhook
|
|
def pytest_testnodeready(self, node):
|
|
self.write_line("[%s] txnode ready to receive tests" %(node.gateway.id,))
|
|
|
|
@optionalhook
|
|
def pytest_testnodedown(self, node, error):
|
|
if error:
|
|
self.write_line("[%s] node down, error: %s" %(node.gateway.id, error))
|
|
|
|
@optionalhook
|
|
def pytest_rescheduleitems(self, items):
|
|
if self.config.option.debug:
|
|
self.write_sep("!", "RESCHEDULING %s " %(items,))
|
|
|
|
@optionalhook
|
|
def pytest_looponfailinfo(self, failreports, rootdirs):
|
|
if failreports:
|
|
self.write_sep("#", "LOOPONFAILING", red=True)
|
|
for report in failreports:
|
|
loc = self._getcrashline(report)
|
|
if loc:
|
|
self.write_line(loc, red=True)
|
|
self.write_sep("#", "waiting for changes")
|
|
for rootdir in rootdirs:
|
|
self.write_line("### Watching: %s" %(rootdir,), bold=True)
|
|
|
|
|
|
def pytest_trace(self, category, msg):
|
|
if self.config.option.debug or \
|
|
self.config.option.traceconfig and category.find("config") != -1:
|
|
self.write_line("[%s] %s" %(category, msg))
|
|
|
|
def pytest_deselected(self, items):
|
|
self.stats.setdefault('deselected', []).append(items)
|
|
|
|
def pytest_itemstart(self, item, node=None):
|
|
if getattr(self.config.option, 'dist', 'no') != "no":
|
|
# for dist-testing situations itemstart means we
|
|
# queued the item for sending, not interesting (unless debugging)
|
|
if self.config.option.debug:
|
|
line = self._reportinfoline(item)
|
|
extra = ""
|
|
if node:
|
|
extra = "-> [%s]" % node.gateway.id
|
|
self.write_ensure_prefix(line, extra)
|
|
else:
|
|
if self.config.option.verbose:
|
|
line = self._reportinfoline(item)
|
|
self.write_ensure_prefix(line, "")
|
|
else:
|
|
# ensure that the path is printed before the
|
|
# 1st test of a module starts running
|
|
|
|
self.write_fspath_result(self._getfspath(item), "")
|
|
|
|
def pytest__teardown_final_logerror(self, report):
|
|
self.stats.setdefault("error", []).append(report)
|
|
|
|
def pytest_runtest_logreport(self, report):
|
|
rep = report
|
|
cat, letter, word = self.getcategoryletterword(rep)
|
|
if not letter and not word:
|
|
# probably passed setup/teardown
|
|
return
|
|
if isinstance(word, tuple):
|
|
word, markup = word
|
|
else:
|
|
markup = {}
|
|
self.stats.setdefault(cat, []).append(rep)
|
|
if not self.config.option.verbose:
|
|
self.write_fspath_result(self._getfspath(rep.item), letter)
|
|
else:
|
|
line = self._reportinfoline(rep.item)
|
|
if not hasattr(rep, 'node'):
|
|
self.write_ensure_prefix(line, word, **markup)
|
|
else:
|
|
self.ensure_newline()
|
|
if hasattr(rep, 'node'):
|
|
self._tw.write("[%s] " % rep.node.gateway.id)
|
|
self._tw.write(word, **markup)
|
|
self._tw.write(" " + line)
|
|
self.currentfspath = -2
|
|
|
|
def pytest_collectreport(self, report):
|
|
if not report.passed:
|
|
if report.failed:
|
|
self.stats.setdefault("error", []).append(report)
|
|
msg = report.longrepr.reprcrash.message
|
|
self.write_fspath_result(report.collector.fspath, "E")
|
|
elif report.skipped:
|
|
self.stats.setdefault("skipped", []).append(report)
|
|
self.write_fspath_result(report.collector.fspath, "S")
|
|
|
|
def pytest_sessionstart(self, session):
|
|
self.write_sep("=", "test session starts", bold=True)
|
|
self._sessionstarttime = py.std.time.time()
|
|
|
|
verinfo = ".".join(map(str, sys.version_info[:3]))
|
|
msg = "platform %s -- Python %s" % (sys.platform, verinfo)
|
|
msg += " -- pytest-%s" % (py.__version__)
|
|
if self.config.option.verbose or self.config.option.debug or getattr(self.config.option, 'pastebin', None):
|
|
msg += " -- " + str(sys.executable)
|
|
self.write_line(msg)
|
|
lines = self.config.hook.pytest_report_header(config=self.config)
|
|
lines.reverse()
|
|
for line in flatten(lines):
|
|
self.write_line(line)
|
|
for i, testarg in enumerate(self.config.args):
|
|
self.write_line("test object %d: %s" %(i+1, testarg))
|
|
|
|
def pytest_sessionfinish(self, exitstatus, __multicall__):
|
|
__multicall__.execute()
|
|
self._tw.line("")
|
|
if exitstatus in (0, 1, 2):
|
|
self.summary_errors()
|
|
self.summary_failures()
|
|
self.config.hook.pytest_terminal_summary(terminalreporter=self)
|
|
if exitstatus == 2:
|
|
self._report_keyboardinterrupt()
|
|
self.summary_deselected()
|
|
self.summary_stats()
|
|
|
|
def pytest_keyboard_interrupt(self, excinfo):
|
|
self._keyboardinterrupt_memo = excinfo.getrepr(funcargs=True)
|
|
|
|
def _report_keyboardinterrupt(self):
|
|
excrepr = self._keyboardinterrupt_memo
|
|
msg = excrepr.reprcrash.message
|
|
self.write_sep("!", msg)
|
|
if "KeyboardInterrupt" in msg:
|
|
if self.config.getvalue("fulltrace"):
|
|
excrepr.toterminal(self._tw)
|
|
else:
|
|
excrepr.reprcrash.toterminal(self._tw)
|
|
|
|
def _getcrashline(self, report):
|
|
try:
|
|
return report.longrepr.reprcrash
|
|
except AttributeError:
|
|
try:
|
|
return str(report.longrepr)[:50]
|
|
except AttributeError:
|
|
return ""
|
|
|
|
def _reportinfoline(self, item):
|
|
collect_fspath = self._getfspath(item)
|
|
fspath, lineno, msg = self._getreportinfo(item)
|
|
if fspath and fspath != collect_fspath:
|
|
fspath = "%s <- %s" % (
|
|
self.curdir.bestrelpath(collect_fspath),
|
|
self.curdir.bestrelpath(fspath))
|
|
elif fspath:
|
|
fspath = self.curdir.bestrelpath(fspath)
|
|
if lineno is not None:
|
|
lineno += 1
|
|
if fspath and lineno and msg:
|
|
line = "%(fspath)s:%(lineno)s: %(msg)s"
|
|
elif fspath and msg:
|
|
line = "%(fspath)s: %(msg)s"
|
|
elif fspath and lineno:
|
|
line = "%(fspath)s:%(lineno)s %(extrapath)s"
|
|
else:
|
|
line = "[noreportinfo]"
|
|
return line % locals() + " "
|
|
|
|
def _getfailureheadline(self, rep):
|
|
if hasattr(rep, "collector"):
|
|
return str(rep.collector.fspath)
|
|
elif hasattr(rep, 'item'):
|
|
fspath, lineno, msg = self._getreportinfo(rep.item)
|
|
return msg
|
|
else:
|
|
return "test session"
|
|
|
|
def _getreportinfo(self, item):
|
|
try:
|
|
return item.__reportinfo
|
|
except AttributeError:
|
|
pass
|
|
reportinfo = item.config.hook.pytest_report_iteminfo(item=item)
|
|
# cache on item
|
|
item.__reportinfo = reportinfo
|
|
return reportinfo
|
|
|
|
def _getfspath(self, item):
|
|
try:
|
|
return item.fspath
|
|
except AttributeError:
|
|
fspath, lineno, msg = self._getreportinfo(item)
|
|
return fspath
|
|
|
|
#
|
|
# summaries for sessionfinish
|
|
#
|
|
|
|
def summary_failures(self):
|
|
tbstyle = self.config.getvalue("tbstyle")
|
|
if 'failed' in self.stats and tbstyle != "no":
|
|
self.write_sep("=", "FAILURES")
|
|
for rep in self.stats['failed']:
|
|
if tbstyle == "line":
|
|
line = self._getcrashline(rep)
|
|
self.write_line(line)
|
|
else:
|
|
msg = self._getfailureheadline(rep)
|
|
self.write_sep("_", msg)
|
|
self.write_platinfo(rep)
|
|
rep.toterminal(self._tw)
|
|
|
|
def summary_errors(self):
|
|
if 'error' in self.stats and self.config.option.tbstyle != "no":
|
|
self.write_sep("=", "ERRORS")
|
|
for rep in self.stats['error']:
|
|
msg = self._getfailureheadline(rep)
|
|
if not hasattr(rep, 'when'):
|
|
# collect
|
|
msg = "ERROR during collection " + msg
|
|
elif rep.when == "setup":
|
|
msg = "ERROR at setup of " + msg
|
|
elif rep.when == "teardown":
|
|
msg = "ERROR at teardown of " + msg
|
|
self.write_sep("_", msg)
|
|
self.write_platinfo(rep)
|
|
rep.toterminal(self._tw)
|
|
|
|
def write_platinfo(self, rep):
|
|
if hasattr(rep, 'node'):
|
|
self.write_line(self.gateway2info.get(
|
|
rep.node.gateway,
|
|
"node %r (platinfo not found? strange)")
|
|
[:self._tw.fullwidth-1])
|
|
|
|
def summary_stats(self):
|
|
session_duration = py.std.time.time() - self._sessionstarttime
|
|
|
|
keys = "failed passed skipped deselected".split()
|
|
for key in self.stats.keys():
|
|
if key not in keys:
|
|
keys.append(key)
|
|
parts = []
|
|
for key in keys:
|
|
val = self.stats.get(key, None)
|
|
if val:
|
|
parts.append("%d %s" %(len(val), key))
|
|
line = ", ".join(parts)
|
|
# XXX coloring
|
|
self.write_sep("=", "%s in %.2f seconds" %(line, session_duration))
|
|
|
|
def summary_deselected(self):
|
|
if 'deselected' in self.stats:
|
|
self.write_sep("=", "%d tests deselected by %r" %(
|
|
len(self.stats['deselected']), self.config.option.keyword), bold=True)
|
|
|
|
|
|
class CollectonlyReporter:
|
|
INDENT = " "
|
|
|
|
def __init__(self, config, out=None):
|
|
self.config = config
|
|
if out is None:
|
|
out = py.std.sys.stdout
|
|
self.out = py.io.TerminalWriter(out)
|
|
self.indent = ""
|
|
self._failed = []
|
|
|
|
def outindent(self, line):
|
|
self.out.line(self.indent + str(line))
|
|
|
|
def pytest_internalerror(self, excrepr):
|
|
for line in str(excrepr).split("\n"):
|
|
self.out.line("INTERNALERROR> " + line)
|
|
|
|
def pytest_collectstart(self, collector):
|
|
self.outindent(collector)
|
|
self.indent += self.INDENT
|
|
|
|
def pytest_itemstart(self, item, node=None):
|
|
self.outindent(item)
|
|
|
|
def pytest_collectreport(self, report):
|
|
if not report.passed:
|
|
self.outindent("!!! %s !!!" % report.longrepr.reprcrash.message)
|
|
self._failed.append(report)
|
|
self.indent = self.indent[:-len(self.INDENT)]
|
|
|
|
def pytest_sessionfinish(self, session, exitstatus):
|
|
if self._failed:
|
|
self.out.sep("!", "collection failures")
|
|
for rep in self._failed:
|
|
rep.toterminal(self.out)
|
|
|
|
|
|
def repr_pythonversion(v=None):
|
|
if v is None:
|
|
v = sys.version_info
|
|
try:
|
|
return "%s.%s.%s-%s-%s" % v
|
|
except (TypeError, ValueError):
|
|
return str(v)
|
|
|
|
def flatten(l):
|
|
for x in l:
|
|
if isinstance(x, (list, tuple)):
|
|
for y in flatten(x):
|
|
yield y
|
|
else:
|
|
yield x
|
|
|
|
from py._test.session import Session
|
|
class ShowFuncargSession(Session):
|
|
def main(self, colitems):
|
|
self.fspath = py.path.local()
|
|
self.sessionstarts()
|
|
try:
|
|
self.showargs(colitems[0])
|
|
finally:
|
|
self.sessionfinishes(exitstatus=1)
|
|
|
|
def showargs(self, colitem):
|
|
tw = py.io.TerminalWriter()
|
|
from py._test.funcargs import getplugins
|
|
from py._test.funcargs import FuncargRequest
|
|
plugins = getplugins(colitem, withpy=True)
|
|
verbose = self.config.getvalue("verbose")
|
|
for plugin in plugins:
|
|
available = []
|
|
for name, factory in vars(plugin).items():
|
|
if name.startswith(FuncargRequest._argprefix):
|
|
name = name[len(FuncargRequest._argprefix):]
|
|
if name not in available:
|
|
available.append([name, factory])
|
|
if available:
|
|
pluginname = plugin.__name__
|
|
for name, factory in available:
|
|
loc = self.getlocation(factory)
|
|
if verbose:
|
|
funcargspec = "%s -- %s" %(name, loc,)
|
|
else:
|
|
funcargspec = name
|
|
tw.line(funcargspec, green=True)
|
|
doc = factory.__doc__ or ""
|
|
if doc:
|
|
for line in doc.split("\n"):
|
|
tw.line(" " + line.strip())
|
|
else:
|
|
tw.line(" %s: no docstring available" %(loc,),
|
|
red=True)
|
|
|
|
def getlocation(self, function):
|
|
import inspect
|
|
fn = py.path.local(inspect.getfile(function))
|
|
lineno = py.builtin._getcode(function).co_firstlineno
|
|
if fn.relto(self.fspath):
|
|
fn = fn.relto(self.fspath)
|
|
return "%s:%d" %(fn, lineno+1)
|