This commit is contained in:
holger krekel 2014-03-28 09:27:44 +01:00
commit b5467645d3
16 changed files with 563 additions and 565 deletions

View File

@ -1,12 +1,13 @@
""" """
per-test stdout/stderr capturing mechanisms, per-test stdout/stderr capturing mechanism.
``capsys`` and ``capfd`` function arguments.
""" """
# note: py.io capture was where copied from from __future__ import with_statement
# pylib 1.4.20.dev2 (rev 13d9af95547e)
import sys import sys
import os import os
import tempfile from tempfile import TemporaryFile
import contextlib
import py import py
import pytest import pytest
@ -58,8 +59,18 @@ def pytest_load_initial_conftests(early_config, parser, args, __multicall__):
method = "fd" method = "fd"
if method == "fd" and not hasattr(os, "dup"): if method == "fd" and not hasattr(os, "dup"):
method = "sys" method = "sys"
pluginmanager = early_config.pluginmanager
if method != "no":
try:
sys.stdout.fileno()
except Exception:
dupped_stdout = sys.stdout
else:
dupped_stdout = dupfile(sys.stdout, buffering=1)
pluginmanager.register(dupped_stdout, "dupped_stdout")
#pluginmanager.add_shutdown(dupped_stdout.close)
capman = CaptureManager(method) capman = CaptureManager(method)
early_config.pluginmanager.register(capman, "capturemanager") pluginmanager.register(capman, "capturemanager")
# make sure that capturemanager is properly reset at final shutdown # make sure that capturemanager is properly reset at final shutdown
def teardown(): def teardown():
@ -68,13 +79,13 @@ def pytest_load_initial_conftests(early_config, parser, args, __multicall__):
except ValueError: except ValueError:
pass pass
early_config.pluginmanager.add_shutdown(teardown) pluginmanager.add_shutdown(teardown)
# make sure logging does not raise exceptions at the end # make sure logging does not raise exceptions at the end
def silence_logging_at_shutdown(): def silence_logging_at_shutdown():
if "logging" in sys.modules: if "logging" in sys.modules:
sys.modules["logging"].raiseExceptions = False sys.modules["logging"].raiseExceptions = False
early_config.pluginmanager.add_shutdown(silence_logging_at_shutdown) pluginmanager.add_shutdown(silence_logging_at_shutdown)
# finally trigger conftest loading but while capturing (issue93) # finally trigger conftest loading but while capturing (issue93)
capman.resumecapture() capman.resumecapture()
@ -89,53 +100,19 @@ def pytest_load_initial_conftests(early_config, parser, args, __multicall__):
raise raise
def addouterr(rep, outerr):
for secname, content in zip(["out", "err"], outerr):
if content:
rep.sections.append(("Captured std%s" % secname, content))
class NoCapture:
def startall(self):
pass
def resume(self):
pass
def reset(self):
pass
def suspend(self):
return "", ""
class CaptureManager: class CaptureManager:
def __init__(self, defaultmethod=None): def __init__(self, defaultmethod=None):
self._method2capture = {} self._method2capture = {}
self._defaultmethod = defaultmethod self._defaultmethod = defaultmethod
def _maketempfile(self):
f = py.std.tempfile.TemporaryFile()
newf = dupfile(f, encoding="UTF-8")
f.close()
return newf
def _makestringio(self):
return TextIO()
def _getcapture(self, method): def _getcapture(self, method):
if method == "fd": if method == "fd":
return StdCaptureFD( return StdCaptureBase(out=True, err=True, Capture=FDCapture)
out=self._maketempfile(),
err=self._maketempfile(),
)
elif method == "sys": elif method == "sys":
return StdCapture( return StdCaptureBase(out=True, err=True, Capture=SysCapture)
out=self._makestringio(),
err=self._makestringio(),
)
elif method == "no": elif method == "no":
return NoCapture() return StdCaptureBase(out=False, err=False, in_=False)
else: else:
raise ValueError("unknown capturing method: %r" % method) raise ValueError("unknown capturing method: %r" % method)
@ -153,12 +130,12 @@ class CaptureManager:
def reset_capturings(self): def reset_capturings(self):
for cap in self._method2capture.values(): for cap in self._method2capture.values():
cap.reset() cap.pop_outerr_to_orig()
cap.stop_capturing()
self._method2capture.clear()
def resumecapture_item(self, item): def resumecapture_item(self, item):
method = self._getmethod(item.config, item.fspath) method = self._getmethod(item.config, item.fspath)
if not hasattr(item, 'outerr'):
item.outerr = ('', '') # we accumulate outerr on the item
return self.resumecapture(method) return self.resumecapture(method)
def resumecapture(self, method=None): def resumecapture(self, method=None):
@ -172,87 +149,85 @@ class CaptureManager:
self._capturing = method self._capturing = method
if cap is None: if cap is None:
self._method2capture[method] = cap = self._getcapture(method) self._method2capture[method] = cap = self._getcapture(method)
cap.startall() cap.start_capturing()
else: else:
cap.resume() cap.pop_outerr_to_orig()
def suspendcapture(self, item=None): def suspendcapture(self, item=None):
self.deactivate_funcargs() self.deactivate_funcargs()
if hasattr(self, '_capturing'): method = self.__dict__.pop("_capturing", None)
method = self._capturing if method is not None:
cap = self._method2capture.get(method) cap = self._method2capture.get(method)
if cap is not None: if cap is not None:
outerr = cap.suspend() return cap.readouterr()
del self._capturing
if item:
outerr = (item.outerr[0] + outerr[0],
item.outerr[1] + outerr[1])
return outerr
if hasattr(item, 'outerr'):
return item.outerr
return "", "" return "", ""
def activate_funcargs(self, pyfuncitem): def activate_funcargs(self, pyfuncitem):
funcargs = getattr(pyfuncitem, "funcargs", None) capfuncarg = pyfuncitem.__dict__.pop("_capfuncarg", None)
if funcargs is not None: if capfuncarg is not None:
for name, capfuncarg in funcargs.items(): capfuncarg._start()
if name in ('capsys', 'capfd'): self._capfuncarg = capfuncarg
assert not hasattr(self, '_capturing_funcarg')
self._capturing_funcarg = capfuncarg
capfuncarg._start()
def deactivate_funcargs(self): def deactivate_funcargs(self):
capturing_funcarg = getattr(self, '_capturing_funcarg', None) capfuncarg = self.__dict__.pop("_capfuncarg", None)
if capturing_funcarg: if capfuncarg is not None:
outerr = capturing_funcarg._finalize() capfuncarg.close()
del self._capturing_funcarg
return outerr
@pytest.mark.hookwrapper
def pytest_make_collect_report(self, __multicall__, collector): def pytest_make_collect_report(self, __multicall__, collector):
method = self._getmethod(collector.config, collector.fspath) method = self._getmethod(collector.config, collector.fspath)
try: try:
self.resumecapture(method) self.resumecapture(method)
except ValueError: except ValueError:
yield
# recursive collect, XXX refactor capturing # recursive collect, XXX refactor capturing
# to allow for more lightweight recursive capturing # to allow for more lightweight recursive capturing
return return
try: yield
rep = __multicall__.execute() out, err = self.suspendcapture()
finally: # XXX getting the report from the ongoing hook call is a bit
outerr = self.suspendcapture() # of a hack. We need to think about capturing during collection
addouterr(rep, outerr) # and find out if it's really needed fine-grained (per
return rep # collector).
if __multicall__.results:
rep = __multicall__.results[0]
if out:
rep.sections.append(("Captured stdout", out))
if err:
rep.sections.append(("Captured stderr", err))
@pytest.mark.tryfirst @pytest.mark.hookwrapper
def pytest_runtest_setup(self, item): def pytest_runtest_setup(self, item):
self.resumecapture_item(item) with self.item_capture_wrapper(item, "setup"):
yield
@pytest.mark.tryfirst @pytest.mark.hookwrapper
def pytest_runtest_call(self, item): def pytest_runtest_call(self, item):
self.resumecapture_item(item) with self.item_capture_wrapper(item, "call"):
self.activate_funcargs(item) self.activate_funcargs(item)
yield
#self.deactivate_funcargs() called from ctx's suspendcapture()
@pytest.mark.tryfirst @pytest.mark.hookwrapper
def pytest_runtest_teardown(self, item): def pytest_runtest_teardown(self, item):
self.resumecapture_item(item) with self.item_capture_wrapper(item, "teardown"):
yield
def pytest_keyboard_interrupt(self, excinfo):
if hasattr(self, '_capturing'):
self.suspendcapture()
@pytest.mark.tryfirst @pytest.mark.tryfirst
def pytest_runtest_makereport(self, __multicall__, item, call): def pytest_keyboard_interrupt(self, excinfo):
funcarg_outerr = self.deactivate_funcargs() self.reset_capturings()
rep = __multicall__.execute()
outerr = self.suspendcapture(item) @pytest.mark.tryfirst
if funcarg_outerr is not None: def pytest_internalerror(self, excinfo):
outerr = (outerr[0] + funcarg_outerr[0], self.reset_capturings()
outerr[1] + funcarg_outerr[1])
addouterr(rep, outerr) @contextlib.contextmanager
if not rep.passed or rep.when == "teardown": def item_capture_wrapper(self, item, when):
outerr = ('', '') self.resumecapture_item(item)
item.outerr = outerr yield
return rep out, err = self.suspendcapture(item)
item.add_report_section(when, "out", out)
item.add_report_section(when, "err", err)
error_capsysfderror = "cannot use capsys and capfd at the same time" error_capsysfderror = "cannot use capsys and capfd at the same time"
@ -264,8 +239,8 @@ def pytest_funcarg__capsys(request):
""" """
if "capfd" in request._funcargs: if "capfd" in request._funcargs:
raise request.raiseerror(error_capsysfderror) raise request.raiseerror(error_capsysfderror)
return CaptureFixture(StdCapture) request.node._capfuncarg = c = CaptureFixture(SysCapture)
return c
def pytest_funcarg__capfd(request): def pytest_funcarg__capfd(request):
"""enables capturing of writes to file descriptors 1 and 2 and makes """enables capturing of writes to file descriptors 1 and 2 and makes
@ -276,89 +251,30 @@ def pytest_funcarg__capfd(request):
request.raiseerror(error_capsysfderror) request.raiseerror(error_capsysfderror)
if not hasattr(os, 'dup'): if not hasattr(os, 'dup'):
pytest.skip("capfd funcarg needs os.dup") pytest.skip("capfd funcarg needs os.dup")
return CaptureFixture(StdCaptureFD) request.node._capfuncarg = c = CaptureFixture(FDCapture)
return c
class CaptureFixture: class CaptureFixture:
def __init__(self, captureclass): def __init__(self, captureclass):
self._capture = captureclass() self.captureclass = captureclass
def _start(self): def _start(self):
self._capture.startall() self._capture = StdCaptureBase(out=True, err=True, in_=False,
Capture=self.captureclass)
self._capture.start_capturing()
def _finalize(self): def close(self):
if hasattr(self, '_capture'): cap = self.__dict__.pop("_capture", None)
outerr = self._outerr = self._capture.reset() if cap is not None:
del self._capture cap.pop_outerr_to_orig()
return outerr cap.stop_capturing()
def readouterr(self): def readouterr(self):
try: try:
return self._capture.readouterr() return self._capture.readouterr()
except AttributeError: except AttributeError:
return self._outerr return "", ""
def close(self):
self._finalize()
class FDCapture:
""" Capture IO to/from a given os-level filedescriptor. """
def __init__(self, targetfd, tmpfile=None, patchsys=False):
""" save targetfd descriptor, and open a new
temporary file there. If no tmpfile is
specified a tempfile.Tempfile() will be opened
in text mode.
"""
self.targetfd = targetfd
if tmpfile is None and targetfd != 0:
f = tempfile.TemporaryFile('wb+')
tmpfile = dupfile(f, encoding="UTF-8")
f.close()
self.tmpfile = tmpfile
self._savefd = os.dup(self.targetfd)
if patchsys:
self._oldsys = getattr(sys, patchsysdict[targetfd])
def start(self):
try:
os.fstat(self._savefd)
except OSError:
raise ValueError(
"saved filedescriptor not valid, "
"did you call start() twice?")
if self.targetfd == 0 and not self.tmpfile:
fd = os.open(os.devnull, os.O_RDONLY)
os.dup2(fd, 0)
os.close(fd)
if hasattr(self, '_oldsys'):
setattr(sys, patchsysdict[self.targetfd], DontReadFromInput())
else:
os.dup2(self.tmpfile.fileno(), self.targetfd)
if hasattr(self, '_oldsys'):
setattr(sys, patchsysdict[self.targetfd], self.tmpfile)
def done(self):
""" unpatch and clean up, returns the self.tmpfile (file object)
"""
os.dup2(self._savefd, self.targetfd)
os.close(self._savefd)
if self.targetfd != 0:
self.tmpfile.seek(0)
if hasattr(self, '_oldsys'):
setattr(sys, patchsysdict[self.targetfd], self._oldsys)
return self.tmpfile
def writeorg(self, data):
""" write a string to the original file descriptor
"""
tempfp = tempfile.TemporaryFile()
try:
os.dup2(self._savefd, tempfp.fileno())
tempfp.write(data)
finally:
tempfp.close()
def dupfile(f, mode=None, buffering=0, raising=False, encoding=None): def dupfile(f, mode=None, buffering=0, raising=False, encoding=None):
@ -408,185 +324,148 @@ class EncodedFile(object):
return getattr(self._stream, name) return getattr(self._stream, name)
class Capture(object): class StdCaptureBase(object):
def reset(self): out = err = in_ = None
""" reset sys.stdout/stderr and return captured output as strings. """
if hasattr(self, '_reset'):
raise ValueError("was already reset")
self._reset = True
outfile, errfile = self.done(save=False)
out, err = "", ""
if outfile and not outfile.closed:
out = outfile.read()
outfile.close()
if errfile and errfile != outfile and not errfile.closed:
err = errfile.read()
errfile.close()
return out, err
def suspend(self): def __init__(self, out=True, err=True, in_=True, Capture=None):
""" return current snapshot captures, memorize tempfiles. """
outerr = self.readouterr()
outfile, errfile = self.done()
return outerr
class StdCaptureFD(Capture):
""" This class allows to capture writes to FD1 and FD2
and may connect a NULL file to FD0 (and prevent
reads from sys.stdin). If any of the 0,1,2 file descriptors
is invalid it will not be captured.
"""
def __init__(self, out=True, err=True, in_=True, patchsys=True):
self._options = {
"out": out,
"err": err,
"in_": in_,
"patchsys": patchsys,
}
self._save()
def _save(self):
in_ = self._options['in_']
out = self._options['out']
err = self._options['err']
patchsys = self._options['patchsys']
if in_: if in_:
try: self.in_ = Capture(0)
self.in_ = FDCapture(
0, tmpfile=None,
patchsys=patchsys)
except OSError:
pass
if out: if out:
tmpfile = None self.out = Capture(1)
if hasattr(out, 'write'):
tmpfile = out
try:
self.out = FDCapture(
1, tmpfile=tmpfile,
patchsys=patchsys)
self._options['out'] = self.out.tmpfile
except OSError:
pass
if err: if err:
if hasattr(err, 'write'): self.err = Capture(2)
tmpfile = err
else:
tmpfile = None
try:
self.err = FDCapture(
2, tmpfile=tmpfile,
patchsys=patchsys)
self._options['err'] = self.err.tmpfile
except OSError:
pass
def startall(self): def start_capturing(self):
if hasattr(self, 'in_'): if self.in_:
self.in_.start() self.in_.start()
if hasattr(self, 'out'): if self.out:
self.out.start() self.out.start()
if hasattr(self, 'err'): if self.err:
self.err.start() self.err.start()
def resume(self): def pop_outerr_to_orig(self):
""" resume capturing with original temp files. """ """ pop current snapshot out/err capture and flush to orig streams. """
self.startall() out, err = self.readouterr()
if out:
self.out.writeorg(out)
if err:
self.err.writeorg(err)
def done(self, save=True): def stop_capturing(self):
""" return (outfile, errfile) and stop capturing. """ """ stop capturing and reset capturing streams """
outfile = errfile = None if hasattr(self, '_reset'):
if hasattr(self, 'out') and not self.out.tmpfile.closed: raise ValueError("was already stopped")
outfile = self.out.done() self._reset = True
if hasattr(self, 'err') and not self.err.tmpfile.closed: if self.out:
errfile = self.err.done() self.out.done()
if hasattr(self, 'in_'): if self.err:
self.err.done()
if self.in_:
self.in_.done() self.in_.done()
if save:
self._save()
return outfile, errfile
def readouterr(self): def readouterr(self):
""" return snapshot value of stdout/stderr capturings. """ """ return snapshot unicode value of stdout/stderr capturings. """
out = self._readsnapshot('out') return self._readsnapshot('out'), self._readsnapshot('err')
err = self._readsnapshot('err')
return out, err
def _readsnapshot(self, name): def _readsnapshot(self, name):
if hasattr(self, name): cap = getattr(self, name, None)
f = getattr(self, name).tmpfile if cap is None:
else: return ""
return '' return cap.snap()
class FDCapture:
""" Capture IO to/from a given os-level filedescriptor. """
def __init__(self, targetfd, tmpfile=None):
self.targetfd = targetfd
try:
self._savefd = os.dup(self.targetfd)
except OSError:
self.start = lambda: None
self.done = lambda: None
else:
if tmpfile is None:
if targetfd == 0:
tmpfile = open(os.devnull, "r")
else:
f = TemporaryFile()
with f:
tmpfile = dupfile(f, encoding="UTF-8")
self.tmpfile = tmpfile
if targetfd in patchsysdict:
self._oldsys = getattr(sys, patchsysdict[targetfd])
def __repr__(self):
return "<FDCapture %s oldfd=%s>" % (self.targetfd, self._savefd)
def start(self):
""" Start capturing on targetfd using memorized tmpfile. """
try:
os.fstat(self._savefd)
except OSError:
raise ValueError("saved filedescriptor not valid anymore")
targetfd = self.targetfd
os.dup2(self.tmpfile.fileno(), targetfd)
if hasattr(self, '_oldsys'):
subst = self.tmpfile if targetfd != 0 else DontReadFromInput()
setattr(sys, patchsysdict[targetfd], subst)
def snap(self):
f = self.tmpfile
f.seek(0) f.seek(0)
res = f.read() res = f.read()
enc = getattr(f, "encoding", None) if res:
if enc: enc = getattr(f, "encoding", None)
res = py.builtin._totext(res, enc, "replace") if enc and isinstance(res, bytes):
res = py.builtin._totext(res, enc, "replace")
f.truncate(0)
f.seek(0)
return res
def done(self):
""" stop capturing, restore streams, return original capture file,
seeked to position zero. """
os.dup2(self._savefd, self.targetfd)
os.close(self._savefd)
if hasattr(self, '_oldsys'):
setattr(sys, patchsysdict[self.targetfd], self._oldsys)
self.tmpfile.close()
def writeorg(self, data):
""" write to original file descriptor. """
if py.builtin._istext(data):
data = data.encode("utf8") # XXX use encoding of original stream
os.write(self._savefd, data)
class SysCapture:
def __init__(self, fd):
name = patchsysdict[fd]
self._old = getattr(sys, name)
self.name = name
if name == "stdin":
self.tmpfile = DontReadFromInput()
else:
self.tmpfile = TextIO()
def start(self):
setattr(sys, self.name, self.tmpfile)
def snap(self):
f = self.tmpfile
res = f.getvalue()
f.truncate(0) f.truncate(0)
f.seek(0) f.seek(0)
return res return res
def done(self):
setattr(sys, self.name, self._old)
self.tmpfile.close()
class StdCapture(Capture): def writeorg(self, data):
""" This class allows to capture writes to sys.stdout|stderr "in-memory" self._old.write(data)
and will raise errors on tries to read from sys.stdin. It only self._old.flush()
modifies sys.stdout|stderr|stdin attributes and does not
touch underlying File Descriptors (use StdCaptureFD for that).
"""
def __init__(self, out=True, err=True, in_=True):
self._oldout = sys.stdout
self._olderr = sys.stderr
self._oldin = sys.stdin
if out and not hasattr(out, 'file'):
out = TextIO()
self.out = out
if err:
if not hasattr(err, 'write'):
err = TextIO()
self.err = err
self.in_ = in_
def startall(self):
if self.out:
sys.stdout = self.out
if self.err:
sys.stderr = self.err
if self.in_:
sys.stdin = self.in_ = DontReadFromInput()
def done(self, save=True):
""" return (outfile, errfile) and stop capturing. """
outfile = errfile = None
if self.out and not self.out.closed:
sys.stdout = self._oldout
outfile = self.out
outfile.seek(0)
if self.err and not self.err.closed:
sys.stderr = self._olderr
errfile = self.err
errfile.seek(0)
if self.in_:
sys.stdin = self._oldin
return outfile, errfile
def resume(self):
""" resume capturing with original temp files. """
self.startall()
def readouterr(self):
""" return snapshot value of stdout/stderr capturings. """
out = err = ""
if self.out:
out = self.out.getvalue()
self.out.truncate(0)
self.out.seek(0)
if self.err:
err = self.err.getvalue()
self.err.truncate(0)
self.err.seek(0)
return out, err
class DontReadFromInput: class DontReadFromInput:

