import contextlib import io import os import pickle import subprocess import sys import textwrap from io import StringIO from io import UnsupportedOperation from typing import BinaryIO from typing import Generator from typing import List from typing import TextIO import pytest from _pytest import capture from _pytest.capture import CaptureManager from _pytest.config import ExitCode # note: py.io capture tests where copied from # pylib 1.4.20.dev2 (rev 13d9af95547e) needsosdup = pytest.mark.skipif( not hasattr(os, "dup"), reason="test needs os.dup, not available on this platform" ) def StdCaptureFD(out=True, err=True, in_=True): return capture.MultiCapture(out, err, in_, Capture=capture.FDCapture) def StdCapture(out=True, err=True, in_=True): return capture.MultiCapture(out, err, in_, Capture=capture.SysCapture) def TeeStdCapture(out=True, err=True, in_=True): return capture.MultiCapture(out, err, in_, Capture=capture.TeeSysCapture) class TestCaptureManager: def test_getmethod_default_no_fd(self, monkeypatch): from _pytest.capture import pytest_addoption from _pytest.config.argparsing import Parser parser = Parser() pytest_addoption(parser) default = parser._groups[0].options[0].default assert default == "fd" if hasattr(os, "dup") else "sys" parser = Parser() monkeypatch.delattr(os, "dup", raising=False) pytest_addoption(parser) assert parser._groups[0].options[0].default == "sys" @pytest.mark.parametrize( "method", ["no", "sys", pytest.param("fd", marks=needsosdup)] ) def test_capturing_basic_api(self, method): capouter = StdCaptureFD() old = sys.stdout, sys.stderr, sys.stdin try: capman = CaptureManager(method) capman.start_global_capturing() capman.suspend_global_capture() outerr = capman.read_global_capture() assert outerr == ("", "") capman.suspend_global_capture() outerr = capman.read_global_capture() assert outerr == ("", "") print("hello") capman.suspend_global_capture() out, err = capman.read_global_capture() if method == "no": assert old == (sys.stdout, sys.stderr, sys.stdin) else: assert not out capman.resume_global_capture() print("hello") capman.suspend_global_capture() out, err = capman.read_global_capture() if method != "no": assert out == "hello\n" capman.stop_global_capturing() finally: capouter.stop_capturing() @needsosdup def test_init_capturing(self): capouter = StdCaptureFD() try: capman = CaptureManager("fd") capman.start_global_capturing() pytest.raises(AssertionError, capman.start_global_capturing) capman.stop_global_capturing() finally: capouter.stop_capturing() @pytest.mark.parametrize("method", ["fd", "sys"]) def test_capturing_unicode(testdir, method): obj = "'b\u00f6y'" testdir.makepyfile( """\ # taken from issue 227 from nosetests def test_unicode(): import sys print(sys.stdout) print(%s) """ % obj ) result = testdir.runpytest("--capture=%s" % method) result.stdout.fnmatch_lines(["*1 passed*"]) @pytest.mark.parametrize("method", ["fd", "sys"]) def test_capturing_bytes_in_utf8_encoding(testdir, method): testdir.makepyfile( """\ def test_unicode(): print('b\\u00f6y') """ ) result = testdir.runpytest("--capture=%s" % method) result.stdout.fnmatch_lines(["*1 passed*"]) def test_collect_capturing(testdir): p = testdir.makepyfile( """ import sys print("collect %s failure" % 13) sys.stderr.write("collect %s_stderr failure" % 13) import xyz42123 """ ) result = testdir.runpytest(p) result.stdout.fnmatch_lines( [ "*Captured stdout*", "collect 13 failure", "*Captured stderr*", "collect 13_stderr failure", ] ) class TestPerTestCapturing: def test_capture_and_fixtures(self, testdir): p = testdir.makepyfile( """ def setup_module(mod): print("setup module") def setup_function(function): print("setup " + function.__name__) def test_func1(): print("in func1") assert 0 def test_func2(): print("in func2") assert 0 """ ) result = testdir.runpytest(p) result.stdout.fnmatch_lines( [ "setup module*", "setup test_func1*", "in func1*", "setup test_func2*", "in func2*", ] ) @pytest.mark.xfail(reason="unimplemented feature") def test_capture_scope_cache(self, testdir): p = testdir.makepyfile( """ import sys def setup_module(func): print("module-setup") def setup_function(func): print("function-setup") def test_func(): print("in function") assert 0 def teardown_function(func): print("in teardown") """ ) result = testdir.runpytest(p) result.stdout.fnmatch_lines( [ "*test_func():*", "*Captured stdout during setup*", "module-setup*", "function-setup*", "*Captured stdout*", "in teardown*", ] ) def test_no_carry_over(self, testdir): p = testdir.makepyfile( """ def test_func1(): print("in func1") def test_func2(): print("in func2") assert 0 """ ) result = testdir.runpytest(p) s = result.stdout.str() assert "in func1" not in s assert "in func2" in s def test_teardown_capturing(self, testdir): p = testdir.makepyfile( """ def setup_function(function): print("setup func1") def teardown_function(function): print("teardown func1") assert 0 def test_func1(): print("in func1") pass """ ) result = testdir.runpytest(p) result.stdout.fnmatch_lines( [ "*teardown_function*", "*Captured stdout*", "setup func1*", "in func1*", "teardown func1*", # "*1 fixture failure*" ] ) def test_teardown_capturing_final(self, testdir): p = testdir.makepyfile( """ def teardown_module(mod): print("teardown module") assert 0 def test_func(): pass """ ) result = testdir.runpytest(p) result.stdout.fnmatch_lines( [ "*def teardown_module(mod):*", "*Captured stdout*", "*teardown module*", "*1 error*", ] ) def test_capturing_outerr(self, testdir): p1 = testdir.makepyfile( """\ import sys def test_capturing(): print(42) sys.stderr.write(str(23)) def test_capturing_error(): print(1) sys.stderr.write(str(2)) raise ValueError """ ) result = testdir.runpytest(p1) result.stdout.fnmatch_lines( [ "*test_capturing_outerr.py .F*", "====* FAILURES *====", "____*____", "*test_capturing_outerr.py:8: ValueError", "*--- Captured stdout *call*", "1", "*--- Captured stderr *call*", "2", ] ) class TestLoggingInteraction: def test_logging_stream_ownership(self, testdir): p = testdir.makepyfile( """\ def test_logging(): import logging import pytest stream = capture.CaptureIO() logging.basicConfig(stream=stream) stream.close() # to free memory/release resources """ ) result = testdir.runpytest_subprocess(p) assert result.stderr.str().find("atexit") == -1 def test_logging_and_immediate_setupteardown(self, testdir): p = testdir.makepyfile( """\ import logging def setup_function(function): logging.warning("hello1") def test_logging(): logging.warning("hello2") assert 0 def teardown_function(function): logging.warning("hello3") assert 0 """ ) for optargs in (("--capture=sys",), ("--capture=fd",)): print(optargs) result = testdir.runpytest_subprocess(p, *optargs) s = result.stdout.str() result.stdout.fnmatch_lines( ["*WARN*hello3", "*WARN*hello1", "*WARN*hello2"] # errors show first! ) # verify proper termination assert "closed" not in s def test_logging_and_crossscope_fixtures(self, testdir): p = testdir.makepyfile( """\ import logging def setup_module(function): logging.warning("hello1") def test_logging(): logging.warning("hello2") assert 0 def teardown_module(function): logging.warning("hello3") assert 0 """ ) for optargs in (("--capture=sys",), ("--capture=fd",)): print(optargs) result = testdir.runpytest_subprocess(p, *optargs) s = result.stdout.str() result.stdout.fnmatch_lines( ["*WARN*hello3", "*WARN*hello1", "*WARN*hello2"] # errors come first ) # verify proper termination assert "closed" not in s def test_conftestlogging_is_shown(self, testdir): testdir.makeconftest( """\ import logging logging.basicConfig() logging.warning("hello435") """ ) # make sure that logging is still captured in tests result = testdir.runpytest_subprocess("-s", "-p", "no:capturelog") assert result.ret == ExitCode.NO_TESTS_COLLECTED result.stderr.fnmatch_lines(["WARNING*hello435*"]) assert "operation on closed file" not in result.stderr.str() def test_conftestlogging_and_test_logging(self, testdir): testdir.makeconftest( """\ import logging logging.basicConfig() """ ) # make sure that logging is still captured in tests p = testdir.makepyfile( """\ def test_hello(): import logging logging.warning("hello433") assert 0 """ ) result = testdir.runpytest_subprocess(p, "-p", "no:capturelog") assert result.ret != 0 result.stdout.fnmatch_lines(["WARNING*hello433*"]) assert "something" not in result.stderr.str() assert "operation on closed file" not in result.stderr.str() def test_logging_after_cap_stopped(self, testdir): testdir.makeconftest( """\ import pytest import logging log = logging.getLogger(__name__) @pytest.fixture def log_on_teardown(): yield log.warning('Logging on teardown') """ ) # make sure that logging is still captured in tests p = testdir.makepyfile( """\ def test_hello(log_on_teardown): import logging logging.warning("hello433") assert 1 raise KeyboardInterrupt() """ ) result = testdir.runpytest_subprocess(p, "--log-cli-level", "info") assert result.ret != 0 result.stdout.fnmatch_lines( ["*WARNING*hello433*", "*WARNING*Logging on teardown*"] ) assert ( "AttributeError: 'NoneType' object has no attribute 'resume_capturing'" not in result.stderr.str() ) class TestCaptureFixture: @pytest.mark.parametrize("opt", [[], ["-s"]]) def test_std_functional(self, testdir, opt): reprec = testdir.inline_runsource( """\ def test_hello(capsys): print(42) out, err = capsys.readouterr() assert out.startswith("42") """, *opt ) reprec.assertoutcome(passed=1) def test_capsyscapfd(self, testdir): p = testdir.makepyfile( """\ def test_one(capsys, capfd): pass def test_two(capfd, capsys): pass """ ) result = testdir.runpytest(p) result.stdout.fnmatch_lines( [ "*ERROR*setup*test_one*", "E*capfd*capsys*same*time*", "*ERROR*setup*test_two*", "E*capsys*capfd*same*time*", "*2 errors*", ] ) def test_capturing_getfixturevalue(self, testdir): """Test that asking for "capfd" and "capsys" using request.getfixturevalue in the same test is an error. """ testdir.makepyfile( """\ def test_one(capsys, request): request.getfixturevalue("capfd") def test_two(capfd, request): request.getfixturevalue("capsys") """ ) result = testdir.runpytest() result.stdout.fnmatch_lines( [ "*test_one*", "E * cannot use capfd and capsys at the same time", "*test_two*", "E * cannot use capsys and capfd at the same time", "*2 failed in*", ] ) def test_capsyscapfdbinary(self, testdir): p = testdir.makepyfile( """\ def test_one(capsys, capfdbinary): pass """ ) result = testdir.runpytest(p) result.stdout.fnmatch_lines( ["*ERROR*setup*test_one*", "E*capfdbinary*capsys*same*time*", "*1 error*"] ) @pytest.mark.parametrize("method", ["sys", "fd"]) def test_capture_is_represented_on_failure_issue128(self, testdir, method): p = testdir.makepyfile( """\ def test_hello(cap{}): print("xxx42xxx") assert 0 """.format( method ) ) result = testdir.runpytest(p) result.stdout.fnmatch_lines(["xxx42xxx"]) @needsosdup def test_stdfd_functional(self, testdir): reprec = testdir.inline_runsource( """\ def test_hello(capfd): import os os.write(1, b"42") out, err = capfd.readouterr() assert out.startswith("42") capfd.close() """ ) reprec.assertoutcome(passed=1) @needsosdup def test_capfdbinary(self, testdir): reprec = testdir.inline_runsource( """\ def test_hello(capfdbinary): import os # some likely un-decodable bytes os.write(1, b'\\xfe\\x98\\x20') out, err = capfdbinary.readouterr() assert out == b'\\xfe\\x98\\x20' assert err == b'' """ ) reprec.assertoutcome(passed=1) def test_capsysbinary(self, testdir): reprec = testdir.inline_runsource( """\ def test_hello(capsysbinary): import sys # some likely un-decodable bytes sys.stdout.buffer.write(b'\\xfe\\x98\\x20') out, err = capsysbinary.readouterr() assert out == b'\\xfe\\x98\\x20' assert err == b'' """ ) reprec.assertoutcome(passed=1) def test_partial_setup_failure(self, testdir): p = testdir.makepyfile( """\ def test_hello(capsys, missingarg): pass """ ) result = testdir.runpytest(p) result.stdout.fnmatch_lines(["*test_partial_setup_failure*", "*1 error*"]) @needsosdup def test_keyboardinterrupt_disables_capturing(self, testdir): p = testdir.makepyfile( """\ def test_hello(capfd): import os os.write(1, b'42') raise KeyboardInterrupt() """ ) result = testdir.runpytest_subprocess(p) result.stdout.fnmatch_lines(["*KeyboardInterrupt*"]) assert result.ret == 2 def test_capture_and_logging(self, testdir): """#14""" p = testdir.makepyfile( """\ import logging def test_log(capsys): logging.error('x') """ ) result = testdir.runpytest_subprocess(p) assert "closed" not in result.stderr.str() @pytest.mark.parametrize("fixture", ["capsys", "capfd"]) @pytest.mark.parametrize("no_capture", [True, False]) def test_disabled_capture_fixture(self, testdir, fixture, no_capture): testdir.makepyfile( """\ def test_disabled({fixture}): print('captured before') with {fixture}.disabled(): print('while capture is disabled') print('captured after') assert {fixture}.readouterr() == ('captured before\\ncaptured after\\n', '') def test_normal(): print('test_normal executed') """.format( fixture=fixture ) ) args = ("-s",) if no_capture else () result = testdir.runpytest_subprocess(*args) result.stdout.fnmatch_lines(["*while capture is disabled*", "*= 2 passed in *"]) result.stdout.no_fnmatch_line("*captured before*") result.stdout.no_fnmatch_line("*captured after*") if no_capture: assert "test_normal executed" in result.stdout.str() else: result.stdout.no_fnmatch_line("*test_normal executed*") @pytest.mark.parametrize("fixture", ["capsys", "capfd"]) def test_fixture_use_by_other_fixtures(self, testdir, fixture): """ Ensure that capsys and capfd can be used by other fixtures during setup and teardown. """ testdir.makepyfile( """\ import sys import pytest @pytest.fixture def captured_print({fixture}): print('stdout contents begin') print('stderr contents begin', file=sys.stderr) out, err = {fixture}.readouterr() yield out, err print('stdout contents end') print('stderr contents end', file=sys.stderr) out, err = {fixture}.readouterr() assert out == 'stdout contents end\\n' assert err == 'stderr contents end\\n' def test_captured_print(captured_print): out, err = captured_print assert out == 'stdout contents begin\\n' assert err == 'stderr contents begin\\n' """.format( fixture=fixture ) ) result = testdir.runpytest_subprocess() result.stdout.fnmatch_lines(["*1 passed*"]) result.stdout.no_fnmatch_line("*stdout contents begin*") result.stdout.no_fnmatch_line("*stderr contents begin*") @pytest.mark.parametrize("cap", ["capsys", "capfd"]) def test_fixture_use_by_other_fixtures_teardown(self, testdir, cap): """Ensure we can access setup and teardown buffers from teardown when using capsys/capfd (##3033)""" testdir.makepyfile( """\ import sys import pytest import os @pytest.fixture() def fix({cap}): print("setup out") sys.stderr.write("setup err\\n") yield out, err = {cap}.readouterr() assert out == 'setup out\\ncall out\\n' assert err == 'setup err\\ncall err\\n' def test_a(fix): print("call out") sys.stderr.write("call err\\n") """.format( cap=cap ) ) reprec = testdir.inline_run() reprec.assertoutcome(passed=1) def test_setup_failure_does_not_kill_capturing(testdir): sub1 = testdir.mkpydir("sub1") sub1.join("conftest.py").write( textwrap.dedent( """\ def pytest_runtest_setup(item): raise ValueError(42) """ ) ) sub1.join("test_mod.py").write("def test_func1(): pass") result = testdir.runpytest(testdir.tmpdir, "--traceconfig") result.stdout.fnmatch_lines(["*ValueError(42)*", "*1 error*"]) def test_fdfuncarg_skips_on_no_osdup(testdir): testdir.makepyfile( """ import os if hasattr(os, 'dup'): del os.dup def test_hello(capfd): pass """ ) result = testdir.runpytest_subprocess("--capture=no") result.stdout.fnmatch_lines(["*1 skipped*"]) def test_capture_conftest_runtest_setup(testdir): testdir.makeconftest( """ def pytest_runtest_setup(): print("hello19") """ ) testdir.makepyfile("def test_func(): pass") result = testdir.runpytest() assert result.ret == 0 result.stdout.no_fnmatch_line("*hello19*") def test_capture_badoutput_issue412(testdir): testdir.makepyfile( """ import os def test_func(): omg = bytearray([1,129,1]) os.write(1, omg) assert 0 """ ) result = testdir.runpytest("--capture=fd") result.stdout.fnmatch_lines( """ *def test_func* *assert 0* *Captured* *1 failed* """ ) def test_capture_early_option_parsing(testdir): testdir.makeconftest( """ def pytest_runtest_setup(): print("hello19") """ ) testdir.makepyfile("def test_func(): pass") result = testdir.runpytest("-vs") assert result.ret == 0 assert "hello19" in result.stdout.str() def test_capture_binary_output(testdir): testdir.makepyfile( r""" import pytest def test_a(): import sys import subprocess subprocess.call([sys.executable, __file__]) def test_foo(): import os;os.write(1, b'\xc3') if __name__ == '__main__': test_foo() """ ) result = testdir.runpytest("--assert=plain") result.assert_outcomes(passed=2) def test_error_during_readouterr(testdir): """Make sure we suspend capturing if errors occur during readouterr""" testdir.makepyfile( pytest_xyz=""" from _pytest.capture import FDCapture def bad_snap(self): raise Exception('boom') assert FDCapture.snap FDCapture.snap = bad_snap """ ) result = testdir.runpytest_subprocess("-p", "pytest_xyz", "--version") result.stderr.fnmatch_lines( ["*in bad_snap", " raise Exception('boom')", "Exception: boom"] ) class TestCaptureIO: def test_text(self): f = capture.CaptureIO() f.write("hello") s = f.getvalue() assert s == "hello" f.close() def test_unicode_and_str_mixture(self): f = capture.CaptureIO() f.write("\u00f6") pytest.raises(TypeError, f.write, b"hello") def test_write_bytes_to_buffer(self): """In python3, stdout / stderr are text io wrappers (exposing a buffer property of the underlying bytestream). See issue #1407 """ f = capture.CaptureIO() f.buffer.write(b"foo\r\n") assert f.getvalue() == "foo\r\n" class TestCaptureAndPassthroughIO(TestCaptureIO): def test_text(self): sio = io.StringIO() f = capture.CaptureAndPassthroughIO(sio) f.write("hello") s1 = f.getvalue() assert s1 == "hello" s2 = sio.getvalue() assert s2 == s1 f.close() sio.close() def test_unicode_and_str_mixture(self): sio = io.StringIO() f = capture.CaptureAndPassthroughIO(sio) f.write("\u00f6") pytest.raises(TypeError, f.write, b"hello") def test_dontreadfrominput(): from _pytest.capture import DontReadFromInput f = DontReadFromInput() assert f.buffer is f assert not f.isatty() pytest.raises(IOError, f.read) pytest.raises(IOError, f.readlines) iter_f = iter(f) pytest.raises(IOError, next, iter_f) pytest.raises(UnsupportedOperation, f.fileno) f.close() # just for completeness @pytest.fixture def tmpfile(testdir) -> Generator[BinaryIO, None, None]: f = testdir.makepyfile("").open("wb+") yield f if not f.closed: f.close() @needsosdup def test_dupfile(tmpfile) -> None: flist = [] # type: List[TextIO] for i in range(5): nf = capture.safe_text_dupfile(tmpfile, "wb") assert nf != tmpfile assert nf.fileno() != tmpfile.fileno() assert nf not in flist print(i, end="", file=nf) flist.append(nf) fname_open = flist[0].name assert fname_open == repr(flist[0].buffer) for i in range(5): f = flist[i] f.close() fname_closed = flist[0].name assert fname_closed == repr(flist[0].buffer) assert fname_closed != fname_open tmpfile.seek(0) s = tmpfile.read() assert "01234" in repr(s) tmpfile.close() assert fname_closed == repr(flist[0].buffer) def test_dupfile_on_bytesio(): bio = io.BytesIO() f = capture.safe_text_dupfile(bio, "wb") f.write("hello") assert bio.getvalue() == b"hello" assert "BytesIO object" in f.name def test_dupfile_on_textio(): sio = StringIO() f = capture.safe_text_dupfile(sio, "wb") f.write("hello") assert sio.getvalue() == "hello" assert not hasattr(f, "name") @contextlib.contextmanager def lsof_check(): pid = os.getpid() try: out = subprocess.check_output(("lsof", "-p", str(pid))).decode() except (OSError, subprocess.CalledProcessError, UnicodeDecodeError) as exc: # about UnicodeDecodeError, see note on pytester pytest.skip("could not run 'lsof' ({!r})".format(exc)) yield out2 = subprocess.check_output(("lsof", "-p", str(pid))).decode() len1 = len([x for x in out.split("\n") if "REG" in x]) len2 = len([x for x in out2.split("\n") if "REG" in x]) assert len2 < len1 + 3, out2 class TestFDCapture: pytestmark = needsosdup def test_simple(self, tmpfile): fd = tmpfile.fileno() cap = capture.FDCapture(fd) data = b"hello" os.write(fd, data) s = cap.snap() cap.done() assert not s cap = capture.FDCapture(fd) cap.start() os.write(fd, data) s = cap.snap() cap.done() assert s == "hello" def test_simple_many(self, tmpfile): for i in range(10): self.test_simple(tmpfile) def test_simple_many_check_open_files(self, testdir): with lsof_check(): with testdir.makepyfile("").open("wb+") as tmpfile: self.test_simple_many(tmpfile) def test_simple_fail_second_start(self, tmpfile): fd = tmpfile.fileno() cap = capture.FDCapture(fd) cap.done() pytest.raises(ValueError, cap.start) def test_stderr(self): cap = capture.FDCapture(2) cap.start() print("hello", file=sys.stderr) s = cap.snap() cap.done() assert s == "hello\n" def test_stdin(self): cap = capture.FDCapture(0) cap.start() x = os.read(0, 100).strip() cap.done() assert x == b"" def test_writeorg(self, tmpfile): data1, data2 = b"foo", b"bar" cap = capture.FDCapture(tmpfile.fileno()) cap.start() tmpfile.write(data1) tmpfile.flush() cap.writeorg(data2) scap = cap.snap() cap.done() assert scap == data1.decode("ascii") with open(tmpfile.name, "rb") as stmp_file: stmp = stmp_file.read() assert stmp == data2 def test_simple_resume_suspend(self): with saved_fd(1): cap = capture.FDCapture(1) cap.start() data = b"hello" os.write(1, data) sys.stdout.write("whatever") s = cap.snap() assert s == "hellowhatever" cap.suspend() os.write(1, b"world") sys.stdout.write("qlwkej") assert not cap.snap() cap.resume() os.write(1, b"but now") sys.stdout.write(" yes\n") s = cap.snap() assert s == "but now yes\n" cap.suspend() cap.done() pytest.raises(AttributeError, cap.suspend) def test_capfd_sys_stdout_mode(self, capfd): assert "b" not in sys.stdout.mode @contextlib.contextmanager def saved_fd(fd): new_fd = os.dup(fd) try: yield finally: os.dup2(new_fd, fd) os.close(new_fd) class TestStdCapture: captureclass = staticmethod(StdCapture) @contextlib.contextmanager def getcapture(self, **kw): cap = self.__class__.captureclass(**kw) cap.start_capturing() try: yield cap finally: cap.stop_capturing() def test_capturing_done_simple(self): with self.getcapture() as cap: sys.stdout.write("hello") sys.stderr.write("world") out, err = cap.readouterr() assert out == "hello" assert err == "world" def test_capturing_reset_simple(self): with self.getcapture() as cap: print("hello world") sys.stderr.write("hello error\n") out, err = cap.readouterr() assert out == "hello world\n" assert err == "hello error\n" def test_capturing_readouterr(self): with self.getcapture() as cap: print("hello world") sys.stderr.write("hello error\n") out, err = cap.readouterr() assert out == "hello world\n" assert err == "hello error\n" sys.stderr.write("error2") out, err = cap.readouterr() assert err == "error2" def test_capture_results_accessible_by_attribute(self): with self.getcapture() as cap: sys.stdout.write("hello") sys.stderr.write("world") capture_result = cap.readouterr() assert capture_result.out == "hello" assert capture_result.err == "world" def test_capturing_readouterr_unicode(self): with self.getcapture() as cap: print("hxąć") out, err = cap.readouterr() assert out == "hxąć\n" def test_reset_twice_error(self): with self.getcapture() as cap: print("hello") out, err = cap.readouterr() pytest.raises(ValueError, cap.stop_capturing) assert out == "hello\n" assert not err def test_capturing_modify_sysouterr_in_between(self): oldout = sys.stdout olderr = sys.stderr with self.getcapture() as cap: sys.stdout.write("hello") sys.stderr.write("world") sys.stdout = capture.CaptureIO() sys.stderr = capture.CaptureIO() print("not seen") sys.stderr.write("not seen\n") out, err = cap.readouterr() assert out == "hello" assert err == "world" assert sys.stdout == oldout assert sys.stderr == olderr def test_capturing_error_recursive(self): with self.getcapture() as cap1: print("cap1") with self.getcapture() as cap2: print("cap2") out2, err2 = cap2.readouterr() out1, err1 = cap1.readouterr() assert out1 == "cap1\n" assert out2 == "cap2\n" def test_just_out_capture(self): with self.getcapture(out=True, err=False) as cap: sys.stdout.write("hello") sys.stderr.write("world") out, err = cap.readouterr() assert out == "hello" assert not err def test_just_err_capture(self): with self.getcapture(out=False, err=True) as cap: sys.stdout.write("hello") sys.stderr.write("world") out, err = cap.readouterr() assert err == "world" assert not out def test_stdin_restored(self): old = sys.stdin with self.getcapture(in_=True): newstdin = sys.stdin assert newstdin != sys.stdin assert sys.stdin is old def test_stdin_nulled_by_default(self): print("XXX this test may well hang instead of crashing") print("XXX which indicates an error in the underlying capturing") print("XXX mechanisms") with self.getcapture(): pytest.raises(IOError, sys.stdin.read) class TestTeeStdCapture(TestStdCapture): captureclass = staticmethod(TeeStdCapture) def test_capturing_error_recursive(self): """ for TeeStdCapture since we passthrough stderr/stdout, cap1 should get all output, while cap2 should only get "cap2\n" """ with self.getcapture() as cap1: print("cap1") with self.getcapture() as cap2: print("cap2") out2, err2 = cap2.readouterr() out1, err1 = cap1.readouterr() assert out1 == "cap1\ncap2\n" assert out2 == "cap2\n" class TestStdCaptureFD(TestStdCapture): pytestmark = needsosdup captureclass = staticmethod(StdCaptureFD) def test_simple_only_fd(self, testdir): testdir.makepyfile( """\ import os def test_x(): os.write(1, b"hello\\n") assert 0 """ ) result = testdir.runpytest_subprocess() result.stdout.fnmatch_lines( """ *test_x* *assert 0* *Captured stdout* """ ) def test_intermingling(self): with self.getcapture() as cap: os.write(1, b"1") sys.stdout.write(str(2)) sys.stdout.flush() os.write(1, b"3") os.write(2, b"a") sys.stderr.write("b") sys.stderr.flush() os.write(2, b"c") out, err = cap.readouterr() assert out == "123" assert err == "abc" def test_many(self, capfd): with lsof_check(): for i in range(10): cap = StdCaptureFD() cap.stop_capturing() class TestStdCaptureFDinvalidFD: pytestmark = needsosdup def test_stdcapture_fd_invalid_fd(self, testdir): testdir.makepyfile( """ import os from _pytest import capture def StdCaptureFD(out=True, err=True, in_=True): return capture.MultiCapture(out, err, in_, Capture=capture.FDCapture) def test_stdout(): os.close(1) cap = StdCaptureFD(out=True, err=False, in_=False) assert repr(cap.out) == "" cap.stop_capturing() def test_stderr(): os.close(2) cap = StdCaptureFD(out=False, err=True, in_=False) assert repr(cap.err) == "" cap.stop_capturing() def test_stdin(): os.close(0) cap = StdCaptureFD(out=False, err=False, in_=True) assert repr(cap.in_) == "" cap.stop_capturing() """ ) result = testdir.runpytest_subprocess("--capture=fd") assert result.ret == 0 assert result.parseoutcomes()["passed"] == 3 def test_capture_not_started_but_reset(): capsys = StdCapture() capsys.stop_capturing() def test_using_capsys_fixture_works_with_sys_stdout_encoding(capsys): test_text = "test text" print(test_text.encode(sys.stdout.encoding, "replace")) (out, err) = capsys.readouterr() assert out assert err == "" def test_capsys_results_accessible_by_attribute(capsys): sys.stdout.write("spam") sys.stderr.write("eggs") capture_result = capsys.readouterr() assert capture_result.out == "spam" assert capture_result.err == "eggs" @needsosdup @pytest.mark.parametrize("use", [True, False]) def test_fdcapture_tmpfile_remains_the_same(tmpfile, use): if not use: tmpfile = True cap = StdCaptureFD(out=False, err=tmpfile) try: cap.start_capturing() capfile = cap.err.tmpfile cap.readouterr() finally: cap.stop_capturing() capfile2 = cap.err.tmpfile assert capfile2 == capfile @needsosdup def test_close_and_capture_again(testdir): testdir.makepyfile( """ import os def test_close(): os.close(1) def test_capture_again(): os.write(1, b"hello\\n") assert 0 """ ) result = testdir.runpytest_subprocess() result.stdout.fnmatch_lines( """ *test_capture_again* *assert 0* *stdout* *hello* """ ) @pytest.mark.parametrize("method", ["SysCapture", "FDCapture", "TeeSysCapture"]) def test_capturing_and_logging_fundamentals(testdir, method): if method == "StdCaptureFD" and not hasattr(os, "dup"): pytest.skip("need os.dup") # here we check a fundamental feature p = testdir.makepyfile( """ import sys, os import py, logging from _pytest import capture cap = capture.MultiCapture(out=False, in_=False, Capture=capture.%s) cap.start_capturing() logging.warning("hello1") outerr = cap.readouterr() print("suspend, captured %%s" %%(outerr,)) logging.warning("hello2") cap.pop_outerr_to_orig() logging.warning("hello3") outerr = cap.readouterr() print("suspend2, captured %%s" %% (outerr,)) """ % (method,) ) result = testdir.runpython(p) result.stdout.fnmatch_lines( """ suspend, captured*hello1* suspend2, captured*WARNING:root:hello3* """ ) result.stderr.fnmatch_lines( """ WARNING:root:hello2 """ ) assert "atexit" not in result.stderr.str() def test_error_attribute_issue555(testdir): testdir.makepyfile( """ import sys def test_capattr(): assert sys.stdout.errors == "strict" assert sys.stderr.errors == "strict" """ ) reprec = testdir.inline_run() reprec.assertoutcome(passed=1) @pytest.mark.skipif( not sys.platform.startswith("win") and sys.version_info[:2] >= (3, 6), reason="only py3.6+ on windows", ) def test_py36_windowsconsoleio_workaround_non_standard_streams(): """ Ensure _py36_windowsconsoleio_workaround function works with objects that do not implement the full ``io``-based stream protocol, for example execnet channels (#2666). """ from _pytest.capture import _py36_windowsconsoleio_workaround class DummyStream: def write(self, s): pass stream = DummyStream() _py36_windowsconsoleio_workaround(stream) def test_dontreadfrominput_has_encoding(testdir): testdir.makepyfile( """ import sys def test_capattr(): # should not raise AttributeError assert sys.stdout.encoding assert sys.stderr.encoding """ ) reprec = testdir.inline_run() reprec.assertoutcome(passed=1) def test_crash_on_closing_tmpfile_py27(testdir): p = testdir.makepyfile( """ import threading import sys printing = threading.Event() def spam(): f = sys.stderr print('SPAMBEFORE', end='', file=f) printing.set() while True: try: f.flush() except (OSError, ValueError): break def test_spam_in_thread(): t = threading.Thread(target=spam) t.daemon = True t.start() printing.wait() """ ) # Do not consider plugins like hypothesis, which might output to stderr. testdir.monkeypatch.setenv("PYTEST_DISABLE_PLUGIN_AUTOLOAD", "1") result = testdir.runpytest_subprocess(str(p)) assert result.ret == 0 assert result.stderr.str() == "" result.stdout.no_fnmatch_line("*IOError*") def test_pickling_and_unpickling_encoded_file(): # See https://bitbucket.org/pytest-dev/pytest/pull-request/194 # pickle.loads() raises infinite recursion if # EncodedFile.__getattr__ is not implemented properly ef = capture.EncodedFile(None, None) ef_as_str = pickle.dumps(ef) pickle.loads(ef_as_str) def test_global_capture_with_live_logging(testdir): # Issue 3819 # capture should work with live cli logging # Teardown report seems to have the capture for the whole process (setup, capture, teardown) testdir.makeconftest( """ def pytest_runtest_logreport(report): if "test_global" in report.nodeid: if report.when == "teardown": with open("caplog", "w") as f: f.write(report.caplog) with open("capstdout", "w") as f: f.write(report.capstdout) """ ) testdir.makepyfile( """ import logging import sys import pytest logger = logging.getLogger(__name__) @pytest.fixture def fix1(): print("fix setup") logging.info("fix setup") yield logging.info("fix teardown") print("fix teardown") def test_global(fix1): print("begin test") logging.info("something in test") print("end test") """ ) result = testdir.runpytest_subprocess("--log-cli-level=INFO") assert result.ret == 0 with open("caplog", "r") as f: caplog = f.read() assert "fix setup" in caplog assert "something in test" in caplog assert "fix teardown" in caplog with open("capstdout", "r") as f: capstdout = f.read() assert "fix setup" in capstdout assert "begin test" in capstdout assert "end test" in capstdout assert "fix teardown" in capstdout @pytest.mark.parametrize("capture_fixture", ["capsys", "capfd"]) def test_capture_with_live_logging(testdir, capture_fixture): # Issue 3819 # capture should work with live cli logging testdir.makepyfile( """ import logging import sys logger = logging.getLogger(__name__) def test_capture({0}): print("hello") sys.stderr.write("world\\n") captured = {0}.readouterr() assert captured.out == "hello\\n" assert captured.err == "world\\n" logging.info("something") print("next") logging.info("something") captured = {0}.readouterr() assert captured.out == "next\\n" """.format( capture_fixture ) ) result = testdir.runpytest_subprocess("--log-cli-level=INFO") assert result.ret == 0 def test_typeerror_encodedfile_write(testdir): """It should behave the same with and without output capturing (#4861).""" p = testdir.makepyfile( """ def test_fails(): import sys sys.stdout.write(b"foo") """ ) result_without_capture = testdir.runpytest("-s", str(p)) result_with_capture = testdir.runpytest(str(p)) assert result_with_capture.ret == result_without_capture.ret result_with_capture.stdout.fnmatch_lines( ["E * TypeError: write() argument must be str, not bytes"] ) def test_stderr_write_returns_len(capsys): """Write on Encoded files, namely captured stderr, should return number of characters written.""" assert sys.stderr.write("Foo") == 3 def test_encodedfile_writelines(tmpfile: BinaryIO) -> None: ef = capture.EncodedFile(tmpfile, "utf-8") with pytest.raises(AttributeError): ef.writelines([b"line1", b"line2"]) # type: ignore[list-item] # noqa: F821 assert ef.writelines(["line1", "line2"]) is None # type: ignore[func-returns-value] # noqa: F821 tmpfile.seek(0) assert tmpfile.read() == b"line1line2" tmpfile.close() with pytest.raises(ValueError): ef.read()