485 lines
16 KiB
Python
485 lines
16 KiB
Python
""" basic collect and runtest protocol implementations """
|
|
|
|
import py
|
|
import pytest
|
|
import sys
|
|
from time import time
|
|
from py._code.code import TerminalRepr
|
|
|
|
def pytest_namespace():
|
|
return {
|
|
'fail' : fail,
|
|
'skip' : skip,
|
|
'importorskip' : importorskip,
|
|
'exit' : exit,
|
|
}
|
|
|
|
#
|
|
# pytest plugin hooks
|
|
|
|
def pytest_addoption(parser):
|
|
group = parser.getgroup("terminal reporting", "reporting", after="general")
|
|
group.addoption('--durations',
|
|
action="store", type=int, default=None, metavar="N",
|
|
help="show N slowest setup/test durations (N=0 for all)."),
|
|
|
|
def pytest_terminal_summary(terminalreporter):
|
|
durations = terminalreporter.config.option.durations
|
|
if durations is None:
|
|
return
|
|
tr = terminalreporter
|
|
dlist = []
|
|
for replist in tr.stats.values():
|
|
for rep in replist:
|
|
if hasattr(rep, 'duration'):
|
|
dlist.append(rep)
|
|
if not dlist:
|
|
return
|
|
dlist.sort(key=lambda x: x.duration)
|
|
dlist.reverse()
|
|
if not durations:
|
|
tr.write_sep("=", "slowest test durations")
|
|
else:
|
|
tr.write_sep("=", "slowest %s test durations" % durations)
|
|
dlist = dlist[:durations]
|
|
|
|
for rep in dlist:
|
|
nodeid = rep.nodeid.replace("::()::", "::")
|
|
tr.write_line("%02.2fs %-8s %s" %
|
|
(rep.duration, rep.when, nodeid))
|
|
|
|
def pytest_sessionstart(session):
|
|
session._setupstate = SetupState()
|
|
def pytest_sessionfinish(session):
|
|
session._setupstate.teardown_all()
|
|
|
|
class NodeInfo:
|
|
def __init__(self, location):
|
|
self.location = location
|
|
|
|
def pytest_runtest_protocol(item, nextitem):
|
|
item.ihook.pytest_runtest_logstart(
|
|
nodeid=item.nodeid, location=item.location,
|
|
)
|
|
runtestprotocol(item, nextitem=nextitem)
|
|
return True
|
|
|
|
def runtestprotocol(item, log=True, nextitem=None):
|
|
hasrequest = hasattr(item, "_request")
|
|
if hasrequest and not item._request:
|
|
item._initrequest()
|
|
rep = call_and_report(item, "setup", log)
|
|
reports = [rep]
|
|
if rep.passed:
|
|
reports.append(call_and_report(item, "call", log))
|
|
reports.append(call_and_report(item, "teardown", log,
|
|
nextitem=nextitem))
|
|
# after all teardown hooks have been called
|
|
# want funcargs and request info to go away
|
|
if hasrequest:
|
|
item._request = False
|
|
item.funcargs = None
|
|
return reports
|
|
|
|
def pytest_runtest_setup(item):
|
|
item.session._setupstate.prepare(item)
|
|
|
|
def pytest_runtest_call(item):
|
|
item.runtest()
|
|
|
|
def pytest_runtest_teardown(item, nextitem):
|
|
item.session._setupstate.teardown_exact(item, nextitem)
|
|
|
|
def pytest_report_teststatus(report):
|
|
if report.when in ("setup", "teardown"):
|
|
if report.failed:
|
|
# category, shortletter, verbose-word
|
|
return "error", "E", "ERROR"
|
|
elif report.skipped:
|
|
return "skipped", "s", "SKIPPED"
|
|
else:
|
|
return "", "", ""
|
|
|
|
|
|
#
|
|
# Implementation
|
|
|
|
def call_and_report(item, when, log=True, **kwds):
|
|
call = call_runtest_hook(item, when, **kwds)
|
|
hook = item.ihook
|
|
report = hook.pytest_runtest_makereport(item=item, call=call)
|
|
if log:
|
|
hook.pytest_runtest_logreport(report=report)
|
|
if check_interactive_exception(call, report):
|
|
hook.pytest_exception_interact(node=item, call=call, report=report)
|
|
return report
|
|
|
|
def check_interactive_exception(call, report):
|
|
return call.excinfo and not (
|
|
hasattr(report, "wasxfail") or
|
|
call.excinfo.errisinstance(skip.Exception) or
|
|
call.excinfo.errisinstance(py.std.bdb.BdbQuit))
|
|
|
|
def call_runtest_hook(item, when, **kwds):
|
|
hookname = "pytest_runtest_" + when
|
|
ihook = getattr(item.ihook, hookname)
|
|
return CallInfo(lambda: ihook(item=item, **kwds), when=when)
|
|
|
|
class CallInfo:
|
|
""" Result/Exception info a function invocation. """
|
|
#: None or ExceptionInfo object.
|
|
excinfo = None
|
|
def __init__(self, func, when):
|
|
#: context of invocation: one of "setup", "call",
|
|
#: "teardown", "memocollect"
|
|
self.when = when
|
|
self.start = time()
|
|
try:
|
|
try:
|
|
self.result = func()
|
|
except KeyboardInterrupt:
|
|
raise
|
|
except:
|
|
self.excinfo = py.code.ExceptionInfo()
|
|
finally:
|
|
self.stop = time()
|
|
|
|
def __repr__(self):
|
|
if self.excinfo:
|
|
status = "exception: %s" % str(self.excinfo.value)
|
|
else:
|
|
status = "result: %r" % (self.result,)
|
|
return "<CallInfo when=%r %s>" % (self.when, status)
|
|
|
|
def getslaveinfoline(node):
|
|
try:
|
|
return node._slaveinfocache
|
|
except AttributeError:
|
|
d = node.slaveinfo
|
|
ver = "%s.%s.%s" % d['version_info'][:3]
|
|
node._slaveinfocache = s = "[%s] %s -- Python %s %s" % (
|
|
d['id'], d['sysplatform'], ver, d['executable'])
|
|
return s
|
|
|
|
class BaseReport(object):
|
|
|
|
def __init__(self, **kw):
|
|
self.__dict__.update(kw)
|
|
|
|
def toterminal(self, out):
|
|
longrepr = self.longrepr
|
|
if hasattr(self, 'node'):
|
|
out.line(getslaveinfoline(self.node))
|
|
if hasattr(longrepr, 'toterminal'):
|
|
longrepr.toterminal(out)
|
|
else:
|
|
try:
|
|
out.line(longrepr)
|
|
except UnicodeEncodeError:
|
|
out.line("<unprintable longrepr>")
|
|
|
|
passed = property(lambda x: x.outcome == "passed")
|
|
failed = property(lambda x: x.outcome == "failed")
|
|
skipped = property(lambda x: x.outcome == "skipped")
|
|
|
|
@property
|
|
def fspath(self):
|
|
return self.nodeid.split("::")[0]
|
|
|
|
def pytest_runtest_makereport(item, call):
|
|
when = call.when
|
|
duration = call.stop-call.start
|
|
keywords = dict([(x,1) for x in item.keywords])
|
|
excinfo = call.excinfo
|
|
if not call.excinfo:
|
|
outcome = "passed"
|
|
longrepr = None
|
|
else:
|
|
if not isinstance(excinfo, py.code.ExceptionInfo):
|
|
outcome = "failed"
|
|
longrepr = excinfo
|
|
elif excinfo.errisinstance(pytest.skip.Exception):
|
|
outcome = "skipped"
|
|
r = excinfo._getreprcrash()
|
|
longrepr = (str(r.path), r.lineno, r.message)
|
|
else:
|
|
outcome = "failed"
|
|
if call.when == "call":
|
|
longrepr = item.repr_failure(excinfo)
|
|
else: # exception in setup or teardown
|
|
longrepr = item._repr_failure_py(excinfo,
|
|
style=item.config.option.tbstyle)
|
|
return TestReport(item.nodeid, item.location,
|
|
keywords, outcome, longrepr, when,
|
|
duration=duration)
|
|
|
|
class TestReport(BaseReport):
|
|
""" Basic test report object (also used for setup and teardown calls if
|
|
they fail).
|
|
"""
|
|
def __init__(self, nodeid, location,
|
|
keywords, outcome, longrepr, when, sections=(), duration=0, **extra):
|
|
#: normalized collection node id
|
|
self.nodeid = nodeid
|
|
|
|
#: a (filesystempath, lineno, domaininfo) tuple indicating the
|
|
#: actual location of a test item - it might be different from the
|
|
#: collected one e.g. if a method is inherited from a different module.
|
|
self.location = location
|
|
|
|
#: a name -> value dictionary containing all keywords and
|
|
#: markers associated with a test invocation.
|
|
self.keywords = keywords
|
|
|
|
#: test outcome, always one of "passed", "failed", "skipped".
|
|
self.outcome = outcome
|
|
|
|
#: None or a failure representation.
|
|
self.longrepr = longrepr
|
|
|
|
#: one of 'setup', 'call', 'teardown' to indicate runtest phase.
|
|
self.when = when
|
|
|
|
#: list of (secname, data) extra information which needs to
|
|
#: marshallable
|
|
self.sections = list(sections)
|
|
|
|
#: time it took to run just the test
|
|
self.duration = duration
|
|
|
|
self.__dict__.update(extra)
|
|
|
|
def __repr__(self):
|
|
return "<TestReport %r when=%r outcome=%r>" % (
|
|
self.nodeid, self.when, self.outcome)
|
|
|
|
class TeardownErrorReport(BaseReport):
|
|
outcome = "failed"
|
|
when = "teardown"
|
|
def __init__(self, longrepr, **extra):
|
|
self.longrepr = longrepr
|
|
self.sections = []
|
|
self.__dict__.update(extra)
|
|
|
|
def pytest_make_collect_report(collector):
|
|
call = CallInfo(collector._memocollect, "memocollect")
|
|
longrepr = None
|
|
if not call.excinfo:
|
|
outcome = "passed"
|
|
else:
|
|
if call.excinfo.errisinstance(collector.skip_exceptions):
|
|
outcome = "skipped"
|
|
r = collector._repr_failure_py(call.excinfo, "line").reprcrash
|
|
longrepr = (str(r.path), r.lineno, r.message)
|
|
else:
|
|
outcome = "failed"
|
|
errorinfo = collector.repr_failure(call.excinfo)
|
|
if not hasattr(errorinfo, "toterminal"):
|
|
errorinfo = CollectErrorRepr(errorinfo)
|
|
longrepr = errorinfo
|
|
rep = CollectReport(collector.nodeid, outcome, longrepr,
|
|
getattr(call, 'result', None))
|
|
rep.call = call # see collect_one_node
|
|
return rep
|
|
|
|
|
|
class CollectReport(BaseReport):
|
|
def __init__(self, nodeid, outcome, longrepr, result, sections=(), **extra):
|
|
self.nodeid = nodeid
|
|
self.outcome = outcome
|
|
self.longrepr = longrepr
|
|
self.result = result or []
|
|
self.sections = list(sections)
|
|
self.__dict__.update(extra)
|
|
|
|
@property
|
|
def location(self):
|
|
return (self.fspath, None, self.fspath)
|
|
|
|
def __repr__(self):
|
|
return "<CollectReport %r lenresult=%s outcome=%r>" % (
|
|
self.nodeid, len(self.result), self.outcome)
|
|
|
|
class CollectErrorRepr(TerminalRepr):
|
|
def __init__(self, msg):
|
|
self.longrepr = msg
|
|
def toterminal(self, out):
|
|
out.line(self.longrepr, red=True)
|
|
|
|
class SetupState(object):
|
|
""" shared state for setting up/tearing down test items or collectors. """
|
|
def __init__(self):
|
|
self.stack = []
|
|
self._finalizers = {}
|
|
|
|
def addfinalizer(self, finalizer, colitem):
|
|
""" attach a finalizer to the given colitem.
|
|
if colitem is None, this will add a finalizer that
|
|
is called at the end of teardown_all().
|
|
if colitem is a tuple, it will be used as a key
|
|
and needs an explicit call to _callfinalizers(key) later on.
|
|
"""
|
|
assert hasattr(finalizer, '__call__')
|
|
#assert colitem in self.stack
|
|
self._finalizers.setdefault(colitem, []).append(finalizer)
|
|
|
|
def _pop_and_teardown(self):
|
|
colitem = self.stack.pop()
|
|
self._teardown_with_finalization(colitem)
|
|
|
|
def _callfinalizers(self, colitem):
|
|
finalizers = self._finalizers.pop(colitem, None)
|
|
exc = None
|
|
while finalizers:
|
|
fin = finalizers.pop()
|
|
try:
|
|
fin()
|
|
except Exception:
|
|
# XXX Only first exception will be seen by user,
|
|
# ideally all should be reported.
|
|
if exc is None:
|
|
exc = sys.exc_info()
|
|
if exc:
|
|
py.builtin._reraise(*exc)
|
|
|
|
def _teardown_with_finalization(self, colitem):
|
|
self._callfinalizers(colitem)
|
|
if hasattr(colitem, "teardown"):
|
|
colitem.teardown()
|
|
for colitem in self._finalizers:
|
|
assert colitem is None or colitem in self.stack \
|
|
or isinstance(colitem, tuple)
|
|
|
|
def teardown_all(self):
|
|
while self.stack:
|
|
self._pop_and_teardown()
|
|
for key in list(self._finalizers):
|
|
self._teardown_with_finalization(key)
|
|
assert not self._finalizers
|
|
|
|
def teardown_exact(self, item, nextitem):
|
|
needed_collectors = nextitem and nextitem.listchain() or []
|
|
self._teardown_towards(needed_collectors)
|
|
|
|
def _teardown_towards(self, needed_collectors):
|
|
while self.stack:
|
|
if self.stack == needed_collectors[:len(self.stack)]:
|
|
break
|
|
self._pop_and_teardown()
|
|
|
|
def prepare(self, colitem):
|
|
""" setup objects along the collector chain to the test-method
|
|
and teardown previously setup objects."""
|
|
needed_collectors = colitem.listchain()
|
|
self._teardown_towards(needed_collectors)
|
|
|
|
# check if the last collection node has raised an error
|
|
for col in self.stack:
|
|
if hasattr(col, '_prepare_exc'):
|
|
py.builtin._reraise(*col._prepare_exc)
|
|
for col in needed_collectors[len(self.stack):]:
|
|
self.stack.append(col)
|
|
try:
|
|
col.setup()
|
|
except Exception:
|
|
col._prepare_exc = sys.exc_info()
|
|
raise
|
|
|
|
def collect_one_node(collector):
|
|
ihook = collector.ihook
|
|
ihook.pytest_collectstart(collector=collector)
|
|
rep = ihook.pytest_make_collect_report(collector=collector)
|
|
call = rep.__dict__.pop("call", None)
|
|
if call and check_interactive_exception(call, rep):
|
|
ihook.pytest_exception_interact(node=collector, call=call, report=rep)
|
|
return rep
|
|
|
|
|
|
# =============================================================
|
|
# Test OutcomeExceptions and helpers for creating them.
|
|
|
|
|
|
class OutcomeException(Exception):
|
|
""" OutcomeException and its subclass instances indicate and
|
|
contain info about test and collection outcomes.
|
|
"""
|
|
def __init__(self, msg=None, pytrace=True):
|
|
Exception.__init__(self, msg)
|
|
self.msg = msg
|
|
self.pytrace = pytrace
|
|
|
|
def __repr__(self):
|
|
if self.msg:
|
|
return str(self.msg)
|
|
return "<%s instance>" %(self.__class__.__name__,)
|
|
__str__ = __repr__
|
|
|
|
class Skipped(OutcomeException):
|
|
# XXX hackish: on 3k we fake to live in the builtins
|
|
# in order to have Skipped exception printing shorter/nicer
|
|
__module__ = 'builtins'
|
|
|
|
class Failed(OutcomeException):
|
|
""" raised from an explicit call to pytest.fail() """
|
|
__module__ = 'builtins'
|
|
|
|
class Exit(KeyboardInterrupt):
|
|
""" raised for immediate program exits (no tracebacks/summaries)"""
|
|
def __init__(self, msg="unknown reason"):
|
|
self.msg = msg
|
|
KeyboardInterrupt.__init__(self, msg)
|
|
|
|
# exposed helper methods
|
|
|
|
def exit(msg):
|
|
""" exit testing process as if KeyboardInterrupt was triggered. """
|
|
__tracebackhide__ = True
|
|
raise Exit(msg)
|
|
|
|
exit.Exception = Exit
|
|
|
|
def skip(msg=""):
|
|
""" skip an executing test with the given message. Note: it's usually
|
|
better to use the pytest.mark.skipif marker to declare a test to be
|
|
skipped under certain conditions like mismatching platforms or
|
|
dependencies. See the pytest_skipping plugin for details.
|
|
"""
|
|
__tracebackhide__ = True
|
|
raise Skipped(msg=msg)
|
|
skip.Exception = Skipped
|
|
|
|
def fail(msg="", pytrace=True):
|
|
""" explicitely fail an currently-executing test with the given Message.
|
|
|
|
:arg pytrace: if false the msg represents the full failure information
|
|
and no python traceback will be reported.
|
|
"""
|
|
__tracebackhide__ = True
|
|
raise Failed(msg=msg, pytrace=pytrace)
|
|
fail.Exception = Failed
|
|
|
|
|
|
def importorskip(modname, minversion=None):
|
|
""" return imported module if it has at least "minversion" as its
|
|
__version__ attribute. If no minversion is specified the a skip
|
|
is only triggered if the module can not be imported.
|
|
Note that version comparison only works with simple version strings
|
|
like "1.2.3" but not "1.2.3.dev1" or others.
|
|
"""
|
|
__tracebackhide__ = True
|
|
compile(modname, '', 'eval') # to catch syntaxerrors
|
|
try:
|
|
__import__(modname)
|
|
except ImportError:
|
|
skip("could not import %r" %(modname,))
|
|
mod = sys.modules[modname]
|
|
if minversion is None:
|
|
return mod
|
|
verattr = getattr(mod, '__version__', None)
|
|
def intver(verstring):
|
|
return [int(x) for x in verstring.split(".")]
|
|
if verattr is None or intver(verattr) < intver(minversion):
|
|
skip("module %r has __version__ %r, required is: %r" %(
|
|
modname, verattr, minversion))
|
|
return mod
|