View File

@ -56,11 +56,15 @@ def _prepareconfig(args=None, plugins=None):
raise ValueError("not a string or argument list: %r" % (args,)) raise ValueError("not a string or argument list: %r" % (args,))
args = py.std.shlex.split(args) args = py.std.shlex.split(args)
pluginmanager = get_plugin_manager() pluginmanager = get_plugin_manager()
if plugins: try:
for plugin in plugins: if plugins:
pluginmanager.register(plugin) for plugin in plugins:
return pluginmanager.hook.pytest_cmdline_parse( pluginmanager.register(plugin)
pluginmanager=pluginmanager, args=args) return pluginmanager.hook.pytest_cmdline_parse(
pluginmanager=pluginmanager, args=args)
except Exception:
pluginmanager.ensure_shutdown()
raise
class PytestPluginManager(PluginManager): class PytestPluginManager(PluginManager):
def __init__(self, hookspecs=[hookspec]): def __init__(self, hookspecs=[hookspec]):
@ -612,6 +616,9 @@ class Config(object):
self.hook.pytest_logwarning(code=code, message=message, self.hook.pytest_logwarning(code=code, message=message,
fslocation=None, nodeid=None) fslocation=None, nodeid=None)
def get_terminal_writer(self):
return self.pluginmanager.getplugin("terminalreporter")._tw
def pytest_cmdline_parse(self, pluginmanager, args): def pytest_cmdline_parse(self, pluginmanager, args):
assert self == pluginmanager.config, (self, pluginmanager.config) assert self == pluginmanager.config, (self, pluginmanager.config)
self.parse(args) self.parse(args)

