From 491239d9b27670d55cee21f9141a2bc91cf44f16 Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Sun, 12 Apr 2020 20:47:28 +0300 Subject: [PATCH 1/9] capture: remove some indirection in MultiCapture Removing this indirection enables some further clean ups. --- src/_pytest/capture.py | 22 +++++++++------------- src/_pytest/pytester.py | 5 ++--- testing/test_capture.py | 37 ++++++++++++++++++++++++++++--------- 3 files changed, 39 insertions(+), 25 deletions(-) diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index 323881151..201bcd962 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -71,13 +71,13 @@ def pytest_load_initial_conftests(early_config: Config): def _get_multicapture(method: "_CaptureMethod") -> "MultiCapture": if method == "fd": - return MultiCapture(out=True, err=True, Capture=FDCapture) + return MultiCapture(in_=FDCapture(0), out=FDCapture(1), err=FDCapture(2)) elif method == "sys": - return MultiCapture(out=True, err=True, Capture=SysCapture) + return MultiCapture(in_=SysCapture(0), out=SysCapture(1), err=SysCapture(2)) elif method == "no": - return MultiCapture(out=False, err=False, in_=False) + return MultiCapture(in_=None, out=None, err=None) elif method == "tee-sys": - return MultiCapture(out=True, err=True, in_=False, Capture=TeeSysCapture) + return MultiCapture(in_=None, out=TeeSysCapture(1), err=TeeSysCapture(2)) raise ValueError("unknown capturing method: {!r}".format(method)) @@ -354,7 +354,7 @@ class CaptureFixture: def _start(self): if self._capture is None: self._capture = MultiCapture( - out=True, err=True, in_=False, Capture=self.captureclass + in_=None, out=self.captureclass(1), err=self.captureclass(2), ) self._capture.start_capturing() @@ -418,17 +418,13 @@ CaptureResult = collections.namedtuple("CaptureResult", ["out", "err"]) class MultiCapture: - out = err = in_ = None _state = None _in_suspended = False - def __init__(self, out=True, err=True, in_=True, Capture=None): - if in_: - self.in_ = Capture(0) - if out: - self.out = Capture(1) - if err: - self.err = Capture(2) + def __init__(self, in_, out, err) -> None: + self.in_ = in_ + self.out = out + self.err = err def __repr__(self): return "".format( diff --git a/src/_pytest/pytester.py b/src/_pytest/pytester.py index 1da478736..9df86a22f 100644 --- a/src/_pytest/pytester.py +++ b/src/_pytest/pytester.py @@ -25,8 +25,7 @@ import py import pytest from _pytest._code import Source -from _pytest.capture import MultiCapture -from _pytest.capture import SysCapture +from _pytest.capture import _get_multicapture from _pytest.compat import TYPE_CHECKING from _pytest.config import _PluggyPlugin from _pytest.config import Config @@ -972,7 +971,7 @@ class Testdir: if syspathinsert: self.syspathinsert() now = time.time() - capture = MultiCapture(Capture=SysCapture) + capture = _get_multicapture("sys") capture.start_capturing() try: try: diff --git a/testing/test_capture.py b/testing/test_capture.py index 177d72ebc..fa4f523d9 100644 --- a/testing/test_capture.py +++ b/testing/test_capture.py @@ -19,16 +19,28 @@ from _pytest.config import ExitCode # pylib 1.4.20.dev2 (rev 13d9af95547e) -def StdCaptureFD(out=True, err=True, in_=True): - return capture.MultiCapture(out, err, in_, Capture=capture.FDCapture) +def StdCaptureFD(out: bool = True, err: bool = True, in_: bool = True) -> MultiCapture: + return capture.MultiCapture( + in_=capture.FDCapture(0) if in_ else None, + out=capture.FDCapture(1) if out else None, + err=capture.FDCapture(2) if err else None, + ) -def StdCapture(out=True, err=True, in_=True): - return capture.MultiCapture(out, err, in_, Capture=capture.SysCapture) +def StdCapture(out: bool = True, err: bool = True, in_: bool = True) -> MultiCapture: + return capture.MultiCapture( + in_=capture.SysCapture(0) if in_ else None, + out=capture.SysCapture(1) if out else None, + err=capture.SysCapture(2) if err else None, + ) -def TeeStdCapture(out=True, err=True, in_=True): - return capture.MultiCapture(out, err, in_, Capture=capture.TeeSysCapture) +def TeeStdCapture(out: bool = True, err: bool = True, in_: bool = True) -> MultiCapture: + return capture.MultiCapture( + in_=capture.TeeSysCapture(0) if in_ else None, + out=capture.TeeSysCapture(1) if out else None, + err=capture.TeeSysCapture(2) if err else None, + ) class TestCaptureManager: @@ -1154,7 +1166,11 @@ class TestStdCaptureFDinvalidFD: from _pytest import capture def StdCaptureFD(out=True, err=True, in_=True): - return capture.MultiCapture(out, err, in_, Capture=capture.FDCapture) + return capture.MultiCapture( + in_=capture.FDCapture(0) if in_ else None, + out=capture.FDCapture(1) if out else None, + err=capture.FDCapture(2) if err else None, + ) def test_stdout(): os.close(1) @@ -1284,8 +1300,11 @@ def test_capturing_and_logging_fundamentals(testdir, method): import sys, os import py, logging from _pytest import capture - cap = capture.MultiCapture(out=False, in_=False, - Capture=capture.%s) + cap = capture.MultiCapture( + in_=None, + out=None, + err=capture.%s(2), + ) cap.start_capturing() logging.warning("hello1") From 2695b41df3a0f69023c1736da45a9dbe02e8340c Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Sun, 12 Apr 2020 21:09:39 +0300 Subject: [PATCH 2/9] capture: inline _capturing_for_request to simplify the control flow With straight code, it is a little easier to understand, and simplify further. --- src/_pytest/capture.py | 75 +++++++++++++++++++----------------------- 1 file changed, 33 insertions(+), 42 deletions(-) diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index 201bcd962..3ff9455b0 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -9,14 +9,12 @@ import os import sys from io import UnsupportedOperation from tempfile import TemporaryFile -from typing import Generator from typing import Optional from typing import TextIO import pytest from _pytest.compat import TYPE_CHECKING from _pytest.config import Config -from _pytest.fixtures import FixtureRequest if TYPE_CHECKING: from typing_extensions import Literal @@ -150,35 +148,20 @@ class CaptureManager: def read_global_capture(self): return self._global_capturing.readouterr() - # Fixture Control (it's just forwarding, think about removing this later) + # Fixture Control - @contextlib.contextmanager - def _capturing_for_request( - self, request: FixtureRequest - ) -> Generator["CaptureFixture", None, None]: - """ - Context manager that creates a ``CaptureFixture`` instance for the - given ``request``, ensuring there is only a single one being requested - at the same time. - - This is used as a helper with ``capsys``, ``capfd`` etc. - """ + def set_fixture(self, capture_fixture: "CaptureFixture") -> None: if self._capture_fixture: - other_name = next( - k - for k, v in map_fixname_class.items() - if v is self._capture_fixture.captureclass - ) - raise request.raiseerror( + current_fixture = self._capture_fixture.request.fixturename + requested_fixture = capture_fixture.request.fixturename + capture_fixture.request.raiseerror( "cannot use {} and {} at the same time".format( - request.fixturename, other_name + requested_fixture, current_fixture ) ) - capture_class = map_fixname_class[request.fixturename] - self._capture_fixture = CaptureFixture(capture_class, request) - self.activate_fixture() - yield self._capture_fixture - self._capture_fixture.close() + self._capture_fixture = capture_fixture + + def unset_fixture(self) -> None: self._capture_fixture = None def activate_fixture(self): @@ -276,8 +259,12 @@ def capsys(request): ``out`` and ``err`` will be ``text`` objects. """ capman = request.config.pluginmanager.getplugin("capturemanager") - with capman._capturing_for_request(request) as fixture: - yield fixture + capture_fixture = CaptureFixture(SysCapture, request) + capman.set_fixture(capture_fixture) + capture_fixture._start() + yield capture_fixture + capture_fixture.close() + capman.unset_fixture() @pytest.fixture @@ -289,8 +276,12 @@ def capsysbinary(request): ``out`` and ``err`` will be ``bytes`` objects. """ capman = request.config.pluginmanager.getplugin("capturemanager") - with capman._capturing_for_request(request) as fixture: - yield fixture + capture_fixture = CaptureFixture(SysCaptureBinary, request) + capman.set_fixture(capture_fixture) + capture_fixture._start() + yield capture_fixture + capture_fixture.close() + capman.unset_fixture() @pytest.fixture @@ -302,8 +293,12 @@ def capfd(request): ``out`` and ``err`` will be ``text`` objects. """ capman = request.config.pluginmanager.getplugin("capturemanager") - with capman._capturing_for_request(request) as fixture: - yield fixture + capture_fixture = CaptureFixture(FDCapture, request) + capman.set_fixture(capture_fixture) + capture_fixture._start() + yield capture_fixture + capture_fixture.close() + capman.unset_fixture() @pytest.fixture @@ -315,8 +310,12 @@ def capfdbinary(request): ``out`` and ``err`` will be ``byte`` objects. """ capman = request.config.pluginmanager.getplugin("capturemanager") - with capman._capturing_for_request(request) as fixture: - yield fixture + capture_fixture = CaptureFixture(FDCaptureBinary, request) + capman.set_fixture(capture_fixture) + capture_fixture._start() + yield capture_fixture + capture_fixture.close() + capman.unset_fixture() class CaptureIO(io.TextIOWrapper): @@ -701,14 +700,6 @@ class TeeSysCapture(SysCapture): self.tmpfile = tmpfile -map_fixname_class = { - "capfd": FDCapture, - "capfdbinary": FDCaptureBinary, - "capsys": SysCapture, - "capsysbinary": SysCaptureBinary, -} - - class DontReadFromInput: encoding = None From 02c95ea624eded63b64611aee28a0b1e0bb4ad69 Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Wed, 15 Apr 2020 16:24:07 +0300 Subject: [PATCH 3/9] capture: remove unused FDCapture tmpfile argument --- src/_pytest/capture.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index 3ff9455b0..106b3fafb 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -506,7 +506,7 @@ class FDCaptureBinary: EMPTY_BUFFER = b"" _state = None - def __init__(self, targetfd, tmpfile=None): + def __init__(self, targetfd): self.targetfd = targetfd try: @@ -530,22 +530,19 @@ class FDCaptureBinary: self.targetfd_save = os.dup(targetfd) if targetfd == 0: - assert not tmpfile, "cannot set tmpfile with stdin" - tmpfile = open(os.devnull) + self.tmpfile = open(os.devnull) self.syscapture = SysCapture(targetfd) else: - if tmpfile is None: - tmpfile = EncodedFile( - TemporaryFile(buffering=0), - encoding="utf-8", - errors="replace", - write_through=True, - ) + self.tmpfile = EncodedFile( + TemporaryFile(buffering=0), + encoding="utf-8", + errors="replace", + write_through=True, + ) if targetfd in patchsysdict: - self.syscapture = SysCapture(targetfd, tmpfile) + self.syscapture = SysCapture(targetfd, self.tmpfile) else: self.syscapture = NoCapture() - self.tmpfile = tmpfile def __repr__(self): return "<{} {} oldfd={} _state={!r} tmpfile={!r}>".format( From ea3f44894f8d0f944f11b782dd722232a97092ca Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Thu, 16 Apr 2020 09:49:17 +0300 Subject: [PATCH 4/9] capture: replace TeeSysCapture with SysCapture(tee=True) This is more straightforward and does not require duplicating the initialization logic. --- src/_pytest/capture.py | 21 +++++---------------- testing/test_capture.py | 14 ++++++++------ 2 files changed, 13 insertions(+), 22 deletions(-) diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index 106b3fafb..1eb5e1b41 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -75,7 +75,9 @@ def _get_multicapture(method: "_CaptureMethod") -> "MultiCapture": elif method == "no": return MultiCapture(in_=None, out=None, err=None) elif method == "tee-sys": - return MultiCapture(in_=None, out=TeeSysCapture(1), err=TeeSysCapture(2)) + return MultiCapture( + in_=None, out=SysCapture(1, tee=True), err=SysCapture(2, tee=True) + ) raise ValueError("unknown capturing method: {!r}".format(method)) @@ -620,7 +622,7 @@ class SysCaptureBinary: EMPTY_BUFFER = b"" _state = None - def __init__(self, fd, tmpfile=None): + def __init__(self, fd, tmpfile=None, *, tee=False): name = patchsysdict[fd] self._old = getattr(sys, name) self.name = name @@ -628,7 +630,7 @@ class SysCaptureBinary: if name == "stdin": tmpfile = DontReadFromInput() else: - tmpfile = CaptureIO() + tmpfile = CaptureIO() if not tee else TeeCaptureIO(self._old) self.tmpfile = tmpfile def __repr__(self): @@ -684,19 +686,6 @@ class SysCapture(SysCaptureBinary): self._old.flush() -class TeeSysCapture(SysCapture): - def __init__(self, fd, tmpfile=None): - name = patchsysdict[fd] - self._old = getattr(sys, name) - self.name = name - if tmpfile is None: - if name == "stdin": - tmpfile = DontReadFromInput() - else: - tmpfile = TeeCaptureIO(self._old) - self.tmpfile = tmpfile - - class DontReadFromInput: encoding = None diff --git a/testing/test_capture.py b/testing/test_capture.py index fa4f523d9..5a0998da7 100644 --- a/testing/test_capture.py +++ b/testing/test_capture.py @@ -37,9 +37,9 @@ def StdCapture(out: bool = True, err: bool = True, in_: bool = True) -> MultiCap def TeeStdCapture(out: bool = True, err: bool = True, in_: bool = True) -> MultiCapture: return capture.MultiCapture( - in_=capture.TeeSysCapture(0) if in_ else None, - out=capture.TeeSysCapture(1) if out else None, - err=capture.TeeSysCapture(2) if err else None, + in_=capture.SysCapture(0, tee=True) if in_ else None, + out=capture.SysCapture(1, tee=True) if out else None, + err=capture.SysCapture(2, tee=True) if err else None, ) @@ -1292,8 +1292,10 @@ def test_close_and_capture_again(testdir): ) -@pytest.mark.parametrize("method", ["SysCapture", "FDCapture", "TeeSysCapture"]) -def test_capturing_and_logging_fundamentals(testdir, method): +@pytest.mark.parametrize( + "method", ["SysCapture(2)", "SysCapture(2, tee=True)", "FDCapture(2)"] +) +def test_capturing_and_logging_fundamentals(testdir, method: str) -> None: # here we check a fundamental feature p = testdir.makepyfile( """ @@ -1303,7 +1305,7 @@ def test_capturing_and_logging_fundamentals(testdir, method): cap = capture.MultiCapture( in_=None, out=None, - err=capture.%s(2), + err=capture.%s, ) cap.start_capturing() From 97bcf5a3a2fdabfdbc61bee275e97d09be728b08 Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Thu, 16 Apr 2020 11:29:13 +0300 Subject: [PATCH 5/9] capture: reorder file into sections and avoid forward references Make it easier to read the file in progression, and avoid forward references for upcoming type annotations. There is one cycle, CaptureManager <-> CaptureFixture, which is hard to untangle. (This commit should be added to `.gitblameignore`). --- src/_pytest/capture.py | 1018 ++++++++++++++++++++-------------------- 1 file changed, 521 insertions(+), 497 deletions(-) diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index 1eb5e1b41..7a5e854ef 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -21,8 +21,6 @@ if TYPE_CHECKING: _CaptureMethod = Literal["fd", "sys", "no", "tee-sys"] -patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"} - def pytest_addoption(parser): group = parser.getgroup("general") @@ -43,6 +41,105 @@ def pytest_addoption(parser): ) +def _colorama_workaround(): + """ + Ensure colorama is imported so that it attaches to the correct stdio + handles on Windows. + + colorama uses the terminal on import time. So if something does the + first import of colorama while I/O capture is active, colorama will + fail in various ways. + """ + if sys.platform.startswith("win32"): + try: + import colorama # noqa: F401 + except ImportError: + pass + + +def _readline_workaround(): + """ + Ensure readline is imported so that it attaches to the correct stdio + handles on Windows. + + Pdb uses readline support where available--when not running from the Python + prompt, the readline module is not imported until running the pdb REPL. If + running pytest with the --pdb option this means the readline module is not + imported until after I/O capture has been started. + + This is a problem for pyreadline, which is often used to implement readline + support on Windows, as it does not attach to the correct handles for stdout + and/or stdin if they have been redirected by the FDCapture mechanism. This + workaround ensures that readline is imported before I/O capture is setup so + that it can attach to the actual stdin/out for the console. + + See https://github.com/pytest-dev/pytest/pull/1281 + """ + if sys.platform.startswith("win32"): + try: + import readline # noqa: F401 + except ImportError: + pass + + +def _py36_windowsconsoleio_workaround(stream): + """ + Python 3.6 implemented unicode console handling for Windows. This works + by reading/writing to the raw console handle using + ``{Read,Write}ConsoleW``. + + The problem is that we are going to ``dup2`` over the stdio file + descriptors when doing ``FDCapture`` and this will ``CloseHandle`` the + handles used by Python to write to the console. Though there is still some + weirdness and the console handle seems to only be closed randomly and not + on the first call to ``CloseHandle``, or maybe it gets reopened with the + same handle value when we suspend capturing. + + The workaround in this case will reopen stdio with a different fd which + also means a different handle by replicating the logic in + "Py_lifecycle.c:initstdio/create_stdio". + + :param stream: in practice ``sys.stdout`` or ``sys.stderr``, but given + here as parameter for unittesting purposes. + + See https://github.com/pytest-dev/py/issues/103 + """ + if ( + not sys.platform.startswith("win32") + or sys.version_info[:2] < (3, 6) + or hasattr(sys, "pypy_version_info") + ): + return + + # bail out if ``stream`` doesn't seem like a proper ``io`` stream (#2666) + if not hasattr(stream, "buffer"): + return + + buffered = hasattr(stream.buffer, "raw") + raw_stdout = stream.buffer.raw if buffered else stream.buffer + + if not isinstance(raw_stdout, io._WindowsConsoleIO): + return + + def _reopen_stdio(f, mode): + if not buffered and mode[0] == "w": + buffering = 0 + else: + buffering = -1 + + return io.TextIOWrapper( + open(os.dup(f.fileno()), mode, buffering), + f.encoding, + f.errors, + f.newlines, + f.line_buffering, + ) + + sys.stdin = _reopen_stdio(sys.stdin, "rb") + sys.stdout = _reopen_stdio(sys.stdout, "wb") + sys.stderr = _reopen_stdio(sys.stderr, "wb") + + @pytest.hookimpl(hookwrapper=True) def pytest_load_initial_conftests(early_config: Config): ns = early_config.known_args_namespace @@ -67,7 +164,362 @@ def pytest_load_initial_conftests(early_config: Config): sys.stderr.write(err) -def _get_multicapture(method: "_CaptureMethod") -> "MultiCapture": +# IO Helpers. + + +class EncodedFile(io.TextIOWrapper): + __slots__ = () + + @property + def name(self) -> str: + # Ensure that file.name is a string. Workaround for a Python bug + # fixed in >=3.7.4: https://bugs.python.org/issue36015 + return repr(self.buffer) + + @property + def mode(self) -> str: + # TextIOWrapper doesn't expose a mode, but at least some of our + # tests check it. + return self.buffer.mode.replace("b", "") + + +class CaptureIO(io.TextIOWrapper): + def __init__(self) -> None: + super().__init__(io.BytesIO(), encoding="UTF-8", newline="", write_through=True) + + def getvalue(self) -> str: + assert isinstance(self.buffer, io.BytesIO) + return self.buffer.getvalue().decode("UTF-8") + + +class TeeCaptureIO(CaptureIO): + def __init__(self, other: TextIO) -> None: + self._other = other + super().__init__() + + def write(self, s) -> int: + super().write(s) + return self._other.write(s) + + +class DontReadFromInput: + encoding = None + + def read(self, *args): + raise OSError( + "pytest: reading from stdin while output is captured! Consider using `-s`." + ) + + readline = read + readlines = read + __next__ = read + + def __iter__(self): + return self + + def fileno(self): + raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()") + + def isatty(self): + return False + + def close(self): + pass + + @property + def buffer(self): + return self + + +# Capture classes. + + +patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"} + + +class NoCapture: + EMPTY_BUFFER = None + __init__ = start = done = suspend = resume = lambda *args: None + + +class SysCaptureBinary: + + EMPTY_BUFFER = b"" + _state = None + + def __init__(self, fd, tmpfile=None, *, tee=False): + name = patchsysdict[fd] + self._old = getattr(sys, name) + self.name = name + if tmpfile is None: + if name == "stdin": + tmpfile = DontReadFromInput() + else: + tmpfile = CaptureIO() if not tee else TeeCaptureIO(self._old) + self.tmpfile = tmpfile + + def repr(self, class_name: str) -> str: + return "<{} {} _old={} _state={!r} tmpfile={!r}>".format( + class_name, + self.name, + hasattr(self, "_old") and repr(self._old) or "", + self._state, + self.tmpfile, + ) + + def __repr__(self) -> str: + return "<{} {} _old={} _state={!r} tmpfile={!r}>".format( + self.__class__.__name__, + self.name, + hasattr(self, "_old") and repr(self._old) or "", + self._state, + self.tmpfile, + ) + + def start(self): + setattr(sys, self.name, self.tmpfile) + self._state = "started" + + def snap(self): + res = self.tmpfile.buffer.getvalue() + self.tmpfile.seek(0) + self.tmpfile.truncate() + return res + + def done(self): + setattr(sys, self.name, self._old) + del self._old + self.tmpfile.close() + self._state = "done" + + def suspend(self): + setattr(sys, self.name, self._old) + self._state = "suspended" + + def resume(self): + setattr(sys, self.name, self.tmpfile) + self._state = "resumed" + + def writeorg(self, data): + self._old.flush() + self._old.buffer.write(data) + self._old.buffer.flush() + + +class SysCapture(SysCaptureBinary): + EMPTY_BUFFER = "" # type: ignore[assignment] + + def snap(self): + res = self.tmpfile.getvalue() + self.tmpfile.seek(0) + self.tmpfile.truncate() + return res + + def writeorg(self, data): + self._old.write(data) + self._old.flush() + + +class FDCaptureBinary: + """Capture IO to/from a given os-level filedescriptor. + + snap() produces `bytes` + """ + + EMPTY_BUFFER = b"" + _state = None + + def __init__(self, targetfd): + self.targetfd = targetfd + + try: + os.fstat(targetfd) + except OSError: + # FD capturing is conceptually simple -- create a temporary file, + # redirect the FD to it, redirect back when done. But when the + # target FD is invalid it throws a wrench into this loveley scheme. + # + # Tests themselves shouldn't care if the FD is valid, FD capturing + # should work regardless of external circumstances. So falling back + # to just sys capturing is not a good option. + # + # Further complications are the need to support suspend() and the + # possibility of FD reuse (e.g. the tmpfile getting the very same + # target FD). The following approach is robust, I believe. + self.targetfd_invalid = os.open(os.devnull, os.O_RDWR) + os.dup2(self.targetfd_invalid, targetfd) + else: + self.targetfd_invalid = None + self.targetfd_save = os.dup(targetfd) + + if targetfd == 0: + self.tmpfile = open(os.devnull) + self.syscapture = SysCapture(targetfd) + else: + self.tmpfile = EncodedFile( + TemporaryFile(buffering=0), + encoding="utf-8", + errors="replace", + write_through=True, + ) + if targetfd in patchsysdict: + self.syscapture = SysCapture(targetfd, self.tmpfile) + else: + self.syscapture = NoCapture() + + def __repr__(self): + return "<{} {} oldfd={} _state={!r} tmpfile={!r}>".format( + self.__class__.__name__, + self.targetfd, + self.targetfd_save, + self._state, + self.tmpfile, + ) + + def start(self): + """ Start capturing on targetfd using memorized tmpfile. """ + os.dup2(self.tmpfile.fileno(), self.targetfd) + self.syscapture.start() + self._state = "started" + + def snap(self): + self.tmpfile.seek(0) + res = self.tmpfile.buffer.read() + self.tmpfile.seek(0) + self.tmpfile.truncate() + return res + + def done(self): + """ stop capturing, restore streams, return original capture file, + seeked to position zero. """ + os.dup2(self.targetfd_save, self.targetfd) + os.close(self.targetfd_save) + if self.targetfd_invalid is not None: + if self.targetfd_invalid != self.targetfd: + os.close(self.targetfd) + os.close(self.targetfd_invalid) + self.syscapture.done() + self.tmpfile.close() + self._state = "done" + + def suspend(self): + self.syscapture.suspend() + os.dup2(self.targetfd_save, self.targetfd) + self._state = "suspended" + + def resume(self): + self.syscapture.resume() + os.dup2(self.tmpfile.fileno(), self.targetfd) + self._state = "resumed" + + def writeorg(self, data): + """ write to original file descriptor. """ + os.write(self.targetfd_save, data) + + +class FDCapture(FDCaptureBinary): + """Capture IO to/from a given os-level filedescriptor. + + snap() produces text + """ + + # Ignore type because it doesn't match the type in the superclass (bytes). + EMPTY_BUFFER = "" # type: ignore + + def snap(self): + self.tmpfile.seek(0) + res = self.tmpfile.read() + self.tmpfile.seek(0) + self.tmpfile.truncate() + return res + + def writeorg(self, data): + """ write to original file descriptor. """ + super().writeorg(data.encode("utf-8")) # XXX use encoding of original stream + + +# MultiCapture + +CaptureResult = collections.namedtuple("CaptureResult", ["out", "err"]) + + +class MultiCapture: + _state = None + _in_suspended = False + + def __init__(self, in_, out, err) -> None: + self.in_ = in_ + self.out = out + self.err = err + + def __repr__(self): + return "".format( + self.out, self.err, self.in_, self._state, self._in_suspended, + ) + + def start_capturing(self): + self._state = "started" + if self.in_: + self.in_.start() + if self.out: + self.out.start() + if self.err: + self.err.start() + + def pop_outerr_to_orig(self): + """ pop current snapshot out/err capture and flush to orig streams. """ + out, err = self.readouterr() + if out: + self.out.writeorg(out) + if err: + self.err.writeorg(err) + return out, err + + def suspend_capturing(self, in_=False): + self._state = "suspended" + if self.out: + self.out.suspend() + if self.err: + self.err.suspend() + if in_ and self.in_: + self.in_.suspend() + self._in_suspended = True + + def resume_capturing(self): + self._state = "resumed" + if self.out: + self.out.resume() + if self.err: + self.err.resume() + if self._in_suspended: + self.in_.resume() + self._in_suspended = False + + def stop_capturing(self): + """ stop capturing and reset capturing streams """ + if self._state == "stopped": + raise ValueError("was already stopped") + self._state = "stopped" + if self.out: + self.out.done() + if self.err: + self.err.done() + if self.in_: + self.in_.done() + + def readouterr(self) -> CaptureResult: + if self.out: + out = self.out.snap() + else: + out = "" + if self.err: + err = self.err.snap() + else: + err = "" + return CaptureResult(out, err) + + +def _get_multicapture(method: "_CaptureMethod") -> MultiCapture: if method == "fd": return MultiCapture(in_=FDCapture(0), out=FDCapture(1), err=FDCapture(2)) elif method == "sys": @@ -81,6 +533,9 @@ def _get_multicapture(method: "_CaptureMethod") -> "MultiCapture": raise ValueError("unknown capturing method: {!r}".format(method)) +# CaptureManager and CaptureFixture + + class CaptureManager: """ Capture plugin, manages that the appropriate capture method is enabled/disabled during collection and each @@ -252,6 +707,69 @@ class CaptureManager: self.stop_global_capturing() +class CaptureFixture: + """ + Object returned by :py:func:`capsys`, :py:func:`capsysbinary`, :py:func:`capfd` and :py:func:`capfdbinary` + fixtures. + """ + + def __init__(self, captureclass, request): + self.captureclass = captureclass + self.request = request + self._capture = None + self._captured_out = self.captureclass.EMPTY_BUFFER + self._captured_err = self.captureclass.EMPTY_BUFFER + + def _start(self): + if self._capture is None: + self._capture = MultiCapture( + in_=None, out=self.captureclass(1), err=self.captureclass(2), + ) + self._capture.start_capturing() + + def close(self): + if self._capture is not None: + out, err = self._capture.pop_outerr_to_orig() + self._captured_out += out + self._captured_err += err + self._capture.stop_capturing() + self._capture = None + + def readouterr(self): + """Read and return the captured output so far, resetting the internal buffer. + + :return: captured content as a namedtuple with ``out`` and ``err`` string attributes + """ + captured_out, captured_err = self._captured_out, self._captured_err + if self._capture is not None: + out, err = self._capture.readouterr() + captured_out += out + captured_err += err + self._captured_out = self.captureclass.EMPTY_BUFFER + self._captured_err = self.captureclass.EMPTY_BUFFER + return CaptureResult(captured_out, captured_err) + + def _suspend(self): + """Suspends this fixture's own capturing temporarily.""" + if self._capture is not None: + self._capture.suspend_capturing() + + def _resume(self): + """Resumes this fixture's own capturing temporarily.""" + if self._capture is not None: + self._capture.resume_capturing() + + @contextlib.contextmanager + def disabled(self): + """Temporarily disables capture while inside the 'with' block.""" + capmanager = self.request.config.pluginmanager.getplugin("capturemanager") + with capmanager.global_and_fixture_disabled(): + yield + + +# The fixtures. + + @pytest.fixture def capsys(request): """Enable text capturing of writes to ``sys.stdout`` and ``sys.stderr``. @@ -318,497 +836,3 @@ def capfdbinary(request): yield capture_fixture capture_fixture.close() capman.unset_fixture() - - -class CaptureIO(io.TextIOWrapper): - def __init__(self) -> None: - super().__init__(io.BytesIO(), encoding="UTF-8", newline="", write_through=True) - - def getvalue(self) -> str: - assert isinstance(self.buffer, io.BytesIO) - return self.buffer.getvalue().decode("UTF-8") - - -class TeeCaptureIO(CaptureIO): - def __init__(self, other: TextIO) -> None: - self._other = other - super().__init__() - - def write(self, s: str) -> int: - super().write(s) - return self._other.write(s) - - -class CaptureFixture: - """ - Object returned by :py:func:`capsys`, :py:func:`capsysbinary`, :py:func:`capfd` and :py:func:`capfdbinary` - fixtures. - """ - - def __init__(self, captureclass, request): - self.captureclass = captureclass - self.request = request - self._capture = None - self._captured_out = self.captureclass.EMPTY_BUFFER - self._captured_err = self.captureclass.EMPTY_BUFFER - - def _start(self): - if self._capture is None: - self._capture = MultiCapture( - in_=None, out=self.captureclass(1), err=self.captureclass(2), - ) - self._capture.start_capturing() - - def close(self): - if self._capture is not None: - out, err = self._capture.pop_outerr_to_orig() - self._captured_out += out - self._captured_err += err - self._capture.stop_capturing() - self._capture = None - - def readouterr(self): - """Read and return the captured output so far, resetting the internal buffer. - - :return: captured content as a namedtuple with ``out`` and ``err`` string attributes - """ - captured_out, captured_err = self._captured_out, self._captured_err - if self._capture is not None: - out, err = self._capture.readouterr() - captured_out += out - captured_err += err - self._captured_out = self.captureclass.EMPTY_BUFFER - self._captured_err = self.captureclass.EMPTY_BUFFER - return CaptureResult(captured_out, captured_err) - - def _suspend(self): - """Suspends this fixture's own capturing temporarily.""" - if self._capture is not None: - self._capture.suspend_capturing() - - def _resume(self): - """Resumes this fixture's own capturing temporarily.""" - if self._capture is not None: - self._capture.resume_capturing() - - @contextlib.contextmanager - def disabled(self): - """Temporarily disables capture while inside the 'with' block.""" - capmanager = self.request.config.pluginmanager.getplugin("capturemanager") - with capmanager.global_and_fixture_disabled(): - yield - - -class EncodedFile(io.TextIOWrapper): - __slots__ = () - - @property - def name(self) -> str: - # Ensure that file.name is a string. Workaround for a Python bug - # fixed in >=3.7.4: https://bugs.python.org/issue36015 - return repr(self.buffer) - - @property - def mode(self) -> str: - # TextIOWrapper doesn't expose a mode, but at least some of our - # tests check it. - return self.buffer.mode.replace("b", "") - - -CaptureResult = collections.namedtuple("CaptureResult", ["out", "err"]) - - -class MultiCapture: - _state = None - _in_suspended = False - - def __init__(self, in_, out, err) -> None: - self.in_ = in_ - self.out = out - self.err = err - - def __repr__(self): - return "".format( - self.out, self.err, self.in_, self._state, self._in_suspended, - ) - - def start_capturing(self): - self._state = "started" - if self.in_: - self.in_.start() - if self.out: - self.out.start() - if self.err: - self.err.start() - - def pop_outerr_to_orig(self): - """ pop current snapshot out/err capture and flush to orig streams. """ - out, err = self.readouterr() - if out: - self.out.writeorg(out) - if err: - self.err.writeorg(err) - return out, err - - def suspend_capturing(self, in_=False): - self._state = "suspended" - if self.out: - self.out.suspend() - if self.err: - self.err.suspend() - if in_ and self.in_: - self.in_.suspend() - self._in_suspended = True - - def resume_capturing(self): - self._state = "resumed" - if self.out: - self.out.resume() - if self.err: - self.err.resume() - if self._in_suspended: - self.in_.resume() - self._in_suspended = False - - def stop_capturing(self): - """ stop capturing and reset capturing streams """ - if self._state == "stopped": - raise ValueError("was already stopped") - self._state = "stopped" - if self.out: - self.out.done() - if self.err: - self.err.done() - if self.in_: - self.in_.done() - - def readouterr(self) -> CaptureResult: - if self.out: - out = self.out.snap() - else: - out = "" - if self.err: - err = self.err.snap() - else: - err = "" - return CaptureResult(out, err) - - -class NoCapture: - EMPTY_BUFFER = None - __init__ = start = done = suspend = resume = lambda *args: None - - -class FDCaptureBinary: - """Capture IO to/from a given os-level filedescriptor. - - snap() produces `bytes` - """ - - EMPTY_BUFFER = b"" - _state = None - - def __init__(self, targetfd): - self.targetfd = targetfd - - try: - os.fstat(targetfd) - except OSError: - # FD capturing is conceptually simple -- create a temporary file, - # redirect the FD to it, redirect back when done. But when the - # target FD is invalid it throws a wrench into this loveley scheme. - # - # Tests themselves shouldn't care if the FD is valid, FD capturing - # should work regardless of external circumstances. So falling back - # to just sys capturing is not a good option. - # - # Further complications are the need to support suspend() and the - # possibility of FD reuse (e.g. the tmpfile getting the very same - # target FD). The following approach is robust, I believe. - self.targetfd_invalid = os.open(os.devnull, os.O_RDWR) - os.dup2(self.targetfd_invalid, targetfd) - else: - self.targetfd_invalid = None - self.targetfd_save = os.dup(targetfd) - - if targetfd == 0: - self.tmpfile = open(os.devnull) - self.syscapture = SysCapture(targetfd) - else: - self.tmpfile = EncodedFile( - TemporaryFile(buffering=0), - encoding="utf-8", - errors="replace", - write_through=True, - ) - if targetfd in patchsysdict: - self.syscapture = SysCapture(targetfd, self.tmpfile) - else: - self.syscapture = NoCapture() - - def __repr__(self): - return "<{} {} oldfd={} _state={!r} tmpfile={!r}>".format( - self.__class__.__name__, - self.targetfd, - self.targetfd_save, - self._state, - self.tmpfile, - ) - - def start(self): - """ Start capturing on targetfd using memorized tmpfile. """ - os.dup2(self.tmpfile.fileno(), self.targetfd) - self.syscapture.start() - self._state = "started" - - def snap(self): - self.tmpfile.seek(0) - res = self.tmpfile.buffer.read() - self.tmpfile.seek(0) - self.tmpfile.truncate() - return res - - def done(self): - """ stop capturing, restore streams, return original capture file, - seeked to position zero. """ - os.dup2(self.targetfd_save, self.targetfd) - os.close(self.targetfd_save) - if self.targetfd_invalid is not None: - if self.targetfd_invalid != self.targetfd: - os.close(self.targetfd) - os.close(self.targetfd_invalid) - self.syscapture.done() - self.tmpfile.close() - self._state = "done" - - def suspend(self): - self.syscapture.suspend() - os.dup2(self.targetfd_save, self.targetfd) - self._state = "suspended" - - def resume(self): - self.syscapture.resume() - os.dup2(self.tmpfile.fileno(), self.targetfd) - self._state = "resumed" - - def writeorg(self, data): - """ write to original file descriptor. """ - os.write(self.targetfd_save, data) - - -class FDCapture(FDCaptureBinary): - """Capture IO to/from a given os-level filedescriptor. - - snap() produces text - """ - - # Ignore type because it doesn't match the type in the superclass (bytes). - EMPTY_BUFFER = "" # type: ignore - - def snap(self): - self.tmpfile.seek(0) - res = self.tmpfile.read() - self.tmpfile.seek(0) - self.tmpfile.truncate() - return res - - def writeorg(self, data): - """ write to original file descriptor. """ - super().writeorg(data.encode("utf-8")) # XXX use encoding of original stream - - -class SysCaptureBinary: - - EMPTY_BUFFER = b"" - _state = None - - def __init__(self, fd, tmpfile=None, *, tee=False): - name = patchsysdict[fd] - self._old = getattr(sys, name) - self.name = name - if tmpfile is None: - if name == "stdin": - tmpfile = DontReadFromInput() - else: - tmpfile = CaptureIO() if not tee else TeeCaptureIO(self._old) - self.tmpfile = tmpfile - - def __repr__(self): - return "<{} {} _old={} _state={!r} tmpfile={!r}>".format( - self.__class__.__name__, - self.name, - hasattr(self, "_old") and repr(self._old) or "", - self._state, - self.tmpfile, - ) - - def start(self): - setattr(sys, self.name, self.tmpfile) - self._state = "started" - - def snap(self): - res = self.tmpfile.buffer.getvalue() - self.tmpfile.seek(0) - self.tmpfile.truncate() - return res - - def done(self): - setattr(sys, self.name, self._old) - del self._old - self.tmpfile.close() - self._state = "done" - - def suspend(self): - setattr(sys, self.name, self._old) - self._state = "suspended" - - def resume(self): - setattr(sys, self.name, self.tmpfile) - self._state = "resumed" - - def writeorg(self, data): - self._old.flush() - self._old.buffer.write(data) - self._old.buffer.flush() - - -class SysCapture(SysCaptureBinary): - EMPTY_BUFFER = "" # type: ignore[assignment] - - def snap(self): - res = self.tmpfile.getvalue() - self.tmpfile.seek(0) - self.tmpfile.truncate() - return res - - def writeorg(self, data): - self._old.write(data) - self._old.flush() - - -class DontReadFromInput: - encoding = None - - def read(self, *args): - raise OSError( - "pytest: reading from stdin while output is captured! Consider using `-s`." - ) - - readline = read - readlines = read - __next__ = read - - def __iter__(self): - return self - - def fileno(self): - raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()") - - def isatty(self): - return False - - def close(self): - pass - - @property - def buffer(self): - return self - - -def _colorama_workaround(): - """ - Ensure colorama is imported so that it attaches to the correct stdio - handles on Windows. - - colorama uses the terminal on import time. So if something does the - first import of colorama while I/O capture is active, colorama will - fail in various ways. - """ - if sys.platform.startswith("win32"): - try: - import colorama # noqa: F401 - except ImportError: - pass - - -def _readline_workaround(): - """ - Ensure readline is imported so that it attaches to the correct stdio - handles on Windows. - - Pdb uses readline support where available--when not running from the Python - prompt, the readline module is not imported until running the pdb REPL. If - running pytest with the --pdb option this means the readline module is not - imported until after I/O capture has been started. - - This is a problem for pyreadline, which is often used to implement readline - support on Windows, as it does not attach to the correct handles for stdout - and/or stdin if they have been redirected by the FDCapture mechanism. This - workaround ensures that readline is imported before I/O capture is setup so - that it can attach to the actual stdin/out for the console. - - See https://github.com/pytest-dev/pytest/pull/1281 - """ - if sys.platform.startswith("win32"): - try: - import readline # noqa: F401 - except ImportError: - pass - - -def _py36_windowsconsoleio_workaround(stream): - """ - Python 3.6 implemented unicode console handling for Windows. This works - by reading/writing to the raw console handle using - ``{Read,Write}ConsoleW``. - - The problem is that we are going to ``dup2`` over the stdio file - descriptors when doing ``FDCapture`` and this will ``CloseHandle`` the - handles used by Python to write to the console. Though there is still some - weirdness and the console handle seems to only be closed randomly and not - on the first call to ``CloseHandle``, or maybe it gets reopened with the - same handle value when we suspend capturing. - - The workaround in this case will reopen stdio with a different fd which - also means a different handle by replicating the logic in - "Py_lifecycle.c:initstdio/create_stdio". - - :param stream: in practice ``sys.stdout`` or ``sys.stderr``, but given - here as parameter for unittesting purposes. - - See https://github.com/pytest-dev/py/issues/103 - """ - if ( - not sys.platform.startswith("win32") - or sys.version_info[:2] < (3, 6) - or hasattr(sys, "pypy_version_info") - ): - return - - # bail out if ``stream`` doesn't seem like a proper ``io`` stream (#2666) - if not hasattr(stream, "buffer"): - return - - buffered = hasattr(stream.buffer, "raw") - raw_stdout = stream.buffer.raw if buffered else stream.buffer - - if not isinstance(raw_stdout, io._WindowsConsoleIO): - return - - def _reopen_stdio(f, mode): - if not buffered and mode[0] == "w": - buffering = 0 - else: - buffering = -1 - - return io.TextIOWrapper( - open(os.dup(f.fileno()), mode, buffering), - f.encoding, - f.errors, - f.newlines, - f.line_buffering, - ) - - sys.stdin = _reopen_stdio(sys.stdin, "rb") - sys.stdout = _reopen_stdio(sys.stdout, "wb") - sys.stderr = _reopen_stdio(sys.stderr, "wb") From fd3ba053cfab5d376b66a880f41834638e93fa0f Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Thu, 16 Apr 2020 20:59:27 +0300 Subject: [PATCH 6/9] capture: don't assume that the tmpfile is backed by a BytesIO Since tmpfile is a parameter to SysCapture, it shouldn't assume things unnecessarily, when there is an alternative. --- src/_pytest/capture.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index 7a5e854ef..a07892563 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -281,7 +281,8 @@ class SysCaptureBinary: self._state = "started" def snap(self): - res = self.tmpfile.buffer.getvalue() + self.tmpfile.seek(0) + res = self.tmpfile.buffer.read() self.tmpfile.seek(0) self.tmpfile.truncate() return res From a35800c2e1461d32dfd4322dc21d02e46c49c776 Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Fri, 17 Apr 2020 10:01:51 +0300 Subject: [PATCH 7/9] capture: formalize and check allowed state transition in capture classes There are state transitions start/done/suspend/resume and two additional operations snap/writeorg. Previously it was not well defined in what order they can be called, and which operations are idempotent. Formalize this and enforce using assert checks with informative error messages if they fail (rather than random AttributeErrors). --- src/_pytest/capture.py | 48 +++++++++++++++++++++++++++++++++++++---- testing/test_capture.py | 14 ++++++------ 2 files changed, 51 insertions(+), 11 deletions(-) diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index a07892563..32e83dd21 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -11,6 +11,7 @@ from io import UnsupportedOperation from tempfile import TemporaryFile from typing import Optional from typing import TextIO +from typing import Tuple import pytest from _pytest.compat import TYPE_CHECKING @@ -245,7 +246,6 @@ class NoCapture: class SysCaptureBinary: EMPTY_BUFFER = b"" - _state = None def __init__(self, fd, tmpfile=None, *, tee=False): name = patchsysdict[fd] @@ -257,6 +257,7 @@ class SysCaptureBinary: else: tmpfile = CaptureIO() if not tee else TeeCaptureIO(self._old) self.tmpfile = tmpfile + self._state = "initialized" def repr(self, class_name: str) -> str: return "<{} {} _old={} _state={!r} tmpfile={!r}>".format( @@ -276,11 +277,20 @@ class SysCaptureBinary: self.tmpfile, ) + def _assert_state(self, op: str, states: Tuple[str, ...]) -> None: + assert ( + self._state in states + ), "cannot {} in state {!r}: expected one of {}".format( + op, self._state, ", ".join(states) + ) + def start(self): + self._assert_state("start", ("initialized",)) setattr(sys, self.name, self.tmpfile) self._state = "started" def snap(self): + self._assert_state("snap", ("started", "suspended")) self.tmpfile.seek(0) res = self.tmpfile.buffer.read() self.tmpfile.seek(0) @@ -288,20 +298,28 @@ class SysCaptureBinary: return res def done(self): + self._assert_state("done", ("initialized", "started", "suspended", "done")) + if self._state == "done": + return setattr(sys, self.name, self._old) del self._old self.tmpfile.close() self._state = "done" def suspend(self): + self._assert_state("suspend", ("started", "suspended")) setattr(sys, self.name, self._old) self._state = "suspended" def resume(self): + self._assert_state("resume", ("started", "suspended")) + if self._state == "started": + return setattr(sys, self.name, self.tmpfile) - self._state = "resumed" + self._state = "started" def writeorg(self, data): + self._assert_state("writeorg", ("started", "suspended")) self._old.flush() self._old.buffer.write(data) self._old.buffer.flush() @@ -317,6 +335,7 @@ class SysCapture(SysCaptureBinary): return res def writeorg(self, data): + self._assert_state("writeorg", ("started", "suspended")) self._old.write(data) self._old.flush() @@ -328,7 +347,6 @@ class FDCaptureBinary: """ EMPTY_BUFFER = b"" - _state = None def __init__(self, targetfd): self.targetfd = targetfd @@ -368,6 +386,8 @@ class FDCaptureBinary: else: self.syscapture = NoCapture() + self._state = "initialized" + def __repr__(self): return "<{} {} oldfd={} _state={!r} tmpfile={!r}>".format( self.__class__.__name__, @@ -377,13 +397,22 @@ class FDCaptureBinary: self.tmpfile, ) + def _assert_state(self, op: str, states: Tuple[str, ...]) -> None: + assert ( + self._state in states + ), "cannot {} in state {!r}: expected one of {}".format( + op, self._state, ", ".join(states) + ) + def start(self): """ Start capturing on targetfd using memorized tmpfile. """ + self._assert_state("start", ("initialized",)) os.dup2(self.tmpfile.fileno(), self.targetfd) self.syscapture.start() self._state = "started" def snap(self): + self._assert_state("snap", ("started", "suspended")) self.tmpfile.seek(0) res = self.tmpfile.buffer.read() self.tmpfile.seek(0) @@ -393,6 +422,9 @@ class FDCaptureBinary: def done(self): """ stop capturing, restore streams, return original capture file, seeked to position zero. """ + self._assert_state("done", ("initialized", "started", "suspended", "done")) + if self._state == "done": + return os.dup2(self.targetfd_save, self.targetfd) os.close(self.targetfd_save) if self.targetfd_invalid is not None: @@ -404,17 +436,24 @@ class FDCaptureBinary: self._state = "done" def suspend(self): + self._assert_state("suspend", ("started", "suspended")) + if self._state == "suspended": + return self.syscapture.suspend() os.dup2(self.targetfd_save, self.targetfd) self._state = "suspended" def resume(self): + self._assert_state("resume", ("started", "suspended")) + if self._state == "started": + return self.syscapture.resume() os.dup2(self.tmpfile.fileno(), self.targetfd) - self._state = "resumed" + self._state = "started" def writeorg(self, data): """ write to original file descriptor. """ + self._assert_state("writeorg", ("started", "suspended")) os.write(self.targetfd_save, data) @@ -428,6 +467,7 @@ class FDCapture(FDCaptureBinary): EMPTY_BUFFER = "" # type: ignore def snap(self): + self._assert_state("snap", ("started", "suspended")) self.tmpfile.seek(0) res = self.tmpfile.read() self.tmpfile.seek(0) diff --git a/testing/test_capture.py b/testing/test_capture.py index 5a0998da7..95f2d748a 100644 --- a/testing/test_capture.py +++ b/testing/test_capture.py @@ -878,9 +878,8 @@ class TestFDCapture: cap = capture.FDCapture(fd) data = b"hello" os.write(fd, data) - s = cap.snap() + pytest.raises(AssertionError, cap.snap) cap.done() - assert not s cap = capture.FDCapture(fd) cap.start() os.write(fd, data) @@ -901,7 +900,7 @@ class TestFDCapture: fd = tmpfile.fileno() cap = capture.FDCapture(fd) cap.done() - pytest.raises(ValueError, cap.start) + pytest.raises(AssertionError, cap.start) def test_stderr(self): cap = capture.FDCapture(2) @@ -952,7 +951,7 @@ class TestFDCapture: assert s == "but now yes\n" cap.suspend() cap.done() - pytest.raises(AttributeError, cap.suspend) + pytest.raises(AssertionError, cap.suspend) assert repr(cap) == ( "".format( @@ -1154,6 +1153,7 @@ class TestStdCaptureFD(TestStdCapture): with lsof_check(): for i in range(10): cap = StdCaptureFD() + cap.start_capturing() cap.stop_capturing() @@ -1175,7 +1175,7 @@ class TestStdCaptureFDinvalidFD: def test_stdout(): os.close(1) cap = StdCaptureFD(out=True, err=False, in_=False) - assert fnmatch(repr(cap.out), "") + assert fnmatch(repr(cap.out), "") cap.start_capturing() os.write(1, b"stdout") assert cap.readouterr() == ("stdout", "") @@ -1184,7 +1184,7 @@ class TestStdCaptureFDinvalidFD: def test_stderr(): os.close(2) cap = StdCaptureFD(out=False, err=True, in_=False) - assert fnmatch(repr(cap.err), "") + assert fnmatch(repr(cap.err), "") cap.start_capturing() os.write(2, b"stderr") assert cap.readouterr() == ("", "stderr") @@ -1193,7 +1193,7 @@ class TestStdCaptureFDinvalidFD: def test_stdin(): os.close(0) cap = StdCaptureFD(out=False, err=False, in_=True) - assert fnmatch(repr(cap.in_), "") + assert fnmatch(repr(cap.in_), "") cap.stop_capturing() """ ) From 7a704288df8ed2d416c6dd8b15df3664ad425a5a Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Fri, 17 Apr 2020 22:52:09 +0300 Subject: [PATCH 8/9] capture: remove unneeded getattr This attribute is set in __init__ and not deleted. Other methods do it already but this one wasn't updated. --- src/_pytest/capture.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index 32e83dd21..64f4b8b92 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -630,9 +630,8 @@ class CaptureManager: self._global_capturing.resume_capturing() def suspend_global_capture(self, in_=False): - cap = getattr(self, "_global_capturing", None) - if cap is not None: - cap.suspend_capturing(in_=in_) + if self._global_capturing is not None: + self._global_capturing.suspend_capturing(in_=in_) def suspend(self, in_=False): # Need to undo local capsys-et-al if it exists before disabling global capture. From f93e021bc87c17528e0c1aaebceea178b22b7470 Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Wed, 27 May 2020 13:04:56 +0300 Subject: [PATCH 9/9] capture: remove some unclear parametrization from a test The two cases end up doing the same (the tmpfile fixture isn't used except being truthy). --- testing/test_capture.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/testing/test_capture.py b/testing/test_capture.py index 95f2d748a..1301a0e69 100644 --- a/testing/test_capture.py +++ b/testing/test_capture.py @@ -1255,11 +1255,8 @@ def test_capsys_results_accessible_by_attribute(capsys): assert capture_result.err == "eggs" -@pytest.mark.parametrize("use", [True, False]) -def test_fdcapture_tmpfile_remains_the_same(tmpfile, use): - if not use: - tmpfile = True - cap = StdCaptureFD(out=False, err=tmpfile) +def test_fdcapture_tmpfile_remains_the_same() -> None: + cap = StdCaptureFD(out=False, err=True) try: cap.start_capturing() capfile = cap.err.tmpfile