Merge pull request #7091 from bluetech/capture-invalid-fd

Perform FD capturing even if the FD is invalid
This commit is contained in:
Ran Benita 2020-05-22 14:36:49 +03:00 committed by GitHub
commit 4a1557fa0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 95 additions and 43 deletions

View File

@ -0,0 +1,4 @@
When ``fd`` capturing is used, through ``--capture=fd`` or the ``capfd`` and
``capfdbinary`` fixtures, and the file descriptor (0, 1, 2) cannot be
duplicated, FD capturing is still performed. Previously, direct writes to the
file descriptors would fail or be lost in this case.

View File

@ -513,49 +513,57 @@ class FDCaptureBinary:
def __init__(self, targetfd, tmpfile=None):
self.targetfd = targetfd
try:
self.targetfd_save = os.dup(self.targetfd)
os.fstat(targetfd)
except OSError:
self.start = lambda: None
self.done = lambda: None
# FD capturing is conceptually simple -- create a temporary file,
# redirect the FD to it, redirect back when done. But when the
# target FD is invalid it throws a wrench into this loveley scheme.
#
# Tests themselves shouldn't care if the FD is valid, FD capturing
# should work regardless of external circumstances. So falling back
# to just sys capturing is not a good option.
#
# Further complications are the need to support suspend() and the
# possibility of FD reuse (e.g. the tmpfile getting the very same
# target FD). The following approach is robust, I believe.
self.targetfd_invalid = os.open(os.devnull, os.O_RDWR)
os.dup2(self.targetfd_invalid, targetfd)
else:
self.start = self._start
self.done = self._done
if targetfd == 0:
assert not tmpfile, "cannot set tmpfile with stdin"
tmpfile = open(os.devnull)
self.syscapture = SysCapture(targetfd)
self.targetfd_invalid = None
self.targetfd_save = os.dup(targetfd)
if targetfd == 0:
assert not tmpfile, "cannot set tmpfile with stdin"
tmpfile = open(os.devnull)
self.syscapture = SysCapture(targetfd)
else:
if tmpfile is None:
tmpfile = EncodedFile(
TemporaryFile(buffering=0),
encoding="utf-8",
errors="replace",
write_through=True,
)
if targetfd in patchsysdict:
self.syscapture = SysCapture(targetfd, tmpfile)
else:
if tmpfile is None:
tmpfile = EncodedFile(
TemporaryFile(buffering=0),
encoding="utf-8",
errors="replace",
write_through=True,
)
if targetfd in patchsysdict:
self.syscapture = SysCapture(targetfd, tmpfile)
else:
self.syscapture = NoCapture()
self.tmpfile = tmpfile
self.tmpfile_fd = tmpfile.fileno()
self.syscapture = NoCapture()
self.tmpfile = tmpfile
def __repr__(self):
return "<{} {} oldfd={} _state={!r} tmpfile={}>".format(
return "<{} {} oldfd={} _state={!r} tmpfile={!r}>".format(
self.__class__.__name__,
self.targetfd,
getattr(self, "targetfd_save", "<UNSET>"),
self.targetfd_save,
self._state,
hasattr(self, "tmpfile") and repr(self.tmpfile) or "<UNSET>",
self.tmpfile,
)
def _start(self):
def start(self):
""" Start capturing on targetfd using memorized tmpfile. """
try:
os.fstat(self.targetfd_save)
except (AttributeError, OSError):
raise ValueError("saved filedescriptor not valid anymore")
os.dup2(self.tmpfile_fd, self.targetfd)
os.dup2(self.tmpfile.fileno(), self.targetfd)
self.syscapture.start()
self._state = "started"
@ -566,12 +574,15 @@ class FDCaptureBinary:
self.tmpfile.truncate()
return res
def _done(self):
def done(self):
""" stop capturing, restore streams, return original capture file,
seeked to position zero. """
targetfd_save = self.__dict__.pop("targetfd_save")
os.dup2(targetfd_save, self.targetfd)
os.close(targetfd_save)
os.dup2(self.targetfd_save, self.targetfd)
os.close(self.targetfd_save)
if self.targetfd_invalid is not None:
if self.targetfd_invalid != self.targetfd:
os.close(self.targetfd)
os.close(self.targetfd_invalid)
self.syscapture.done()
self.tmpfile.close()
self._state = "done"
@ -583,7 +594,7 @@ class FDCaptureBinary:
def resume(self):
self.syscapture.resume()
os.dup2(self.tmpfile_fd, self.targetfd)
os.dup2(self.tmpfile.fileno(), self.targetfd)
self._state = "resumed"
def writeorg(self, data):
@ -609,8 +620,7 @@ class FDCapture(FDCaptureBinary):
def writeorg(self, data):
""" write to original file descriptor. """
data = data.encode("utf-8") # XXX use encoding of original stream
os.write(self.targetfd_save, data)
super().writeorg(data.encode("utf-8")) # XXX use encoding of original stream
class SysCaptureBinary:

View File

@ -943,8 +943,8 @@ class TestFDCapture:
pytest.raises(AttributeError, cap.suspend)
assert repr(cap) == (
"<FDCapture 1 oldfd=<UNSET> _state='done' tmpfile={!r}>".format(
cap.tmpfile
"<FDCapture 1 oldfd={} _state='done' tmpfile={!r}>".format(
cap.targetfd_save, cap.tmpfile
)
)
# Should not crash with missing "_old".
@ -1150,6 +1150,7 @@ class TestStdCaptureFDinvalidFD:
testdir.makepyfile(
"""
import os
from fnmatch import fnmatch
from _pytest import capture
def StdCaptureFD(out=True, err=True, in_=True):
@ -1158,19 +1159,25 @@ class TestStdCaptureFDinvalidFD:
def test_stdout():
os.close(1)
cap = StdCaptureFD(out=True, err=False, in_=False)
assert repr(cap.out) == "<FDCapture 1 oldfd=<UNSET> _state=None tmpfile=<UNSET>>"
assert fnmatch(repr(cap.out), "<FDCapture 1 oldfd=* _state=None tmpfile=*>")
cap.start_capturing()
os.write(1, b"stdout")
assert cap.readouterr() == ("stdout", "")
cap.stop_capturing()
def test_stderr():
os.close(2)
cap = StdCaptureFD(out=False, err=True, in_=False)
assert repr(cap.err) == "<FDCapture 2 oldfd=<UNSET> _state=None tmpfile=<UNSET>>"
assert fnmatch(repr(cap.err), "<FDCapture 2 oldfd=* _state=None tmpfile=*>")
cap.start_capturing()
os.write(2, b"stderr")
assert cap.readouterr() == ("", "stderr")
cap.stop_capturing()
def test_stdin():
os.close(0)
cap = StdCaptureFD(out=False, err=False, in_=True)
assert repr(cap.in_) == "<FDCapture 0 oldfd=<UNSET> _state=None tmpfile=<UNSET>>"
assert fnmatch(repr(cap.in_), "<FDCapture 0 oldfd=* _state=None tmpfile=*>")
cap.stop_capturing()
"""
)
@ -1178,6 +1185,37 @@ class TestStdCaptureFDinvalidFD:
assert result.ret == 0
assert result.parseoutcomes()["passed"] == 3
def test_fdcapture_invalid_fd_with_fd_reuse(self, testdir):
with saved_fd(1):
os.close(1)
cap = capture.FDCaptureBinary(1)
cap.start()
os.write(1, b"started")
cap.suspend()
os.write(1, b" suspended")
cap.resume()
os.write(1, b" resumed")
assert cap.snap() == b"started resumed"
cap.done()
with pytest.raises(OSError):
os.write(1, b"done")
def test_fdcapture_invalid_fd_without_fd_reuse(self, testdir):
with saved_fd(1), saved_fd(2):
os.close(1)
os.close(2)
cap = capture.FDCaptureBinary(2)
cap.start()
os.write(2, b"started")
cap.suspend()
os.write(2, b" suspended")
cap.resume()
os.write(2, b" resumed")
assert cap.snap() == b"started resumed"
cap.done()
with pytest.raises(OSError):
os.write(2, b"done")
def test_capture_not_started_but_reset():
capsys = StdCapture()