View File

@ -240,18 +240,22 @@ class PluginManager(object):
pass pass
l = [] l = []
last = [] last = []
wrappers = []
for plugin in plugins: for plugin in plugins:
try: try:
meth = getattr(plugin, attrname) meth = getattr(plugin, attrname)
if hasattr(meth, 'tryfirst'):
last.append(meth)
elif hasattr(meth, 'trylast'):
l.insert(0, meth)
else:
l.append(meth)
except AttributeError: except AttributeError:
continue continue
if hasattr(meth, 'hookwrapper'):
wrappers.append(meth)
elif hasattr(meth, 'tryfirst'):
last.append(meth)
elif hasattr(meth, 'trylast'):
l.insert(0, meth)
else:
l.append(meth)
l.extend(last) l.extend(last)
l.extend(wrappers)
self._listattrcache[key] = list(l) self._listattrcache[key] = list(l)
return l return l
@ -272,6 +276,14 @@ def importplugin(importspec):
class MultiCall: class MultiCall:
""" execute a call into multiple python functions/methods. """ """ execute a call into multiple python functions/methods. """
class WrongHookWrapper(Exception):
""" a hook wrapper does not behave correctly. """
def __init__(self, func, message):
Exception.__init__(self, func, message)
self.func = func
self.message = message
def __init__(self, methods, kwargs, firstresult=False): def __init__(self, methods, kwargs, firstresult=False):
self.methods = list(methods) self.methods = list(methods)
self.kwargs = kwargs self.kwargs = kwargs
@ -283,16 +295,39 @@ class MultiCall:
return "<MultiCall %s, kwargs=%r>" %(status, self.kwargs) return "<MultiCall %s, kwargs=%r>" %(status, self.kwargs)
def execute(self): def execute(self):
while self.methods: next_finalizers = []
method = self.methods.pop() try:
kwargs = self.getkwargs(method) while self.methods:
res = method(**kwargs) method = self.methods.pop()
if res is not None: kwargs = self.getkwargs(method)
self.results.append(res) if hasattr(method, "hookwrapper"):
if self.firstresult: it = method(**kwargs)
return res next = getattr(it, "next", None)
if not self.firstresult: if next is None:
return self.results next = getattr(it, "__next__", None)
if next is None:
raise self.WrongHookWrapper(method,
"wrapper does not contain a yield")
res = next()
next_finalizers.append((method, next))
else:
res = method(**kwargs)
if res is not None:
self.results.append(res)
if self.firstresult:
return res
if not self.firstresult:
return self.results
finally:
for method, fin in reversed(next_finalizers):
try:
fin()
except StopIteration:
pass
else:
raise self.WrongHookWrapper(method,
"wrapper contain more than one yield")
def getkwargs(self, method): def getkwargs(self, method):
kwargs = {} kwargs = {}

