remove all occurences of "__multicall__" on hook impls in pytest/*.

also simplify pytest_runtest_markereport hook in _pytest/skipping.py
while touching the code anyway.
This commit is contained in:
holger krekel 2014-10-08 20:23:40 +02:00
parent f5f924d293
commit 0253f7b8d5
11 changed files with 138 additions and 135 deletions

View File

@ -29,8 +29,8 @@ def pytest_addoption(parser):
help="shortcut for --capture=no.") help="shortcut for --capture=no.")
@pytest.mark.tryfirst @pytest.mark.hookwrapper
def pytest_load_initial_conftests(early_config, parser, args, __multicall__): def pytest_load_initial_conftests(early_config, parser, args):
ns = early_config.known_args_namespace ns = early_config.known_args_namespace
pluginmanager = early_config.pluginmanager pluginmanager = early_config.pluginmanager
capman = CaptureManager(ns.capture) capman = CaptureManager(ns.capture)
@ -47,15 +47,11 @@ def pytest_load_initial_conftests(early_config, parser, args, __multicall__):
# finally trigger conftest loading but while capturing (issue93) # finally trigger conftest loading but while capturing (issue93)
capman.init_capturings() capman.init_capturings()
try: outcome = yield
try: out, err = capman.suspendcapture()
return __multicall__.execute() if outcome.excinfo is not None:
finally:
out, err = capman.suspendcapture()
except:
sys.stdout.write(out) sys.stdout.write(out)
sys.stderr.write(err) sys.stderr.write(err)
raise
class CaptureManager: class CaptureManager:
@ -105,20 +101,19 @@ class CaptureManager:
if capfuncarg is not None: if capfuncarg is not None:
capfuncarg.close() capfuncarg.close()
@pytest.mark.tryfirst @pytest.mark.hookwrapper
def pytest_make_collect_report(self, __multicall__, collector): def pytest_make_collect_report(self, collector):
if not isinstance(collector, pytest.File): if isinstance(collector, pytest.File):
return self.resumecapture()
self.resumecapture() outcome = yield
try:
rep = __multicall__.execute()
finally:
out, err = self.suspendcapture() out, err = self.suspendcapture()
if out: rep = outcome.get_result()
rep.sections.append(("Captured stdout", out)) if out:
if err: rep.sections.append(("Captured stdout", out))
rep.sections.append(("Captured stderr", err)) if err:
return rep rep.sections.append(("Captured stderr", err))
else:
yield
@pytest.mark.hookwrapper @pytest.mark.hookwrapper
def pytest_runtest_setup(self, item): def pytest_runtest_setup(self, item):

View File

@ -95,7 +95,10 @@ def wrapped_call(wrap_controller, func):
will trigger calling the function and receive an according CallOutcome will trigger calling the function and receive an according CallOutcome
object representing an exception or a result. object representing an exception or a result.
""" """
next(wrap_controller) # first yield try:
next(wrap_controller) # first yield
except StopIteration:
return
call_outcome = CallOutcome(func) call_outcome = CallOutcome(func)
try: try:
wrap_controller.send(call_outcome) wrap_controller.send(call_outcome)
@ -104,13 +107,7 @@ def wrapped_call(wrap_controller, func):
(co.co_name, co.co_filename, co.co_firstlineno)) (co.co_name, co.co_filename, co.co_firstlineno))
except StopIteration: except StopIteration:
pass pass
if call_outcome.excinfo is None: return call_outcome.get_result()
return call_outcome.result
else:
ex = call_outcome.excinfo
if py3:
raise ex[1].with_traceback(ex[2])
py.builtin._reraise(*ex)
class CallOutcome: class CallOutcome:
@ -125,6 +122,15 @@ class CallOutcome:
self.result = result self.result = result
self.excinfo = None self.excinfo = None
def get_result(self):
if self.excinfo is None:
return self.result
else:
ex = self.excinfo
if py3:
raise ex[1].with_traceback(ex[2])
py.builtin._reraise(*ex)
class PluginManager(object): class PluginManager(object):
def __init__(self, hookspecs=None, prefix="pytest_"): def __init__(self, hookspecs=None, prefix="pytest_"):

View File

@ -22,18 +22,21 @@ def pytest_addoption(parser):
help="store internal tracing debug information in 'pytestdebug.log'.") help="store internal tracing debug information in 'pytestdebug.log'.")
def pytest_cmdline_parse(__multicall__): @pytest.mark.hookwrapper
config = __multicall__.execute() def pytest_cmdline_parse():
outcome = yield
config = outcome.get_result()
if config.option.debug: if config.option.debug:
path = os.path.abspath("pytestdebug.log") path = os.path.abspath("pytestdebug.log")
f = open(path, 'w') f = open(path, 'w')
config._debugfile = f config._debugfile = f
f.write("versions pytest-%s, py-%s, python-%s\ncwd=%s\nargs=%s\n\n" %( f.write("versions pytest-%s, py-%s, "
pytest.__version__, py.__version__, ".".join(map(str, sys.version_info)), "python-%s\ncwd=%s\nargs=%s\n\n" %(
pytest.__version__, py.__version__,
".".join(map(str, sys.version_info)),
os.getcwd(), config._origargs)) os.getcwd(), config._origargs))
config.pluginmanager.set_tracing(f.write) config.pluginmanager.set_tracing(f.write)
sys.stderr.write("writing pytestdebug information to %s\n" % path) sys.stderr.write("writing pytestdebug information to %s\n" % path)
return config
@pytest.mark.trylast @pytest.mark.trylast
def pytest_unconfigure(config): def pytest_unconfigure(config):

View File

@ -16,7 +16,7 @@ def get_skip_exceptions():
return tuple(skip_classes) return tuple(skip_classes)
def pytest_runtest_makereport(__multicall__, item, call): def pytest_runtest_makereport(item, call):
if call.excinfo and call.excinfo.errisinstance(get_skip_exceptions()): if call.excinfo and call.excinfo.errisinstance(get_skip_exceptions()):
# let's substitute the excinfo with a pytest.skip one # let's substitute the excinfo with a pytest.skip one
call2 = call.__class__(lambda: call2 = call.__class__(lambda:

View File

@ -1,5 +1,7 @@
""" submit failure or test session information to a pastebin service. """ """ submit failure or test session information to a pastebin service. """
import pytest
import py, sys import py, sys
import tempfile
class url: class url:
base = "http://bpaste.net" base = "http://bpaste.net"
@ -13,9 +15,8 @@ def pytest_addoption(parser):
choices=['failed', 'all'], choices=['failed', 'all'],
help="send failed|all info to bpaste.net pastebin service.") help="send failed|all info to bpaste.net pastebin service.")
def pytest_configure(__multicall__, config): @pytest.mark.trylast
import tempfile def pytest_configure(config):
__multicall__.execute()
if config.option.pastebin == "all": if config.option.pastebin == "all":
config._pastebinfile = tempfile.TemporaryFile('w+') config._pastebinfile = tempfile.TemporaryFile('w+')
tr = config.pluginmanager.getplugin('terminalreporter') tr = config.pluginmanager.getplugin('terminalreporter')

View File

@ -183,17 +183,18 @@ def pytestconfig(request):
return request.config return request.config
def pytest_pyfunc_call(__multicall__, pyfuncitem): @pytest.mark.trylast
if not __multicall__.execute(): def pytest_pyfunc_call(pyfuncitem):
testfunction = pyfuncitem.obj testfunction = pyfuncitem.obj
if pyfuncitem._isyieldedfunction(): if pyfuncitem._isyieldedfunction():
testfunction(*pyfuncitem._args) testfunction(*pyfuncitem._args)
else: else:
funcargs = pyfuncitem.funcargs funcargs = pyfuncitem.funcargs
testargs = {} testargs = {}
for arg in pyfuncitem._fixtureinfo.argnames: for arg in pyfuncitem._fixtureinfo.argnames:
testargs[arg] = funcargs[arg] testargs[arg] = funcargs[arg]
testfunction(**testargs) testfunction(**testargs)
return True
def pytest_collect_file(path, parent): def pytest_collect_file(path, parent):
ext = path.ext ext = path.ext
@ -210,30 +211,31 @@ def pytest_collect_file(path, parent):
def pytest_pycollect_makemodule(path, parent): def pytest_pycollect_makemodule(path, parent):
return Module(path, parent) return Module(path, parent)
def pytest_pycollect_makeitem(__multicall__, collector, name, obj): @pytest.mark.hookwrapper
res = __multicall__.execute() def pytest_pycollect_makeitem(collector, name, obj):
outcome = yield
res = outcome.get_result()
if res is not None: if res is not None:
return res raise StopIteration
# nothing was collected elsewhere, let's do it here
if isclass(obj): if isclass(obj):
#if hasattr(collector.obj, 'unittest'):
# return # we assume it's a mixin class for a TestCase derived one
if collector.classnamefilter(name): if collector.classnamefilter(name):
Class = collector._getcustomclass("Class") Class = collector._getcustomclass("Class")
return Class(name, parent=collector) outcome.force_result(Class(name, parent=collector))
elif collector.funcnamefilter(name) and hasattr(obj, "__call__") and \ elif collector.funcnamefilter(name) and hasattr(obj, "__call__") and\
getfixturemarker(obj) is None: getfixturemarker(obj) is None:
# mock seems to store unbound methods (issue473), let's normalize it # mock seems to store unbound methods (issue473), normalize it
obj = getattr(obj, "__func__", obj) obj = getattr(obj, "__func__", obj)
if not isfunction(obj): if not isfunction(obj):
collector.warn(code="C2", message= collector.warn(code="C2", message=
"cannot collect %r because it is not a function." "cannot collect %r because it is not a function."
% name, ) % name, )
return
if getattr(obj, "__test__", True): if getattr(obj, "__test__", True):
if is_generator(obj): if is_generator(obj):
return Generator(name, parent=collector) res = Generator(name, parent=collector)
else: else:
return list(collector._genfunctions(name, obj)) res = list(collector._genfunctions(name, obj))
outcome.force_result(res)
def is_generator(func): def is_generator(func):
try: try:

View File

@ -57,7 +57,7 @@ class MarkEvaluator:
@property @property
def holder(self): def holder(self):
return self.item.keywords.get(self.name, None) return self.item.keywords.get(self.name)
def __bool__(self): def __bool__(self):
return bool(self.holder) return bool(self.holder)
@ -75,9 +75,7 @@ class MarkEvaluator:
def istrue(self): def istrue(self):
try: try:
return self._istrue() return self._istrue()
except KeyboardInterrupt: except Exception:
raise
except:
self.exc = sys.exc_info() self.exc = sys.exc_info()
if isinstance(self.exc[1], SyntaxError): if isinstance(self.exc[1], SyntaxError):
msg = [" " * (self.exc[1].offset + 4) + "^",] msg = [" " * (self.exc[1].offset + 4) + "^",]
@ -153,44 +151,32 @@ def check_xfail_no_run(item):
if not evalxfail.get('run', True): if not evalxfail.get('run', True):
pytest.xfail("[NOTRUN] " + evalxfail.getexplanation()) pytest.xfail("[NOTRUN] " + evalxfail.getexplanation())
def pytest_runtest_makereport(__multicall__, item, call): @pytest.mark.hookwrapper
def pytest_runtest_makereport(item, call):
outcome = yield
rep = outcome.get_result()
evalxfail = getattr(item, '_evalxfail', None)
# unitttest special case, see setting of _unexpectedsuccess # unitttest special case, see setting of _unexpectedsuccess
if hasattr(item, '_unexpectedsuccess'): if hasattr(item, '_unexpectedsuccess') and rep.when == "call":
rep = __multicall__.execute() # we need to translate into how pytest encodes xpass
if rep.when == "call": rep.wasxfail = "reason: " + repr(item._unexpectedsuccess)
# we need to translate into how pytest encodes xpass rep.outcome = "failed"
rep.wasxfail = "reason: " + repr(item._unexpectedsuccess) elif item.config.option.runxfail:
rep.outcome = "failed" pass # don't interefere
return rep elif call.excinfo and call.excinfo.errisinstance(pytest.xfail.Exception):
if not (call.excinfo and rep.wasxfail = "reason: " + call.excinfo.value.msg
call.excinfo.errisinstance(pytest.xfail.Exception)): rep.outcome = "skipped"
evalxfail = getattr(item, '_evalxfail', None) elif evalxfail and not rep.skipped and evalxfail.wasvalid() and \
if not evalxfail: evalxfail.istrue():
return if call.excinfo:
if call.excinfo and call.excinfo.errisinstance(pytest.xfail.Exception): if evalxfail.invalidraise(call.excinfo.value):
if not item.config.getvalue("runxfail"): rep.outcome = "failed"
rep = __multicall__.execute() else:
rep.wasxfail = "reason: " + call.excinfo.value.msg rep.outcome = "skipped"
rep.outcome = "skipped"
return rep
rep = __multicall__.execute()
evalxfail = item._evalxfail
if not rep.skipped:
if not item.config.option.runxfail:
if evalxfail.wasvalid() and evalxfail.istrue():
if call.excinfo:
if evalxfail.invalidraise(call.excinfo.value):
rep.outcome = "failed"
return rep
else:
rep.outcome = "skipped"
elif call.when == "call":
rep.outcome = "failed"
else:
return rep
rep.wasxfail = evalxfail.getexplanation() rep.wasxfail = evalxfail.getexplanation()
return rep elif call.when == "call":
return rep rep.outcome = "failed" # xpass outcome
rep.wasxfail = evalxfail.getexplanation()
# called by terminalreporter progress reporting # called by terminalreporter progress reporting
def pytest_report_teststatus(report): def pytest_report_teststatus(report):

View File

@ -345,8 +345,10 @@ class TerminalReporter:
indent = (len(stack) - 1) * " " indent = (len(stack) - 1) * " "
self._tw.line("%s%s" % (indent, col)) self._tw.line("%s%s" % (indent, col))
def pytest_sessionfinish(self, exitstatus, __multicall__): @pytest.mark.hookwrapper
__multicall__.execute() def pytest_sessionfinish(self, exitstatus):
outcome = yield
outcome.get_result()
self._tw.line("") self._tw.line("")
if exitstatus in (0, 1, 2, 4): if exitstatus in (0, 1, 2, 4):
self.summary_errors() self.summary_errors()

View File

@ -151,30 +151,33 @@ def pytest_runtest_makereport(item, call):
pass pass
# twisted trial support # twisted trial support
def pytest_runtest_protocol(item, __multicall__):
if isinstance(item, TestCaseFunction): @pytest.mark.hookwrapper
if 'twisted.trial.unittest' in sys.modules: def pytest_runtest_protocol(item):
ut = sys.modules['twisted.python.failure'] if isinstance(item, TestCaseFunction) and \
Failure__init__ = ut.Failure.__init__ 'twisted.trial.unittest' in sys.modules:
check_testcase_implements_trial_reporter() ut = sys.modules['twisted.python.failure']
def excstore(self, exc_value=None, exc_type=None, exc_tb=None, Failure__init__ = ut.Failure.__init__
captureVars=None): check_testcase_implements_trial_reporter()
if exc_value is None: def excstore(self, exc_value=None, exc_type=None, exc_tb=None,
self._rawexcinfo = sys.exc_info() captureVars=None):
else: if exc_value is None:
if exc_type is None: self._rawexcinfo = sys.exc_info()
exc_type = type(exc_value) else:
self._rawexcinfo = (exc_type, exc_value, exc_tb) if exc_type is None:
try: exc_type = type(exc_value)
Failure__init__(self, exc_value, exc_type, exc_tb, self._rawexcinfo = (exc_type, exc_value, exc_tb)
captureVars=captureVars)
except TypeError:
Failure__init__(self, exc_value, exc_type, exc_tb)
ut.Failure.__init__ = excstore
try: try:
return __multicall__.execute() Failure__init__(self, exc_value, exc_type, exc_tb,
finally: captureVars=captureVars)
ut.Failure.__init__ = Failure__init__ except TypeError:
Failure__init__(self, exc_value, exc_type, exc_tb)
ut.Failure.__init__ = excstore
yield
ut.Failure.__init__ = Failure__init__
else:
yield
def check_testcase_implements_trial_reporter(done=[]): def check_testcase_implements_trial_reporter(done=[]):
if done: if done:

View File

@ -525,12 +525,15 @@ class TestConftestCustomization:
def test_customized_pymakeitem(self, testdir): def test_customized_pymakeitem(self, testdir):
b = testdir.mkdir("a").mkdir("b") b = testdir.mkdir("a").mkdir("b")
b.join("conftest.py").write(py.code.Source(""" b.join("conftest.py").write(py.code.Source("""
def pytest_pycollect_makeitem(__multicall__): import pytest
result = __multicall__.execute() @pytest.mark.hookwrapper
if result: def pytest_pycollect_makeitem():
for func in result: outcome = yield
func._some123 = "world" if outcome.excinfo is None:
return result result = outcome.result
if result:
for func in result:
func._some123 = "world"
""")) """))
b.join("test_module.py").write(py.code.Source(""" b.join("test_module.py").write(py.code.Source("""
import pytest import pytest

View File

@ -509,11 +509,13 @@ class TestKeywordSelection:
pass pass
""") """)
testdir.makepyfile(conftest=""" testdir.makepyfile(conftest="""
def pytest_pycollect_makeitem(__multicall__, name): import pytest
@pytest.mark.hookwrapper
def pytest_pycollect_makeitem(name):
outcome = yield
if name == "TestClass": if name == "TestClass":
item = __multicall__.execute() item = outcome.get_result()
item.extra_keyword_matches.add("xxx") item.extra_keyword_matches.add("xxx")
return item
""") """)
reprec = testdir.inline_run(p.dirpath(), '-s', '-k', keyword) reprec = testdir.inline_run(p.dirpath(), '-s', '-k', keyword)
py.builtin.print_("keyword", repr(keyword)) py.builtin.print_("keyword", repr(keyword))