- turn on capturing before early conftest loading and make terminal writer
use the original stream. - avoid resetting capture FDs/sys.stdout for each test by keeping capturing always turned on and looking at snapshotted capturing data during runtest and collection phases.
This commit is contained in:
parent
f43cda9681
commit
9777703e03
|
@ -1,12 +1,11 @@
|
|||
"""
|
||||
per-test stdout/stderr capturing mechanisms,
|
||||
``capsys`` and ``capfd`` function arguments.
|
||||
per-test stdout/stderr capturing mechanism.
|
||||
|
||||
"""
|
||||
# note: py.io capture was where copied from
|
||||
# pylib 1.4.20.dev2 (rev 13d9af95547e)
|
||||
import sys
|
||||
import os
|
||||
import tempfile
|
||||
import contextlib
|
||||
|
||||
import py
|
||||
import pytest
|
||||
|
@ -58,8 +57,18 @@ def pytest_load_initial_conftests(early_config, parser, args, __multicall__):
|
|||
method = "fd"
|
||||
if method == "fd" and not hasattr(os, "dup"):
|
||||
method = "sys"
|
||||
pluginmanager = early_config.pluginmanager
|
||||
if method != "no":
|
||||
try:
|
||||
sys.stdout.fileno()
|
||||
except Exception:
|
||||
dupped_stdout = sys.stdout
|
||||
else:
|
||||
dupped_stdout = dupfile(sys.stdout, buffering=1)
|
||||
pluginmanager.register(dupped_stdout, "dupped_stdout")
|
||||
#pluginmanager.add_shutdown(dupped_stdout.close)
|
||||
capman = CaptureManager(method)
|
||||
early_config.pluginmanager.register(capman, "capturemanager")
|
||||
pluginmanager.register(capman, "capturemanager")
|
||||
|
||||
# make sure that capturemanager is properly reset at final shutdown
|
||||
def teardown():
|
||||
|
@ -68,13 +77,13 @@ def pytest_load_initial_conftests(early_config, parser, args, __multicall__):
|
|||
except ValueError:
|
||||
pass
|
||||
|
||||
early_config.pluginmanager.add_shutdown(teardown)
|
||||
pluginmanager.add_shutdown(teardown)
|
||||
|
||||
# make sure logging does not raise exceptions at the end
|
||||
def silence_logging_at_shutdown():
|
||||
if "logging" in sys.modules:
|
||||
sys.modules["logging"].raiseExceptions = False
|
||||
early_config.pluginmanager.add_shutdown(silence_logging_at_shutdown)
|
||||
pluginmanager.add_shutdown(silence_logging_at_shutdown)
|
||||
|
||||
# finally trigger conftest loading but while capturing (issue93)
|
||||
capman.resumecapture()
|
||||
|
@ -89,23 +98,20 @@ def pytest_load_initial_conftests(early_config, parser, args, __multicall__):
|
|||
raise
|
||||
|
||||
|
||||
def addouterr(rep, outerr):
|
||||
for secname, content in zip(["out", "err"], outerr):
|
||||
if content:
|
||||
rep.sections.append(("Captured std%s" % secname, content))
|
||||
|
||||
|
||||
class NoCapture:
|
||||
def startall(self):
|
||||
def start_capturing(self):
|
||||
pass
|
||||
|
||||
def resume(self):
|
||||
def stop_capturing(self):
|
||||
pass
|
||||
|
||||
def pop_outerr_to_orig(self):
|
||||
pass
|
||||
|
||||
def reset(self):
|
||||
pass
|
||||
|
||||
def suspend(self):
|
||||
def readouterr(self):
|
||||
return "", ""
|
||||
|
||||
|
||||
|
@ -147,7 +153,9 @@ class CaptureManager:
|
|||
|
||||
def reset_capturings(self):
|
||||
for cap in self._method2capture.values():
|
||||
cap.pop_outerr_to_orig()
|
||||
cap.reset()
|
||||
self._method2capture.clear()
|
||||
|
||||
def resumecapture_item(self, item):
|
||||
method = self._getmethod(item.config, item.fspath)
|
||||
|
@ -164,9 +172,9 @@ class CaptureManager:
|
|||
self._capturing = method
|
||||
if cap is None:
|
||||
self._method2capture[method] = cap = self._getcapture(method)
|
||||
cap.startall()
|
||||
cap.start_capturing()
|
||||
else:
|
||||
cap.resume()
|
||||
cap.pop_outerr_to_orig()
|
||||
|
||||
def suspendcapture(self, item=None):
|
||||
self.deactivate_funcargs()
|
||||
|
@ -175,7 +183,7 @@ class CaptureManager:
|
|||
del self._capturing
|
||||
cap = self._method2capture.get(method)
|
||||
if cap is not None:
|
||||
return cap.suspend()
|
||||
return cap.readouterr()
|
||||
return "", ""
|
||||
|
||||
def activate_funcargs(self, pyfuncitem):
|
||||
|
@ -194,47 +202,68 @@ class CaptureManager:
|
|||
del self._capturing_funcarg
|
||||
return outerr
|
||||
|
||||
@pytest.mark.hookwrapper
|
||||
def pytest_make_collect_report(self, __multicall__, collector):
|
||||
method = self._getmethod(collector.config, collector.fspath)
|
||||
try:
|
||||
self.resumecapture(method)
|
||||
except ValueError:
|
||||
yield
|
||||
# recursive collect, XXX refactor capturing
|
||||
# to allow for more lightweight recursive capturing
|
||||
return
|
||||
try:
|
||||
rep = __multicall__.execute()
|
||||
finally:
|
||||
outerr = self.suspendcapture()
|
||||
addouterr(rep, outerr)
|
||||
return rep
|
||||
yield
|
||||
out, err = self.suspendcapture()
|
||||
# XXX getting the report from the ongoing hook call is a bit
|
||||
# of a hack. We need to think about capturing during collection
|
||||
# and find out if it's really needed fine-grained (per
|
||||
# collector).
|
||||
if __multicall__.results:
|
||||
rep = __multicall__.results[0]
|
||||
if out:
|
||||
rep.sections.append(("Captured stdout", out))
|
||||
if err:
|
||||
rep.sections.append(("Captured stderr", err))
|
||||
|
||||
@pytest.mark.tryfirst
|
||||
@pytest.mark.hookwrapper
|
||||
def pytest_runtest_setup(self, item):
|
||||
self.resumecapture_item(item)
|
||||
with self.item_capture_wrapper(item, "setup"):
|
||||
yield
|
||||
|
||||
@pytest.mark.tryfirst
|
||||
@pytest.mark.hookwrapper
|
||||
def pytest_runtest_call(self, item):
|
||||
self.resumecapture_item(item)
|
||||
self.activate_funcargs(item)
|
||||
with self.item_capture_wrapper(item, "call"):
|
||||
yield
|
||||
|
||||
@pytest.mark.tryfirst
|
||||
@pytest.mark.hookwrapper
|
||||
def pytest_runtest_teardown(self, item):
|
||||
self.resumecapture_item(item)
|
||||
|
||||
def pytest_keyboard_interrupt(self, excinfo):
|
||||
if hasattr(self, '_capturing'):
|
||||
self.suspendcapture()
|
||||
with self.item_capture_wrapper(item, "teardown"):
|
||||
yield
|
||||
|
||||
@pytest.mark.tryfirst
|
||||
def pytest_runtest_makereport(self, item, call):
|
||||
funcarg_outerr = self.deactivate_funcargs()
|
||||
def pytest_keyboard_interrupt(self, excinfo):
|
||||
self.reset_capturings()
|
||||
|
||||
@pytest.mark.tryfirst
|
||||
def pytest_internalerror(self, excinfo):
|
||||
self.reset_capturings()
|
||||
|
||||
@contextlib.contextmanager
|
||||
def item_capture_wrapper(self, item, when):
|
||||
self.resumecapture_item(item)
|
||||
if when == "call":
|
||||
self.activate_funcargs(item)
|
||||
yield
|
||||
funcarg_outerr = self.deactivate_funcargs()
|
||||
else:
|
||||
yield
|
||||
funcarg_outerr = None
|
||||
out, err = self.suspendcapture(item)
|
||||
if funcarg_outerr is not None:
|
||||
out += funcarg_outerr[0]
|
||||
err += funcarg_outerr[1]
|
||||
item.add_report_section(call.when, "out", out)
|
||||
item.add_report_section(call.when, "err", err)
|
||||
item.add_report_section(when, "out", out)
|
||||
item.add_report_section(when, "err", err)
|
||||
|
||||
error_capsysfderror = "cannot use capsys and capfd at the same time"
|
||||
|
||||
|
@ -263,10 +292,10 @@ def pytest_funcarg__capfd(request):
|
|||
|
||||
class CaptureFixture:
|
||||
def __init__(self, captureclass):
|
||||
self._capture = captureclass()
|
||||
self._capture = captureclass(in_=False)
|
||||
|
||||
def _start(self):
|
||||
self._capture.startall()
|
||||
self._capture.start_capturing()
|
||||
|
||||
def _finalize(self):
|
||||
if hasattr(self, '_capture'):
|
||||
|
@ -295,6 +324,8 @@ class FDCapture:
|
|||
"""
|
||||
self.targetfd = targetfd
|
||||
if tmpfile is None and targetfd != 0:
|
||||
# this code path is covered in the tests
|
||||
# but not used by a regular pytest run
|
||||
f = tempfile.TemporaryFile('wb+')
|
||||
tmpfile = dupfile(f, encoding="UTF-8")
|
||||
f.close()
|
||||
|
@ -390,13 +421,13 @@ class EncodedFile(object):
|
|||
return getattr(self._stream, name)
|
||||
|
||||
|
||||
class Capture(object):
|
||||
class StdCaptureBase(object):
|
||||
def reset(self):
|
||||
""" reset sys.stdout/stderr and return captured output as strings. """
|
||||
if hasattr(self, '_reset'):
|
||||
raise ValueError("was already reset")
|
||||
self._reset = True
|
||||
outfile, errfile = self.done(save=False)
|
||||
outfile, errfile = self.stop_capturing(save=False)
|
||||
out, err = "", ""
|
||||
if outfile and not outfile.closed:
|
||||
out = outfile.read()
|
||||
|
@ -406,14 +437,16 @@ class Capture(object):
|
|||
errfile.close()
|
||||
return out, err
|
||||
|
||||
def suspend(self):
|
||||
""" return current snapshot captures, memorize tempfiles. """
|
||||
outerr = self.readouterr()
|
||||
outfile, errfile = self.done()
|
||||
return outerr
|
||||
def pop_outerr_to_orig(self):
|
||||
""" pop current snapshot out/err capture and flush to orig streams. """
|
||||
out, err = self.readouterr()
|
||||
if out:
|
||||
self.out.writeorg(out)
|
||||
if err:
|
||||
self.err.writeorg(err)
|
||||
|
||||
|
||||
class StdCaptureFD(Capture):
|
||||
class StdCaptureFD(StdCaptureBase):
|
||||
""" This class allows to capture writes to FD1 and FD2
|
||||
and may connect a NULL file to FD0 (and prevent
|
||||
reads from sys.stdin). If any of the 0,1,2 file descriptors
|
||||
|
@ -464,7 +497,7 @@ class StdCaptureFD(Capture):
|
|||
except OSError:
|
||||
pass
|
||||
|
||||
def startall(self):
|
||||
def start_capturing(self):
|
||||
if hasattr(self, 'in_'):
|
||||
self.in_.start()
|
||||
if hasattr(self, 'out'):
|
||||
|
@ -472,11 +505,10 @@ class StdCaptureFD(Capture):
|
|||
if hasattr(self, 'err'):
|
||||
self.err.start()
|
||||
|
||||
def resume(self):
|
||||
""" resume capturing with original temp files. """
|
||||
self.startall()
|
||||
#def pytest_sessionfinish(self):
|
||||
# self.reset_capturings()
|
||||
|
||||
def done(self, save=True):
|
||||
def stop_capturing(self, save=True):
|
||||
""" return (outfile, errfile) and stop capturing. """
|
||||
outfile = errfile = None
|
||||
if hasattr(self, 'out') and not self.out.tmpfile.closed:
|
||||
|
@ -491,16 +523,15 @@ class StdCaptureFD(Capture):
|
|||
|
||||
def readouterr(self):
|
||||
""" return snapshot value of stdout/stderr capturings. """
|
||||
out = self._readsnapshot('out')
|
||||
err = self._readsnapshot('err')
|
||||
return out, err
|
||||
return self._readsnapshot('out'), self._readsnapshot('err')
|
||||
|
||||
def _readsnapshot(self, name):
|
||||
if hasattr(self, name):
|
||||
try:
|
||||
f = getattr(self, name).tmpfile
|
||||
else:
|
||||
except AttributeError:
|
||||
return ''
|
||||
if f.tell() == 0:
|
||||
return ''
|
||||
|
||||
f.seek(0)
|
||||
res = f.read()
|
||||
enc = getattr(f, "encoding", None)
|
||||
|
@ -510,8 +541,17 @@ class StdCaptureFD(Capture):
|
|||
f.seek(0)
|
||||
return res
|
||||
|
||||
class TextCapture(TextIO):
|
||||
def __init__(self, oldout):
|
||||
super(TextCapture, self).__init__()
|
||||
self._oldout = oldout
|
||||
|
||||
class StdCapture(Capture):
|
||||
def writeorg(self, data):
|
||||
self._oldout.write(data)
|
||||
self._oldout.flush()
|
||||
|
||||
|
||||
class StdCapture(StdCaptureBase):
|
||||
""" This class allows to capture writes to sys.stdout|stderr "in-memory"
|
||||
and will raise errors on tries to read from sys.stdin. It only
|
||||
modifies sys.stdout|stderr|stdin attributes and does not
|
||||
|
@ -522,15 +562,15 @@ class StdCapture(Capture):
|
|||
self._olderr = sys.stderr
|
||||
self._oldin = sys.stdin
|
||||
if out and not hasattr(out, 'file'):
|
||||
out = TextIO()
|
||||
out = TextCapture(self._oldout)
|
||||
self.out = out
|
||||
if err:
|
||||
if not hasattr(err, 'write'):
|
||||
err = TextIO()
|
||||
err = TextCapture(self._olderr)
|
||||
self.err = err
|
||||
self.in_ = in_
|
||||
|
||||
def startall(self):
|
||||
def start_capturing(self):
|
||||
if self.out:
|
||||
sys.stdout = self.out
|
||||
if self.err:
|
||||
|
@ -538,7 +578,7 @@ class StdCapture(Capture):
|
|||
if self.in_:
|
||||
sys.stdin = self.in_ = DontReadFromInput()
|
||||
|
||||
def done(self, save=True):
|
||||
def stop_capturing(self, save=True):
|
||||
""" return (outfile, errfile) and stop capturing. """
|
||||
outfile = errfile = None
|
||||
if self.out and not self.out.closed:
|
||||
|
@ -553,9 +593,6 @@ class StdCapture(Capture):
|
|||
sys.stdin = self._oldin
|
||||
return outfile, errfile
|
||||
|
||||
def resume(self):
|
||||
""" resume capturing with original temp files. """
|
||||
self.startall()
|
||||
|
||||
def readouterr(self):
|
||||
""" return snapshot value of stdout/stderr capturings. """
|
||||
|
|
|
@ -56,11 +56,15 @@ def _prepareconfig(args=None, plugins=None):
|
|||
raise ValueError("not a string or argument list: %r" % (args,))
|
||||
args = py.std.shlex.split(args)
|
||||
pluginmanager = get_plugin_manager()
|
||||
if plugins:
|
||||
for plugin in plugins:
|
||||
pluginmanager.register(plugin)
|
||||
return pluginmanager.hook.pytest_cmdline_parse(
|
||||
pluginmanager=pluginmanager, args=args)
|
||||
try:
|
||||
if plugins:
|
||||
for plugin in plugins:
|
||||
pluginmanager.register(plugin)
|
||||
return pluginmanager.hook.pytest_cmdline_parse(
|
||||
pluginmanager=pluginmanager, args=args)
|
||||
except Exception:
|
||||
pluginmanager.ensure_shutdown()
|
||||
raise
|
||||
|
||||
class PytestPluginManager(PluginManager):
|
||||
def __init__(self, hookspecs=[hookspec]):
|
||||
|
@ -612,6 +616,9 @@ class Config(object):
|
|||
self.hook.pytest_logwarning(code=code, message=message,
|
||||
fslocation=None, nodeid=None)
|
||||
|
||||
def get_terminal_writer(self):
|
||||
return self.pluginmanager.getplugin("terminalreporter")._tw
|
||||
|
||||
def pytest_cmdline_parse(self, pluginmanager, args):
|
||||
assert self == pluginmanager.config, (self, pluginmanager.config)
|
||||
self.parse(args)
|
||||
|
|
|
@ -60,6 +60,7 @@ def pytest_addoption(parser):
|
|||
def pytest_cmdline_main(config):
|
||||
genscript = config.getvalue("genscript")
|
||||
if genscript:
|
||||
#tw = config.get_terminal_writer()
|
||||
tw = py.io.TerminalWriter()
|
||||
deps = ['py', '_pytest', 'pytest']
|
||||
if sys.version_info < (2,7):
|
||||
|
|
|
@ -47,6 +47,8 @@ def pytest_unconfigure(config):
|
|||
|
||||
def pytest_cmdline_main(config):
|
||||
if config.option.version:
|
||||
capman = config.pluginmanager.getplugin("capturemanager")
|
||||
capman.reset_capturings()
|
||||
p = py.path.local(pytest.__file__)
|
||||
sys.stderr.write("This is pytest version %s, imported from %s\n" %
|
||||
(pytest.__version__, p))
|
||||
|
@ -62,7 +64,7 @@ def pytest_cmdline_main(config):
|
|||
return 0
|
||||
|
||||
def showhelp(config):
|
||||
tw = py.io.TerminalWriter()
|
||||
tw = config.get_terminal_writer()
|
||||
tw.write(config._parser.optparser.format_help())
|
||||
tw.line()
|
||||
tw.line()
|
||||
|
|
|
@ -40,7 +40,7 @@ def pytest_addoption(parser):
|
|||
def pytest_cmdline_main(config):
|
||||
if config.option.markers:
|
||||
config.do_configure()
|
||||
tw = py.io.TerminalWriter()
|
||||
tw = config.get_terminal_writer()
|
||||
for line in config.getini("markers"):
|
||||
name, rest = line.split(":", 1)
|
||||
tw.write("@pytest.mark.%s:" % name, bold=True)
|
||||
|
|
|
@ -34,10 +34,9 @@ class pytestPDB:
|
|||
|
||||
if item is not None:
|
||||
capman = item.config.pluginmanager.getplugin("capturemanager")
|
||||
out, err = capman.suspendcapture()
|
||||
#if hasattr(item, 'outerr'):
|
||||
# item.outerr = (item.outerr[0] + out, item.outerr[1] + err)
|
||||
tw = py.io.TerminalWriter()
|
||||
if capman:
|
||||
capman.reset_capturings()
|
||||
tw = item.config.get_terminal_writer()
|
||||
tw.line()
|
||||
tw.sep(">", "PDB set_trace (IO-capturing turned off)")
|
||||
py.std.pdb.Pdb().set_trace(frame)
|
||||
|
@ -46,19 +45,20 @@ def pdbitem(item):
|
|||
pytestPDB.item = item
|
||||
pytest_runtest_setup = pytest_runtest_call = pytest_runtest_teardown = pdbitem
|
||||
|
||||
@pytest.mark.tryfirst
|
||||
def pytest_make_collect_report(__multicall__, collector):
|
||||
try:
|
||||
pytestPDB.collector = collector
|
||||
return __multicall__.execute()
|
||||
finally:
|
||||
pytestPDB.collector = None
|
||||
@pytest.mark.hookwrapper
|
||||
def pytest_make_collect_report(collector):
|
||||
pytestPDB.collector = collector
|
||||
yield
|
||||
pytestPDB.collector = None
|
||||
|
||||
def pytest_runtest_makereport():
|
||||
pytestPDB.item = None
|
||||
|
||||
class PdbInvoke:
|
||||
def pytest_exception_interact(self, node, call, report):
|
||||
capman = node.config.pluginmanager.getplugin("capturemanager")
|
||||
if capman:
|
||||
capman.reset_capturings()
|
||||
return _enter_pdb(node, call.excinfo, report)
|
||||
|
||||
def pytest_internalerror(self, excrepr, excinfo):
|
||||
|
|
|
@ -885,7 +885,7 @@ def _showfixtures_main(config, session):
|
|||
nodeid = "::".join(map(str, [curdir.bestrelpath(part[0])] + part[1:]))
|
||||
nodeid.replace(session.fspath.sep, "/")
|
||||
|
||||
tw = py.io.TerminalWriter()
|
||||
tw = config.get_terminal_writer()
|
||||
verbose = config.getvalue("verbose")
|
||||
|
||||
fm = session._fixturemanager
|
||||
|
|
|
@ -135,14 +135,13 @@ class CallInfo:
|
|||
self.when = when
|
||||
self.start = time()
|
||||
try:
|
||||
try:
|
||||
self.result = func()
|
||||
except KeyboardInterrupt:
|
||||
raise
|
||||
except:
|
||||
self.excinfo = py.code.ExceptionInfo()
|
||||
finally:
|
||||
self.result = func()
|
||||
except KeyboardInterrupt:
|
||||
self.stop = time()
|
||||
raise
|
||||
except:
|
||||
self.excinfo = py.code.ExceptionInfo()
|
||||
self.stop = time()
|
||||
|
||||
def __repr__(self):
|
||||
if self.excinfo:
|
||||
|
@ -292,7 +291,8 @@ def pytest_make_collect_report(collector):
|
|||
|
||||
|
||||
class CollectReport(BaseReport):
|
||||
def __init__(self, nodeid, outcome, longrepr, result, sections=(), **extra):
|
||||
def __init__(self, nodeid, outcome, longrepr, result,
|
||||
sections=(), **extra):
|
||||
self.nodeid = nodeid
|
||||
self.outcome = outcome
|
||||
self.longrepr = longrepr
|
||||
|
|
|
@ -36,7 +36,10 @@ def pytest_addoption(parser):
|
|||
|
||||
def pytest_configure(config):
|
||||
config.option.verbose -= config.option.quiet
|
||||
reporter = TerminalReporter(config, sys.stdout)
|
||||
out = config.pluginmanager.getplugin("dupped_stdout")
|
||||
#if out is None:
|
||||
# out = sys.stdout
|
||||
reporter = TerminalReporter(config, out)
|
||||
config.pluginmanager.register(reporter, 'terminalreporter')
|
||||
if config.option.debug or config.option.traceconfig:
|
||||
def mywriter(tags, args):
|
||||
|
@ -44,6 +47,11 @@ def pytest_configure(config):
|
|||
reporter.write_line("[traceconfig] " + msg)
|
||||
config.trace.root.setprocessor("pytest:config", mywriter)
|
||||
|
||||
def get_terminal_writer(config):
|
||||
tr = config.pluginmanager.getplugin("terminalreporter")
|
||||
return tr._tw
|
||||
|
||||
|
||||
def getreportopt(config):
|
||||
reportopts = ""
|
||||
optvalue = config.option.report
|
||||
|
|
|
@ -4,8 +4,8 @@ if __name__ == '__main__':
|
|||
import cProfile
|
||||
import pytest
|
||||
import pstats
|
||||
script = sys.argv[1] if len(sys.argv) > 1 else "empty.py"
|
||||
stats = cProfile.run('pytest.cmdline.main([%r])' % script, 'prof')
|
||||
script = sys.argv[1:] if len(sys.argv) > 1 else "empty.py"
|
||||
stats = cProfile.run('pytest.cmdline.main(%r)' % script, 'prof')
|
||||
p = pstats.Stats("prof")
|
||||
p.strip_dirs()
|
||||
p.sort_stats('cumulative')
|
||||
|
|
|
@ -736,14 +736,14 @@ class TestFDCapture:
|
|||
class TestStdCapture:
|
||||
def getcapture(self, **kw):
|
||||
cap = capture.StdCapture(**kw)
|
||||
cap.startall()
|
||||
cap.start_capturing()
|
||||
return cap
|
||||
|
||||
def test_capturing_done_simple(self):
|
||||
cap = self.getcapture()
|
||||
sys.stdout.write("hello")
|
||||
sys.stderr.write("world")
|
||||
outfile, errfile = cap.done()
|
||||
outfile, errfile = cap.stop_capturing()
|
||||
s = outfile.read()
|
||||
assert s == "hello"
|
||||
s = errfile.read()
|
||||
|
@ -787,6 +787,7 @@ class TestStdCapture:
|
|||
print('\xa6')
|
||||
out, err = cap.readouterr()
|
||||
assert out == py.builtin._totext('\ufffd\n', 'unicode-escape')
|
||||
cap.reset()
|
||||
|
||||
def test_reset_twice_error(self):
|
||||
cap = self.getcapture()
|
||||
|
@ -859,12 +860,13 @@ class TestStdCapture:
|
|||
try:
|
||||
print ("hello")
|
||||
sys.stderr.write("error\n")
|
||||
out, err = cap.suspend()
|
||||
out, err = cap.readouterr()
|
||||
cap.stop_capturing()
|
||||
assert out == "hello\n"
|
||||
assert not err
|
||||
print ("in between")
|
||||
sys.stderr.write("in between\n")
|
||||
cap.resume()
|
||||
cap.start_capturing()
|
||||
print ("after")
|
||||
sys.stderr.write("error_after\n")
|
||||
finally:
|
||||
|
@ -878,7 +880,7 @@ class TestStdCaptureFD(TestStdCapture):
|
|||
|
||||
def getcapture(self, **kw):
|
||||
cap = capture.StdCaptureFD(**kw)
|
||||
cap.startall()
|
||||
cap.start_capturing()
|
||||
return cap
|
||||
|
||||
def test_intermingling(self):
|
||||
|
@ -908,7 +910,7 @@ def test_stdcapture_fd_tmpfile(tmpfile):
|
|||
try:
|
||||
os.write(1, "hello".encode("ascii"))
|
||||
os.write(2, "world".encode("ascii"))
|
||||
outf, errf = capfd.done()
|
||||
outf, errf = capfd.stop_capturing()
|
||||
finally:
|
||||
capfd.reset()
|
||||
assert outf == tmpfile
|
||||
|
@ -924,15 +926,15 @@ class TestStdCaptureFDinvalidFD:
|
|||
def test_stdout():
|
||||
os.close(1)
|
||||
cap = StdCaptureFD(out=True, err=False, in_=False)
|
||||
cap.done()
|
||||
cap.stop_capturing()
|
||||
def test_stderr():
|
||||
os.close(2)
|
||||
cap = StdCaptureFD(out=False, err=True, in_=False)
|
||||
cap.done()
|
||||
cap.stop_capturing()
|
||||
def test_stdin():
|
||||
os.close(0)
|
||||
cap = StdCaptureFD(out=False, err=False, in_=True)
|
||||
cap.done()
|
||||
cap.stop_capturing()
|
||||
""")
|
||||
result = testdir.runpytest("--capture=fd")
|
||||
assert result.ret == 0
|
||||
|
@ -941,8 +943,8 @@ class TestStdCaptureFDinvalidFD:
|
|||
|
||||
def test_capture_not_started_but_reset():
|
||||
capsys = capture.StdCapture()
|
||||
capsys.done()
|
||||
capsys.done()
|
||||
capsys.stop_capturing()
|
||||
capsys.stop_capturing()
|
||||
capsys.reset()
|
||||
|
||||
|
||||
|
@ -951,7 +953,7 @@ def test_capture_no_sys():
|
|||
capsys = capture.StdCapture()
|
||||
try:
|
||||
cap = capture.StdCaptureFD(patchsys=False)
|
||||
cap.startall()
|
||||
cap.start_capturing()
|
||||
sys.stdout.write("hello")
|
||||
sys.stderr.write("world")
|
||||
oswritebytes(1, "1")
|
||||
|
@ -970,10 +972,9 @@ def test_fdcapture_tmpfile_remains_the_same(tmpfile, use):
|
|||
tmpfile = True
|
||||
cap = capture.StdCaptureFD(out=False, err=tmpfile)
|
||||
try:
|
||||
cap.startall()
|
||||
cap.start_capturing()
|
||||
capfile = cap.err.tmpfile
|
||||
cap.suspend()
|
||||
cap.resume()
|
||||
cap.readouterr()
|
||||
finally:
|
||||
cap.reset()
|
||||
capfile2 = cap.err.tmpfile
|
||||
|
@ -990,22 +991,25 @@ def test_capturing_and_logging_fundamentals(testdir, method):
|
|||
import py, logging
|
||||
from _pytest import capture
|
||||
cap = capture.%s(out=False, in_=False)
|
||||
cap.startall()
|
||||
cap.start_capturing()
|
||||
|
||||
logging.warn("hello1")
|
||||
outerr = cap.suspend()
|
||||
outerr = cap.readouterr()
|
||||
print ("suspend, captured %%s" %%(outerr,))
|
||||
logging.warn("hello2")
|
||||
|
||||
cap.resume()
|
||||
cap.pop_outerr_to_orig()
|
||||
logging.warn("hello3")
|
||||
|
||||
outerr = cap.suspend()
|
||||
outerr = cap.readouterr()
|
||||
print ("suspend2, captured %%s" %% (outerr,))
|
||||
""" % (method,))
|
||||
result = testdir.runpython(p)
|
||||
result.stdout.fnmatch_lines([
|
||||
"suspend, captured*hello1*",
|
||||
"suspend2, captured*hello2*WARNING:root:hello3*",
|
||||
])
|
||||
result.stdout.fnmatch_lines("""
|
||||
suspend, captured*hello1*
|
||||
suspend2, captured*WARNING:root:hello3*
|
||||
""")
|
||||
result.stderr.fnmatch_lines("""
|
||||
WARNING:root:hello2
|
||||
""")
|
||||
assert "atexit" not in result.stderr.str()
|
||||
|
|
Loading…
Reference in New Issue