From 9d716a39d672b98e1b40c92004eb3ac46dbed1df Mon Sep 17 00:00:00 2001 From: holger krekel Date: Fri, 28 Mar 2014 11:27:02 +0100 Subject: [PATCH] fix issue412 and other encoding issues. Streamline dupfile() into a new more thoughtful safe_text_dupfile helper. --- _pytest/capture.py | 65 +++++++++++++++-------------------------- _pytest/terminal.py | 6 ++-- testing/test_capture.py | 27 +++++++---------- 3 files changed, 38 insertions(+), 60 deletions(-) diff --git a/_pytest/capture.py b/_pytest/capture.py index 25151ccd5..2d93c64d6 100644 --- a/_pytest/capture.py +++ b/_pytest/capture.py @@ -13,6 +13,7 @@ import py import pytest from py.io import TextIO +unicode = py.builtin.text patchsysdict = {0: 'stdin', 1: 'stdout', 2: 'stderr'} @@ -38,12 +39,7 @@ def pytest_load_initial_conftests(early_config, parser, args, __multicall__): method = "sys" pluginmanager = early_config.pluginmanager if method != "no": - try: - sys.stdout.fileno() - except Exception: - dupped_stdout = sys.stdout - else: - dupped_stdout = dupfile(sys.stdout, buffering=1) + dupped_stdout = safe_text_dupfile(sys.stdout, "wb") pluginmanager.register(dupped_stdout, "dupped_stdout") #pluginmanager.add_shutdown(dupped_stdout.close) capman = CaptureManager(method) @@ -256,51 +252,41 @@ class CaptureFixture: return "", "" -def dupfile(f, mode=None, buffering=0, raising=False, encoding=None): - """ return a new open file object that's a duplicate of f - - mode is duplicated if not given, 'buffering' controls - buffer size (defaulting to no buffering) and 'raising' - defines whether an exception is raised when an incompatible - file object is passed in (if raising is False, the file - object itself will be returned) +def safe_text_dupfile(f, mode, default_encoding="UTF8"): + """ return a open text file object that's a duplicate of f on the + FD-level if possible. """ + encoding = getattr(f, "encoding", None) try: fd = f.fileno() - mode = mode or f.mode - except AttributeError: - if raising: - raise - return f - newfd = os.dup(fd) - if sys.version_info >= (3, 0): - if encoding is not None: - mode = mode.replace("b", "") - buffering = True - return os.fdopen(newfd, mode, buffering, encoding, closefd=True) + except Exception: + if "b" not in getattr(f, "mode", "") and hasattr(f, "encoding"): + # we seem to have a text stream, let's just use it + return f else: - f = os.fdopen(newfd, mode, buffering) - if encoding is not None: - return EncodedFile(f, encoding) - return f + newfd = os.dup(fd) + if "b" not in mode: + mode += "b" + f = os.fdopen(newfd, mode, 0) # no buffering + return EncodedFile(f, encoding or default_encoding) class EncodedFile(object): - def __init__(self, _stream, encoding): - self._stream = _stream + def __init__(self, buffer, encoding): + self.buffer = buffer self.encoding = encoding def write(self, obj): if isinstance(obj, unicode): - obj = obj.encode(self.encoding) - self._stream.write(obj) + obj = obj.encode(self.encoding, "replace") + self.buffer.write(obj) def writelines(self, linelist): data = ''.join(linelist) self.write(data) def __getattr__(self, name): - return getattr(self._stream, name) + return getattr(self.buffer, name) class StdCaptureBase(object): @@ -344,13 +330,8 @@ class StdCaptureBase(object): def readouterr(self): """ return snapshot unicode value of stdout/stderr capturings. """ - return self._readsnapshot('out'), self._readsnapshot('err') - - def _readsnapshot(self, name): - cap = getattr(self, name, None) - if cap is None: - return "" - return cap.snap() + return (self.out.snap() if self.out is not None else "", + self.err.snap() if self.err is not None else "") class FDCapture: @@ -370,7 +351,7 @@ class FDCapture: else: f = TemporaryFile() with f: - tmpfile = dupfile(f, encoding="UTF-8") + tmpfile = safe_text_dupfile(f, mode="wb+") self.tmpfile = tmpfile if targetfd in patchsysdict: self._oldsys = getattr(sys, patchsysdict[targetfd]) diff --git a/_pytest/terminal.py b/_pytest/terminal.py index fb3e92517..3647fa7f9 100644 --- a/_pytest/terminal.py +++ b/_pytest/terminal.py @@ -104,6 +104,7 @@ class TerminalReporter: self.startdir = self.curdir = py.path.local() if file is None: file = py.std.sys.stdout + #assert file.encoding self._tw = self.writer = py.io.TerminalWriter(file) if self.config.option.color == 'yes': self._tw.hasmarkup = True @@ -144,7 +145,8 @@ class TerminalReporter: self._tw.write(content, **markup) def write_line(self, line, **markup): - line = str(line) + if not py.builtin._istext(line): + line = py.builtin.text(line, errors="replace") self.ensure_newline() self._tw.line(line, **markup) @@ -163,7 +165,7 @@ class TerminalReporter: self._tw.line(msg, **kw) def pytest_internalerror(self, excrepr): - for line in str(excrepr).split("\n"): + for line in py.builtin.text(excrepr).split("\n"): self.write_line("INTERNALERROR> " + line) return 1 diff --git a/testing/test_capture.py b/testing/test_capture.py index 6274920de..4175d243b 100644 --- a/testing/test_capture.py +++ b/testing/test_capture.py @@ -541,7 +541,6 @@ def test_capture_conftest_runtest_setup(testdir): assert 'hello19' not in result.stdout.str() -@pytest.mark.xfail("sys.version_info >= (3,)") def test_capture_badoutput_issue412(testdir): testdir.makepyfile(""" import os @@ -571,8 +570,6 @@ def test_capture_early_option_parsing(testdir): assert 'hello19' in result.stdout.str() -@pytest.mark.xfail(sys.version_info >= (3, 0), reason='encoding issues') -@pytest.mark.xfail(sys.version_info < (2, 6), reason='test not run on py25') def test_capture_binary_output(testdir): testdir.makepyfile(r""" import pytest @@ -646,7 +643,7 @@ def tmpfile(testdir): def test_dupfile(tmpfile): flist = [] for i in range(5): - nf = capture.dupfile(tmpfile, encoding="utf-8") + nf = capture.safe_text_dupfile(tmpfile, "wb") assert nf != tmpfile assert nf.fileno() != tmpfile.fileno() assert nf not in flist @@ -660,19 +657,17 @@ def test_dupfile(tmpfile): assert "01234" in repr(s) tmpfile.close() +def test_dupfile_on_bytesio(): + io = py.io.BytesIO() + f = capture.safe_text_dupfile(io, "wb") + f.write("hello") + assert io.getvalue() == b"hello" -def test_dupfile_no_mode(): - """ - dupfile should trap an AttributeError and return f if no mode is supplied. - """ - class SomeFileWrapper(object): - "An object with a fileno method but no mode attribute" - def fileno(self): - return 1 - tmpfile = SomeFileWrapper() - assert capture.dupfile(tmpfile) is tmpfile - with pytest.raises(AttributeError): - capture.dupfile(tmpfile, raising=True) +def test_dupfile_on_textio(): + io = py.io.TextIO() + f = capture.safe_text_dupfile(io, "wb") + f.write("hello") + assert io.getvalue() == "hello" @contextlib.contextmanager