Type annotate _pytest.capture

This commit is contained in:
Ran Benita 2020-05-01 14:40:16 +03:00
parent b51ea4f1a5
commit 3e351afeb3
1 changed files with 59 additions and 53 deletions

View File

@ -9,9 +9,11 @@ import os
import sys
from io import UnsupportedOperation
from tempfile import TemporaryFile
from typing import Generator
from typing import Optional
from typing import TextIO
from typing import Tuple
from typing import Union
import pytest
from _pytest.compat import TYPE_CHECKING
@ -46,7 +48,7 @@ def pytest_addoption(parser: Parser) -> None:
)
def _colorama_workaround():
def _colorama_workaround() -> None:
"""
Ensure colorama is imported so that it attaches to the correct stdio
handles on Windows.
@ -62,7 +64,7 @@ def _colorama_workaround():
pass
def _readline_workaround():
def _readline_workaround() -> None:
"""
Ensure readline is imported so that it attaches to the correct stdio
handles on Windows.
@ -87,7 +89,7 @@ def _readline_workaround():
pass
def _py36_windowsconsoleio_workaround(stream):
def _py36_windowsconsoleio_workaround(stream: TextIO) -> None:
"""
Python 3.6 implemented unicode console handling for Windows. This works
by reading/writing to the raw console handle using
@ -202,7 +204,7 @@ class TeeCaptureIO(CaptureIO):
self._other = other
super().__init__()
def write(self, s) -> int:
def write(self, s: str) -> int:
super().write(s)
return self._other.write(s)
@ -222,13 +224,13 @@ class DontReadFromInput:
def __iter__(self):
return self
def fileno(self):
def fileno(self) -> int:
raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()")
def isatty(self):
def isatty(self) -> bool:
return False
def close(self):
def close(self) -> None:
pass
@property
@ -251,7 +253,7 @@ class SysCaptureBinary:
EMPTY_BUFFER = b""
def __init__(self, fd, tmpfile=None, *, tee=False):
def __init__(self, fd: int, tmpfile=None, *, tee: bool = False) -> None:
name = patchsysdict[fd]
self._old = getattr(sys, name)
self.name = name
@ -288,7 +290,7 @@ class SysCaptureBinary:
op, self._state, ", ".join(states)
)
def start(self):
def start(self) -> None:
self._assert_state("start", ("initialized",))
setattr(sys, self.name, self.tmpfile)
self._state = "started"
@ -301,7 +303,7 @@ class SysCaptureBinary:
self.tmpfile.truncate()
return res
def done(self):
def done(self) -> None:
self._assert_state("done", ("initialized", "started", "suspended", "done"))
if self._state == "done":
return
@ -310,19 +312,19 @@ class SysCaptureBinary:
self.tmpfile.close()
self._state = "done"
def suspend(self):
def suspend(self) -> None:
self._assert_state("suspend", ("started", "suspended"))
setattr(sys, self.name, self._old)
self._state = "suspended"
def resume(self):
def resume(self) -> None:
self._assert_state("resume", ("started", "suspended"))
if self._state == "started":
return
setattr(sys, self.name, self.tmpfile)
self._state = "started"
def writeorg(self, data):
def writeorg(self, data) -> None:
self._assert_state("writeorg", ("started", "suspended"))
self._old.flush()
self._old.buffer.write(data)
@ -352,7 +354,7 @@ class FDCaptureBinary:
EMPTY_BUFFER = b""
def __init__(self, targetfd):
def __init__(self, targetfd: int) -> None:
self.targetfd = targetfd
try:
@ -369,7 +371,9 @@ class FDCaptureBinary:
# Further complications are the need to support suspend() and the
# possibility of FD reuse (e.g. the tmpfile getting the very same
# target FD). The following approach is robust, I believe.
self.targetfd_invalid = os.open(os.devnull, os.O_RDWR)
self.targetfd_invalid = os.open(
os.devnull, os.O_RDWR
) # type: Optional[int]
os.dup2(self.targetfd_invalid, targetfd)
else:
self.targetfd_invalid = None
@ -380,7 +384,8 @@ class FDCaptureBinary:
self.syscapture = SysCapture(targetfd)
else:
self.tmpfile = EncodedFile(
TemporaryFile(buffering=0),
# TODO: Remove type ignore, fixed in next mypy release.
TemporaryFile(buffering=0), # type: ignore[arg-type]
encoding="utf-8",
errors="replace",
write_through=True,
@ -392,7 +397,7 @@ class FDCaptureBinary:
self._state = "initialized"
def __repr__(self):
def __repr__(self) -> str:
return "<{} {} oldfd={} _state={!r} tmpfile={!r}>".format(
self.__class__.__name__,
self.targetfd,
@ -408,7 +413,7 @@ class FDCaptureBinary:
op, self._state, ", ".join(states)
)
def start(self):
def start(self) -> None:
""" Start capturing on targetfd using memorized tmpfile. """
self._assert_state("start", ("initialized",))
os.dup2(self.tmpfile.fileno(), self.targetfd)
@ -423,7 +428,7 @@ class FDCaptureBinary:
self.tmpfile.truncate()
return res
def done(self):
def done(self) -> None:
""" stop capturing, restore streams, return original capture file,
seeked to position zero. """
self._assert_state("done", ("initialized", "started", "suspended", "done"))
@ -439,7 +444,7 @@ class FDCaptureBinary:
self.tmpfile.close()
self._state = "done"
def suspend(self):
def suspend(self) -> None:
self._assert_state("suspend", ("started", "suspended"))
if self._state == "suspended":
return
@ -447,7 +452,7 @@ class FDCaptureBinary:
os.dup2(self.targetfd_save, self.targetfd)
self._state = "suspended"
def resume(self):
def resume(self) -> None:
self._assert_state("resume", ("started", "suspended"))
if self._state == "started":
return
@ -497,12 +502,12 @@ class MultiCapture:
self.out = out
self.err = err
def __repr__(self):
def __repr__(self) -> str:
return "<MultiCapture out={!r} err={!r} in_={!r} _state={!r} _in_suspended={!r}>".format(
self.out, self.err, self.in_, self._state, self._in_suspended,
)
def start_capturing(self):
def start_capturing(self) -> None:
self._state = "started"
if self.in_:
self.in_.start()
@ -520,7 +525,7 @@ class MultiCapture:
self.err.writeorg(err)
return out, err
def suspend_capturing(self, in_=False):
def suspend_capturing(self, in_: bool = False) -> None:
self._state = "suspended"
if self.out:
self.out.suspend()
@ -530,7 +535,7 @@ class MultiCapture:
self.in_.suspend()
self._in_suspended = True
def resume_capturing(self):
def resume_capturing(self) -> None:
self._state = "resumed"
if self.out:
self.out.resume()
@ -540,7 +545,7 @@ class MultiCapture:
self.in_.resume()
self._in_suspended = False
def stop_capturing(self):
def stop_capturing(self) -> None:
""" stop capturing and reset capturing streams """
if self._state == "stopped":
raise ValueError("was already stopped")
@ -596,15 +601,15 @@ class CaptureManager:
def __init__(self, method: "_CaptureMethod") -> None:
self._method = method
self._global_capturing = None
self._global_capturing = None # type: Optional[MultiCapture]
self._capture_fixture = None # type: Optional[CaptureFixture]
def __repr__(self):
def __repr__(self) -> str:
return "<CaptureManager _method={!r} _global_capturing={!r} _capture_fixture={!r}>".format(
self._method, self._global_capturing, self._capture_fixture
)
def is_capturing(self):
def is_capturing(self) -> Union[str, bool]:
if self.is_globally_capturing():
return "global"
if self._capture_fixture:
@ -613,40 +618,41 @@ class CaptureManager:
# Global capturing control
def is_globally_capturing(self):
def is_globally_capturing(self) -> bool:
return self._method != "no"
def start_global_capturing(self):
def start_global_capturing(self) -> None:
assert self._global_capturing is None
self._global_capturing = _get_multicapture(self._method)
self._global_capturing.start_capturing()
def stop_global_capturing(self):
def stop_global_capturing(self) -> None:
if self._global_capturing is not None:
self._global_capturing.pop_outerr_to_orig()
self._global_capturing.stop_capturing()
self._global_capturing = None
def resume_global_capture(self):
def resume_global_capture(self) -> None:
# During teardown of the python process, and on rare occasions, capture
# attributes can be `None` while trying to resume global capture.
if self._global_capturing is not None:
self._global_capturing.resume_capturing()
def suspend_global_capture(self, in_=False):
def suspend_global_capture(self, in_: bool = False) -> None:
if self._global_capturing is not None:
self._global_capturing.suspend_capturing(in_=in_)
def suspend(self, in_=False):
def suspend(self, in_: bool = False) -> None:
# Need to undo local capsys-et-al if it exists before disabling global capture.
self.suspend_fixture()
self.suspend_global_capture(in_)
def resume(self):
def resume(self) -> None:
self.resume_global_capture()
self.resume_fixture()
def read_global_capture(self):
assert self._global_capturing is not None
return self._global_capturing.readouterr()
# Fixture Control
@ -665,30 +671,30 @@ class CaptureManager:
def unset_fixture(self) -> None:
self._capture_fixture = None
def activate_fixture(self):
def activate_fixture(self) -> None:
"""If the current item is using ``capsys`` or ``capfd``, activate them so they take precedence over
the global capture.
"""
if self._capture_fixture:
self._capture_fixture._start()
def deactivate_fixture(self):
def deactivate_fixture(self) -> None:
"""Deactivates the ``capsys`` or ``capfd`` fixture of this item, if any."""
if self._capture_fixture:
self._capture_fixture.close()
def suspend_fixture(self):
def suspend_fixture(self) -> None:
if self._capture_fixture:
self._capture_fixture._suspend()
def resume_fixture(self):
def resume_fixture(self) -> None:
if self._capture_fixture:
self._capture_fixture._resume()
# Helper context managers
@contextlib.contextmanager
def global_and_fixture_disabled(self):
def global_and_fixture_disabled(self) -> Generator[None, None, None]:
"""Context manager to temporarily disable global and current fixture capturing."""
self.suspend()
try:
@ -697,7 +703,7 @@ class CaptureManager:
self.resume()
@contextlib.contextmanager
def item_capture(self, when, item):
def item_capture(self, when: str, item: Item) -> Generator[None, None, None]:
self.resume_global_capture()
self.activate_fixture()
try:
@ -757,21 +763,21 @@ class CaptureFixture:
fixtures.
"""
def __init__(self, captureclass, request):
def __init__(self, captureclass, request: SubRequest) -> None:
self.captureclass = captureclass
self.request = request
self._capture = None
self._capture = None # type: Optional[MultiCapture]
self._captured_out = self.captureclass.EMPTY_BUFFER
self._captured_err = self.captureclass.EMPTY_BUFFER
def _start(self):
def _start(self) -> None:
if self._capture is None:
self._capture = MultiCapture(
in_=None, out=self.captureclass(1), err=self.captureclass(2),
)
self._capture.start_capturing()
def close(self):
def close(self) -> None:
if self._capture is not None:
out, err = self._capture.pop_outerr_to_orig()
self._captured_out += out
@ -793,18 +799,18 @@ class CaptureFixture:
self._captured_err = self.captureclass.EMPTY_BUFFER
return CaptureResult(captured_out, captured_err)
def _suspend(self):
def _suspend(self) -> None:
"""Suspends this fixture's own capturing temporarily."""
if self._capture is not None:
self._capture.suspend_capturing()
def _resume(self):
def _resume(self) -> None:
"""Resumes this fixture's own capturing temporarily."""
if self._capture is not None:
self._capture.resume_capturing()
@contextlib.contextmanager
def disabled(self):
def disabled(self) -> Generator[None, None, None]:
"""Temporarily disables capture while inside the 'with' block."""
capmanager = self.request.config.pluginmanager.getplugin("capturemanager")
with capmanager.global_and_fixture_disabled():
@ -815,7 +821,7 @@ class CaptureFixture:
@pytest.fixture
def capsys(request):
def capsys(request: SubRequest):
"""Enable text capturing of writes to ``sys.stdout`` and ``sys.stderr``.
The captured output is made available via ``capsys.readouterr()`` method
@ -832,7 +838,7 @@ def capsys(request):
@pytest.fixture
def capsysbinary(request):
def capsysbinary(request: SubRequest):
"""Enable bytes capturing of writes to ``sys.stdout`` and ``sys.stderr``.
The captured output is made available via ``capsysbinary.readouterr()``
@ -849,7 +855,7 @@ def capsysbinary(request):
@pytest.fixture
def capfd(request):
def capfd(request: SubRequest):
"""Enable text capturing of writes to file descriptors ``1`` and ``2``.
The captured output is made available via ``capfd.readouterr()`` method
@ -866,7 +872,7 @@ def capfd(request):
@pytest.fixture
def capfdbinary(request):
def capfdbinary(request: SubRequest):
"""Enable bytes capturing of writes to file descriptors ``1`` and ``2``.
The captured output is made available via ``capfd.readouterr()`` method