import contextlib import io import os import subprocess import sys import textwrap from io import UnsupportedOperation from typing import BinaryIO from typing import cast from typing import Generator from typing import TextIO import pytest from _pytest import capture from _pytest.capture import _get_multicapture from _pytest.capture import CaptureFixture from _pytest.capture import CaptureManager from _pytest.capture import CaptureResult from _pytest.capture import MultiCapture from _pytest.config import ExitCode from _pytest.monkeypatch import MonkeyPatch from _pytest.pytester import Pytester # note: py.io capture tests where copied from # pylib 1.4.20.dev2 (rev 13d9af95547e) def StdCaptureFD( out: bool = True, err: bool = True, in_: bool = True ) -> MultiCapture[str]: return capture.MultiCapture( in_=capture.FDCapture(0) if in_ else None, out=capture.FDCapture(1) if out else None, err=capture.FDCapture(2) if err else None, ) def StdCapture( out: bool = True, err: bool = True, in_: bool = True ) -> MultiCapture[str]: return capture.MultiCapture( in_=capture.SysCapture(0) if in_ else None, out=capture.SysCapture(1) if out else None, err=capture.SysCapture(2) if err else None, ) def TeeStdCapture( out: bool = True, err: bool = True, in_: bool = True ) -> MultiCapture[str]: return capture.MultiCapture( in_=capture.SysCapture(0, tee=True) if in_ else None, out=capture.SysCapture(1, tee=True) if out else None, err=capture.SysCapture(2, tee=True) if err else None, ) class TestCaptureManager: @pytest.mark.parametrize("method", ["no", "sys", "fd"]) def test_capturing_basic_api(self, method) -> None: capouter = StdCaptureFD() old = sys.stdout, sys.stderr, sys.stdin try: capman = CaptureManager(method) capman.start_global_capturing() capman.suspend_global_capture() outerr = capman.read_global_capture() assert outerr == ("", "") capman.suspend_global_capture() outerr = capman.read_global_capture() assert outerr == ("", "") print("hello") capman.suspend_global_capture() out, err = capman.read_global_capture() if method == "no": assert old == (sys.stdout, sys.stderr, sys.stdin) else: assert not out capman.resume_global_capture() print("hello") capman.suspend_global_capture() out, err = capman.read_global_capture() if method != "no": assert out == "hello\n" capman.stop_global_capturing() finally: capouter.stop_capturing() def test_init_capturing(self): capouter = StdCaptureFD() try: capman = CaptureManager("fd") capman.start_global_capturing() pytest.raises(AssertionError, capman.start_global_capturing) capman.stop_global_capturing() finally: capouter.stop_capturing() @pytest.mark.parametrize("method", ["fd", "sys"]) def test_capturing_unicode(pytester: Pytester, method: str) -> None: obj = "'b\u00f6y'" pytester.makepyfile( """\ # taken from issue 227 from nosetests def test_unicode(): import sys print(sys.stdout) print(%s) """ % obj ) result = pytester.runpytest("--capture=%s" % method) result.stdout.fnmatch_lines(["*1 passed*"]) @pytest.mark.parametrize("method", ["fd", "sys"]) def test_capturing_bytes_in_utf8_encoding(pytester: Pytester, method: str) -> None: pytester.makepyfile( """\ def test_unicode(): print('b\\u00f6y') """ ) result = pytester.runpytest("--capture=%s" % method) result.stdout.fnmatch_lines(["*1 passed*"]) def test_collect_capturing(pytester: Pytester) -> None: p = pytester.makepyfile( """ import sys print("collect %s failure" % 13) sys.stderr.write("collect %s_stderr failure" % 13) import xyz42123 """ ) result = pytester.runpytest(p) result.stdout.fnmatch_lines( [ "*Captured stdout*", "collect 13 failure", "*Captured stderr*", "collect 13_stderr failure", ] ) class TestPerTestCapturing: def test_capture_and_fixtures(self, pytester: Pytester) -> None: p = pytester.makepyfile( """ def setup_module(mod): print("setup module") def setup_function(function): print("setup " + function.__name__) def test_func1(): print("in func1") assert 0 def test_func2(): print("in func2") assert 0 """ ) result = pytester.runpytest(p) result.stdout.fnmatch_lines( [ "setup module*", "setup test_func1*", "in func1*", "setup test_func2*", "in func2*", ] ) @pytest.mark.xfail(reason="unimplemented feature") def test_capture_scope_cache(self, pytester: Pytester) -> None: p = pytester.makepyfile( """ import sys def setup_module(func): print("module-setup") def setup_function(func): print("function-setup") def test_func(): print("in function") assert 0 def teardown_function(func): print("in teardown") """ ) result = pytester.runpytest(p) result.stdout.fnmatch_lines( [ "*test_func():*", "*Captured stdout during setup*", "module-setup*", "function-setup*", "*Captured stdout*", "in teardown*", ] ) def test_no_carry_over(self, pytester: Pytester) -> None: p = pytester.makepyfile( """ def test_func1(): print("in func1") def test_func2(): print("in func2") assert 0 """ ) result = pytester.runpytest(p) s = result.stdout.str() assert "in func1" not in s assert "in func2" in s def test_teardown_capturing(self, pytester: Pytester) -> None: p = pytester.makepyfile( """ def setup_function(function): print("setup func1") def teardown_function(function): print("teardown func1") assert 0 def test_func1(): print("in func1") pass """ ) result = pytester.runpytest(p) result.stdout.fnmatch_lines( [ "*teardown_function*", "*Captured stdout*", "setup func1*", "in func1*", "teardown func1*", # "*1 fixture failure*" ] ) def test_teardown_capturing_final(self, pytester: Pytester) -> None: p = pytester.makepyfile( """ def teardown_module(mod): print("teardown module") assert 0 def test_func(): pass """ ) result = pytester.runpytest(p) result.stdout.fnmatch_lines( [ "*def teardown_module(mod):*", "*Captured stdout*", "*teardown module*", "*1 error*", ] ) def test_capturing_outerr(self, pytester: Pytester) -> None: p1 = pytester.makepyfile( """\ import sys def test_capturing(): print(42) sys.stderr.write(str(23)) def test_capturing_error(): print(1) sys.stderr.write(str(2)) raise ValueError """ ) result = pytester.runpytest(p1) result.stdout.fnmatch_lines( [ "*test_capturing_outerr.py .F*", "====* FAILURES *====", "____*____", "*test_capturing_outerr.py:8: ValueError", "*--- Captured stdout *call*", "1", "*--- Captured stderr *call*", "2", ] ) class TestLoggingInteraction: def test_logging_stream_ownership(self, pytester: Pytester) -> None: p = pytester.makepyfile( """\ def test_logging(): import logging import pytest stream = capture.CaptureIO() logging.basicConfig(stream=stream) stream.close() # to free memory/release resources """ ) result = pytester.runpytest_subprocess(p) assert result.stderr.str().find("atexit") == -1 def test_logging_and_immediate_setupteardown(self, pytester: Pytester) -> None: p = pytester.makepyfile( """\ import logging def setup_function(function): logging.warning("hello1") def test_logging(): logging.warning("hello2") assert 0 def teardown_function(function): logging.warning("hello3") assert 0 """ ) for optargs in (("--capture=sys",), ("--capture=fd",)): print(optargs) result = pytester.runpytest_subprocess(p, *optargs) s = result.stdout.str() result.stdout.fnmatch_lines( ["*WARN*hello3", "*WARN*hello1", "*WARN*hello2"] # errors show first! ) # verify proper termination assert "closed" not in s def test_logging_and_crossscope_fixtures(self, pytester: Pytester) -> None: p = pytester.makepyfile( """\ import logging def setup_module(function): logging.warning("hello1") def test_logging(): logging.warning("hello2") assert 0 def teardown_module(function): logging.warning("hello3") assert 0 """ ) for optargs in (("--capture=sys",), ("--capture=fd",)): print(optargs) result = pytester.runpytest_subprocess(p, *optargs) s = result.stdout.str() result.stdout.fnmatch_lines( ["*WARN*hello3", "*WARN*hello1", "*WARN*hello2"] # errors come first ) # verify proper termination assert "closed" not in s def test_conftestlogging_is_shown(self, pytester: Pytester) -> None: pytester.makeconftest( """\ import logging logging.basicConfig() logging.warning("hello435") """ ) # make sure that logging is still captured in tests result = pytester.runpytest_subprocess("-s", "-p", "no:capturelog") assert result.ret == ExitCode.NO_TESTS_COLLECTED result.stderr.fnmatch_lines(["WARNING*hello435*"]) assert "operation on closed file" not in result.stderr.str() def test_conftestlogging_and_test_logging(self, pytester: Pytester) -> None: pytester.makeconftest( """\ import logging logging.basicConfig() """ ) # make sure that logging is still captured in tests p = pytester.makepyfile( """\ def test_hello(): import logging logging.warning("hello433") assert 0 """ ) result = pytester.runpytest_subprocess(p, "-p", "no:capturelog") assert result.ret != 0 result.stdout.fnmatch_lines(["WARNING*hello433*"]) assert "something" not in result.stderr.str() assert "operation on closed file" not in result.stderr.str() def test_logging_after_cap_stopped(self, pytester: Pytester) -> None: pytester.makeconftest( """\ import pytest import logging log = logging.getLogger(__name__) @pytest.fixture def log_on_teardown(): yield log.warning('Logging on teardown') """ ) # make sure that logging is still captured in tests p = pytester.makepyfile( """\ def test_hello(log_on_teardown): import logging logging.warning("hello433") assert 1 raise KeyboardInterrupt() """ ) result = pytester.runpytest_subprocess(p, "--log-cli-level", "info") assert result.ret != 0 result.stdout.fnmatch_lines( ["*WARNING*hello433*", "*WARNING*Logging on teardown*"] ) assert ( "AttributeError: 'NoneType' object has no attribute 'resume_capturing'" not in result.stderr.str() ) class TestCaptureFixture: @pytest.mark.parametrize("opt", [[], ["-s"]]) def test_std_functional(self, pytester: Pytester, opt) -> None: reprec = pytester.inline_runsource( """\ def test_hello(capsys): print(42) out, err = capsys.readouterr() assert out.startswith("42") """, *opt, ) reprec.assertoutcome(passed=1) def test_capsyscapfd(self, pytester: Pytester) -> None: p = pytester.makepyfile( """\ def test_one(capsys, capfd): pass def test_two(capfd, capsys): pass """ ) result = pytester.runpytest(p) result.stdout.fnmatch_lines( [ "*ERROR*setup*test_one*", "E*capfd*capsys*same*time*", "*ERROR*setup*test_two*", "E*capsys*capfd*same*time*", "*2 errors*", ] ) def test_capturing_getfixturevalue(self, pytester: Pytester) -> None: """Test that asking for "capfd" and "capsys" using request.getfixturevalue in the same test is an error. """ pytester.makepyfile( """\ def test_one(capsys, request): request.getfixturevalue("capfd") def test_two(capfd, request): request.getfixturevalue("capsys") """ ) result = pytester.runpytest() result.stdout.fnmatch_lines( [ "*test_one*", "E * cannot use capfd and capsys at the same time", "*test_two*", "E * cannot use capsys and capfd at the same time", "*2 failed in*", ] ) def test_capsyscapfdbinary(self, pytester: Pytester) -> None: p = pytester.makepyfile( """\ def test_one(capsys, capfdbinary): pass """ ) result = pytester.runpytest(p) result.stdout.fnmatch_lines( ["*ERROR*setup*test_one*", "E*capfdbinary*capsys*same*time*", "*1 error*"] ) @pytest.mark.parametrize("method", ["sys", "fd"]) def test_capture_is_represented_on_failure_issue128( self, pytester: Pytester, method ) -> None: p = pytester.makepyfile( """\ def test_hello(cap{}): print("xxx42xxx") assert 0 """.format( method ) ) result = pytester.runpytest(p) result.stdout.fnmatch_lines(["xxx42xxx"]) def test_stdfd_functional(self, pytester: Pytester) -> None: reprec = pytester.inline_runsource( """\ def test_hello(capfd): import os os.write(1, b"42") out, err = capfd.readouterr() assert out.startswith("42") capfd.close() """ ) reprec.assertoutcome(passed=1) @pytest.mark.parametrize("nl", ("\n", "\r\n", "\r")) def test_cafd_preserves_newlines(self, capfd, nl) -> None: print("test", end=nl) out, err = capfd.readouterr() assert out.endswith(nl) def test_capfdbinary(self, pytester: Pytester) -> None: reprec = pytester.inline_runsource( """\ def test_hello(capfdbinary): import os # some likely un-decodable bytes os.write(1, b'\\xfe\\x98\\x20') out, err = capfdbinary.readouterr() assert out == b'\\xfe\\x98\\x20' assert err == b'' """ ) reprec.assertoutcome(passed=1) def test_capsysbinary(self, pytester: Pytester) -> None: p1 = pytester.makepyfile( r""" def test_hello(capsysbinary): import sys sys.stdout.buffer.write(b'hello') # Some likely un-decodable bytes. sys.stdout.buffer.write(b'\xfe\x98\x20') sys.stdout.buffer.flush() # Ensure writing in text mode still works and is captured. # https://github.com/pytest-dev/pytest/issues/6871 print("world", flush=True) out, err = capsysbinary.readouterr() assert out == b'hello\xfe\x98\x20world\n' assert err == b'' print("stdout after") print("stderr after", file=sys.stderr) """ ) result = pytester.runpytest(str(p1), "-rA") result.stdout.fnmatch_lines( [ "*- Captured stdout call -*", "stdout after", "*- Captured stderr call -*", "stderr after", "*= 1 passed in *", ] ) def test_partial_setup_failure(self, pytester: Pytester) -> None: p = pytester.makepyfile( """\ def test_hello(capsys, missingarg): pass """ ) result = pytester.runpytest(p) result.stdout.fnmatch_lines(["*test_partial_setup_failure*", "*1 error*"]) def test_keyboardinterrupt_disables_capturing(self, pytester: Pytester) -> None: p = pytester.makepyfile( """\ def test_hello(capfd): import os os.write(1, b'42') raise KeyboardInterrupt() """ ) result = pytester.runpytest_subprocess(p) result.stdout.fnmatch_lines(["*KeyboardInterrupt*"]) assert result.ret == 2 def test_capture_and_logging(self, pytester: Pytester) -> None: """#14""" p = pytester.makepyfile( """\ import logging def test_log(capsys): logging.error('x') """ ) result = pytester.runpytest_subprocess(p) assert "closed" not in result.stderr.str() @pytest.mark.parametrize("fixture", ["capsys", "capfd"]) @pytest.mark.parametrize("no_capture", [True, False]) def test_disabled_capture_fixture( self, pytester: Pytester, fixture: str, no_capture: bool ) -> None: pytester.makepyfile( """\ def test_disabled({fixture}): print('captured before') with {fixture}.disabled(): print('while capture is disabled') print('captured after') assert {fixture}.readouterr() == ('captured before\\ncaptured after\\n', '') def test_normal(): print('test_normal executed') """.format( fixture=fixture ) ) args = ("-s",) if no_capture else () result = pytester.runpytest_subprocess(*args) result.stdout.fnmatch_lines(["*while capture is disabled*", "*= 2 passed in *"]) result.stdout.no_fnmatch_line("*captured before*") result.stdout.no_fnmatch_line("*captured after*") if no_capture: assert "test_normal executed" in result.stdout.str() else: result.stdout.no_fnmatch_line("*test_normal executed*") def test_disabled_capture_fixture_twice(self, pytester: Pytester) -> None: """Test that an inner disabled() exit doesn't undo an outer disabled(). Issue #7148. """ pytester.makepyfile( """ def test_disabled(capfd): print('captured before') with capfd.disabled(): print('while capture is disabled 1') with capfd.disabled(): print('while capture is disabled 2') print('while capture is disabled 1 after') print('captured after') assert capfd.readouterr() == ('captured before\\ncaptured after\\n', '') """ ) result = pytester.runpytest_subprocess() result.stdout.fnmatch_lines( [ "*while capture is disabled 1", "*while capture is disabled 2", "*while capture is disabled 1 after", ], consecutive=True, ) @pytest.mark.parametrize("fixture", ["capsys", "capfd"]) def test_fixture_use_by_other_fixtures(self, pytester: Pytester, fixture) -> None: """Ensure that capsys and capfd can be used by other fixtures during setup and teardown.""" pytester.makepyfile( """\ import sys import pytest @pytest.fixture def captured_print({fixture}): print('stdout contents begin') print('stderr contents begin', file=sys.stderr) out, err = {fixture}.readouterr() yield out, err print('stdout contents end') print('stderr contents end', file=sys.stderr) out, err = {fixture}.readouterr() assert out == 'stdout contents end\\n' assert err == 'stderr contents end\\n' def test_captured_print(captured_print): out, err = captured_print assert out == 'stdout contents begin\\n' assert err == 'stderr contents begin\\n' """.format( fixture=fixture ) ) result = pytester.runpytest_subprocess() result.stdout.fnmatch_lines(["*1 passed*"]) result.stdout.no_fnmatch_line("*stdout contents begin*") result.stdout.no_fnmatch_line("*stderr contents begin*") @pytest.mark.parametrize("cap", ["capsys", "capfd"]) def test_fixture_use_by_other_fixtures_teardown( self, pytester: Pytester, cap ) -> None: """Ensure we can access setup and teardown buffers from teardown when using capsys/capfd (##3033)""" pytester.makepyfile( """\ import sys import pytest import os @pytest.fixture() def fix({cap}): print("setup out") sys.stderr.write("setup err\\n") yield out, err = {cap}.readouterr() assert out == 'setup out\\ncall out\\n' assert err == 'setup err\\ncall err\\n' def test_a(fix): print("call out") sys.stderr.write("call err\\n") """.format( cap=cap ) ) reprec = pytester.inline_run() reprec.assertoutcome(passed=1) def test_setup_failure_does_not_kill_capturing(pytester: Pytester) -> None: sub1 = pytester.mkpydir("sub1") sub1.joinpath("conftest.py").write_text( textwrap.dedent( """\ def pytest_runtest_setup(item): raise ValueError(42) """ ) ) sub1.joinpath("test_mod.py").write_text("def test_func1(): pass") result = pytester.runpytest(pytester.path, "--traceconfig") result.stdout.fnmatch_lines(["*ValueError(42)*", "*1 error*"]) def test_capture_conftest_runtest_setup(pytester: Pytester) -> None: pytester.makeconftest( """ def pytest_runtest_setup(): print("hello19") """ ) pytester.makepyfile("def test_func(): pass") result = pytester.runpytest() assert result.ret == 0 result.stdout.no_fnmatch_line("*hello19*") def test_capture_badoutput_issue412(pytester: Pytester) -> None: pytester.makepyfile( """ import os def test_func(): omg = bytearray([1,129,1]) os.write(1, omg) assert 0 """ ) result = pytester.runpytest("--capture=fd") result.stdout.fnmatch_lines( """ *def test_func* *assert 0* *Captured* *1 failed* """ ) def test_capture_early_option_parsing(pytester: Pytester) -> None: pytester.makeconftest( """ def pytest_runtest_setup(): print("hello19") """ ) pytester.makepyfile("def test_func(): pass") result = pytester.runpytest("-vs") assert result.ret == 0 assert "hello19" in result.stdout.str() def test_capture_binary_output(pytester: Pytester) -> None: pytester.makepyfile( r""" import pytest def test_a(): import sys import subprocess subprocess.call([sys.executable, __file__]) def test_foo(): import os;os.write(1, b'\xc3') if __name__ == '__main__': test_foo() """ ) result = pytester.runpytest("--assert=plain") result.assert_outcomes(passed=2) def test_error_during_readouterr(pytester: Pytester) -> None: """Make sure we suspend capturing if errors occur during readouterr""" pytester.makepyfile( pytest_xyz=""" from _pytest.capture import FDCapture def bad_snap(self): raise Exception('boom') assert FDCapture.snap FDCapture.snap = bad_snap """ ) result = pytester.runpytest_subprocess("-p", "pytest_xyz", "--version") result.stderr.fnmatch_lines( ["*in bad_snap", " raise Exception('boom')", "Exception: boom"] ) class TestCaptureIO: def test_text(self) -> None: f = capture.CaptureIO() f.write("hello") s = f.getvalue() assert s == "hello" f.close() def test_unicode_and_str_mixture(self) -> None: f = capture.CaptureIO() f.write("\u00f6") pytest.raises(TypeError, f.write, b"hello") def test_write_bytes_to_buffer(self) -> None: """In python3, stdout / stderr are text io wrappers (exposing a buffer property of the underlying bytestream). See issue #1407 """ f = capture.CaptureIO() f.buffer.write(b"foo\r\n") assert f.getvalue() == "foo\r\n" class TestTeeCaptureIO(TestCaptureIO): def test_text(self) -> None: sio = io.StringIO() f = capture.TeeCaptureIO(sio) f.write("hello") s1 = f.getvalue() assert s1 == "hello" s2 = sio.getvalue() assert s2 == s1 f.close() sio.close() def test_unicode_and_str_mixture(self) -> None: sio = io.StringIO() f = capture.TeeCaptureIO(sio) f.write("\u00f6") pytest.raises(TypeError, f.write, b"hello") def test_dontreadfrominput() -> None: from _pytest.capture import DontReadFromInput f = DontReadFromInput() assert f.buffer is f assert not f.isatty() pytest.raises(OSError, f.read) pytest.raises(OSError, f.readlines) iter_f = iter(f) pytest.raises(OSError, next, iter_f) pytest.raises(UnsupportedOperation, f.fileno) pytest.raises(UnsupportedOperation, f.flush) assert not f.readable() pytest.raises(UnsupportedOperation, f.seek, 0) assert not f.seekable() pytest.raises(UnsupportedOperation, f.tell) pytest.raises(UnsupportedOperation, f.truncate, 0) pytest.raises(UnsupportedOperation, f.write, b"") pytest.raises(UnsupportedOperation, f.writelines, []) assert not f.writable() f.close() # just for completeness def test_captureresult() -> None: cr = CaptureResult("out", "err") assert len(cr) == 2 assert cr.out == "out" assert cr.err == "err" out, err = cr assert out == "out" assert err == "err" assert cr[0] == "out" assert cr[1] == "err" assert cr == cr assert cr == CaptureResult("out", "err") assert cr != CaptureResult("wrong", "err") assert cr == ("out", "err") assert cr != ("out", "wrong") assert hash(cr) == hash(CaptureResult("out", "err")) assert hash(cr) == hash(("out", "err")) assert hash(cr) != hash(("out", "wrong")) assert cr < ("z",) assert cr < ("z", "b") assert cr < ("z", "b", "c") assert cr.count("err") == 1 assert cr.count("wrong") == 0 assert cr.index("err") == 1 with pytest.raises(ValueError): assert cr.index("wrong") == 0 assert next(iter(cr)) == "out" assert cr._replace(err="replaced") == ("out", "replaced") @pytest.fixture def tmpfile(pytester: Pytester) -> Generator[BinaryIO, None, None]: f = pytester.makepyfile("").open("wb+") yield f if not f.closed: f.close() @contextlib.contextmanager def lsof_check(): pid = os.getpid() try: out = subprocess.check_output(("lsof", "-p", str(pid))).decode() except (OSError, subprocess.CalledProcessError, UnicodeDecodeError) as exc: # about UnicodeDecodeError, see note on pytester pytest.skip(f"could not run 'lsof' ({exc!r})") yield out2 = subprocess.check_output(("lsof", "-p", str(pid))).decode() len1 = len([x for x in out.split("\n") if "REG" in x]) len2 = len([x for x in out2.split("\n") if "REG" in x]) assert len2 < len1 + 3, out2 class TestFDCapture: def test_simple(self, tmpfile: BinaryIO) -> None: fd = tmpfile.fileno() cap = capture.FDCapture(fd) data = b"hello" os.write(fd, data) pytest.raises(AssertionError, cap.snap) cap.done() cap = capture.FDCapture(fd) cap.start() os.write(fd, data) s = cap.snap() cap.done() assert s == "hello" def test_simple_many(self, tmpfile: BinaryIO) -> None: for i in range(10): self.test_simple(tmpfile) def test_simple_many_check_open_files(self, pytester: Pytester) -> None: with lsof_check(): with pytester.makepyfile("").open("wb+") as tmpfile: self.test_simple_many(tmpfile) def test_simple_fail_second_start(self, tmpfile: BinaryIO) -> None: fd = tmpfile.fileno() cap = capture.FDCapture(fd) cap.done() pytest.raises(AssertionError, cap.start) def test_stderr(self) -> None: cap = capture.FDCapture(2) cap.start() print("hello", file=sys.stderr) s = cap.snap() cap.done() assert s == "hello\n" def test_stdin(self) -> None: cap = capture.FDCapture(0) cap.start() x = os.read(0, 100).strip() cap.done() assert x == b"" def test_writeorg(self, tmpfile: BinaryIO) -> None: data1, data2 = b"foo", b"bar" cap = capture.FDCapture(tmpfile.fileno()) cap.start() tmpfile.write(data1) tmpfile.flush() cap.writeorg(data2.decode("ascii")) scap = cap.snap() cap.done() assert scap == data1.decode("ascii") with open(tmpfile.name, "rb") as stmp_file: stmp = stmp_file.read() assert stmp == data2 def test_simple_resume_suspend(self) -> None: with saved_fd(1): cap = capture.FDCapture(1) cap.start() data = b"hello" os.write(1, data) sys.stdout.write("whatever") s = cap.snap() assert s == "hellowhatever" cap.suspend() os.write(1, b"world") sys.stdout.write("qlwkej") assert not cap.snap() cap.resume() os.write(1, b"but now") sys.stdout.write(" yes\n") s = cap.snap() assert s == "but now yes\n" cap.suspend() cap.done() pytest.raises(AssertionError, cap.suspend) assert repr(cap) == ( "".format( cap.targetfd_save, cap.tmpfile ) ) # Should not crash with missing "_old". assert repr(cap.syscapture) == ( " _state='done' tmpfile={!r}>".format( cap.syscapture.tmpfile ) ) def test_capfd_sys_stdout_mode(self, capfd) -> None: assert "b" not in sys.stdout.mode @contextlib.contextmanager def saved_fd(fd): new_fd = os.dup(fd) try: yield finally: os.dup2(new_fd, fd) os.close(new_fd) class TestStdCapture: captureclass = staticmethod(StdCapture) @contextlib.contextmanager def getcapture(self, **kw): cap = self.__class__.captureclass(**kw) cap.start_capturing() try: yield cap finally: cap.stop_capturing() def test_capturing_done_simple(self) -> None: with self.getcapture() as cap: sys.stdout.write("hello") sys.stderr.write("world") out, err = cap.readouterr() assert out == "hello" assert err == "world" def test_capturing_reset_simple(self) -> None: with self.getcapture() as cap: print("hello world") sys.stderr.write("hello error\n") out, err = cap.readouterr() assert out == "hello world\n" assert err == "hello error\n" def test_capturing_readouterr(self) -> None: with self.getcapture() as cap: print("hello world") sys.stderr.write("hello error\n") out, err = cap.readouterr() assert out == "hello world\n" assert err == "hello error\n" sys.stderr.write("error2") out, err = cap.readouterr() assert err == "error2" def test_capture_results_accessible_by_attribute(self) -> None: with self.getcapture() as cap: sys.stdout.write("hello") sys.stderr.write("world") capture_result = cap.readouterr() assert capture_result.out == "hello" assert capture_result.err == "world" def test_capturing_readouterr_unicode(self) -> None: with self.getcapture() as cap: print("hxąć") out, err = cap.readouterr() assert out == "hxąć\n" def test_reset_twice_error(self) -> None: with self.getcapture() as cap: print("hello") out, err = cap.readouterr() pytest.raises(ValueError, cap.stop_capturing) assert out == "hello\n" assert not err def test_capturing_modify_sysouterr_in_between(self) -> None: oldout = sys.stdout olderr = sys.stderr with self.getcapture() as cap: sys.stdout.write("hello") sys.stderr.write("world") sys.stdout = capture.CaptureIO() sys.stderr = capture.CaptureIO() print("not seen") sys.stderr.write("not seen\n") out, err = cap.readouterr() assert out == "hello" assert err == "world" assert sys.stdout == oldout assert sys.stderr == olderr def test_capturing_error_recursive(self) -> None: with self.getcapture() as cap1: print("cap1") with self.getcapture() as cap2: print("cap2") out2, err2 = cap2.readouterr() out1, err1 = cap1.readouterr() assert out1 == "cap1\n" assert out2 == "cap2\n" def test_just_out_capture(self) -> None: with self.getcapture(out=True, err=False) as cap: sys.stdout.write("hello") sys.stderr.write("world") out, err = cap.readouterr() assert out == "hello" assert not err def test_just_err_capture(self) -> None: with self.getcapture(out=False, err=True) as cap: sys.stdout.write("hello") sys.stderr.write("world") out, err = cap.readouterr() assert err == "world" assert not out def test_stdin_restored(self) -> None: old = sys.stdin with self.getcapture(in_=True): newstdin = sys.stdin assert newstdin != sys.stdin assert sys.stdin is old def test_stdin_nulled_by_default(self) -> None: print("XXX this test may well hang instead of crashing") print("XXX which indicates an error in the underlying capturing") print("XXX mechanisms") with self.getcapture(): pytest.raises(OSError, sys.stdin.read) class TestTeeStdCapture(TestStdCapture): captureclass = staticmethod(TeeStdCapture) def test_capturing_error_recursive(self) -> None: r"""For TeeStdCapture since we passthrough stderr/stdout, cap1 should get all output, while cap2 should only get "cap2\n".""" with self.getcapture() as cap1: print("cap1") with self.getcapture() as cap2: print("cap2") out2, err2 = cap2.readouterr() out1, err1 = cap1.readouterr() assert out1 == "cap1\ncap2\n" assert out2 == "cap2\n" class TestStdCaptureFD(TestStdCapture): captureclass = staticmethod(StdCaptureFD) def test_simple_only_fd(self, pytester: Pytester) -> None: pytester.makepyfile( """\ import os def test_x(): os.write(1, b"hello\\n") assert 0 """ ) result = pytester.runpytest_subprocess() result.stdout.fnmatch_lines( """ *test_x* *assert 0* *Captured stdout* """ ) def test_intermingling(self): with self.getcapture() as cap: os.write(1, b"1") sys.stdout.write(str(2)) sys.stdout.flush() os.write(1, b"3") os.write(2, b"a") sys.stderr.write("b") sys.stderr.flush() os.write(2, b"c") out, err = cap.readouterr() assert out == "123" assert err == "abc" def test_many(self, capfd): with lsof_check(): for i in range(10): cap = StdCaptureFD() cap.start_capturing() cap.stop_capturing() class TestStdCaptureFDinvalidFD: def test_stdcapture_fd_invalid_fd(self, pytester: Pytester) -> None: pytester.makepyfile( """ import os from fnmatch import fnmatch from _pytest import capture def StdCaptureFD(out=True, err=True, in_=True): return capture.MultiCapture( in_=capture.FDCapture(0) if in_ else None, out=capture.FDCapture(1) if out else None, err=capture.FDCapture(2) if err else None, ) def test_stdout(): os.close(1) cap = StdCaptureFD(out=True, err=False, in_=False) assert fnmatch(repr(cap.out), "") cap.start_capturing() os.write(1, b"stdout") assert cap.readouterr() == ("stdout", "") cap.stop_capturing() def test_stderr(): os.close(2) cap = StdCaptureFD(out=False, err=True, in_=False) assert fnmatch(repr(cap.err), "") cap.start_capturing() os.write(2, b"stderr") assert cap.readouterr() == ("", "stderr") cap.stop_capturing() def test_stdin(): os.close(0) cap = StdCaptureFD(out=False, err=False, in_=True) assert fnmatch(repr(cap.in_), "") cap.stop_capturing() """ ) result = pytester.runpytest_subprocess("--capture=fd") assert result.ret == 0 assert result.parseoutcomes()["passed"] == 3 def test_fdcapture_invalid_fd_with_fd_reuse(self, pytester: Pytester) -> None: with saved_fd(1): os.close(1) cap = capture.FDCaptureBinary(1) cap.start() os.write(1, b"started") cap.suspend() os.write(1, b" suspended") cap.resume() os.write(1, b" resumed") assert cap.snap() == b"started resumed" cap.done() with pytest.raises(OSError): os.write(1, b"done") def test_fdcapture_invalid_fd_without_fd_reuse(self, pytester: Pytester) -> None: with saved_fd(1), saved_fd(2): os.close(1) os.close(2) cap = capture.FDCaptureBinary(2) cap.start() os.write(2, b"started") cap.suspend() os.write(2, b" suspended") cap.resume() os.write(2, b" resumed") assert cap.snap() == b"started resumed" cap.done() with pytest.raises(OSError): os.write(2, b"done") def test_capture_not_started_but_reset() -> None: capsys = StdCapture() capsys.stop_capturing() def test_using_capsys_fixture_works_with_sys_stdout_encoding( capsys: CaptureFixture[str], ) -> None: test_text = "test text" print(test_text.encode(sys.stdout.encoding, "replace")) (out, err) = capsys.readouterr() assert out assert err == "" def test_capsys_results_accessible_by_attribute(capsys: CaptureFixture[str]) -> None: sys.stdout.write("spam") sys.stderr.write("eggs") capture_result = capsys.readouterr() assert capture_result.out == "spam" assert capture_result.err == "eggs" def test_fdcapture_tmpfile_remains_the_same() -> None: cap = StdCaptureFD(out=False, err=True) try: cap.start_capturing() capfile = cap.err.tmpfile cap.readouterr() finally: cap.stop_capturing() capfile2 = cap.err.tmpfile assert capfile2 == capfile def test_close_and_capture_again(pytester: Pytester) -> None: pytester.makepyfile( """ import os def test_close(): os.close(1) def test_capture_again(): os.write(1, b"hello\\n") assert 0 """ ) result = pytester.runpytest_subprocess() result.stdout.fnmatch_lines( """ *test_capture_again* *assert 0* *stdout* *hello* """ ) @pytest.mark.parametrize( "method", ["SysCapture(2)", "SysCapture(2, tee=True)", "FDCapture(2)"] ) def test_capturing_and_logging_fundamentals(pytester: Pytester, method: str) -> None: # here we check a fundamental feature p = pytester.makepyfile( """ import sys, os, logging from _pytest import capture cap = capture.MultiCapture( in_=None, out=None, err=capture.%s, ) cap.start_capturing() logging.warning("hello1") outerr = cap.readouterr() print("suspend, captured %%s" %%(outerr,)) logging.warning("hello2") cap.pop_outerr_to_orig() logging.warning("hello3") outerr = cap.readouterr() print("suspend2, captured %%s" %% (outerr,)) """ % (method,) ) result = pytester.runpython(p) result.stdout.fnmatch_lines( """ suspend, captured*hello1* suspend2, captured*WARNING:root:hello3* """ ) result.stderr.fnmatch_lines( """ WARNING:root:hello2 """ ) assert "atexit" not in result.stderr.str() def test_error_attribute_issue555(pytester: Pytester) -> None: pytester.makepyfile( """ import sys def test_capattr(): assert sys.stdout.errors == "replace" assert sys.stderr.errors == "replace" """ ) reprec = pytester.inline_run() reprec.assertoutcome(passed=1) @pytest.mark.skipif( not sys.platform.startswith("win"), reason="only on windows", ) def test_windowsconsoleio_workaround_non_standard_streams() -> None: """ Ensure _windowsconsoleio_workaround function works with objects that do not implement the full ``io``-based stream protocol, for example execnet channels (#2666). """ from _pytest.capture import _windowsconsoleio_workaround class DummyStream: def write(self, s): pass stream = cast(TextIO, DummyStream()) _windowsconsoleio_workaround(stream) def test_dontreadfrominput_has_encoding(pytester: Pytester) -> None: pytester.makepyfile( """ import sys def test_capattr(): # should not raise AttributeError assert sys.stdout.encoding assert sys.stderr.encoding """ ) reprec = pytester.inline_run() reprec.assertoutcome(passed=1) def test_crash_on_closing_tmpfile_py27( pytester: Pytester, monkeypatch: MonkeyPatch ) -> None: p = pytester.makepyfile( """ import threading import sys printing = threading.Event() def spam(): f = sys.stderr print('SPAMBEFORE', end='', file=f) printing.set() while True: try: f.flush() except (OSError, ValueError): break def test_spam_in_thread(): t = threading.Thread(target=spam) t.daemon = True t.start() printing.wait() """ ) # Do not consider plugins like hypothesis, which might output to stderr. monkeypatch.setenv("PYTEST_DISABLE_PLUGIN_AUTOLOAD", "1") result = pytester.runpytest_subprocess(str(p)) assert result.ret == 0 assert result.stderr.str() == "" result.stdout.no_fnmatch_line("*OSError*") def test_global_capture_with_live_logging(pytester: Pytester) -> None: # Issue 3819 # capture should work with live cli logging # Teardown report seems to have the capture for the whole process (setup, capture, teardown) pytester.makeconftest( """ def pytest_runtest_logreport(report): if "test_global" in report.nodeid: if report.when == "teardown": with open("caplog", "w") as f: f.write(report.caplog) with open("capstdout", "w") as f: f.write(report.capstdout) """ ) pytester.makepyfile( """ import logging import sys import pytest logger = logging.getLogger(__name__) @pytest.fixture def fix1(): print("fix setup") logging.info("fix setup") yield logging.info("fix teardown") print("fix teardown") def test_global(fix1): print("begin test") logging.info("something in test") print("end test") """ ) result = pytester.runpytest_subprocess("--log-cli-level=INFO") assert result.ret == 0 with open("caplog") as f: caplog = f.read() assert "fix setup" in caplog assert "something in test" in caplog assert "fix teardown" in caplog with open("capstdout") as f: capstdout = f.read() assert "fix setup" in capstdout assert "begin test" in capstdout assert "end test" in capstdout assert "fix teardown" in capstdout @pytest.mark.parametrize("capture_fixture", ["capsys", "capfd"]) def test_capture_with_live_logging( pytester: Pytester, capture_fixture: CaptureFixture[str] ) -> None: # Issue 3819 # capture should work with live cli logging pytester.makepyfile( """ import logging import sys logger = logging.getLogger(__name__) def test_capture({0}): print("hello") sys.stderr.write("world\\n") captured = {0}.readouterr() assert captured.out == "hello\\n" assert captured.err == "world\\n" logging.info("something") print("next") logging.info("something") captured = {0}.readouterr() assert captured.out == "next\\n" """.format( capture_fixture ) ) result = pytester.runpytest_subprocess("--log-cli-level=INFO") assert result.ret == 0 def test_typeerror_encodedfile_write(pytester: Pytester) -> None: """It should behave the same with and without output capturing (#4861).""" p = pytester.makepyfile( """ def test_fails(): import sys sys.stdout.write(b"foo") """ ) result_without_capture = pytester.runpytest("-s", str(p)) result_with_capture = pytester.runpytest(str(p)) assert result_with_capture.ret == result_without_capture.ret out = result_with_capture.stdout.str() assert ("TypeError: write() argument must be str, not bytes" in out) or ( "TypeError: unicode argument expected, got 'bytes'" in out ) def test_stderr_write_returns_len(capsys: CaptureFixture[str]) -> None: """Write on Encoded files, namely captured stderr, should return number of characters written.""" assert sys.stderr.write("Foo") == 3 def test_encodedfile_writelines(tmpfile: BinaryIO) -> None: ef = capture.EncodedFile(tmpfile, encoding="utf-8") with pytest.raises(TypeError): ef.writelines([b"line1", b"line2"]) # type: ignore[list-item] assert ef.writelines(["line3", "line4"]) is None # type: ignore[func-returns-value] ef.flush() tmpfile.seek(0) assert tmpfile.read() == b"line3line4" tmpfile.close() with pytest.raises(ValueError): ef.read() def test__get_multicapture() -> None: assert isinstance(_get_multicapture("no"), MultiCapture) pytest.raises(ValueError, _get_multicapture, "unknown").match( r"^unknown capturing method: 'unknown'" ) def test_logging_while_collecting(pytester: Pytester) -> None: """Issue #6240: Calls to logging.xxx() during collection causes all logging calls to be duplicated to stderr""" p = pytester.makepyfile( """\ import logging logging.warning("during collection") def test_logging(): logging.warning("during call") assert False """ ) result = pytester.runpytest_subprocess(p) assert result.ret == ExitCode.TESTS_FAILED result.stdout.fnmatch_lines( [ "*test_*.py F*", "====* FAILURES *====", "____*____", "*--- Captured log call*", "WARNING * during call", "*1 failed*", ] ) result.stdout.no_fnmatch_line("*Captured stderr call*") result.stdout.no_fnmatch_line("*during collection*")