View File

@ -60,6 +60,7 @@ def pytest_addoption(parser):
def pytest_cmdline_main(config): def pytest_cmdline_main(config):
genscript = config.getvalue("genscript") genscript = config.getvalue("genscript")
if genscript: if genscript:
#tw = config.get_terminal_writer()
tw = py.io.TerminalWriter() tw = py.io.TerminalWriter()
deps = ['py', '_pytest', 'pytest'] deps = ['py', '_pytest', 'pytest']
if sys.version_info < (2,7): if sys.version_info < (2,7):

View File

@ -47,6 +47,8 @@ def pytest_unconfigure(config):
def pytest_cmdline_main(config): def pytest_cmdline_main(config):
if config.option.version: if config.option.version:
capman = config.pluginmanager.getplugin("capturemanager")
capman.reset_capturings()
p = py.path.local(pytest.__file__) p = py.path.local(pytest.__file__)
sys.stderr.write("This is pytest version %s, imported from %s\n" % sys.stderr.write("This is pytest version %s, imported from %s\n" %
(pytest.__version__, p)) (pytest.__version__, p))
@ -62,7 +64,7 @@ def pytest_cmdline_main(config):
return 0 return 0
def showhelp(config): def showhelp(config):
tw = py.io.TerminalWriter() tw = config.get_terminal_writer()
tw.write(config._parser.optparser.format_help()) tw.write(config._parser.optparser.format_help())
tw.line() tw.line()
tw.line() tw.line()

View File

@ -108,12 +108,14 @@ class LogXML(object):
)) ))
def _write_captured_output(self, report): def _write_captured_output(self, report):
sec = dict(report.sections) for capname in ('out', 'err'):
for name in ('out', 'err'): allcontent = ""
content = sec.get("Captured std%s" % name) for name, content in report.get_sections("Captured std%s" %
if content: capname):
tag = getattr(Junit, 'system-'+name) allcontent += content
self.append(tag(bin_xml_escape(content))) if allcontent:
tag = getattr(Junit, 'system-'+capname)
self.append(tag(bin_xml_escape(allcontent)))
def append(self, obj): def append(self, obj):
self.tests[-1].append(obj) self.tests[-1].append(obj)

View File

@ -233,6 +233,7 @@ class Node(object):
# used for storing artificial fixturedefs for direct parametrization # used for storing artificial fixturedefs for direct parametrization
self._name2pseudofixturedef = {} self._name2pseudofixturedef = {}
#self.extrainit() #self.extrainit()
@property @property
@ -465,6 +466,14 @@ class Item(Node):
""" """
nextitem = None nextitem = None
def __init__(self, name, parent=None, config=None, session=None):
super(Item, self).__init__(name, parent, config, session)
self._report_sections = []
def add_report_section(self, when, key, content):
if content:
self._report_sections.append((when, key, content))
def reportinfo(self): def reportinfo(self):
return self.fspath, None, "" return self.fspath, None, ""

View File

@ -40,7 +40,7 @@ def pytest_addoption(parser):
def pytest_cmdline_main(config): def pytest_cmdline_main(config):
if config.option.markers: if config.option.markers:
config.do_configure() config.do_configure()
tw = py.io.TerminalWriter() tw = config.get_terminal_writer()
for line in config.getini("markers"): for line in config.getini("markers"):
name, rest = line.split(":", 1) name, rest = line.split(":", 1)
tw.write("@pytest.mark.%s:" % name, bold=True) tw.write("@pytest.mark.%s:" % name, bold=True)

View File

