remove non-documented per-conftest capturing option and simplify/refactor all code accordingly. Also make capturing more robust against tests closing FD1/2 and against pdb.set_trace() calls.
This commit is contained in:
parent
2e1f6c85f6
commit
ce8678e6d5
|
@ -21,9 +21,10 @@ patchsysdict = {0: 'stdin', 1: 'stdout', 2: 'stderr'}
|
|||
def pytest_addoption(parser):
|
||||
group = parser.getgroup("general")
|
||||
group._addoption(
|
||||
'--capture', action="store", default=None,
|
||||
'--capture', action="store",
|
||||
default="fd" if hasattr(os, "dup") else "sys",
|
||||
metavar="method", choices=['fd', 'sys', 'no'],
|
||||
help="per-test capturing method: one of fd (default)|sys|no.")
|
||||
help="per-test capturing method: one of fd|sys|no.")
|
||||
group._addoption(
|
||||
'-s', action="store_const", const="no", dest="capture",
|
||||
help="shortcut for --capture=no.")
|
||||
|
@ -32,16 +33,13 @@ def pytest_addoption(parser):
|
|||
@pytest.mark.tryfirst
|
||||
def pytest_load_initial_conftests(early_config, parser, args, __multicall__):
|
||||
ns = parser.parse_known_args(args)
|
||||
method = ns.capture
|
||||
if not method:
|
||||
method = "fd"
|
||||
if method == "fd" and not hasattr(os, "dup"):
|
||||
method = "sys"
|
||||
pluginmanager = early_config.pluginmanager
|
||||
method = ns.capture
|
||||
if method != "no":
|
||||
dupped_stdout = safe_text_dupfile(sys.stdout, "wb")
|
||||
pluginmanager.register(dupped_stdout, "dupped_stdout")
|
||||
#pluginmanager.add_shutdown(dupped_stdout.close)
|
||||
|
||||
capman = CaptureManager(method)
|
||||
pluginmanager.register(capman, "capturemanager")
|
||||
|
||||
|
@ -55,7 +53,7 @@ def pytest_load_initial_conftests(early_config, parser, args, __multicall__):
|
|||
pluginmanager.add_shutdown(silence_logging_at_shutdown)
|
||||
|
||||
# finally trigger conftest loading but while capturing (issue93)
|
||||
capman.resumecapture()
|
||||
capman.init_capturings()
|
||||
try:
|
||||
try:
|
||||
return __multicall__.execute()
|
||||
|
@ -67,11 +65,9 @@ def pytest_load_initial_conftests(early_config, parser, args, __multicall__):
|
|||
raise
|
||||
|
||||
|
||||
|
||||
class CaptureManager:
|
||||
def __init__(self, defaultmethod=None):
|
||||
self._method2capture = {}
|
||||
self._defaultmethod = defaultmethod
|
||||
def __init__(self, method):
|
||||
self._method = method
|
||||
|
||||
def _getcapture(self, method):
|
||||
if method == "fd":
|
||||
|
@ -83,53 +79,27 @@ class CaptureManager:
|
|||
else:
|
||||
raise ValueError("unknown capturing method: %r" % method)
|
||||
|
||||
def _getmethod(self, config, fspath):
|
||||
if config.option.capture:
|
||||
method = config.option.capture
|
||||
else:
|
||||
try:
|
||||
method = config._conftest.rget("option_capture", path=fspath)
|
||||
except KeyError:
|
||||
method = "fd"
|
||||
if method == "fd" and not hasattr(os, 'dup'): # e.g. jython
|
||||
method = "sys"
|
||||
return method
|
||||
def init_capturings(self):
|
||||
assert not hasattr(self, "_capturing")
|
||||
self._capturing = self._getcapture(self._method)
|
||||
self._capturing.start_capturing()
|
||||
|
||||
def reset_capturings(self):
|
||||
for cap in self._method2capture.values():
|
||||
cap = self.__dict__.pop("_capturing", None)
|
||||
if cap is not None:
|
||||
cap.pop_outerr_to_orig()
|
||||
cap.stop_capturing()
|
||||
self._method2capture.clear()
|
||||
|
||||
def resumecapture_item(self, item):
|
||||
method = self._getmethod(item.config, item.fspath)
|
||||
return self.resumecapture(method)
|
||||
def resumecapture(self):
|
||||
self._capturing.resume_capturing()
|
||||
|
||||
def resumecapture(self, method=None):
|
||||
if hasattr(self, '_capturing'):
|
||||
raise ValueError(
|
||||
"cannot resume, already capturing with %r" %
|
||||
(self._capturing,))
|
||||
if method is None:
|
||||
method = self._defaultmethod
|
||||
cap = self._method2capture.get(method)
|
||||
self._capturing = method
|
||||
if cap is None:
|
||||
self._method2capture[method] = cap = self._getcapture(method)
|
||||
cap.start_capturing()
|
||||
else:
|
||||
cap.resume_capturing()
|
||||
|
||||
def suspendcapture(self, item=None):
|
||||
def suspendcapture(self, in_=False):
|
||||
self.deactivate_funcargs()
|
||||
method = self.__dict__.pop("_capturing", None)
|
||||
outerr = "", ""
|
||||
if method is not None:
|
||||
cap = self._method2capture.get(method)
|
||||
if cap is not None:
|
||||
outerr = cap.readouterr()
|
||||
cap.suspend_capturing()
|
||||
return outerr
|
||||
cap = getattr(self, "_capturing", None)
|
||||
if cap is not None:
|
||||
outerr = cap.readouterr()
|
||||
cap.suspend_capturing(in_=in_)
|
||||
return outerr
|
||||
|
||||
def activate_funcargs(self, pyfuncitem):
|
||||
capfuncarg = pyfuncitem.__dict__.pop("_capfuncarg", None)
|
||||
|
@ -142,28 +112,20 @@ class CaptureManager:
|
|||
if capfuncarg is not None:
|
||||
capfuncarg.close()
|
||||
|
||||
@pytest.mark.hookwrapper
|
||||
@pytest.mark.tryfirst
|
||||
def pytest_make_collect_report(self, __multicall__, collector):
|
||||
method = self._getmethod(collector.config, collector.fspath)
|
||||
try:
|
||||
self.resumecapture(method)
|
||||
except ValueError:
|
||||
yield
|
||||
# recursive collect, XXX refactor capturing
|
||||
# to allow for more lightweight recursive capturing
|
||||
if not isinstance(collector, pytest.File):
|
||||
return
|
||||
yield
|
||||
out, err = self.suspendcapture()
|
||||
# XXX getting the report from the ongoing hook call is a bit
|
||||
# of a hack. We need to think about capturing during collection
|
||||
# and find out if it's really needed fine-grained (per
|
||||
# collector).
|
||||
if __multicall__.results:
|
||||
rep = __multicall__.results[0]
|
||||
if out:
|
||||
rep.sections.append(("Captured stdout", out))
|
||||
if err:
|
||||
rep.sections.append(("Captured stderr", err))
|
||||
self.resumecapture()
|
||||
try:
|
||||
rep = __multicall__.execute()
|
||||
finally:
|
||||
out, err = self.suspendcapture()
|
||||
if out:
|
||||
rep.sections.append(("Captured stdout", out))
|
||||
if err:
|
||||
rep.sections.append(("Captured stderr", err))
|
||||
return rep
|
||||
|
||||
@pytest.mark.hookwrapper
|
||||
def pytest_runtest_setup(self, item):
|
||||
|
@ -192,9 +154,9 @@ class CaptureManager:
|
|||
|
||||
@contextlib.contextmanager
|
||||
def item_capture_wrapper(self, item, when):
|
||||
self.resumecapture_item(item)
|
||||
self.resumecapture()
|
||||
yield
|
||||
out, err = self.suspendcapture(item)
|
||||
out, err = self.suspendcapture()
|
||||
item.add_report_section(when, "out", out)
|
||||
item.add_report_section(when, "err", err)
|
||||
|
||||
|
@ -238,14 +200,14 @@ class CaptureFixture:
|
|||
def close(self):
|
||||
cap = self.__dict__.pop("_capture", None)
|
||||
if cap is not None:
|
||||
cap.pop_outerr_to_orig()
|
||||
self._outerr = cap.pop_outerr_to_orig()
|
||||
cap.stop_capturing()
|
||||
|
||||
def readouterr(self):
|
||||
try:
|
||||
return self._capture.readouterr()
|
||||
except AttributeError:
|
||||
return "", ""
|
||||
return self._outerr
|
||||
|
||||
|
||||
def safe_text_dupfile(f, mode, default_encoding="UTF8"):
|
||||
|
@ -311,18 +273,25 @@ class MultiCapture(object):
|
|||
self.out.writeorg(out)
|
||||
if err:
|
||||
self.err.writeorg(err)
|
||||
return out, err
|
||||
|
||||
def suspend_capturing(self):
|
||||
def suspend_capturing(self, in_=False):
|
||||
if self.out:
|
||||
self.out.suspend()
|
||||
if self.err:
|
||||
self.err.suspend()
|
||||
if in_ and self.in_:
|
||||
self.in_.suspend()
|
||||
self._in_suspended = True
|
||||
|
||||
def resume_capturing(self):
|
||||
if self.out:
|
||||
self.out.resume()
|
||||
if self.err:
|
||||
self.err.resume()
|
||||
if hasattr(self, "_in_suspended"):
|
||||
self.in_.resume()
|
||||
del self._in_suspended
|
||||
|
||||
def stop_capturing(self):
|
||||
""" stop capturing and reset capturing streams """
|
||||
|
@ -393,7 +362,8 @@ class FDCapture:
|
|||
res = py.builtin._totext(res, enc, "replace")
|
||||
f.truncate(0)
|
||||
f.seek(0)
|
||||
return res
|
||||
return res
|
||||
return ''
|
||||
|
||||
def done(self):
|
||||
""" stop capturing, restore streams, return original capture file,
|
||||
|
|
|
@ -34,7 +34,7 @@ class pytestPDB:
|
|||
if self._pluginmanager is not None:
|
||||
capman = self._pluginmanager.getplugin("capturemanager")
|
||||
if capman:
|
||||
capman.reset_capturings()
|
||||
capman.suspendcapture(in_=True)
|
||||
tw = py.io.TerminalWriter()
|
||||
tw.line()
|
||||
tw.sep(">", "PDB set_trace (IO-capturing turned off)")
|
||||
|
@ -45,8 +45,8 @@ class PdbInvoke:
|
|||
def pytest_exception_interact(self, node, call, report):
|
||||
capman = node.config.pluginmanager.getplugin("capturemanager")
|
||||
if capman:
|
||||
capman.reset_capturings()
|
||||
return _enter_pdb(node, call.excinfo, report)
|
||||
capman.suspendcapture(in_=True)
|
||||
_enter_pdb(node, call.excinfo, report)
|
||||
|
||||
def pytest_internalerror(self, excrepr, excinfo):
|
||||
for line in str(excrepr).split("\n"):
|
||||
|
|
|
@ -53,79 +53,54 @@ def StdCapture(out=True, err=True, in_=True):
|
|||
|
||||
|
||||
class TestCaptureManager:
|
||||
def test_getmethod_default_no_fd(self, testdir, monkeypatch):
|
||||
config = testdir.parseconfig(testdir.tmpdir)
|
||||
assert config.getvalue("capture") is None
|
||||
capman = CaptureManager()
|
||||
def test_getmethod_default_no_fd(self, monkeypatch):
|
||||
from _pytest.capture import pytest_addoption
|
||||
from _pytest.config import Parser
|
||||
parser = Parser()
|
||||
pytest_addoption(parser)
|
||||
default = parser._groups[0].options[0].default
|
||||
assert default == "fd" if hasattr(os, "dup") else "sys"
|
||||
parser = Parser()
|
||||
monkeypatch.delattr(os, 'dup', raising=False)
|
||||
try:
|
||||
assert capman._getmethod(config, None) == "sys"
|
||||
finally:
|
||||
monkeypatch.undo()
|
||||
|
||||
@pytest.mark.parametrize("mode", "no fd sys".split())
|
||||
def test_configure_per_fspath(self, testdir, mode):
|
||||
config = testdir.parseconfig(testdir.tmpdir)
|
||||
capman = CaptureManager()
|
||||
hasfd = hasattr(os, 'dup')
|
||||
if hasfd:
|
||||
assert capman._getmethod(config, None) == "fd"
|
||||
else:
|
||||
assert capman._getmethod(config, None) == "sys"
|
||||
|
||||
if not hasfd and mode == 'fd':
|
||||
return
|
||||
sub = testdir.tmpdir.mkdir("dir" + mode)
|
||||
sub.ensure("__init__.py")
|
||||
sub.join("conftest.py").write('option_capture = %r' % mode)
|
||||
assert capman._getmethod(config, sub.join("test_hello.py")) == mode
|
||||
pytest_addoption(parser)
|
||||
assert parser._groups[0].options[0].default == "sys"
|
||||
|
||||
@needsosdup
|
||||
@pytest.mark.parametrize("method", ['no', 'fd', 'sys'])
|
||||
@pytest.mark.parametrize("method",
|
||||
['no', 'sys', pytest.mark.skipif('not hasattr(os, "dup")', 'fd')])
|
||||
def test_capturing_basic_api(self, method):
|
||||
capouter = StdCaptureFD()
|
||||
old = sys.stdout, sys.stderr, sys.stdin
|
||||
try:
|
||||
capman = CaptureManager()
|
||||
# call suspend without resume or start
|
||||
outerr = capman.suspendcapture()
|
||||
capman = CaptureManager(method)
|
||||
capman.init_capturings()
|
||||
outerr = capman.suspendcapture()
|
||||
assert outerr == ("", "")
|
||||
outerr = capman.suspendcapture()
|
||||
assert outerr == ("", "")
|
||||
capman.resumecapture(method)
|
||||
print ("hello")
|
||||
out, err = capman.suspendcapture()
|
||||
if method == "no":
|
||||
assert old == (sys.stdout, sys.stderr, sys.stdin)
|
||||
else:
|
||||
assert out == "hello\n"
|
||||
capman.resumecapture(method)
|
||||
assert not out
|
||||
capman.resumecapture()
|
||||
print ("hello")
|
||||
out, err = capman.suspendcapture()
|
||||
assert not out and not err
|
||||
if method != "no":
|
||||
assert out == "hello\n"
|
||||
capman.reset_capturings()
|
||||
finally:
|
||||
capouter.stop_capturing()
|
||||
|
||||
@needsosdup
|
||||
def test_juggle_capturings(self, testdir):
|
||||
def test_init_capturing(self):
|
||||
capouter = StdCaptureFD()
|
||||
try:
|
||||
#config = testdir.parseconfig(testdir.tmpdir)
|
||||
capman = CaptureManager()
|
||||
try:
|
||||
capman.resumecapture("fd")
|
||||
pytest.raises(ValueError, 'capman.resumecapture("fd")')
|
||||
pytest.raises(ValueError, 'capman.resumecapture("sys")')
|
||||
os.write(1, "hello\n".encode('ascii'))
|
||||
out, err = capman.suspendcapture()
|
||||
assert out == "hello\n"
|
||||
capman.resumecapture("sys")
|
||||
os.write(1, "hello\n".encode('ascii'))
|
||||
py.builtin.print_("world", file=sys.stderr)
|
||||
out, err = capman.suspendcapture()
|
||||
assert not out
|
||||
assert err == "world\n"
|
||||
finally:
|
||||
capman.reset_capturings()
|
||||
capman = CaptureManager("fd")
|
||||
capman.init_capturings()
|
||||
pytest.raises(AssertionError, "capman.init_capturings()")
|
||||
capman.reset_capturings()
|
||||
finally:
|
||||
capouter.stop_capturing()
|
||||
|
||||
|
@ -991,7 +966,7 @@ def test_close_and_capture_again(testdir):
|
|||
def test_close():
|
||||
os.close(1)
|
||||
def test_capture_again():
|
||||
os.write(1, "hello\\n")
|
||||
os.write(1, b"hello\\n")
|
||||
assert 0
|
||||
""")
|
||||
result = testdir.runpytest()
|
||||
|
|
|
@ -167,6 +167,26 @@ class TestPDB:
|
|||
if child.isalive():
|
||||
child.wait()
|
||||
|
||||
def test_set_trace_capturing_afterwards(self, testdir):
|
||||
p1 = testdir.makepyfile("""
|
||||
import pdb
|
||||
def test_1():
|
||||
pdb.set_trace()
|
||||
def test_2():
|
||||
print ("hello")
|
||||
assert 0
|
||||
""")
|
||||
child = testdir.spawn_pytest(str(p1))
|
||||
child.expect("test_1")
|
||||
child.send("c\n")
|
||||
child.expect("test_2")
|
||||
child.expect("Captured")
|
||||
child.expect("hello")
|
||||
child.sendeof()
|
||||
child.read()
|
||||
if child.isalive():
|
||||
child.wait()
|
||||
|
||||
@xfail_if_pdbpp_installed
|
||||
def test_pdb_interaction_doctest(self, testdir):
|
||||
p1 = testdir.makepyfile("""
|
||||
|
|
Loading…
Reference in New Issue