fix issue412 and other encoding issues. Streamline dupfile() into

a new more thoughtful safe_text_dupfile helper.
This commit is contained in:
holger krekel 2014-03-28 11:27:02 +01:00
parent 923dcfd620
commit 9d716a39d6
3 changed files with 38 additions and 60 deletions

View File

@ -13,6 +13,7 @@ import py
import pytest import pytest
from py.io import TextIO from py.io import TextIO
unicode = py.builtin.text
patchsysdict = {0: 'stdin', 1: 'stdout', 2: 'stderr'} patchsysdict = {0: 'stdin', 1: 'stdout', 2: 'stderr'}
@ -38,12 +39,7 @@ def pytest_load_initial_conftests(early_config, parser, args, __multicall__):
method = "sys" method = "sys"
pluginmanager = early_config.pluginmanager pluginmanager = early_config.pluginmanager
if method != "no": if method != "no":
try: dupped_stdout = safe_text_dupfile(sys.stdout, "wb")
sys.stdout.fileno()
except Exception:
dupped_stdout = sys.stdout
else:
dupped_stdout = dupfile(sys.stdout, buffering=1)
pluginmanager.register(dupped_stdout, "dupped_stdout") pluginmanager.register(dupped_stdout, "dupped_stdout")
#pluginmanager.add_shutdown(dupped_stdout.close) #pluginmanager.add_shutdown(dupped_stdout.close)
capman = CaptureManager(method) capman = CaptureManager(method)
@ -256,51 +252,41 @@ class CaptureFixture:
return "", "" return "", ""
def dupfile(f, mode=None, buffering=0, raising=False, encoding=None): def safe_text_dupfile(f, mode, default_encoding="UTF8"):
""" return a new open file object that's a duplicate of f """ return a open text file object that's a duplicate of f on the
FD-level if possible.
mode is duplicated if not given, 'buffering' controls
buffer size (defaulting to no buffering) and 'raising'
defines whether an exception is raised when an incompatible
file object is passed in (if raising is False, the file
object itself will be returned)
""" """
encoding = getattr(f, "encoding", None)
try: try:
fd = f.fileno() fd = f.fileno()
mode = mode or f.mode except Exception:
except AttributeError: if "b" not in getattr(f, "mode", "") and hasattr(f, "encoding"):
if raising: # we seem to have a text stream, let's just use it
raise
return f return f
newfd = os.dup(fd)
if sys.version_info >= (3, 0):
if encoding is not None:
mode = mode.replace("b", "")
buffering = True
return os.fdopen(newfd, mode, buffering, encoding, closefd=True)
else: else:
f = os.fdopen(newfd, mode, buffering) newfd = os.dup(fd)
if encoding is not None: if "b" not in mode:
return EncodedFile(f, encoding) mode += "b"
return f f = os.fdopen(newfd, mode, 0) # no buffering
return EncodedFile(f, encoding or default_encoding)
class EncodedFile(object): class EncodedFile(object):
def __init__(self, _stream, encoding): def __init__(self, buffer, encoding):
self._stream = _stream self.buffer = buffer
self.encoding = encoding self.encoding = encoding
def write(self, obj): def write(self, obj):
if isinstance(obj, unicode): if isinstance(obj, unicode):
obj = obj.encode(self.encoding) obj = obj.encode(self.encoding, "replace")
self._stream.write(obj) self.buffer.write(obj)
def writelines(self, linelist): def writelines(self, linelist):
data = ''.join(linelist) data = ''.join(linelist)
self.write(data) self.write(data)
def __getattr__(self, name): def __getattr__(self, name):
return getattr(self._stream, name) return getattr(self.buffer, name)
class StdCaptureBase(object): class StdCaptureBase(object):
@ -344,13 +330,8 @@ class StdCaptureBase(object):
def readouterr(self): def readouterr(self):
""" return snapshot unicode value of stdout/stderr capturings. """ """ return snapshot unicode value of stdout/stderr capturings. """
return self._readsnapshot('out'), self._readsnapshot('err') return (self.out.snap() if self.out is not None else "",
self.err.snap() if self.err is not None else "")
def _readsnapshot(self, name):
cap = getattr(self, name, None)
if cap is None:
return ""
return cap.snap()
class FDCapture: class FDCapture:
@ -370,7 +351,7 @@ class FDCapture:
else: else:
f = TemporaryFile() f = TemporaryFile()
with f: with f:
tmpfile = dupfile(f, encoding="UTF-8") tmpfile = safe_text_dupfile(f, mode="wb+")
self.tmpfile = tmpfile self.tmpfile = tmpfile
if targetfd in patchsysdict: if targetfd in patchsysdict:
self._oldsys = getattr(sys, patchsysdict[targetfd]) self._oldsys = getattr(sys, patchsysdict[targetfd])