@ -16,49 +16,36 @@ def pytest_configure(config):
if config.getvalue("usepdb"): if config.getvalue("usepdb"):
config.pluginmanager.register(PdbInvoke(), 'pdbinvoke') config.pluginmanager.register(PdbInvoke(), 'pdbinvoke')
old_trace = py.std.pdb.set_trace old = (py.std.pdb.set_trace, pytestPDB._pluginmanager)
def fin(): def fin():
py.std.pdb.set_trace = old_trace py.std.pdb.set_trace, pytestPDB._pluginmanager = old
py.std.pdb.set_trace = pytest.set_trace py.std.pdb.set_trace = pytest.set_trace
pytestPDB._pluginmanager = config.pluginmanager
config._cleanup.append(fin) config._cleanup.append(fin)
class pytestPDB: class pytestPDB:
""" Pseudo PDB that defers to the real pdb. """ """ Pseudo PDB that defers to the real pdb. """
item = None _pluginmanager = None
collector = None
def set_trace(self): def set_trace(self):
""" invoke PDB set_trace debugging, dropping any IO capturing. """ """ invoke PDB set_trace debugging, dropping any IO capturing. """
frame = sys._getframe().f_back frame = sys._getframe().f_back
item = self.item or self.collector capman = None
if self._pluginmanager is not None:
if item is not None: capman = self._pluginmanager.getplugin("capturemanager")
capman = item.config.pluginmanager.getplugin("capturemanager") if capman:
out, err = capman.suspendcapture() capman.reset_capturings()
if hasattr(item, 'outerr'):
item.outerr = (item.outerr[0] + out, item.outerr[1] + err)
tw = py.io.TerminalWriter() tw = py.io.TerminalWriter()
tw.line() tw.line()
tw.sep(">", "PDB set_trace (IO-capturing turned off)") tw.sep(">", "PDB set_trace (IO-capturing turned off)")
py.std.pdb.Pdb().set_trace(frame) py.std.pdb.Pdb().set_trace(frame)
def pdbitem(item):
pytestPDB.item = item
pytest_runtest_setup = pytest_runtest_call = pytest_runtest_teardown = pdbitem
@pytest.mark.tryfirst
def pytest_make_collect_report(__multicall__, collector):
try:
pytestPDB.collector = collector
return __multicall__.execute()
finally:
pytestPDB.collector = None
def pytest_runtest_makereport():
pytestPDB.item = None
class PdbInvoke: class PdbInvoke:
def pytest_exception_interact(self, node, call, report): def pytest_exception_interact(self, node, call, report):
capman = node.config.pluginmanager.getplugin("capturemanager")
if capman:
capman.reset_capturings()
return _enter_pdb(node, call.excinfo, report) return _enter_pdb(node, call.excinfo, report)
def pytest_internalerror(self, excrepr, excinfo): def pytest_internalerror(self, excrepr, excinfo):

View File

@ -885,7 +885,7 @@ def _showfixtures_main(config, session):
nodeid = "::".join(map(str, [curdir.bestrelpath(part[0])] + part[1:])) nodeid = "::".join(map(str, [curdir.bestrelpath(part[0])] + part[1:]))
nodeid.replace(session.fspath.sep, "/") nodeid.replace(session.fspath.sep, "/")
tw = py.io.TerminalWriter() tw = config.get_terminal_writer()
verbose = config.getvalue("verbose") verbose = config.getvalue("verbose")
fm = session._fixturemanager fm = session._fixturemanager

View File

@ -135,14 +135,13 @@ class CallInfo:
self.when = when self.when = when
self.start = time() self.start = time()
try: try:
try: self.result = func()
self.result = func() except KeyboardInterrupt:
except KeyboardInterrupt:
raise
except:
self.excinfo = py.code.ExceptionInfo()
finally:
self.stop = time() self.stop = time()
raise
except:
self.excinfo = py.code.ExceptionInfo()
self.stop = time()
def __repr__(self): def __repr__(self):
if self.excinfo: if self.excinfo:
@ -178,6 +177,11 @@ class BaseReport(object):
except UnicodeEncodeError: except UnicodeEncodeError:
out.line("<unprintable longrepr>") out.line("<unprintable longrepr>")
def get_sections(self, prefix):
for name, content in self.sections:
if name.startswith(prefix):
yield prefix, content
passed = property(lambda x: x.outcome == "passed") passed = property(lambda x: x.outcome == "passed")
failed = property(lambda x: x.outcome == "failed") failed = property(lambda x: x.outcome == "failed")
skipped = property(lambda x: x.outcome == "skipped") skipped = property(lambda x: x.outcome == "skipped")
@ -191,6 +195,7 @@ def pytest_runtest_makereport(item, call):
duration = call.stop-call.start duration = call.stop-call.start
keywords = dict([(x,1) for x in item.keywords]) keywords = dict([(x,1) for x in item.keywords])
excinfo = call.excinfo excinfo = call.excinfo
sections = []
if not call.excinfo: if not call.excinfo:
outcome = "passed" outcome = "passed"
longrepr = None longrepr = None
@ -209,16 +214,18 @@ def pytest_runtest_makereport(item, call):
else: # exception in setup or teardown else: # exception in setup or teardown
longrepr = item._repr_failure_py(excinfo, longrepr = item._repr_failure_py(excinfo,
style=item.config.option.tbstyle) style=item.config.option.tbstyle)
for rwhen, key, content in item._report_sections:
sections.append(("Captured std%s %s" %(key, rwhen), content))
return TestReport(item.nodeid, item.location, return TestReport(item.nodeid, item.location,
keywords, outcome, longrepr, when, keywords, outcome, longrepr, when,
duration=duration) sections, duration)
class TestReport(BaseReport): class TestReport(BaseReport):
""" Basic test report object (also used for setup and teardown calls if """ Basic test report object (also used for setup and teardown calls if
they fail). they fail).
""" """
def __init__(self, nodeid, location, def __init__(self, nodeid, location, keywords, outcome,
keywords, outcome, longrepr, when, sections=(), duration=0, **extra): longrepr, when, sections=(), duration=0, **extra):
#: normalized collection node id #: normalized collection node id
self.nodeid = nodeid self.nodeid = nodeid
@ -286,7 +293,8 @@ def pytest_make_collect_report(collector):
class CollectReport(BaseReport): class CollectReport(BaseReport):
def __init__(self, nodeid, outcome, longrepr, result, sections=(), **extra): def __init__(self, nodeid, outcome, longrepr, result,
sections=(), **extra):
self.nodeid = nodeid self.nodeid = nodeid
self.outcome = outcome self.outcome = outcome
self.longrepr = longrepr self.longrepr = longrepr

View File

@ -36,7 +36,10 @@ def pytest_addoption(parser):
def pytest_configure(config): def pytest_configure(config):
config.option.verbose -= config.option.quiet config.option.verbose -= config.option.quiet
reporter = TerminalReporter(config, sys.stdout) out = config.pluginmanager.getplugin("dupped_stdout")
#if out is None:
# out = sys.stdout
reporter = TerminalReporter(config, out)
config.pluginmanager.register(reporter, 'terminalreporter') config.pluginmanager.register(reporter, 'terminalreporter')
if config.option.debug or config.option.traceconfig: if config.option.debug or config.option.traceconfig:
def mywriter(tags, args): def mywriter(tags, args):
@ -44,6 +47,11 @@ def pytest_configure(config):
reporter.write_line("[traceconfig] " + msg) reporter.write_line("[traceconfig] " + msg)
config.trace.root.setprocessor("pytest:config", mywriter) config.trace.root.setprocessor("pytest:config", mywriter)
def get_terminal_writer(config):
tr = config.pluginmanager.getplugin("terminalreporter")
return tr._tw
def getreportopt(config): def getreportopt(config):
reportopts = "" reportopts = ""
optvalue = config.option.report optvalue = config.option.report

View File

@ -4,8 +4,8 @@ if __name__ == '__main__':
import cProfile import cProfile
import pytest import pytest
import pstats import pstats
script = sys.argv[1] if len(sys.argv) > 1 else "empty.py" script = sys.argv[1:] if len(sys.argv) > 1 else "empty.py"
stats = cProfile.run('pytest.cmdline.main([%r])' % script, 'prof') stats = cProfile.run('pytest.cmdline.main(%r)' % script, 'prof')
p = pstats.Stats("prof") p = pstats.Stats("prof")
p.strip_dirs() p.strip_dirs()
p.sort_stats('cumulative') p.sort_stats('cumulative')

