Remove safe_text_dupfile() and simplify EncodedFile
I tried to understand what the `safe_text_dupfile()` function and `EncodedFile` class do. Outside tests, `EncodedFile` is only used by `safe_text_dupfile`, and `safe_text_dupfile` is only used by `FDCaptureBinary.__init__()`. I then started to eliminate always-true conditions based on the single call site, and in the end nothing was left except of a couple workarounds that are still needed.
This commit is contained in:
parent
d7f01a90eb
commit
29e4cb5d45
|
@ -9,9 +9,7 @@ import os
|
|||
import sys
|
||||
from io import UnsupportedOperation
|
||||
from tempfile import TemporaryFile
|
||||
from typing import BinaryIO
|
||||
from typing import Generator
|
||||
from typing import Iterable
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
|
@ -382,54 +380,21 @@ class CaptureFixture:
|
|||
yield
|
||||
|
||||
|
||||
def safe_text_dupfile(f, mode, default_encoding="UTF8"):
|
||||
""" return an open text file object that's a duplicate of f on the
|
||||
FD-level if possible.
|
||||
"""
|
||||
encoding = getattr(f, "encoding", None)
|
||||
try:
|
||||
fd = f.fileno()
|
||||
except Exception:
|
||||
if "b" not in getattr(f, "mode", "") and hasattr(f, "encoding"):
|
||||
# we seem to have a text stream, let's just use it
|
||||
return f
|
||||
else:
|
||||
newfd = os.dup(fd)
|
||||
if "b" not in mode:
|
||||
mode += "b"
|
||||
f = os.fdopen(newfd, mode, 0) # no buffering
|
||||
return EncodedFile(f, encoding or default_encoding)
|
||||
|
||||
|
||||
class EncodedFile:
|
||||
errors = "strict" # possibly needed by py3 code (issue555)
|
||||
|
||||
def __init__(self, buffer: BinaryIO, encoding: str) -> None:
|
||||
self.buffer = buffer
|
||||
self.encoding = encoding
|
||||
|
||||
def write(self, s: str) -> int:
|
||||
if not isinstance(s, str):
|
||||
raise TypeError(
|
||||
"write() argument must be str, not {}".format(type(s).__name__)
|
||||
)
|
||||
return self.buffer.write(s.encode(self.encoding, "replace"))
|
||||
|
||||
def writelines(self, lines: Iterable[str]) -> None:
|
||||
self.buffer.writelines(x.encode(self.encoding, "replace") for x in lines)
|
||||
class EncodedFile(io.TextIOWrapper):
|
||||
__slots__ = ()
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
"""Ensure that file.name is a string."""
|
||||
# Ensure that file.name is a string. Workaround for a Python bug
|
||||
# fixed in >=3.7.4: https://bugs.python.org/issue36015
|
||||
return repr(self.buffer)
|
||||
|
||||
@property
|
||||
def mode(self) -> str:
|
||||
# TextIOWrapper doesn't expose a mode, but at least some of our
|
||||
# tests check it.
|
||||
return self.buffer.mode.replace("b", "")
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(object.__getattribute__(self, "buffer"), name)
|
||||
|
||||
|
||||
CaptureResult = collections.namedtuple("CaptureResult", ["out", "err"])
|
||||
|
||||
|
@ -544,9 +509,12 @@ class FDCaptureBinary:
|
|||
self.syscapture = SysCapture(targetfd)
|
||||
else:
|
||||
if tmpfile is None:
|
||||
f = TemporaryFile()
|
||||
with f:
|
||||
tmpfile = safe_text_dupfile(f, mode="wb+")
|
||||
tmpfile = EncodedFile(
|
||||
TemporaryFile(buffering=0),
|
||||
encoding="utf-8",
|
||||
errors="replace",
|
||||
write_through=True,
|
||||
)
|
||||
if targetfd in patchsysdict:
|
||||
self.syscapture = SysCapture(targetfd, tmpfile)
|
||||
else:
|
||||
|
@ -575,7 +543,7 @@ class FDCaptureBinary:
|
|||
|
||||
def snap(self):
|
||||
self.tmpfile.seek(0)
|
||||
res = self.tmpfile.read()
|
||||
res = self.tmpfile.buffer.read()
|
||||
self.tmpfile.seek(0)
|
||||
self.tmpfile.truncate()
|
||||
return res
|
||||
|
@ -617,10 +585,10 @@ class FDCapture(FDCaptureBinary):
|
|||
EMPTY_BUFFER = str() # type: ignore
|
||||
|
||||
def snap(self):
|
||||
res = super().snap()
|
||||
enc = getattr(self.tmpfile, "encoding", None)
|
||||
if enc and isinstance(res, bytes):
|
||||
res = str(res, enc, "replace")
|
||||
self.tmpfile.seek(0)
|
||||
res = self.tmpfile.read()
|
||||
self.tmpfile.seek(0)
|
||||
self.tmpfile.truncate()
|
||||
return res
|
||||
|
||||
|
||||
|
|
|
@ -1,16 +1,12 @@
|
|||
import contextlib
|
||||
import io
|
||||
import os
|
||||
import pickle
|
||||
import subprocess
|
||||
import sys
|
||||
import textwrap
|
||||
from io import StringIO
|
||||
from io import UnsupportedOperation
|
||||
from typing import BinaryIO
|
||||
from typing import Generator
|
||||
from typing import List
|
||||
from typing import TextIO
|
||||
|
||||
import pytest
|
||||
from _pytest import capture
|
||||
|
@ -827,48 +823,6 @@ def tmpfile(testdir) -> Generator[BinaryIO, None, None]:
|
|||
f.close()
|
||||
|
||||
|
||||
def test_dupfile(tmpfile) -> None:
|
||||
flist = [] # type: List[TextIO]
|
||||
for i in range(5):
|
||||
nf = capture.safe_text_dupfile(tmpfile, "wb")
|
||||
assert nf != tmpfile
|
||||
assert nf.fileno() != tmpfile.fileno()
|
||||
assert nf not in flist
|
||||
print(i, end="", file=nf)
|
||||
flist.append(nf)
|
||||
|
||||
fname_open = flist[0].name
|
||||
assert fname_open == repr(flist[0].buffer)
|
||||
|
||||
for i in range(5):
|
||||
f = flist[i]
|
||||
f.close()
|
||||
fname_closed = flist[0].name
|
||||
assert fname_closed == repr(flist[0].buffer)
|
||||
assert fname_closed != fname_open
|
||||
tmpfile.seek(0)
|
||||
s = tmpfile.read()
|
||||
assert "01234" in repr(s)
|
||||
tmpfile.close()
|
||||
assert fname_closed == repr(flist[0].buffer)
|
||||
|
||||
|
||||
def test_dupfile_on_bytesio():
|
||||
bio = io.BytesIO()
|
||||
f = capture.safe_text_dupfile(bio, "wb")
|
||||
f.write("hello")
|
||||
assert bio.getvalue() == b"hello"
|
||||
assert "BytesIO object" in f.name
|
||||
|
||||
|
||||
def test_dupfile_on_textio():
|
||||
sio = StringIO()
|
||||
f = capture.safe_text_dupfile(sio, "wb")
|
||||
f.write("hello")
|
||||
assert sio.getvalue() == "hello"
|
||||
assert not hasattr(f, "name")
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def lsof_check():
|
||||
pid = os.getpid()
|
||||
|
@ -1307,8 +1261,8 @@ def test_error_attribute_issue555(testdir):
|
|||
"""
|
||||
import sys
|
||||
def test_capattr():
|
||||
assert sys.stdout.errors == "strict"
|
||||
assert sys.stderr.errors == "strict"
|
||||
assert sys.stdout.errors == "replace"
|
||||
assert sys.stderr.errors == "replace"
|
||||
"""
|
||||
)
|
||||
reprec = testdir.inline_run()
|
||||
|
@ -1383,15 +1337,6 @@ def test_crash_on_closing_tmpfile_py27(testdir):
|
|||
result.stdout.no_fnmatch_line("*IOError*")
|
||||
|
||||
|
||||
def test_pickling_and_unpickling_encoded_file():
|
||||
# See https://bitbucket.org/pytest-dev/pytest/pull-request/194
|
||||
# pickle.loads() raises infinite recursion if
|
||||
# EncodedFile.__getattr__ is not implemented properly
|
||||
ef = capture.EncodedFile(None, None)
|
||||
ef_as_str = pickle.dumps(ef)
|
||||
pickle.loads(ef_as_str)
|
||||
|
||||
|
||||
def test_global_capture_with_live_logging(testdir):
|
||||
# Issue 3819
|
||||
# capture should work with live cli logging
|
||||
|
@ -1497,8 +1442,9 @@ def test_typeerror_encodedfile_write(testdir):
|
|||
result_with_capture = testdir.runpytest(str(p))
|
||||
|
||||
assert result_with_capture.ret == result_without_capture.ret
|
||||
result_with_capture.stdout.fnmatch_lines(
|
||||
["E * TypeError: write() argument must be str, not bytes"]
|
||||
out = result_with_capture.stdout.str()
|
||||
assert ("TypeError: write() argument must be str, not bytes" in out) or (
|
||||
"TypeError: unicode argument expected, got 'bytes'" in out
|
||||
)
|
||||
|
||||
|
||||
|
@ -1508,12 +1454,13 @@ def test_stderr_write_returns_len(capsys):
|
|||
|
||||
|
||||
def test_encodedfile_writelines(tmpfile: BinaryIO) -> None:
|
||||
ef = capture.EncodedFile(tmpfile, "utf-8")
|
||||
with pytest.raises(AttributeError):
|
||||
ef.writelines([b"line1", b"line2"]) # type: ignore[list-item] # noqa: F821
|
||||
assert ef.writelines(["line1", "line2"]) is None # type: ignore[func-returns-value] # noqa: F821
|
||||
ef = capture.EncodedFile(tmpfile, encoding="utf-8")
|
||||
with pytest.raises(TypeError):
|
||||
ef.writelines([b"line1", b"line2"])
|
||||
assert ef.writelines(["line3", "line4"]) is None # type: ignore[func-returns-value] # noqa: F821
|
||||
ef.flush()
|
||||
tmpfile.seek(0)
|
||||
assert tmpfile.read() == b"line1line2"
|
||||
assert tmpfile.read() == b"line3line4"
|
||||
tmpfile.close()
|
||||
with pytest.raises(ValueError):
|
||||
ef.read()
|
||||
|
|
Loading…
Reference in New Issue