diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index 13931ca10..bcc16ceb6 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -9,9 +9,11 @@ import os import sys from io import UnsupportedOperation from tempfile import TemporaryFile +from typing import Generator from typing import Optional from typing import TextIO from typing import Tuple +from typing import Union import pytest from _pytest.compat import TYPE_CHECKING @@ -46,7 +48,7 @@ def pytest_addoption(parser: Parser) -> None: ) -def _colorama_workaround(): +def _colorama_workaround() -> None: """ Ensure colorama is imported so that it attaches to the correct stdio handles on Windows. @@ -62,7 +64,7 @@ def _colorama_workaround(): pass -def _readline_workaround(): +def _readline_workaround() -> None: """ Ensure readline is imported so that it attaches to the correct stdio handles on Windows. @@ -87,7 +89,7 @@ def _readline_workaround(): pass -def _py36_windowsconsoleio_workaround(stream): +def _py36_windowsconsoleio_workaround(stream: TextIO) -> None: """ Python 3.6 implemented unicode console handling for Windows. This works by reading/writing to the raw console handle using @@ -202,7 +204,7 @@ class TeeCaptureIO(CaptureIO): self._other = other super().__init__() - def write(self, s) -> int: + def write(self, s: str) -> int: super().write(s) return self._other.write(s) @@ -222,13 +224,13 @@ class DontReadFromInput: def __iter__(self): return self - def fileno(self): + def fileno(self) -> int: raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()") - def isatty(self): + def isatty(self) -> bool: return False - def close(self): + def close(self) -> None: pass @property @@ -251,7 +253,7 @@ class SysCaptureBinary: EMPTY_BUFFER = b"" - def __init__(self, fd, tmpfile=None, *, tee=False): + def __init__(self, fd: int, tmpfile=None, *, tee: bool = False) -> None: name = patchsysdict[fd] self._old = getattr(sys, name) self.name = name @@ -288,7 +290,7 @@ class SysCaptureBinary: op, self._state, ", ".join(states) ) - def start(self): + def start(self) -> None: self._assert_state("start", ("initialized",)) setattr(sys, self.name, self.tmpfile) self._state = "started" @@ -301,7 +303,7 @@ class SysCaptureBinary: self.tmpfile.truncate() return res - def done(self): + def done(self) -> None: self._assert_state("done", ("initialized", "started", "suspended", "done")) if self._state == "done": return @@ -310,19 +312,19 @@ class SysCaptureBinary: self.tmpfile.close() self._state = "done" - def suspend(self): + def suspend(self) -> None: self._assert_state("suspend", ("started", "suspended")) setattr(sys, self.name, self._old) self._state = "suspended" - def resume(self): + def resume(self) -> None: self._assert_state("resume", ("started", "suspended")) if self._state == "started": return setattr(sys, self.name, self.tmpfile) self._state = "started" - def writeorg(self, data): + def writeorg(self, data) -> None: self._assert_state("writeorg", ("started", "suspended")) self._old.flush() self._old.buffer.write(data) @@ -352,7 +354,7 @@ class FDCaptureBinary: EMPTY_BUFFER = b"" - def __init__(self, targetfd): + def __init__(self, targetfd: int) -> None: self.targetfd = targetfd try: @@ -369,7 +371,9 @@ class FDCaptureBinary: # Further complications are the need to support suspend() and the # possibility of FD reuse (e.g. the tmpfile getting the very same # target FD). The following approach is robust, I believe. - self.targetfd_invalid = os.open(os.devnull, os.O_RDWR) + self.targetfd_invalid = os.open( + os.devnull, os.O_RDWR + ) # type: Optional[int] os.dup2(self.targetfd_invalid, targetfd) else: self.targetfd_invalid = None @@ -380,7 +384,8 @@ class FDCaptureBinary: self.syscapture = SysCapture(targetfd) else: self.tmpfile = EncodedFile( - TemporaryFile(buffering=0), + # TODO: Remove type ignore, fixed in next mypy release. + TemporaryFile(buffering=0), # type: ignore[arg-type] encoding="utf-8", errors="replace", write_through=True, @@ -392,7 +397,7 @@ class FDCaptureBinary: self._state = "initialized" - def __repr__(self): + def __repr__(self) -> str: return "<{} {} oldfd={} _state={!r} tmpfile={!r}>".format( self.__class__.__name__, self.targetfd, @@ -408,7 +413,7 @@ class FDCaptureBinary: op, self._state, ", ".join(states) ) - def start(self): + def start(self) -> None: """ Start capturing on targetfd using memorized tmpfile. """ self._assert_state("start", ("initialized",)) os.dup2(self.tmpfile.fileno(), self.targetfd) @@ -423,7 +428,7 @@ class FDCaptureBinary: self.tmpfile.truncate() return res - def done(self): + def done(self) -> None: """ stop capturing, restore streams, return original capture file, seeked to position zero. """ self._assert_state("done", ("initialized", "started", "suspended", "done")) @@ -439,7 +444,7 @@ class FDCaptureBinary: self.tmpfile.close() self._state = "done" - def suspend(self): + def suspend(self) -> None: self._assert_state("suspend", ("started", "suspended")) if self._state == "suspended": return @@ -447,7 +452,7 @@ class FDCaptureBinary: os.dup2(self.targetfd_save, self.targetfd) self._state = "suspended" - def resume(self): + def resume(self) -> None: self._assert_state("resume", ("started", "suspended")) if self._state == "started": return @@ -497,12 +502,12 @@ class MultiCapture: self.out = out self.err = err - def __repr__(self): + def __repr__(self) -> str: return "".format( self.out, self.err, self.in_, self._state, self._in_suspended, ) - def start_capturing(self): + def start_capturing(self) -> None: self._state = "started" if self.in_: self.in_.start() @@ -520,7 +525,7 @@ class MultiCapture: self.err.writeorg(err) return out, err - def suspend_capturing(self, in_=False): + def suspend_capturing(self, in_: bool = False) -> None: self._state = "suspended" if self.out: self.out.suspend() @@ -530,7 +535,7 @@ class MultiCapture: self.in_.suspend() self._in_suspended = True - def resume_capturing(self): + def resume_capturing(self) -> None: self._state = "resumed" if self.out: self.out.resume() @@ -540,7 +545,7 @@ class MultiCapture: self.in_.resume() self._in_suspended = False - def stop_capturing(self): + def stop_capturing(self) -> None: """ stop capturing and reset capturing streams """ if self._state == "stopped": raise ValueError("was already stopped") @@ -596,15 +601,15 @@ class CaptureManager: def __init__(self, method: "_CaptureMethod") -> None: self._method = method - self._global_capturing = None + self._global_capturing = None # type: Optional[MultiCapture] self._capture_fixture = None # type: Optional[CaptureFixture] - def __repr__(self): + def __repr__(self) -> str: return "".format( self._method, self._global_capturing, self._capture_fixture ) - def is_capturing(self): + def is_capturing(self) -> Union[str, bool]: if self.is_globally_capturing(): return "global" if self._capture_fixture: @@ -613,40 +618,41 @@ class CaptureManager: # Global capturing control - def is_globally_capturing(self): + def is_globally_capturing(self) -> bool: return self._method != "no" - def start_global_capturing(self): + def start_global_capturing(self) -> None: assert self._global_capturing is None self._global_capturing = _get_multicapture(self._method) self._global_capturing.start_capturing() - def stop_global_capturing(self): + def stop_global_capturing(self) -> None: if self._global_capturing is not None: self._global_capturing.pop_outerr_to_orig() self._global_capturing.stop_capturing() self._global_capturing = None - def resume_global_capture(self): + def resume_global_capture(self) -> None: # During teardown of the python process, and on rare occasions, capture # attributes can be `None` while trying to resume global capture. if self._global_capturing is not None: self._global_capturing.resume_capturing() - def suspend_global_capture(self, in_=False): + def suspend_global_capture(self, in_: bool = False) -> None: if self._global_capturing is not None: self._global_capturing.suspend_capturing(in_=in_) - def suspend(self, in_=False): + def suspend(self, in_: bool = False) -> None: # Need to undo local capsys-et-al if it exists before disabling global capture. self.suspend_fixture() self.suspend_global_capture(in_) - def resume(self): + def resume(self) -> None: self.resume_global_capture() self.resume_fixture() def read_global_capture(self): + assert self._global_capturing is not None return self._global_capturing.readouterr() # Fixture Control @@ -665,30 +671,30 @@ class CaptureManager: def unset_fixture(self) -> None: self._capture_fixture = None - def activate_fixture(self): + def activate_fixture(self) -> None: """If the current item is using ``capsys`` or ``capfd``, activate them so they take precedence over the global capture. """ if self._capture_fixture: self._capture_fixture._start() - def deactivate_fixture(self): + def deactivate_fixture(self) -> None: """Deactivates the ``capsys`` or ``capfd`` fixture of this item, if any.""" if self._capture_fixture: self._capture_fixture.close() - def suspend_fixture(self): + def suspend_fixture(self) -> None: if self._capture_fixture: self._capture_fixture._suspend() - def resume_fixture(self): + def resume_fixture(self) -> None: if self._capture_fixture: self._capture_fixture._resume() # Helper context managers @contextlib.contextmanager - def global_and_fixture_disabled(self): + def global_and_fixture_disabled(self) -> Generator[None, None, None]: """Context manager to temporarily disable global and current fixture capturing.""" self.suspend() try: @@ -697,7 +703,7 @@ class CaptureManager: self.resume() @contextlib.contextmanager - def item_capture(self, when, item): + def item_capture(self, when: str, item: Item) -> Generator[None, None, None]: self.resume_global_capture() self.activate_fixture() try: @@ -757,21 +763,21 @@ class CaptureFixture: fixtures. """ - def __init__(self, captureclass, request): + def __init__(self, captureclass, request: SubRequest) -> None: self.captureclass = captureclass self.request = request - self._capture = None + self._capture = None # type: Optional[MultiCapture] self._captured_out = self.captureclass.EMPTY_BUFFER self._captured_err = self.captureclass.EMPTY_BUFFER - def _start(self): + def _start(self) -> None: if self._capture is None: self._capture = MultiCapture( in_=None, out=self.captureclass(1), err=self.captureclass(2), ) self._capture.start_capturing() - def close(self): + def close(self) -> None: if self._capture is not None: out, err = self._capture.pop_outerr_to_orig() self._captured_out += out @@ -793,18 +799,18 @@ class CaptureFixture: self._captured_err = self.captureclass.EMPTY_BUFFER return CaptureResult(captured_out, captured_err) - def _suspend(self): + def _suspend(self) -> None: """Suspends this fixture's own capturing temporarily.""" if self._capture is not None: self._capture.suspend_capturing() - def _resume(self): + def _resume(self) -> None: """Resumes this fixture's own capturing temporarily.""" if self._capture is not None: self._capture.resume_capturing() @contextlib.contextmanager - def disabled(self): + def disabled(self) -> Generator[None, None, None]: """Temporarily disables capture while inside the 'with' block.""" capmanager = self.request.config.pluginmanager.getplugin("capturemanager") with capmanager.global_and_fixture_disabled(): @@ -815,7 +821,7 @@ class CaptureFixture: @pytest.fixture -def capsys(request): +def capsys(request: SubRequest): """Enable text capturing of writes to ``sys.stdout`` and ``sys.stderr``. The captured output is made available via ``capsys.readouterr()`` method @@ -832,7 +838,7 @@ def capsys(request): @pytest.fixture -def capsysbinary(request): +def capsysbinary(request: SubRequest): """Enable bytes capturing of writes to ``sys.stdout`` and ``sys.stderr``. The captured output is made available via ``capsysbinary.readouterr()`` @@ -849,7 +855,7 @@ def capsysbinary(request): @pytest.fixture -def capfd(request): +def capfd(request: SubRequest): """Enable text capturing of writes to file descriptors ``1`` and ``2``. The captured output is made available via ``capfd.readouterr()`` method @@ -866,7 +872,7 @@ def capfd(request): @pytest.fixture -def capfdbinary(request): +def capfdbinary(request: SubRequest): """Enable bytes capturing of writes to file descriptors ``1`` and ``2``. The captured output is made available via ``capfd.readouterr()`` method