View File

@ -4,6 +4,7 @@ from __future__ import with_statement
import os import os
import sys import sys
import py import py
import tempfile
import pytest import pytest
import contextlib import contextlib
@ -44,6 +45,13 @@ def oswritebytes(fd, obj):
def StdCaptureFD(out=True, err=True, in_=True):
return capture.StdCaptureBase(out, err, in_, Capture=capture.FDCapture)
def StdCapture(out=True, err=True, in_=True):
return capture.StdCaptureBase(out, err, in_, Capture=capture.SysCapture)
class TestCaptureManager: class TestCaptureManager:
def test_getmethod_default_no_fd(self, testdir, monkeypatch): def test_getmethod_default_no_fd(self, testdir, monkeypatch):
config = testdir.parseconfig(testdir.tmpdir) config = testdir.parseconfig(testdir.tmpdir)
@ -75,7 +83,7 @@ class TestCaptureManager:
@needsosdup @needsosdup
@pytest.mark.parametrize("method", ['no', 'fd', 'sys']) @pytest.mark.parametrize("method", ['no', 'fd', 'sys'])
def test_capturing_basic_api(self, method): def test_capturing_basic_api(self, method):
capouter = capture.StdCaptureFD() capouter = StdCaptureFD()
old = sys.stdout, sys.stderr, sys.stdin old = sys.stdout, sys.stderr, sys.stdin
try: try:
capman = CaptureManager() capman = CaptureManager()
@ -95,11 +103,11 @@ class TestCaptureManager:
assert not out and not err assert not out and not err
capman.reset_capturings() capman.reset_capturings()
finally: finally:
capouter.reset() capouter.stop_capturing()
@needsosdup @needsosdup
def test_juggle_capturings(self, testdir): def test_juggle_capturings(self, testdir):
capouter = capture.StdCaptureFD() capouter = StdCaptureFD()
try: try:
#config = testdir.parseconfig(testdir.tmpdir) #config = testdir.parseconfig(testdir.tmpdir)
capman = CaptureManager() capman = CaptureManager()
@ -119,7 +127,7 @@ class TestCaptureManager:
finally: finally:
capman.reset_capturings() capman.reset_capturings()
finally: finally:
capouter.reset() capouter.stop_capturing()
@pytest.mark.parametrize("method", ['fd', 'sys']) @pytest.mark.parametrize("method", ['fd', 'sys'])
@ -282,9 +290,9 @@ class TestPerTestCapturing:
"====* FAILURES *====", "====* FAILURES *====",
"____*____", "____*____",
"*test_capturing_outerr.py:8: ValueError", "*test_capturing_outerr.py:8: ValueError",
"*--- Captured stdout ---*", "*--- Captured stdout *call*",
"1", "1",
"*--- Captured stderr ---*", "*--- Captured stderr *call*",
"2", "2",
]) ])
@ -688,17 +696,15 @@ class TestFDCapture:
cap = capture.FDCapture(fd) cap = capture.FDCapture(fd)
data = tobytes("hello") data = tobytes("hello")
os.write(fd, data) os.write(fd, data)
f = cap.done() s = cap.snap()
s = f.read() cap.done()
f.close()
assert not s assert not s
cap = capture.FDCapture(fd) cap = capture.FDCapture(fd)
cap.start() cap.start()
os.write(fd, data) os.write(fd, data)
f = cap.done() s = cap.snap()
s = f.read() cap.done()
assert s == "hello" assert s == "hello"
f.close()
def test_simple_many(self, tmpfile): def test_simple_many(self, tmpfile):
for i in range(10): for i in range(10):
@ -712,22 +718,21 @@ class TestFDCapture:
def test_simple_fail_second_start(self, tmpfile): def test_simple_fail_second_start(self, tmpfile):
fd = tmpfile.fileno() fd = tmpfile.fileno()
cap = capture.FDCapture(fd) cap = capture.FDCapture(fd)
f = cap.done() cap.done()
pytest.raises(ValueError, cap.start) pytest.raises(ValueError, cap.start)
f.close()
def test_stderr(self): def test_stderr(self):
cap = capture.FDCapture(2, patchsys=True) cap = capture.FDCapture(2)
cap.start() cap.start()
print_("hello", file=sys.stderr) print_("hello", file=sys.stderr)
f = cap.done() s = cap.snap()
s = f.read() cap.done()
assert s == "hello\n" assert s == "hello\n"
def test_stdin(self, tmpfile): def test_stdin(self, tmpfile):
tmpfile.write(tobytes("3")) tmpfile.write(tobytes("3"))
tmpfile.seek(0) tmpfile.seek(0)
cap = capture.FDCapture(0, tmpfile=tmpfile) cap = capture.FDCapture(0, tmpfile)
cap.start() cap.start()
# check with os.read() directly instead of raw_input(), because # check with os.read() directly instead of raw_input(), because
# sys.stdin itself may be redirected (as pytest now does by default) # sys.stdin itself may be redirected (as pytest now does by default)
@ -744,123 +749,121 @@ class TestFDCapture:
cap.writeorg(data2) cap.writeorg(data2)
finally: finally:
tmpfile.close() tmpfile.close()
f = cap.done() scap = cap.snap()
scap = f.read() cap.done()
assert scap == totext(data1) assert scap == totext(data1)
stmp = open(tmpfile.name, 'rb').read() stmp = open(tmpfile.name, 'rb').read()
assert stmp == data2 assert stmp == data2
class TestStdCapture: class TestStdCapture:
captureclass = staticmethod(StdCapture)
@contextlib.contextmanager
def getcapture(self, **kw): def getcapture(self, **kw):
cap = capture.StdCapture(**kw) cap = self.__class__.captureclass(**kw)
cap.startall() cap.start_capturing()
return cap try:
yield cap
finally:
cap.stop_capturing()
def test_capturing_done_simple(self): def test_capturing_done_simple(self):
cap = self.getcapture() with self.getcapture() as cap:
sys.stdout.write("hello") sys.stdout.write("hello")
sys.stderr.write("world") sys.stderr.write("world")
outfile, errfile = cap.done() out, err = cap.readouterr()
s = outfile.read() assert out == "hello"
assert s == "hello" assert err == "world"
s = errfile.read()
assert s == "world"
def test_capturing_reset_simple(self): def test_capturing_reset_simple(self):
cap = self.getcapture() with self.getcapture() as cap:
print("hello world") print("hello world")
sys.stderr.write("hello error\n") sys.stderr.write("hello error\n")
out, err = cap.reset() out, err = cap.readouterr()
assert out == "hello world\n" assert out == "hello world\n"
assert err == "hello error\n" assert err == "hello error\n"
def test_capturing_readouterr(self): def test_capturing_readouterr(self):
cap = self.getcapture() with self.getcapture() as cap:
try:
print ("hello world") print ("hello world")
sys.stderr.write("hello error\n") sys.stderr.write("hello error\n")
out, err = cap.readouterr() out, err = cap.readouterr()
assert out == "hello world\n" assert out == "hello world\n"
assert err == "hello error\n" assert err == "hello error\n"
sys.stderr.write("error2") sys.stderr.write("error2")
finally: out, err = cap.readouterr()
out, err = cap.reset()
assert err == "error2" assert err == "error2"
def test_capturing_readouterr_unicode(self): def test_capturing_readouterr_unicode(self):
cap = self.getcapture() with self.getcapture() as cap:
try:
print ("hx\xc4\x85\xc4\x87") print ("hx\xc4\x85\xc4\x87")
out, err = cap.readouterr() out, err = cap.readouterr()
finally:
cap.reset()
assert out == py.builtin._totext("hx\xc4\x85\xc4\x87\n", "utf8") assert out == py.builtin._totext("hx\xc4\x85\xc4\x87\n", "utf8")
@pytest.mark.skipif('sys.version_info >= (3,)', @pytest.mark.skipif('sys.version_info >= (3,)',
reason='text output different for bytes on python3') reason='text output different for bytes on python3')
def test_capturing_readouterr_decode_error_handling(self): def test_capturing_readouterr_decode_error_handling(self):
cap = self.getcapture() with self.getcapture() as cap:
# triggered a internal error in pytest # triggered a internal error in pytest
print('\xa6') print('\xa6')
out, err = cap.readouterr() out, err = cap.readouterr()
assert out == py.builtin._totext('\ufffd\n', 'unicode-escape') assert out == py.builtin._totext('\ufffd\n', 'unicode-escape')
def test_reset_twice_error(self): def test_reset_twice_error(self):
cap = self.getcapture() with self.getcapture() as cap:
print ("hello") print ("hello")
out, err = cap.reset() out, err = cap.readouterr()
pytest.raises(ValueError, cap.reset) pytest.raises(ValueError, cap.stop_capturing)
assert out == "hello\n" assert out == "hello\n"
assert not err assert not err
def test_capturing_modify_sysouterr_in_between(self): def test_capturing_modify_sysouterr_in_between(self):
oldout = sys.stdout oldout = sys.stdout
olderr = sys.stderr olderr = sys.stderr
cap = self.getcapture() with self.getcapture() as cap:
sys.stdout.write("hello") sys.stdout.write("hello")
sys.stderr.write("world") sys.stderr.write("world")
sys.stdout = capture.TextIO() sys.stdout = capture.TextIO()
sys.stderr = capture.TextIO() sys.stderr = capture.TextIO()
print ("not seen") print ("not seen")
sys.stderr.write("not seen\n") sys.stderr.write("not seen\n")
out, err = cap.reset() out, err = cap.readouterr()
assert out == "hello" assert out == "hello"
assert err == "world" assert err == "world"
assert sys.stdout == oldout assert sys.stdout == oldout
assert sys.stderr == olderr assert sys.stderr == olderr
def test_capturing_error_recursive(self): def test_capturing_error_recursive(self):
cap1 = self.getcapture() with self.getcapture() as cap1:
print ("cap1") print ("cap1")
cap2 = self.getcapture() with self.getcapture() as cap2:
print ("cap2") print ("cap2")
out2, err2 = cap2.reset() out2, err2 = cap2.readouterr()
out1, err1 = cap1.reset() out1, err1 = cap1.readouterr()
assert out1 == "cap1\n" assert out1 == "cap1\n"
assert out2 == "cap2\n" assert out2 == "cap2\n"
def test_just_out_capture(self): def test_just_out_capture(self):
cap = self.getcapture(out=True, err=False) with self.getcapture(out=True, err=False) as cap:
sys.stdout.write("hello") sys.stdout.write("hello")
sys.stderr.write("world") sys.stderr.write("world")
out, err = cap.reset() out, err = cap.readouterr()
assert out == "hello" assert out == "hello"
assert not err assert not err
def test_just_err_capture(self): def test_just_err_capture(self):
cap = self.getcapture(out=False, err=True) with self.getcapture(out=False, err=True) as cap:
sys.stdout.write("hello") sys.stdout.write("hello")
sys.stderr.write("world") sys.stderr.write("world")
out, err = cap.reset() out, err = cap.readouterr()
assert err == "world" assert err == "world"
assert not out assert not out
def test_stdin_restored(self): def test_stdin_restored(self):
old = sys.stdin old = sys.stdin
cap = self.getcapture(in_=True) with self.getcapture(in_=True) as cap:
newstdin = sys.stdin newstdin = sys.stdin
out, err = cap.reset()
assert newstdin != sys.stdin assert newstdin != sys.stdin
assert sys.stdin is old assert sys.stdin is old
@ -868,68 +871,47 @@ class TestStdCapture:
print ("XXX this test may well hang instead of crashing") print ("XXX this test may well hang instead of crashing")
print ("XXX which indicates an error in the underlying capturing") print ("XXX which indicates an error in the underlying capturing")
print ("XXX mechanisms") print ("XXX mechanisms")
cap = self.getcapture() with self.getcapture() as cap:
pytest.raises(IOError, "sys.stdin.read()") pytest.raises(IOError, "sys.stdin.read()")
out, err = cap.reset()
def test_suspend_resume(self):
cap = self.getcapture(out=True, err=False, in_=False)
try:
print ("hello")
sys.stderr.write("error\n")
out, err = cap.suspend()
assert out == "hello\n"
assert not err
print ("in between")
sys.stderr.write("in between\n")
cap.resume()
print ("after")
sys.stderr.write("error_after\n")
finally:
out, err = cap.reset()
assert out == "after\n"
assert not err
class TestStdCaptureFD(TestStdCapture): class TestStdCaptureFD(TestStdCapture):
pytestmark = needsosdup pytestmark = needsosdup
captureclass = staticmethod(StdCaptureFD)
def getcapture(self, **kw): def test_simple_only_fd(self, testdir):
cap = capture.StdCaptureFD(**kw) testdir.makepyfile("""
cap.startall() import os
return cap def test_x():
os.write(1, "hello\\n".encode("ascii"))
assert 0
""")
result = testdir.runpytest()
result.stdout.fnmatch_lines("""
*test_x*
*assert 0*
*Captured stdout*
""")
def test_intermingling(self): def test_intermingling(self):
cap = self.getcapture() with self.getcapture() as cap:
oswritebytes(1, "1") oswritebytes(1, "1")
sys.stdout.write(str(2)) sys.stdout.write(str(2))
sys.stdout.flush() sys.stdout.flush()
oswritebytes(1, "3") oswritebytes(1, "3")
oswritebytes(2, "a") oswritebytes(2, "a")
sys.stderr.write("b") sys.stderr.write("b")
sys.stderr.flush() sys.stderr.flush()
oswritebytes(2, "c") oswritebytes(2, "c")
out, err = cap.reset() out, err = cap.readouterr()
assert out == "123" assert out == "123"
assert err == "abc" assert err == "abc"
def test_many(self, capfd): def test_many(self, capfd):
with lsof_check(): with lsof_check():
for i in range(10): for i in range(10):
cap = capture.StdCaptureFD() cap = StdCaptureFD()
cap.reset() cap.stop_capturing()
@needsosdup
def test_stdcapture_fd_tmpfile(tmpfile):
capfd = capture.StdCaptureFD(out=tmpfile)
try:
os.write(1, "hello".encode("ascii"))
os.write(2, "world".encode("ascii"))
outf, errf = capfd.done()
finally:
capfd.reset()
assert outf == tmpfile
class TestStdCaptureFDinvalidFD: class TestStdCaptureFDinvalidFD:
@ -938,19 +920,22 @@ class TestStdCaptureFDinvalidFD:
def test_stdcapture_fd_invalid_fd(self, testdir): def test_stdcapture_fd_invalid_fd(self, testdir):
testdir.makepyfile(""" testdir.makepyfile("""
import os import os
from _pytest.capture import StdCaptureFD from _pytest import capture
def StdCaptureFD(out=True, err=True, in_=True):
return capture.StdCaptureBase(out, err, in_,
Capture=capture.FDCapture)
def test_stdout(): def test_stdout():
os.close(1) os.close(1)
cap = StdCaptureFD(out=True, err=False, in_=False) cap = StdCaptureFD(out=True, err=False, in_=False)
cap.done() cap.stop_capturing()
def test_stderr(): def test_stderr():
os.close(2) os.close(2)
cap = StdCaptureFD(out=False, err=True, in_=False) cap = StdCaptureFD(out=False, err=True, in_=False)
cap.done() cap.stop_capturing()
def test_stdin(): def test_stdin():
os.close(0) os.close(0)
cap = StdCaptureFD(out=False, err=False, in_=True) cap = StdCaptureFD(out=False, err=False, in_=True)
cap.done() cap.stop_capturing()
""") """)
result = testdir.runpytest("--capture=fd") result = testdir.runpytest("--capture=fd")
assert result.ret == 0 assert result.ret == 0
@ -958,27 +943,8 @@ class TestStdCaptureFDinvalidFD:
def test_capture_not_started_but_reset(): def test_capture_not_started_but_reset():
capsys = capture.StdCapture() capsys = StdCapture()
capsys.done() capsys.stop_capturing()
capsys.done()
capsys.reset()
@needsosdup
def test_capture_no_sys():
capsys = capture.StdCapture()
try:
cap = capture.StdCaptureFD(patchsys=False)
cap.startall()
sys.stdout.write("hello")
sys.stderr.write("world")
oswritebytes(1, "1")
oswritebytes(2, "2")
out, err = cap.reset()
assert out == "1"
assert err == "2"
finally:
capsys.reset()
@needsosdup @needsosdup
@ -986,19 +952,18 @@ def test_capture_no_sys():
def test_fdcapture_tmpfile_remains_the_same(tmpfile, use): def test_fdcapture_tmpfile_remains_the_same(tmpfile, use):
if not use: if not use:
tmpfile = True tmpfile = True
cap = capture.StdCaptureFD(out=False, err=tmpfile) cap = StdCaptureFD(out=False, err=tmpfile)
try: try:
cap.startall() cap.start_capturing()
capfile = cap.err.tmpfile capfile = cap.err.tmpfile
cap.suspend() cap.readouterr()
cap.resume()
finally: finally:
cap.reset() cap.stop_capturing()
capfile2 = cap.err.tmpfile capfile2 = cap.err.tmpfile
assert capfile2 == capfile assert capfile2 == capfile
@pytest.mark.parametrize('method', ['StdCapture', 'StdCaptureFD']) @pytest.mark.parametrize('method', ['SysCapture', 'FDCapture'])
def test_capturing_and_logging_fundamentals(testdir, method): def test_capturing_and_logging_fundamentals(testdir, method):
if method == "StdCaptureFD" and not hasattr(os, 'dup'): if method == "StdCaptureFD" and not hasattr(os, 'dup'):
pytest.skip("need os.dup") pytest.skip("need os.dup")
@ -1007,23 +972,27 @@ def test_capturing_and_logging_fundamentals(testdir, method):
import sys, os import sys, os
import py, logging import py, logging
from _pytest import capture from _pytest import capture
cap = capture.%s(out=False, in_=False) cap = capture.StdCaptureBase(out=False, in_=False,
cap.startall() Capture=capture.%s)
cap.start_capturing()
logging.warn("hello1") logging.warn("hello1")
outerr = cap.suspend() outerr = cap.readouterr()
print ("suspend, captured %%s" %%(outerr,)) print ("suspend, captured %%s" %%(outerr,))
logging.warn("hello2") logging.warn("hello2")
cap.resume() cap.pop_outerr_to_orig()
logging.warn("hello3") logging.warn("hello3")
outerr = cap.suspend() outerr = cap.readouterr()
print ("suspend2, captured %%s" %% (outerr,)) print ("suspend2, captured %%s" %% (outerr,))
""" % (method,)) """ % (method,))
result = testdir.runpython(p) result = testdir.runpython(p)
result.stdout.fnmatch_lines([ result.stdout.fnmatch_lines("""
"suspend, captured*hello1*", suspend, captured*hello1*
"suspend2, captured*hello2*WARNING:root:hello3*", suspend2, captured*WARNING:root:hello3*
]) """)
result.stderr.fnmatch_lines("""
WARNING:root:hello2
""")
assert "atexit" not in result.stderr.str() assert "atexit" not in result.stderr.str()

View File

@ -523,6 +523,95 @@ class TestMultiCall:
res = MultiCall([m1, m2], {}).execute() res = MultiCall([m1, m2], {}).execute()
assert res == [1] assert res == [1]
def test_hookwrapper(self):
l = []
def m1():
l.append("m1 init")
yield None
l.append("m1 finish")
m1.hookwrapper = True
def m2():
l.append("m2")
return 2
res = MultiCall([m2, m1], {}).execute()
assert res == [2]
assert l == ["m1 init", "m2", "m1 finish"]
l[:] = []
res = MultiCall([m2, m1], {}, firstresult=True).execute()
assert res == 2
assert l == ["m1 init", "m2", "m1 finish"]
def test_hookwrapper_order(self):
l = []
def m1():
l.append("m1 init")
yield 1
l.append("m1 finish")
m1.hookwrapper = True
def m2():
l.append("m2 init")
yield 2
l.append("m2 finish")
m2.hookwrapper = True
res = MultiCall([m2, m1], {}).execute()
assert res == [1, 2]
assert l == ["m1 init", "m2 init", "m2 finish", "m1 finish"]
def test_listattr_hookwrapper_ordering(self):
class P1:
@pytest.mark.hookwrapper
def m(self):
return 17
class P2:
def m(self):
return 23
class P3:
@pytest.mark.tryfirst
def m(self):
return 19
pluginmanager = PluginManager()
p1 = P1()
p2 = P2()
p3 = P3()
pluginmanager.register(p1)
pluginmanager.register(p2)
pluginmanager.register(p3)
methods = pluginmanager.listattr('m')
assert methods == [p2.m, p3.m, p1.m]
## listattr keeps a cache and deleting
## a function attribute requires clearing it
#pluginmanager._listattrcache.clear()
#del P1.m.__dict__['tryfirst']
def test_hookwrapper_not_yield(self):
def m1():
pass
m1.hookwrapper = True
mc = MultiCall([m1], {})
with pytest.raises(mc.WrongHookWrapper) as ex:
mc.execute()
assert ex.value.func == m1
assert ex.value.message
def test_hookwrapper_too_many_yield(self):
def m1():
yield 1
yield 2
m1.hookwrapper = True
mc = MultiCall([m1], {})
with pytest.raises(mc.WrongHookWrapper) as ex:
mc.execute()
assert ex.value.func == m1
assert ex.value.message
class TestHookRelay: class TestHookRelay:
def test_happypath(self): def test_happypath(self):
pm = PluginManager() pm = PluginManager()

View File

@ -478,10 +478,12 @@ def test_unicode_issue368(testdir):
path = testdir.tmpdir.join("test.xml") path = testdir.tmpdir.join("test.xml")
log = LogXML(str(path), None) log = LogXML(str(path), None)
ustr = py.builtin._totext("ВНИ!", "utf-8") ustr = py.builtin._totext("ВНИ!", "utf-8")
class report: from _pytest.runner import BaseReport
class Report(BaseReport):
longrepr = ustr longrepr = ustr
sections = [] sections = []
nodeid = "something" nodeid = "something"
report = Report()
# hopefully this is not too brittle ... # hopefully this is not too brittle ...
log.pytest_sessionstart() log.pytest_sessionstart()