diff --git a/_pytest/capture.py b/_pytest/capture.py index 04fcbbd0d..60771cdf6 100644 --- a/_pytest/capture.py +++ b/_pytest/capture.py @@ -1,12 +1,13 @@ """ - per-test stdout/stderr capturing mechanisms, - ``capsys`` and ``capfd`` function arguments. +per-test stdout/stderr capturing mechanism. + """ -# note: py.io capture was where copied from -# pylib 1.4.20.dev2 (rev 13d9af95547e) +from __future__ import with_statement + import sys import os -import tempfile +from tempfile import TemporaryFile +import contextlib import py import pytest @@ -58,8 +59,18 @@ def pytest_load_initial_conftests(early_config, parser, args, __multicall__): method = "fd" if method == "fd" and not hasattr(os, "dup"): method = "sys" + pluginmanager = early_config.pluginmanager + if method != "no": + try: + sys.stdout.fileno() + except Exception: + dupped_stdout = sys.stdout + else: + dupped_stdout = dupfile(sys.stdout, buffering=1) + pluginmanager.register(dupped_stdout, "dupped_stdout") + #pluginmanager.add_shutdown(dupped_stdout.close) capman = CaptureManager(method) - early_config.pluginmanager.register(capman, "capturemanager") + pluginmanager.register(capman, "capturemanager") # make sure that capturemanager is properly reset at final shutdown def teardown(): @@ -68,13 +79,13 @@ def pytest_load_initial_conftests(early_config, parser, args, __multicall__): except ValueError: pass - early_config.pluginmanager.add_shutdown(teardown) + pluginmanager.add_shutdown(teardown) # make sure logging does not raise exceptions at the end def silence_logging_at_shutdown(): if "logging" in sys.modules: sys.modules["logging"].raiseExceptions = False - early_config.pluginmanager.add_shutdown(silence_logging_at_shutdown) + pluginmanager.add_shutdown(silence_logging_at_shutdown) # finally trigger conftest loading but while capturing (issue93) capman.resumecapture() @@ -89,53 +100,19 @@ def pytest_load_initial_conftests(early_config, parser, args, __multicall__): raise -def addouterr(rep, outerr): - for secname, content in zip(["out", "err"], outerr): - if content: - rep.sections.append(("Captured std%s" % secname, content)) - - -class NoCapture: - def startall(self): - pass - - def resume(self): - pass - - def reset(self): - pass - - def suspend(self): - return "", "" - class CaptureManager: def __init__(self, defaultmethod=None): self._method2capture = {} self._defaultmethod = defaultmethod - def _maketempfile(self): - f = py.std.tempfile.TemporaryFile() - newf = dupfile(f, encoding="UTF-8") - f.close() - return newf - - def _makestringio(self): - return TextIO() - def _getcapture(self, method): if method == "fd": - return StdCaptureFD( - out=self._maketempfile(), - err=self._maketempfile(), - ) + return StdCaptureBase(out=True, err=True, Capture=FDCapture) elif method == "sys": - return StdCapture( - out=self._makestringio(), - err=self._makestringio(), - ) + return StdCaptureBase(out=True, err=True, Capture=SysCapture) elif method == "no": - return NoCapture() + return StdCaptureBase(out=False, err=False, in_=False) else: raise ValueError("unknown capturing method: %r" % method) @@ -153,12 +130,12 @@ class CaptureManager: def reset_capturings(self): for cap in self._method2capture.values(): - cap.reset() + cap.pop_outerr_to_orig() + cap.stop_capturing() + self._method2capture.clear() def resumecapture_item(self, item): method = self._getmethod(item.config, item.fspath) - if not hasattr(item, 'outerr'): - item.outerr = ('', '') # we accumulate outerr on the item return self.resumecapture(method) def resumecapture(self, method=None): @@ -172,87 +149,85 @@ class CaptureManager: self._capturing = method if cap is None: self._method2capture[method] = cap = self._getcapture(method) - cap.startall() + cap.start_capturing() else: - cap.resume() + cap.pop_outerr_to_orig() def suspendcapture(self, item=None): self.deactivate_funcargs() - if hasattr(self, '_capturing'): - method = self._capturing + method = self.__dict__.pop("_capturing", None) + if method is not None: cap = self._method2capture.get(method) if cap is not None: - outerr = cap.suspend() - del self._capturing - if item: - outerr = (item.outerr[0] + outerr[0], - item.outerr[1] + outerr[1]) - return outerr - if hasattr(item, 'outerr'): - return item.outerr + return cap.readouterr() return "", "" def activate_funcargs(self, pyfuncitem): - funcargs = getattr(pyfuncitem, "funcargs", None) - if funcargs is not None: - for name, capfuncarg in funcargs.items(): - if name in ('capsys', 'capfd'): - assert not hasattr(self, '_capturing_funcarg') - self._capturing_funcarg = capfuncarg - capfuncarg._start() + capfuncarg = pyfuncitem.__dict__.pop("_capfuncarg", None) + if capfuncarg is not None: + capfuncarg._start() + self._capfuncarg = capfuncarg def deactivate_funcargs(self): - capturing_funcarg = getattr(self, '_capturing_funcarg', None) - if capturing_funcarg: - outerr = capturing_funcarg._finalize() - del self._capturing_funcarg - return outerr + capfuncarg = self.__dict__.pop("_capfuncarg", None) + if capfuncarg is not None: + capfuncarg.close() + @pytest.mark.hookwrapper def pytest_make_collect_report(self, __multicall__, collector): method = self._getmethod(collector.config, collector.fspath) try: self.resumecapture(method) except ValueError: + yield # recursive collect, XXX refactor capturing # to allow for more lightweight recursive capturing return - try: - rep = __multicall__.execute() - finally: - outerr = self.suspendcapture() - addouterr(rep, outerr) - return rep + yield + out, err = self.suspendcapture() + # XXX getting the report from the ongoing hook call is a bit + # of a hack. We need to think about capturing during collection + # and find out if it's really needed fine-grained (per + # collector). + if __multicall__.results: + rep = __multicall__.results[0] + if out: + rep.sections.append(("Captured stdout", out)) + if err: + rep.sections.append(("Captured stderr", err)) - @pytest.mark.tryfirst + @pytest.mark.hookwrapper def pytest_runtest_setup(self, item): - self.resumecapture_item(item) + with self.item_capture_wrapper(item, "setup"): + yield - @pytest.mark.tryfirst + @pytest.mark.hookwrapper def pytest_runtest_call(self, item): - self.resumecapture_item(item) - self.activate_funcargs(item) + with self.item_capture_wrapper(item, "call"): + self.activate_funcargs(item) + yield + #self.deactivate_funcargs() called from ctx's suspendcapture() - @pytest.mark.tryfirst + @pytest.mark.hookwrapper def pytest_runtest_teardown(self, item): - self.resumecapture_item(item) - - def pytest_keyboard_interrupt(self, excinfo): - if hasattr(self, '_capturing'): - self.suspendcapture() + with self.item_capture_wrapper(item, "teardown"): + yield @pytest.mark.tryfirst - def pytest_runtest_makereport(self, __multicall__, item, call): - funcarg_outerr = self.deactivate_funcargs() - rep = __multicall__.execute() - outerr = self.suspendcapture(item) - if funcarg_outerr is not None: - outerr = (outerr[0] + funcarg_outerr[0], - outerr[1] + funcarg_outerr[1]) - addouterr(rep, outerr) - if not rep.passed or rep.when == "teardown": - outerr = ('', '') - item.outerr = outerr - return rep + def pytest_keyboard_interrupt(self, excinfo): + self.reset_capturings() + + @pytest.mark.tryfirst + def pytest_internalerror(self, excinfo): + self.reset_capturings() + + @contextlib.contextmanager + def item_capture_wrapper(self, item, when): + self.resumecapture_item(item) + yield + out, err = self.suspendcapture(item) + item.add_report_section(when, "out", out) + item.add_report_section(when, "err", err) error_capsysfderror = "cannot use capsys and capfd at the same time" @@ -264,8 +239,8 @@ def pytest_funcarg__capsys(request): """ if "capfd" in request._funcargs: raise request.raiseerror(error_capsysfderror) - return CaptureFixture(StdCapture) - + request.node._capfuncarg = c = CaptureFixture(SysCapture) + return c def pytest_funcarg__capfd(request): """enables capturing of writes to file descriptors 1 and 2 and makes @@ -276,89 +251,30 @@ def pytest_funcarg__capfd(request): request.raiseerror(error_capsysfderror) if not hasattr(os, 'dup'): pytest.skip("capfd funcarg needs os.dup") - return CaptureFixture(StdCaptureFD) + request.node._capfuncarg = c = CaptureFixture(FDCapture) + return c class CaptureFixture: def __init__(self, captureclass): - self._capture = captureclass() + self.captureclass = captureclass def _start(self): - self._capture.startall() + self._capture = StdCaptureBase(out=True, err=True, in_=False, + Capture=self.captureclass) + self._capture.start_capturing() - def _finalize(self): - if hasattr(self, '_capture'): - outerr = self._outerr = self._capture.reset() - del self._capture - return outerr + def close(self): + cap = self.__dict__.pop("_capture", None) + if cap is not None: + cap.pop_outerr_to_orig() + cap.stop_capturing() def readouterr(self): try: return self._capture.readouterr() except AttributeError: - return self._outerr - - def close(self): - self._finalize() - - -class FDCapture: - """ Capture IO to/from a given os-level filedescriptor. """ - - def __init__(self, targetfd, tmpfile=None, patchsys=False): - """ save targetfd descriptor, and open a new - temporary file there. If no tmpfile is - specified a tempfile.Tempfile() will be opened - in text mode. - """ - self.targetfd = targetfd - if tmpfile is None and targetfd != 0: - f = tempfile.TemporaryFile('wb+') - tmpfile = dupfile(f, encoding="UTF-8") - f.close() - self.tmpfile = tmpfile - self._savefd = os.dup(self.targetfd) - if patchsys: - self._oldsys = getattr(sys, patchsysdict[targetfd]) - - def start(self): - try: - os.fstat(self._savefd) - except OSError: - raise ValueError( - "saved filedescriptor not valid, " - "did you call start() twice?") - if self.targetfd == 0 and not self.tmpfile: - fd = os.open(os.devnull, os.O_RDONLY) - os.dup2(fd, 0) - os.close(fd) - if hasattr(self, '_oldsys'): - setattr(sys, patchsysdict[self.targetfd], DontReadFromInput()) - else: - os.dup2(self.tmpfile.fileno(), self.targetfd) - if hasattr(self, '_oldsys'): - setattr(sys, patchsysdict[self.targetfd], self.tmpfile) - - def done(self): - """ unpatch and clean up, returns the self.tmpfile (file object) - """ - os.dup2(self._savefd, self.targetfd) - os.close(self._savefd) - if self.targetfd != 0: - self.tmpfile.seek(0) - if hasattr(self, '_oldsys'): - setattr(sys, patchsysdict[self.targetfd], self._oldsys) - return self.tmpfile - - def writeorg(self, data): - """ write a string to the original file descriptor - """ - tempfp = tempfile.TemporaryFile() - try: - os.dup2(self._savefd, tempfp.fileno()) - tempfp.write(data) - finally: - tempfp.close() + return "", "" def dupfile(f, mode=None, buffering=0, raising=False, encoding=None): @@ -408,185 +324,148 @@ class EncodedFile(object): return getattr(self._stream, name) -class Capture(object): - def reset(self): - """ reset sys.stdout/stderr and return captured output as strings. """ - if hasattr(self, '_reset'): - raise ValueError("was already reset") - self._reset = True - outfile, errfile = self.done(save=False) - out, err = "", "" - if outfile and not outfile.closed: - out = outfile.read() - outfile.close() - if errfile and errfile != outfile and not errfile.closed: - err = errfile.read() - errfile.close() - return out, err +class StdCaptureBase(object): + out = err = in_ = None - def suspend(self): - """ return current snapshot captures, memorize tempfiles. """ - outerr = self.readouterr() - outfile, errfile = self.done() - return outerr - - -class StdCaptureFD(Capture): - """ This class allows to capture writes to FD1 and FD2 - and may connect a NULL file to FD0 (and prevent - reads from sys.stdin). If any of the 0,1,2 file descriptors - is invalid it will not be captured. - """ - def __init__(self, out=True, err=True, in_=True, patchsys=True): - self._options = { - "out": out, - "err": err, - "in_": in_, - "patchsys": patchsys, - } - self._save() - - def _save(self): - in_ = self._options['in_'] - out = self._options['out'] - err = self._options['err'] - patchsys = self._options['patchsys'] + def __init__(self, out=True, err=True, in_=True, Capture=None): if in_: - try: - self.in_ = FDCapture( - 0, tmpfile=None, - patchsys=patchsys) - except OSError: - pass + self.in_ = Capture(0) if out: - tmpfile = None - if hasattr(out, 'write'): - tmpfile = out - try: - self.out = FDCapture( - 1, tmpfile=tmpfile, - patchsys=patchsys) - self._options['out'] = self.out.tmpfile - except OSError: - pass + self.out = Capture(1) if err: - if hasattr(err, 'write'): - tmpfile = err - else: - tmpfile = None - try: - self.err = FDCapture( - 2, tmpfile=tmpfile, - patchsys=patchsys) - self._options['err'] = self.err.tmpfile - except OSError: - pass + self.err = Capture(2) - def startall(self): - if hasattr(self, 'in_'): + def start_capturing(self): + if self.in_: self.in_.start() - if hasattr(self, 'out'): + if self.out: self.out.start() - if hasattr(self, 'err'): + if self.err: self.err.start() - def resume(self): - """ resume capturing with original temp files. """ - self.startall() + def pop_outerr_to_orig(self): + """ pop current snapshot out/err capture and flush to orig streams. """ + out, err = self.readouterr() + if out: + self.out.writeorg(out) + if err: + self.err.writeorg(err) - def done(self, save=True): - """ return (outfile, errfile) and stop capturing. """ - outfile = errfile = None - if hasattr(self, 'out') and not self.out.tmpfile.closed: - outfile = self.out.done() - if hasattr(self, 'err') and not self.err.tmpfile.closed: - errfile = self.err.done() - if hasattr(self, 'in_'): + def stop_capturing(self): + """ stop capturing and reset capturing streams """ + if hasattr(self, '_reset'): + raise ValueError("was already stopped") + self._reset = True + if self.out: + self.out.done() + if self.err: + self.err.done() + if self.in_: self.in_.done() - if save: - self._save() - return outfile, errfile def readouterr(self): - """ return snapshot value of stdout/stderr capturings. """ - out = self._readsnapshot('out') - err = self._readsnapshot('err') - return out, err + """ return snapshot unicode value of stdout/stderr capturings. """ + return self._readsnapshot('out'), self._readsnapshot('err') def _readsnapshot(self, name): - if hasattr(self, name): - f = getattr(self, name).tmpfile - else: - return '' + cap = getattr(self, name, None) + if cap is None: + return "" + return cap.snap() + +class FDCapture: + """ Capture IO to/from a given os-level filedescriptor. """ + + def __init__(self, targetfd, tmpfile=None): + self.targetfd = targetfd + try: + self._savefd = os.dup(self.targetfd) + except OSError: + self.start = lambda: None + self.done = lambda: None + else: + if tmpfile is None: + if targetfd == 0: + tmpfile = open(os.devnull, "r") + else: + f = TemporaryFile() + with f: + tmpfile = dupfile(f, encoding="UTF-8") + self.tmpfile = tmpfile + if targetfd in patchsysdict: + self._oldsys = getattr(sys, patchsysdict[targetfd]) + + def __repr__(self): + return "" % (self.targetfd, self._savefd) + + def start(self): + """ Start capturing on targetfd using memorized tmpfile. """ + try: + os.fstat(self._savefd) + except OSError: + raise ValueError("saved filedescriptor not valid anymore") + targetfd = self.targetfd + os.dup2(self.tmpfile.fileno(), targetfd) + if hasattr(self, '_oldsys'): + subst = self.tmpfile if targetfd != 0 else DontReadFromInput() + setattr(sys, patchsysdict[targetfd], subst) + + def snap(self): + f = self.tmpfile f.seek(0) res = f.read() - enc = getattr(f, "encoding", None) - if enc: - res = py.builtin._totext(res, enc, "replace") + if res: + enc = getattr(f, "encoding", None) + if enc and isinstance(res, bytes): + res = py.builtin._totext(res, enc, "replace") + f.truncate(0) + f.seek(0) + return res + + def done(self): + """ stop capturing, restore streams, return original capture file, + seeked to position zero. """ + os.dup2(self._savefd, self.targetfd) + os.close(self._savefd) + if hasattr(self, '_oldsys'): + setattr(sys, patchsysdict[self.targetfd], self._oldsys) + self.tmpfile.close() + + def writeorg(self, data): + """ write to original file descriptor. """ + if py.builtin._istext(data): + data = data.encode("utf8") # XXX use encoding of original stream + os.write(self._savefd, data) + + +class SysCapture: + def __init__(self, fd): + name = patchsysdict[fd] + self._old = getattr(sys, name) + self.name = name + if name == "stdin": + self.tmpfile = DontReadFromInput() + else: + self.tmpfile = TextIO() + + def start(self): + setattr(sys, self.name, self.tmpfile) + + def snap(self): + f = self.tmpfile + res = f.getvalue() f.truncate(0) f.seek(0) return res + def done(self): + setattr(sys, self.name, self._old) + self.tmpfile.close() -class StdCapture(Capture): - """ This class allows to capture writes to sys.stdout|stderr "in-memory" - and will raise errors on tries to read from sys.stdin. It only - modifies sys.stdout|stderr|stdin attributes and does not - touch underlying File Descriptors (use StdCaptureFD for that). - """ - def __init__(self, out=True, err=True, in_=True): - self._oldout = sys.stdout - self._olderr = sys.stderr - self._oldin = sys.stdin - if out and not hasattr(out, 'file'): - out = TextIO() - self.out = out - if err: - if not hasattr(err, 'write'): - err = TextIO() - self.err = err - self.in_ = in_ - - def startall(self): - if self.out: - sys.stdout = self.out - if self.err: - sys.stderr = self.err - if self.in_: - sys.stdin = self.in_ = DontReadFromInput() - - def done(self, save=True): - """ return (outfile, errfile) and stop capturing. """ - outfile = errfile = None - if self.out and not self.out.closed: - sys.stdout = self._oldout - outfile = self.out - outfile.seek(0) - if self.err and not self.err.closed: - sys.stderr = self._olderr - errfile = self.err - errfile.seek(0) - if self.in_: - sys.stdin = self._oldin - return outfile, errfile - - def resume(self): - """ resume capturing with original temp files. """ - self.startall() - - def readouterr(self): - """ return snapshot value of stdout/stderr capturings. """ - out = err = "" - if self.out: - out = self.out.getvalue() - self.out.truncate(0) - self.out.seek(0) - if self.err: - err = self.err.getvalue() - self.err.truncate(0) - self.err.seek(0) - return out, err + def writeorg(self, data): + self._old.write(data) + self._old.flush() class DontReadFromInput: diff --git a/_pytest/config.py b/_pytest/config.py index 397cb6fa7..eee4d085a 100644 --- a/_pytest/config.py +++ b/_pytest/config.py @@ -56,11 +56,15 @@ def _prepareconfig(args=None, plugins=None): raise ValueError("not a string or argument list: %r" % (args,)) args = py.std.shlex.split(args) pluginmanager = get_plugin_manager() - if plugins: - for plugin in plugins: - pluginmanager.register(plugin) - return pluginmanager.hook.pytest_cmdline_parse( - pluginmanager=pluginmanager, args=args) + try: + if plugins: + for plugin in plugins: + pluginmanager.register(plugin) + return pluginmanager.hook.pytest_cmdline_parse( + pluginmanager=pluginmanager, args=args) + except Exception: + pluginmanager.ensure_shutdown() + raise class PytestPluginManager(PluginManager): def __init__(self, hookspecs=[hookspec]): @@ -612,6 +616,9 @@ class Config(object): self.hook.pytest_logwarning(code=code, message=message, fslocation=None, nodeid=None) + def get_terminal_writer(self): + return self.pluginmanager.getplugin("terminalreporter")._tw + def pytest_cmdline_parse(self, pluginmanager, args): assert self == pluginmanager.config, (self, pluginmanager.config) self.parse(args) diff --git a/_pytest/core.py b/_pytest/core.py index 43d2801c0..50af3188d 100644 --- a/_pytest/core.py +++ b/_pytest/core.py @@ -240,18 +240,22 @@ class PluginManager(object): pass l = [] last = [] + wrappers = [] for plugin in plugins: try: meth = getattr(plugin, attrname) - if hasattr(meth, 'tryfirst'): - last.append(meth) - elif hasattr(meth, 'trylast'): - l.insert(0, meth) - else: - l.append(meth) except AttributeError: continue + if hasattr(meth, 'hookwrapper'): + wrappers.append(meth) + elif hasattr(meth, 'tryfirst'): + last.append(meth) + elif hasattr(meth, 'trylast'): + l.insert(0, meth) + else: + l.append(meth) l.extend(last) + l.extend(wrappers) self._listattrcache[key] = list(l) return l @@ -272,6 +276,14 @@ def importplugin(importspec): class MultiCall: """ execute a call into multiple python functions/methods. """ + + class WrongHookWrapper(Exception): + """ a hook wrapper does not behave correctly. """ + def __init__(self, func, message): + Exception.__init__(self, func, message) + self.func = func + self.message = message + def __init__(self, methods, kwargs, firstresult=False): self.methods = list(methods) self.kwargs = kwargs @@ -283,16 +295,39 @@ class MultiCall: return "" %(status, self.kwargs) def execute(self): - while self.methods: - method = self.methods.pop() - kwargs = self.getkwargs(method) - res = method(**kwargs) - if res is not None: - self.results.append(res) - if self.firstresult: - return res - if not self.firstresult: - return self.results + next_finalizers = [] + try: + while self.methods: + method = self.methods.pop() + kwargs = self.getkwargs(method) + if hasattr(method, "hookwrapper"): + it = method(**kwargs) + next = getattr(it, "next", None) + if next is None: + next = getattr(it, "__next__", None) + if next is None: + raise self.WrongHookWrapper(method, + "wrapper does not contain a yield") + res = next() + next_finalizers.append((method, next)) + else: + res = method(**kwargs) + if res is not None: + self.results.append(res) + if self.firstresult: + return res + if not self.firstresult: + return self.results + finally: + for method, fin in reversed(next_finalizers): + try: + fin() + except StopIteration: + pass + else: + raise self.WrongHookWrapper(method, + "wrapper contain more than one yield") + def getkwargs(self, method): kwargs = {} diff --git a/_pytest/genscript.py b/_pytest/genscript.py index 25e8e6a0f..b58798aec 100755 --- a/_pytest/genscript.py +++ b/_pytest/genscript.py @@ -60,6 +60,7 @@ def pytest_addoption(parser): def pytest_cmdline_main(config): genscript = config.getvalue("genscript") if genscript: + #tw = config.get_terminal_writer() tw = py.io.TerminalWriter() deps = ['py', '_pytest', 'pytest'] if sys.version_info < (2,7): diff --git a/_pytest/helpconfig.py b/_pytest/helpconfig.py index 9e31c21e7..f3cb10fe7 100644 --- a/_pytest/helpconfig.py +++ b/_pytest/helpconfig.py @@ -47,6 +47,8 @@ def pytest_unconfigure(config): def pytest_cmdline_main(config): if config.option.version: + capman = config.pluginmanager.getplugin("capturemanager") + capman.reset_capturings() p = py.path.local(pytest.__file__) sys.stderr.write("This is pytest version %s, imported from %s\n" % (pytest.__version__, p)) @@ -62,7 +64,7 @@ def pytest_cmdline_main(config): return 0 def showhelp(config): - tw = py.io.TerminalWriter() + tw = config.get_terminal_writer() tw.write(config._parser.optparser.format_help()) tw.line() tw.line() diff --git a/_pytest/junitxml.py b/_pytest/junitxml.py index 2d330870f..c87e7f34f 100644 --- a/_pytest/junitxml.py +++ b/_pytest/junitxml.py @@ -108,12 +108,14 @@ class LogXML(object): )) def _write_captured_output(self, report): - sec = dict(report.sections) - for name in ('out', 'err'): - content = sec.get("Captured std%s" % name) - if content: - tag = getattr(Junit, 'system-'+name) - self.append(tag(bin_xml_escape(content))) + for capname in ('out', 'err'): + allcontent = "" + for name, content in report.get_sections("Captured std%s" % + capname): + allcontent += content + if allcontent: + tag = getattr(Junit, 'system-'+capname) + self.append(tag(bin_xml_escape(allcontent))) def append(self, obj): self.tests[-1].append(obj) diff --git a/_pytest/main.py b/_pytest/main.py index de69d351b..b4cea1ca0 100644 --- a/_pytest/main.py +++ b/_pytest/main.py @@ -233,6 +233,7 @@ class Node(object): # used for storing artificial fixturedefs for direct parametrization self._name2pseudofixturedef = {} + #self.extrainit() @property @@ -465,6 +466,14 @@ class Item(Node): """ nextitem = None + def __init__(self, name, parent=None, config=None, session=None): + super(Item, self).__init__(name, parent, config, session) + self._report_sections = [] + + def add_report_section(self, when, key, content): + if content: + self._report_sections.append((when, key, content)) + def reportinfo(self): return self.fspath, None, "" diff --git a/_pytest/mark.py b/_pytest/mark.py index 6b66b1876..0d241625e 100644 --- a/_pytest/mark.py +++ b/_pytest/mark.py @@ -40,7 +40,7 @@ def pytest_addoption(parser): def pytest_cmdline_main(config): if config.option.markers: config.do_configure() - tw = py.io.TerminalWriter() + tw = config.get_terminal_writer() for line in config.getini("markers"): name, rest = line.split(":", 1) tw.write("@pytest.mark.%s:" % name, bold=True) diff --git a/_pytest/pdb.py b/_pytest/pdb.py index 6405773f8..33a892184 100644 --- a/_pytest/pdb.py +++ b/_pytest/pdb.py @@ -16,49 +16,36 @@ def pytest_configure(config): if config.getvalue("usepdb"): config.pluginmanager.register(PdbInvoke(), 'pdbinvoke') - old_trace = py.std.pdb.set_trace + old = (py.std.pdb.set_trace, pytestPDB._pluginmanager) def fin(): - py.std.pdb.set_trace = old_trace + py.std.pdb.set_trace, pytestPDB._pluginmanager = old py.std.pdb.set_trace = pytest.set_trace + pytestPDB._pluginmanager = config.pluginmanager config._cleanup.append(fin) class pytestPDB: """ Pseudo PDB that defers to the real pdb. """ - item = None - collector = None + _pluginmanager = None def set_trace(self): """ invoke PDB set_trace debugging, dropping any IO capturing. """ frame = sys._getframe().f_back - item = self.item or self.collector - - if item is not None: - capman = item.config.pluginmanager.getplugin("capturemanager") - out, err = capman.suspendcapture() - if hasattr(item, 'outerr'): - item.outerr = (item.outerr[0] + out, item.outerr[1] + err) + capman = None + if self._pluginmanager is not None: + capman = self._pluginmanager.getplugin("capturemanager") + if capman: + capman.reset_capturings() tw = py.io.TerminalWriter() tw.line() tw.sep(">", "PDB set_trace (IO-capturing turned off)") py.std.pdb.Pdb().set_trace(frame) -def pdbitem(item): - pytestPDB.item = item -pytest_runtest_setup = pytest_runtest_call = pytest_runtest_teardown = pdbitem - -@pytest.mark.tryfirst -def pytest_make_collect_report(__multicall__, collector): - try: - pytestPDB.collector = collector - return __multicall__.execute() - finally: - pytestPDB.collector = None - -def pytest_runtest_makereport(): - pytestPDB.item = None class PdbInvoke: def pytest_exception_interact(self, node, call, report): + capman = node.config.pluginmanager.getplugin("capturemanager") + if capman: + capman.reset_capturings() return _enter_pdb(node, call.excinfo, report) def pytest_internalerror(self, excrepr, excinfo): diff --git a/_pytest/python.py b/_pytest/python.py index 544055ec9..23a45f30d 100644 --- a/_pytest/python.py +++ b/_pytest/python.py @@ -885,7 +885,7 @@ def _showfixtures_main(config, session): nodeid = "::".join(map(str, [curdir.bestrelpath(part[0])] + part[1:])) nodeid.replace(session.fspath.sep, "/") - tw = py.io.TerminalWriter() + tw = config.get_terminal_writer() verbose = config.getvalue("verbose") fm = session._fixturemanager diff --git a/_pytest/runner.py b/_pytest/runner.py index 68719765b..8c82a3294 100644 --- a/_pytest/runner.py +++ b/_pytest/runner.py @@ -135,14 +135,13 @@ class CallInfo: self.when = when self.start = time() try: - try: - self.result = func() - except KeyboardInterrupt: - raise - except: - self.excinfo = py.code.ExceptionInfo() - finally: + self.result = func() + except KeyboardInterrupt: self.stop = time() + raise + except: + self.excinfo = py.code.ExceptionInfo() + self.stop = time() def __repr__(self): if self.excinfo: @@ -178,6 +177,11 @@ class BaseReport(object): except UnicodeEncodeError: out.line("") + def get_sections(self, prefix): + for name, content in self.sections: + if name.startswith(prefix): + yield prefix, content + passed = property(lambda x: x.outcome == "passed") failed = property(lambda x: x.outcome == "failed") skipped = property(lambda x: x.outcome == "skipped") @@ -191,6 +195,7 @@ def pytest_runtest_makereport(item, call): duration = call.stop-call.start keywords = dict([(x,1) for x in item.keywords]) excinfo = call.excinfo + sections = [] if not call.excinfo: outcome = "passed" longrepr = None @@ -209,16 +214,18 @@ def pytest_runtest_makereport(item, call): else: # exception in setup or teardown longrepr = item._repr_failure_py(excinfo, style=item.config.option.tbstyle) + for rwhen, key, content in item._report_sections: + sections.append(("Captured std%s %s" %(key, rwhen), content)) return TestReport(item.nodeid, item.location, keywords, outcome, longrepr, when, - duration=duration) + sections, duration) class TestReport(BaseReport): """ Basic test report object (also used for setup and teardown calls if they fail). """ - def __init__(self, nodeid, location, - keywords, outcome, longrepr, when, sections=(), duration=0, **extra): + def __init__(self, nodeid, location, keywords, outcome, + longrepr, when, sections=(), duration=0, **extra): #: normalized collection node id self.nodeid = nodeid @@ -286,7 +293,8 @@ def pytest_make_collect_report(collector): class CollectReport(BaseReport): - def __init__(self, nodeid, outcome, longrepr, result, sections=(), **extra): + def __init__(self, nodeid, outcome, longrepr, result, + sections=(), **extra): self.nodeid = nodeid self.outcome = outcome self.longrepr = longrepr diff --git a/_pytest/terminal.py b/_pytest/terminal.py index 9b64ba336..fb3e92517 100644 --- a/_pytest/terminal.py +++ b/_pytest/terminal.py @@ -36,7 +36,10 @@ def pytest_addoption(parser): def pytest_configure(config): config.option.verbose -= config.option.quiet - reporter = TerminalReporter(config, sys.stdout) + out = config.pluginmanager.getplugin("dupped_stdout") + #if out is None: + # out = sys.stdout + reporter = TerminalReporter(config, out) config.pluginmanager.register(reporter, 'terminalreporter') if config.option.debug or config.option.traceconfig: def mywriter(tags, args): @@ -44,6 +47,11 @@ def pytest_configure(config): reporter.write_line("[traceconfig] " + msg) config.trace.root.setprocessor("pytest:config", mywriter) +def get_terminal_writer(config): + tr = config.pluginmanager.getplugin("terminalreporter") + return tr._tw + + def getreportopt(config): reportopts = "" optvalue = config.option.report diff --git a/bench/bench.py b/bench/bench.py index bb95f86fc..c99bc3234 100644 --- a/bench/bench.py +++ b/bench/bench.py @@ -4,8 +4,8 @@ if __name__ == '__main__': import cProfile import pytest import pstats - script = sys.argv[1] if len(sys.argv) > 1 else "empty.py" - stats = cProfile.run('pytest.cmdline.main([%r])' % script, 'prof') + script = sys.argv[1:] if len(sys.argv) > 1 else "empty.py" + stats = cProfile.run('pytest.cmdline.main(%r)' % script, 'prof') p = pstats.Stats("prof") p.strip_dirs() p.sort_stats('cumulative') diff --git a/testing/test_capture.py b/testing/test_capture.py index fe28f56bb..f346ca160 100644 --- a/testing/test_capture.py +++ b/testing/test_capture.py @@ -4,6 +4,7 @@ from __future__ import with_statement import os import sys import py +import tempfile import pytest import contextlib @@ -44,6 +45,13 @@ def oswritebytes(fd, obj): +def StdCaptureFD(out=True, err=True, in_=True): + return capture.StdCaptureBase(out, err, in_, Capture=capture.FDCapture) + +def StdCapture(out=True, err=True, in_=True): + return capture.StdCaptureBase(out, err, in_, Capture=capture.SysCapture) + + class TestCaptureManager: def test_getmethod_default_no_fd(self, testdir, monkeypatch): config = testdir.parseconfig(testdir.tmpdir) @@ -75,7 +83,7 @@ class TestCaptureManager: @needsosdup @pytest.mark.parametrize("method", ['no', 'fd', 'sys']) def test_capturing_basic_api(self, method): - capouter = capture.StdCaptureFD() + capouter = StdCaptureFD() old = sys.stdout, sys.stderr, sys.stdin try: capman = CaptureManager() @@ -95,11 +103,11 @@ class TestCaptureManager: assert not out and not err capman.reset_capturings() finally: - capouter.reset() + capouter.stop_capturing() @needsosdup def test_juggle_capturings(self, testdir): - capouter = capture.StdCaptureFD() + capouter = StdCaptureFD() try: #config = testdir.parseconfig(testdir.tmpdir) capman = CaptureManager() @@ -119,7 +127,7 @@ class TestCaptureManager: finally: capman.reset_capturings() finally: - capouter.reset() + capouter.stop_capturing() @pytest.mark.parametrize("method", ['fd', 'sys']) @@ -282,9 +290,9 @@ class TestPerTestCapturing: "====* FAILURES *====", "____*____", "*test_capturing_outerr.py:8: ValueError", - "*--- Captured stdout ---*", + "*--- Captured stdout *call*", "1", - "*--- Captured stderr ---*", + "*--- Captured stderr *call*", "2", ]) @@ -688,17 +696,15 @@ class TestFDCapture: cap = capture.FDCapture(fd) data = tobytes("hello") os.write(fd, data) - f = cap.done() - s = f.read() - f.close() + s = cap.snap() + cap.done() assert not s cap = capture.FDCapture(fd) cap.start() os.write(fd, data) - f = cap.done() - s = f.read() + s = cap.snap() + cap.done() assert s == "hello" - f.close() def test_simple_many(self, tmpfile): for i in range(10): @@ -712,22 +718,21 @@ class TestFDCapture: def test_simple_fail_second_start(self, tmpfile): fd = tmpfile.fileno() cap = capture.FDCapture(fd) - f = cap.done() + cap.done() pytest.raises(ValueError, cap.start) - f.close() def test_stderr(self): - cap = capture.FDCapture(2, patchsys=True) + cap = capture.FDCapture(2) cap.start() print_("hello", file=sys.stderr) - f = cap.done() - s = f.read() + s = cap.snap() + cap.done() assert s == "hello\n" def test_stdin(self, tmpfile): tmpfile.write(tobytes("3")) tmpfile.seek(0) - cap = capture.FDCapture(0, tmpfile=tmpfile) + cap = capture.FDCapture(0, tmpfile) cap.start() # check with os.read() directly instead of raw_input(), because # sys.stdin itself may be redirected (as pytest now does by default) @@ -744,123 +749,121 @@ class TestFDCapture: cap.writeorg(data2) finally: tmpfile.close() - f = cap.done() - scap = f.read() + scap = cap.snap() + cap.done() assert scap == totext(data1) stmp = open(tmpfile.name, 'rb').read() assert stmp == data2 class TestStdCapture: + captureclass = staticmethod(StdCapture) + + @contextlib.contextmanager def getcapture(self, **kw): - cap = capture.StdCapture(**kw) - cap.startall() - return cap + cap = self.__class__.captureclass(**kw) + cap.start_capturing() + try: + yield cap + finally: + cap.stop_capturing() def test_capturing_done_simple(self): - cap = self.getcapture() - sys.stdout.write("hello") - sys.stderr.write("world") - outfile, errfile = cap.done() - s = outfile.read() - assert s == "hello" - s = errfile.read() - assert s == "world" + with self.getcapture() as cap: + sys.stdout.write("hello") + sys.stderr.write("world") + out, err = cap.readouterr() + assert out == "hello" + assert err == "world" def test_capturing_reset_simple(self): - cap = self.getcapture() - print("hello world") - sys.stderr.write("hello error\n") - out, err = cap.reset() + with self.getcapture() as cap: + print("hello world") + sys.stderr.write("hello error\n") + out, err = cap.readouterr() assert out == "hello world\n" assert err == "hello error\n" def test_capturing_readouterr(self): - cap = self.getcapture() - try: + with self.getcapture() as cap: print ("hello world") sys.stderr.write("hello error\n") out, err = cap.readouterr() assert out == "hello world\n" assert err == "hello error\n" sys.stderr.write("error2") - finally: - out, err = cap.reset() + out, err = cap.readouterr() assert err == "error2" def test_capturing_readouterr_unicode(self): - cap = self.getcapture() - try: + with self.getcapture() as cap: print ("hx\xc4\x85\xc4\x87") out, err = cap.readouterr() - finally: - cap.reset() assert out == py.builtin._totext("hx\xc4\x85\xc4\x87\n", "utf8") @pytest.mark.skipif('sys.version_info >= (3,)', reason='text output different for bytes on python3') def test_capturing_readouterr_decode_error_handling(self): - cap = self.getcapture() - # triggered a internal error in pytest - print('\xa6') - out, err = cap.readouterr() + with self.getcapture() as cap: + # triggered a internal error in pytest + print('\xa6') + out, err = cap.readouterr() assert out == py.builtin._totext('\ufffd\n', 'unicode-escape') def test_reset_twice_error(self): - cap = self.getcapture() - print ("hello") - out, err = cap.reset() - pytest.raises(ValueError, cap.reset) + with self.getcapture() as cap: + print ("hello") + out, err = cap.readouterr() + pytest.raises(ValueError, cap.stop_capturing) assert out == "hello\n" assert not err def test_capturing_modify_sysouterr_in_between(self): oldout = sys.stdout olderr = sys.stderr - cap = self.getcapture() - sys.stdout.write("hello") - sys.stderr.write("world") - sys.stdout = capture.TextIO() - sys.stderr = capture.TextIO() - print ("not seen") - sys.stderr.write("not seen\n") - out, err = cap.reset() + with self.getcapture() as cap: + sys.stdout.write("hello") + sys.stderr.write("world") + sys.stdout = capture.TextIO() + sys.stderr = capture.TextIO() + print ("not seen") + sys.stderr.write("not seen\n") + out, err = cap.readouterr() assert out == "hello" assert err == "world" assert sys.stdout == oldout assert sys.stderr == olderr def test_capturing_error_recursive(self): - cap1 = self.getcapture() - print ("cap1") - cap2 = self.getcapture() - print ("cap2") - out2, err2 = cap2.reset() - out1, err1 = cap1.reset() + with self.getcapture() as cap1: + print ("cap1") + with self.getcapture() as cap2: + print ("cap2") + out2, err2 = cap2.readouterr() + out1, err1 = cap1.readouterr() assert out1 == "cap1\n" assert out2 == "cap2\n" def test_just_out_capture(self): - cap = self.getcapture(out=True, err=False) - sys.stdout.write("hello") - sys.stderr.write("world") - out, err = cap.reset() + with self.getcapture(out=True, err=False) as cap: + sys.stdout.write("hello") + sys.stderr.write("world") + out, err = cap.readouterr() assert out == "hello" assert not err def test_just_err_capture(self): - cap = self.getcapture(out=False, err=True) - sys.stdout.write("hello") - sys.stderr.write("world") - out, err = cap.reset() + with self.getcapture(out=False, err=True) as cap: + sys.stdout.write("hello") + sys.stderr.write("world") + out, err = cap.readouterr() assert err == "world" assert not out def test_stdin_restored(self): old = sys.stdin - cap = self.getcapture(in_=True) - newstdin = sys.stdin - out, err = cap.reset() + with self.getcapture(in_=True) as cap: + newstdin = sys.stdin assert newstdin != sys.stdin assert sys.stdin is old @@ -868,68 +871,47 @@ class TestStdCapture: print ("XXX this test may well hang instead of crashing") print ("XXX which indicates an error in the underlying capturing") print ("XXX mechanisms") - cap = self.getcapture() - pytest.raises(IOError, "sys.stdin.read()") - out, err = cap.reset() - - def test_suspend_resume(self): - cap = self.getcapture(out=True, err=False, in_=False) - try: - print ("hello") - sys.stderr.write("error\n") - out, err = cap.suspend() - assert out == "hello\n" - assert not err - print ("in between") - sys.stderr.write("in between\n") - cap.resume() - print ("after") - sys.stderr.write("error_after\n") - finally: - out, err = cap.reset() - assert out == "after\n" - assert not err + with self.getcapture() as cap: + pytest.raises(IOError, "sys.stdin.read()") class TestStdCaptureFD(TestStdCapture): pytestmark = needsosdup + captureclass = staticmethod(StdCaptureFD) - def getcapture(self, **kw): - cap = capture.StdCaptureFD(**kw) - cap.startall() - return cap + def test_simple_only_fd(self, testdir): + testdir.makepyfile(""" + import os + def test_x(): + os.write(1, "hello\\n".encode("ascii")) + assert 0 + """) + result = testdir.runpytest() + result.stdout.fnmatch_lines(""" + *test_x* + *assert 0* + *Captured stdout* + """) def test_intermingling(self): - cap = self.getcapture() - oswritebytes(1, "1") - sys.stdout.write(str(2)) - sys.stdout.flush() - oswritebytes(1, "3") - oswritebytes(2, "a") - sys.stderr.write("b") - sys.stderr.flush() - oswritebytes(2, "c") - out, err = cap.reset() + with self.getcapture() as cap: + oswritebytes(1, "1") + sys.stdout.write(str(2)) + sys.stdout.flush() + oswritebytes(1, "3") + oswritebytes(2, "a") + sys.stderr.write("b") + sys.stderr.flush() + oswritebytes(2, "c") + out, err = cap.readouterr() assert out == "123" assert err == "abc" def test_many(self, capfd): with lsof_check(): for i in range(10): - cap = capture.StdCaptureFD() - cap.reset() - - -@needsosdup -def test_stdcapture_fd_tmpfile(tmpfile): - capfd = capture.StdCaptureFD(out=tmpfile) - try: - os.write(1, "hello".encode("ascii")) - os.write(2, "world".encode("ascii")) - outf, errf = capfd.done() - finally: - capfd.reset() - assert outf == tmpfile + cap = StdCaptureFD() + cap.stop_capturing() class TestStdCaptureFDinvalidFD: @@ -938,19 +920,22 @@ class TestStdCaptureFDinvalidFD: def test_stdcapture_fd_invalid_fd(self, testdir): testdir.makepyfile(""" import os - from _pytest.capture import StdCaptureFD + from _pytest import capture + def StdCaptureFD(out=True, err=True, in_=True): + return capture.StdCaptureBase(out, err, in_, + Capture=capture.FDCapture) def test_stdout(): os.close(1) cap = StdCaptureFD(out=True, err=False, in_=False) - cap.done() + cap.stop_capturing() def test_stderr(): os.close(2) cap = StdCaptureFD(out=False, err=True, in_=False) - cap.done() + cap.stop_capturing() def test_stdin(): os.close(0) cap = StdCaptureFD(out=False, err=False, in_=True) - cap.done() + cap.stop_capturing() """) result = testdir.runpytest("--capture=fd") assert result.ret == 0 @@ -958,27 +943,8 @@ class TestStdCaptureFDinvalidFD: def test_capture_not_started_but_reset(): - capsys = capture.StdCapture() - capsys.done() - capsys.done() - capsys.reset() - - -@needsosdup -def test_capture_no_sys(): - capsys = capture.StdCapture() - try: - cap = capture.StdCaptureFD(patchsys=False) - cap.startall() - sys.stdout.write("hello") - sys.stderr.write("world") - oswritebytes(1, "1") - oswritebytes(2, "2") - out, err = cap.reset() - assert out == "1" - assert err == "2" - finally: - capsys.reset() + capsys = StdCapture() + capsys.stop_capturing() @needsosdup @@ -986,19 +952,18 @@ def test_capture_no_sys(): def test_fdcapture_tmpfile_remains_the_same(tmpfile, use): if not use: tmpfile = True - cap = capture.StdCaptureFD(out=False, err=tmpfile) + cap = StdCaptureFD(out=False, err=tmpfile) try: - cap.startall() + cap.start_capturing() capfile = cap.err.tmpfile - cap.suspend() - cap.resume() + cap.readouterr() finally: - cap.reset() + cap.stop_capturing() capfile2 = cap.err.tmpfile assert capfile2 == capfile -@pytest.mark.parametrize('method', ['StdCapture', 'StdCaptureFD']) +@pytest.mark.parametrize('method', ['SysCapture', 'FDCapture']) def test_capturing_and_logging_fundamentals(testdir, method): if method == "StdCaptureFD" and not hasattr(os, 'dup'): pytest.skip("need os.dup") @@ -1007,23 +972,27 @@ def test_capturing_and_logging_fundamentals(testdir, method): import sys, os import py, logging from _pytest import capture - cap = capture.%s(out=False, in_=False) - cap.startall() + cap = capture.StdCaptureBase(out=False, in_=False, + Capture=capture.%s) + cap.start_capturing() logging.warn("hello1") - outerr = cap.suspend() + outerr = cap.readouterr() print ("suspend, captured %%s" %%(outerr,)) logging.warn("hello2") - cap.resume() + cap.pop_outerr_to_orig() logging.warn("hello3") - outerr = cap.suspend() + outerr = cap.readouterr() print ("suspend2, captured %%s" %% (outerr,)) """ % (method,)) result = testdir.runpython(p) - result.stdout.fnmatch_lines([ - "suspend, captured*hello1*", - "suspend2, captured*hello2*WARNING:root:hello3*", - ]) + result.stdout.fnmatch_lines(""" + suspend, captured*hello1* + suspend2, captured*WARNING:root:hello3* + """) + result.stderr.fnmatch_lines(""" + WARNING:root:hello2 + """) assert "atexit" not in result.stderr.str() diff --git a/testing/test_core.py b/testing/test_core.py index 7ec8d6519..e04720bb5 100644 --- a/testing/test_core.py +++ b/testing/test_core.py @@ -523,6 +523,95 @@ class TestMultiCall: res = MultiCall([m1, m2], {}).execute() assert res == [1] + def test_hookwrapper(self): + l = [] + def m1(): + l.append("m1 init") + yield None + l.append("m1 finish") + m1.hookwrapper = True + + def m2(): + l.append("m2") + return 2 + res = MultiCall([m2, m1], {}).execute() + assert res == [2] + assert l == ["m1 init", "m2", "m1 finish"] + l[:] = [] + res = MultiCall([m2, m1], {}, firstresult=True).execute() + assert res == 2 + assert l == ["m1 init", "m2", "m1 finish"] + + def test_hookwrapper_order(self): + l = [] + def m1(): + l.append("m1 init") + yield 1 + l.append("m1 finish") + m1.hookwrapper = True + + def m2(): + l.append("m2 init") + yield 2 + l.append("m2 finish") + m2.hookwrapper = True + res = MultiCall([m2, m1], {}).execute() + assert res == [1, 2] + assert l == ["m1 init", "m2 init", "m2 finish", "m1 finish"] + + def test_listattr_hookwrapper_ordering(self): + class P1: + @pytest.mark.hookwrapper + def m(self): + return 17 + + class P2: + def m(self): + return 23 + + class P3: + @pytest.mark.tryfirst + def m(self): + return 19 + + pluginmanager = PluginManager() + p1 = P1() + p2 = P2() + p3 = P3() + pluginmanager.register(p1) + pluginmanager.register(p2) + pluginmanager.register(p3) + methods = pluginmanager.listattr('m') + assert methods == [p2.m, p3.m, p1.m] + ## listattr keeps a cache and deleting + ## a function attribute requires clearing it + #pluginmanager._listattrcache.clear() + #del P1.m.__dict__['tryfirst'] + + def test_hookwrapper_not_yield(self): + def m1(): + pass + m1.hookwrapper = True + + mc = MultiCall([m1], {}) + with pytest.raises(mc.WrongHookWrapper) as ex: + mc.execute() + assert ex.value.func == m1 + assert ex.value.message + + def test_hookwrapper_too_many_yield(self): + def m1(): + yield 1 + yield 2 + m1.hookwrapper = True + + mc = MultiCall([m1], {}) + with pytest.raises(mc.WrongHookWrapper) as ex: + mc.execute() + assert ex.value.func == m1 + assert ex.value.message + + class TestHookRelay: def test_happypath(self): pm = PluginManager() diff --git a/testing/test_junitxml.py b/testing/test_junitxml.py index 05573ad68..965c444bf 100644 --- a/testing/test_junitxml.py +++ b/testing/test_junitxml.py @@ -478,10 +478,12 @@ def test_unicode_issue368(testdir): path = testdir.tmpdir.join("test.xml") log = LogXML(str(path), None) ustr = py.builtin._totext("ВНИ!", "utf-8") - class report: + from _pytest.runner import BaseReport + class Report(BaseReport): longrepr = ustr sections = [] nodeid = "something" + report = Report() # hopefully this is not too brittle ... log.pytest_sessionstart()