From 29e4cb5d45f44379aba948c2cd791b3b97210e31 Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Sat, 7 Mar 2020 18:38:22 +0200 Subject: [PATCH] Remove safe_text_dupfile() and simplify EncodedFile I tried to understand what the `safe_text_dupfile()` function and `EncodedFile` class do. Outside tests, `EncodedFile` is only used by `safe_text_dupfile`, and `safe_text_dupfile` is only used by `FDCaptureBinary.__init__()`. I then started to eliminate always-true conditions based on the single call site, and in the end nothing was left except of a couple workarounds that are still needed. --- src/_pytest/capture.py | 66 ++++++++++-------------------------- testing/test_capture.py | 75 ++++++----------------------------------- 2 files changed, 28 insertions(+), 113 deletions(-) diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index 2af207e21..a64c72b5a 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -9,9 +9,7 @@ import os import sys from io import UnsupportedOperation from tempfile import TemporaryFile -from typing import BinaryIO from typing import Generator -from typing import Iterable from typing import Optional import pytest @@ -382,54 +380,21 @@ class CaptureFixture: yield -def safe_text_dupfile(f, mode, default_encoding="UTF8"): - """ return an open text file object that's a duplicate of f on the - FD-level if possible. - """ - encoding = getattr(f, "encoding", None) - try: - fd = f.fileno() - except Exception: - if "b" not in getattr(f, "mode", "") and hasattr(f, "encoding"): - # we seem to have a text stream, let's just use it - return f - else: - newfd = os.dup(fd) - if "b" not in mode: - mode += "b" - f = os.fdopen(newfd, mode, 0) # no buffering - return EncodedFile(f, encoding or default_encoding) - - -class EncodedFile: - errors = "strict" # possibly needed by py3 code (issue555) - - def __init__(self, buffer: BinaryIO, encoding: str) -> None: - self.buffer = buffer - self.encoding = encoding - - def write(self, s: str) -> int: - if not isinstance(s, str): - raise TypeError( - "write() argument must be str, not {}".format(type(s).__name__) - ) - return self.buffer.write(s.encode(self.encoding, "replace")) - - def writelines(self, lines: Iterable[str]) -> None: - self.buffer.writelines(x.encode(self.encoding, "replace") for x in lines) +class EncodedFile(io.TextIOWrapper): + __slots__ = () @property def name(self) -> str: - """Ensure that file.name is a string.""" + # Ensure that file.name is a string. Workaround for a Python bug + # fixed in >=3.7.4: https://bugs.python.org/issue36015 return repr(self.buffer) @property def mode(self) -> str: + # TextIOWrapper doesn't expose a mode, but at least some of our + # tests check it. return self.buffer.mode.replace("b", "") - def __getattr__(self, name): - return getattr(object.__getattribute__(self, "buffer"), name) - CaptureResult = collections.namedtuple("CaptureResult", ["out", "err"]) @@ -544,9 +509,12 @@ class FDCaptureBinary: self.syscapture = SysCapture(targetfd) else: if tmpfile is None: - f = TemporaryFile() - with f: - tmpfile = safe_text_dupfile(f, mode="wb+") + tmpfile = EncodedFile( + TemporaryFile(buffering=0), + encoding="utf-8", + errors="replace", + write_through=True, + ) if targetfd in patchsysdict: self.syscapture = SysCapture(targetfd, tmpfile) else: @@ -575,7 +543,7 @@ class FDCaptureBinary: def snap(self): self.tmpfile.seek(0) - res = self.tmpfile.read() + res = self.tmpfile.buffer.read() self.tmpfile.seek(0) self.tmpfile.truncate() return res @@ -617,10 +585,10 @@ class FDCapture(FDCaptureBinary): EMPTY_BUFFER = str() # type: ignore def snap(self): - res = super().snap() - enc = getattr(self.tmpfile, "encoding", None) - if enc and isinstance(res, bytes): - res = str(res, enc, "replace") + self.tmpfile.seek(0) + res = self.tmpfile.read() + self.tmpfile.seek(0) + self.tmpfile.truncate() return res diff --git a/testing/test_capture.py b/testing/test_capture.py index a2b100fda..652461515 100644 --- a/testing/test_capture.py +++ b/testing/test_capture.py @@ -1,16 +1,12 @@ import contextlib import io import os -import pickle import subprocess import sys import textwrap -from io import StringIO from io import UnsupportedOperation from typing import BinaryIO from typing import Generator -from typing import List -from typing import TextIO import pytest from _pytest import capture @@ -827,48 +823,6 @@ def tmpfile(testdir) -> Generator[BinaryIO, None, None]: f.close() -def test_dupfile(tmpfile) -> None: - flist = [] # type: List[TextIO] - for i in range(5): - nf = capture.safe_text_dupfile(tmpfile, "wb") - assert nf != tmpfile - assert nf.fileno() != tmpfile.fileno() - assert nf not in flist - print(i, end="", file=nf) - flist.append(nf) - - fname_open = flist[0].name - assert fname_open == repr(flist[0].buffer) - - for i in range(5): - f = flist[i] - f.close() - fname_closed = flist[0].name - assert fname_closed == repr(flist[0].buffer) - assert fname_closed != fname_open - tmpfile.seek(0) - s = tmpfile.read() - assert "01234" in repr(s) - tmpfile.close() - assert fname_closed == repr(flist[0].buffer) - - -def test_dupfile_on_bytesio(): - bio = io.BytesIO() - f = capture.safe_text_dupfile(bio, "wb") - f.write("hello") - assert bio.getvalue() == b"hello" - assert "BytesIO object" in f.name - - -def test_dupfile_on_textio(): - sio = StringIO() - f = capture.safe_text_dupfile(sio, "wb") - f.write("hello") - assert sio.getvalue() == "hello" - assert not hasattr(f, "name") - - @contextlib.contextmanager def lsof_check(): pid = os.getpid() @@ -1307,8 +1261,8 @@ def test_error_attribute_issue555(testdir): """ import sys def test_capattr(): - assert sys.stdout.errors == "strict" - assert sys.stderr.errors == "strict" + assert sys.stdout.errors == "replace" + assert sys.stderr.errors == "replace" """ ) reprec = testdir.inline_run() @@ -1383,15 +1337,6 @@ def test_crash_on_closing_tmpfile_py27(testdir): result.stdout.no_fnmatch_line("*IOError*") -def test_pickling_and_unpickling_encoded_file(): - # See https://bitbucket.org/pytest-dev/pytest/pull-request/194 - # pickle.loads() raises infinite recursion if - # EncodedFile.__getattr__ is not implemented properly - ef = capture.EncodedFile(None, None) - ef_as_str = pickle.dumps(ef) - pickle.loads(ef_as_str) - - def test_global_capture_with_live_logging(testdir): # Issue 3819 # capture should work with live cli logging @@ -1497,8 +1442,9 @@ def test_typeerror_encodedfile_write(testdir): result_with_capture = testdir.runpytest(str(p)) assert result_with_capture.ret == result_without_capture.ret - result_with_capture.stdout.fnmatch_lines( - ["E * TypeError: write() argument must be str, not bytes"] + out = result_with_capture.stdout.str() + assert ("TypeError: write() argument must be str, not bytes" in out) or ( + "TypeError: unicode argument expected, got 'bytes'" in out ) @@ -1508,12 +1454,13 @@ def test_stderr_write_returns_len(capsys): def test_encodedfile_writelines(tmpfile: BinaryIO) -> None: - ef = capture.EncodedFile(tmpfile, "utf-8") - with pytest.raises(AttributeError): - ef.writelines([b"line1", b"line2"]) # type: ignore[list-item] # noqa: F821 - assert ef.writelines(["line1", "line2"]) is None # type: ignore[func-returns-value] # noqa: F821 + ef = capture.EncodedFile(tmpfile, encoding="utf-8") + with pytest.raises(TypeError): + ef.writelines([b"line1", b"line2"]) + assert ef.writelines(["line3", "line4"]) is None # type: ignore[func-returns-value] # noqa: F821 + ef.flush() tmpfile.seek(0) - assert tmpfile.read() == b"line1line2" + assert tmpfile.read() == b"line3line4" tmpfile.close() with pytest.raises(ValueError): ef.read()