View File

@ -104,6 +104,7 @@ class TerminalReporter:
self.startdir = self.curdir = py.path.local() self.startdir = self.curdir = py.path.local()
if file is None: if file is None:
file = py.std.sys.stdout file = py.std.sys.stdout
#assert file.encoding
self._tw = self.writer = py.io.TerminalWriter(file) self._tw = self.writer = py.io.TerminalWriter(file)
if self.config.option.color == 'yes': if self.config.option.color == 'yes':
self._tw.hasmarkup = True self._tw.hasmarkup = True
@ -144,7 +145,8 @@ class TerminalReporter:
self._tw.write(content, **markup) self._tw.write(content, **markup)
def write_line(self, line, **markup): def write_line(self, line, **markup):
line = str(line) if not py.builtin._istext(line):
line = py.builtin.text(line, errors="replace")
self.ensure_newline() self.ensure_newline()
self._tw.line(line, **markup) self._tw.line(line, **markup)
@ -163,7 +165,7 @@ class TerminalReporter:
self._tw.line(msg, **kw) self._tw.line(msg, **kw)
def pytest_internalerror(self, excrepr): def pytest_internalerror(self, excrepr):
for line in str(excrepr).split("\n"): for line in py.builtin.text(excrepr).split("\n"):
self.write_line("INTERNALERROR> " + line) self.write_line("INTERNALERROR> " + line)
return 1 return 1

View File

@ -541,7 +541,6 @@ def test_capture_conftest_runtest_setup(testdir):
assert 'hello19' not in result.stdout.str() assert 'hello19' not in result.stdout.str()
@pytest.mark.xfail("sys.version_info >= (3,)")
def test_capture_badoutput_issue412(testdir): def test_capture_badoutput_issue412(testdir):
testdir.makepyfile(""" testdir.makepyfile("""
import os import os
@ -571,8 +570,6 @@ def test_capture_early_option_parsing(testdir):
assert 'hello19' in result.stdout.str() assert 'hello19' in result.stdout.str()
@pytest.mark.xfail(sys.version_info >= (3, 0), reason='encoding issues')
@pytest.mark.xfail(sys.version_info < (2, 6), reason='test not run on py25')
def test_capture_binary_output(testdir): def test_capture_binary_output(testdir):
testdir.makepyfile(r""" testdir.makepyfile(r"""
import pytest import pytest
@ -646,7 +643,7 @@ def tmpfile(testdir):
def test_dupfile(tmpfile): def test_dupfile(tmpfile):
flist = [] flist = []
for i in range(5): for i in range(5):
nf = capture.dupfile(tmpfile, encoding="utf-8") nf = capture.safe_text_dupfile(tmpfile, "wb")
assert nf != tmpfile assert nf != tmpfile
assert nf.fileno() != tmpfile.fileno() assert nf.fileno() != tmpfile.fileno()
assert nf not in flist assert nf not in flist
@ -660,19 +657,17 @@ def test_dupfile(tmpfile):
assert "01234" in repr(s) assert "01234" in repr(s)
tmpfile.close() tmpfile.close()
def test_dupfile_on_bytesio():
io = py.io.BytesIO()
f = capture.safe_text_dupfile(io, "wb")
f.write("hello")
assert io.getvalue() == b"hello"
def test_dupfile_no_mode(): def test_dupfile_on_textio():
""" io = py.io.TextIO()
dupfile should trap an AttributeError and return f if no mode is supplied. f = capture.safe_text_dupfile(io, "wb")
""" f.write("hello")
class SomeFileWrapper(object): assert io.getvalue() == "hello"
"An object with a fileno method but no mode attribute"
def fileno(self):
return 1
tmpfile = SomeFileWrapper()
assert capture.dupfile(tmpfile) is tmpfile
with pytest.raises(AttributeError):
capture.dupfile(tmpfile, raising=True)
@contextlib.contextmanager @contextlib.contextmanager