diff --git a/changelog/7091.improvement.rst b/changelog/7091.improvement.rst new file mode 100644 index 000000000..72f17c5e4 --- /dev/null +++ b/changelog/7091.improvement.rst @@ -0,0 +1,4 @@ +When ``fd`` capturing is used, through ``--capture=fd`` or the ``capfd`` and +``capfdbinary`` fixtures, and the file descriptor (0, 1, 2) cannot be +duplicated, FD capturing is still performed. Previously, direct writes to the +file descriptors would fail or be lost in this case. diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index 7eafeb3e4..323881151 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -513,49 +513,57 @@ class FDCaptureBinary: def __init__(self, targetfd, tmpfile=None): self.targetfd = targetfd + try: - self.targetfd_save = os.dup(self.targetfd) + os.fstat(targetfd) except OSError: - self.start = lambda: None - self.done = lambda: None + # FD capturing is conceptually simple -- create a temporary file, + # redirect the FD to it, redirect back when done. But when the + # target FD is invalid it throws a wrench into this loveley scheme. + # + # Tests themselves shouldn't care if the FD is valid, FD capturing + # should work regardless of external circumstances. So falling back + # to just sys capturing is not a good option. + # + # Further complications are the need to support suspend() and the + # possibility of FD reuse (e.g. the tmpfile getting the very same + # target FD). The following approach is robust, I believe. + self.targetfd_invalid = os.open(os.devnull, os.O_RDWR) + os.dup2(self.targetfd_invalid, targetfd) else: - self.start = self._start - self.done = self._done - if targetfd == 0: - assert not tmpfile, "cannot set tmpfile with stdin" - tmpfile = open(os.devnull) - self.syscapture = SysCapture(targetfd) + self.targetfd_invalid = None + self.targetfd_save = os.dup(targetfd) + + if targetfd == 0: + assert not tmpfile, "cannot set tmpfile with stdin" + tmpfile = open(os.devnull) + self.syscapture = SysCapture(targetfd) + else: + if tmpfile is None: + tmpfile = EncodedFile( + TemporaryFile(buffering=0), + encoding="utf-8", + errors="replace", + write_through=True, + ) + if targetfd in patchsysdict: + self.syscapture = SysCapture(targetfd, tmpfile) else: - if tmpfile is None: - tmpfile = EncodedFile( - TemporaryFile(buffering=0), - encoding="utf-8", - errors="replace", - write_through=True, - ) - if targetfd in patchsysdict: - self.syscapture = SysCapture(targetfd, tmpfile) - else: - self.syscapture = NoCapture() - self.tmpfile = tmpfile - self.tmpfile_fd = tmpfile.fileno() + self.syscapture = NoCapture() + self.tmpfile = tmpfile def __repr__(self): - return "<{} {} oldfd={} _state={!r} tmpfile={}>".format( + return "<{} {} oldfd={} _state={!r} tmpfile={!r}>".format( self.__class__.__name__, self.targetfd, - getattr(self, "targetfd_save", ""), + self.targetfd_save, self._state, - hasattr(self, "tmpfile") and repr(self.tmpfile) or "", + self.tmpfile, ) - def _start(self): + def start(self): """ Start capturing on targetfd using memorized tmpfile. """ - try: - os.fstat(self.targetfd_save) - except (AttributeError, OSError): - raise ValueError("saved filedescriptor not valid anymore") - os.dup2(self.tmpfile_fd, self.targetfd) + os.dup2(self.tmpfile.fileno(), self.targetfd) self.syscapture.start() self._state = "started" @@ -566,12 +574,15 @@ class FDCaptureBinary: self.tmpfile.truncate() return res - def _done(self): + def done(self): """ stop capturing, restore streams, return original capture file, seeked to position zero. """ - targetfd_save = self.__dict__.pop("targetfd_save") - os.dup2(targetfd_save, self.targetfd) - os.close(targetfd_save) + os.dup2(self.targetfd_save, self.targetfd) + os.close(self.targetfd_save) + if self.targetfd_invalid is not None: + if self.targetfd_invalid != self.targetfd: + os.close(self.targetfd) + os.close(self.targetfd_invalid) self.syscapture.done() self.tmpfile.close() self._state = "done" @@ -583,7 +594,7 @@ class FDCaptureBinary: def resume(self): self.syscapture.resume() - os.dup2(self.tmpfile_fd, self.targetfd) + os.dup2(self.tmpfile.fileno(), self.targetfd) self._state = "resumed" def writeorg(self, data): @@ -609,8 +620,7 @@ class FDCapture(FDCaptureBinary): def writeorg(self, data): """ write to original file descriptor. """ - data = data.encode("utf-8") # XXX use encoding of original stream - os.write(self.targetfd_save, data) + super().writeorg(data.encode("utf-8")) # XXX use encoding of original stream class SysCaptureBinary: diff --git a/testing/test_capture.py b/testing/test_capture.py index c064614d2..177d72ebc 100644 --- a/testing/test_capture.py +++ b/testing/test_capture.py @@ -943,8 +943,8 @@ class TestFDCapture: pytest.raises(AttributeError, cap.suspend) assert repr(cap) == ( - " _state='done' tmpfile={!r}>".format( - cap.tmpfile + "".format( + cap.targetfd_save, cap.tmpfile ) ) # Should not crash with missing "_old". @@ -1150,6 +1150,7 @@ class TestStdCaptureFDinvalidFD: testdir.makepyfile( """ import os + from fnmatch import fnmatch from _pytest import capture def StdCaptureFD(out=True, err=True, in_=True): @@ -1158,19 +1159,25 @@ class TestStdCaptureFDinvalidFD: def test_stdout(): os.close(1) cap = StdCaptureFD(out=True, err=False, in_=False) - assert repr(cap.out) == " _state=None tmpfile=>" + assert fnmatch(repr(cap.out), "") + cap.start_capturing() + os.write(1, b"stdout") + assert cap.readouterr() == ("stdout", "") cap.stop_capturing() def test_stderr(): os.close(2) cap = StdCaptureFD(out=False, err=True, in_=False) - assert repr(cap.err) == " _state=None tmpfile=>" + assert fnmatch(repr(cap.err), "") + cap.start_capturing() + os.write(2, b"stderr") + assert cap.readouterr() == ("", "stderr") cap.stop_capturing() def test_stdin(): os.close(0) cap = StdCaptureFD(out=False, err=False, in_=True) - assert repr(cap.in_) == " _state=None tmpfile=>" + assert fnmatch(repr(cap.in_), "") cap.stop_capturing() """ ) @@ -1178,6 +1185,37 @@ class TestStdCaptureFDinvalidFD: assert result.ret == 0 assert result.parseoutcomes()["passed"] == 3 + def test_fdcapture_invalid_fd_with_fd_reuse(self, testdir): + with saved_fd(1): + os.close(1) + cap = capture.FDCaptureBinary(1) + cap.start() + os.write(1, b"started") + cap.suspend() + os.write(1, b" suspended") + cap.resume() + os.write(1, b" resumed") + assert cap.snap() == b"started resumed" + cap.done() + with pytest.raises(OSError): + os.write(1, b"done") + + def test_fdcapture_invalid_fd_without_fd_reuse(self, testdir): + with saved_fd(1), saved_fd(2): + os.close(1) + os.close(2) + cap = capture.FDCaptureBinary(2) + cap.start() + os.write(2, b"started") + cap.suspend() + os.write(2, b" suspended") + cap.resume() + os.write(2, b" resumed") + assert cap.snap() == b"started resumed" + cap.done() + with pytest.raises(OSError): + os.write(2, b"done") + def test_capture_not_started_but_reset(): capsys = StdCapture()