Merged in hpk42/pytest-capsimple/capsimple1 (pull request #115)

some simplifications in capturing code
This commit is contained in:
holger krekel 2014-01-26 12:07:45 +01:00
commit 899998cf9c
2 changed files with 28 additions and 116 deletions

View File

@ -126,13 +126,11 @@ class CaptureManager:
def _getcapture(self, method): def _getcapture(self, method):
if method == "fd": if method == "fd":
return StdCaptureFD( return StdCaptureFD(
now=False,
out=self._maketempfile(), out=self._maketempfile(),
err=self._maketempfile(), err=self._maketempfile(),
) )
elif method == "sys": elif method == "sys":
return StdCapture( return StdCapture(
now=False,
out=self._makestringio(), out=self._makestringio(),
err=self._makestringio(), err=self._makestringio(),
) )
@ -283,7 +281,7 @@ def pytest_funcarg__capfd(request):
class CaptureFixture: class CaptureFixture:
def __init__(self, captureclass): def __init__(self, captureclass):
self._capture = captureclass(now=False) self._capture = captureclass()
def _start(self): def _start(self):
self._capture.startall() self._capture.startall()
@ -307,7 +305,7 @@ class CaptureFixture:
class FDCapture: class FDCapture:
""" Capture IO to/from a given os-level filedescriptor. """ """ Capture IO to/from a given os-level filedescriptor. """
def __init__(self, targetfd, tmpfile=None, now=True, patchsys=False): def __init__(self, targetfd, tmpfile=None, patchsys=False):
""" save targetfd descriptor, and open a new """ save targetfd descriptor, and open a new
temporary file there. If no tmpfile is temporary file there. If no tmpfile is
specified a tempfile.Tempfile() will be opened specified a tempfile.Tempfile() will be opened
@ -322,8 +320,6 @@ class FDCapture:
self._savefd = os.dup(self.targetfd) self._savefd = os.dup(self.targetfd)
if patchsys: if patchsys:
self._oldsys = getattr(sys, patchsysdict[targetfd]) self._oldsys = getattr(sys, patchsysdict[targetfd])
if now:
self.start()
def start(self): def start(self):
try: try:
@ -413,21 +409,6 @@ class EncodedFile(object):
class Capture(object): class Capture(object):
def call(cls, func, *args, **kwargs):
""" return a (res, out, err) tuple where
out and err represent the output/error output
during function execution.
call the given function with args/kwargs
and capture output/error during its execution.
"""
so = cls()
try:
res = func(*args, **kwargs)
finally:
out, err = so.reset()
return res, out, err
call = classmethod(call)
def reset(self): def reset(self):
""" reset sys.stdout/stderr and return captured output as strings. """ """ reset sys.stdout/stderr and return captured output as strings. """
if hasattr(self, '_reset'): if hasattr(self, '_reset'):
@ -456,30 +437,24 @@ class StdCaptureFD(Capture):
reads from sys.stdin). If any of the 0,1,2 file descriptors reads from sys.stdin). If any of the 0,1,2 file descriptors
is invalid it will not be captured. is invalid it will not be captured.
""" """
def __init__(self, out=True, err=True, mixed=False, def __init__(self, out=True, err=True, in_=True, patchsys=True):
in_=True, patchsys=True, now=True):
self._options = { self._options = {
"out": out, "out": out,
"err": err, "err": err,
"mixed": mixed,
"in_": in_, "in_": in_,
"patchsys": patchsys, "patchsys": patchsys,
"now": now,
} }
self._save() self._save()
if now:
self.startall()
def _save(self): def _save(self):
in_ = self._options['in_'] in_ = self._options['in_']
out = self._options['out'] out = self._options['out']
err = self._options['err'] err = self._options['err']
mixed = self._options['mixed']
patchsys = self._options['patchsys'] patchsys = self._options['patchsys']
if in_: if in_:
try: try:
self.in_ = FDCapture( self.in_ = FDCapture(
0, tmpfile=None, now=False, 0, tmpfile=None,
patchsys=patchsys) patchsys=patchsys)
except OSError: except OSError:
pass pass
@ -490,21 +465,19 @@ class StdCaptureFD(Capture):
try: try:
self.out = FDCapture( self.out = FDCapture(
1, tmpfile=tmpfile, 1, tmpfile=tmpfile,
now=False, patchsys=patchsys) patchsys=patchsys)
self._options['out'] = self.out.tmpfile self._options['out'] = self.out.tmpfile
except OSError: except OSError:
pass pass
if err: if err:
if out and mixed: if hasattr(err, 'write'):
tmpfile = self.out.tmpfile
elif hasattr(err, 'write'):
tmpfile = err tmpfile = err
else: else:
tmpfile = None tmpfile = None
try: try:
self.err = FDCapture( self.err = FDCapture(
2, tmpfile=tmpfile, 2, tmpfile=tmpfile,
now=False, patchsys=patchsys) patchsys=patchsys)
self._options['err'] = self.err.tmpfile self._options['err'] = self.err.tmpfile
except OSError: except OSError:
pass pass
@ -562,7 +535,7 @@ class StdCapture(Capture):
modifies sys.stdout|stderr|stdin attributes and does not modifies sys.stdout|stderr|stdin attributes and does not
touch underlying File Descriptors (use StdCaptureFD for that). touch underlying File Descriptors (use StdCaptureFD for that).
""" """
def __init__(self, out=True, err=True, in_=True, mixed=False, now=True): def __init__(self, out=True, err=True, in_=True):
self._oldout = sys.stdout self._oldout = sys.stdout
self._olderr = sys.stderr self._olderr = sys.stderr
self._oldin = sys.stdin self._oldin = sys.stdin
@ -570,14 +543,10 @@ class StdCapture(Capture):
out = TextIO() out = TextIO()
self.out = out self.out = out
if err: if err:
if mixed: if not hasattr(err, 'write'):
err = out
elif not hasattr(err, 'write'):
err = TextIO() err = TextIO()
self.err = err self.err = err
self.in_ = in_ self.in_ = in_
if now:
self.startall()
def startall(self): def startall(self):
if self.out: if self.out:

View File

@ -661,21 +661,6 @@ def lsof_check(func):
class TestFDCapture: class TestFDCapture:
pytestmark = needsosdup pytestmark = needsosdup
def test_not_now(self, tmpfile):
fd = tmpfile.fileno()
cap = capture.FDCapture(fd, now=False)
data = tobytes("hello")
os.write(fd, data)
f = cap.done()
s = f.read()
assert not s
cap = capture.FDCapture(fd, now=False)
cap.start()
os.write(fd, data)
f = cap.done()
s = f.read()
assert s == "hello"
def test_simple(self, tmpfile): def test_simple(self, tmpfile):
fd = tmpfile.fileno() fd = tmpfile.fileno()
cap = capture.FDCapture(fd) cap = capture.FDCapture(fd)
@ -683,8 +668,13 @@ class TestFDCapture:
os.write(fd, data) os.write(fd, data)
f = cap.done() f = cap.done()
s = f.read() s = f.read()
assert not s
cap = capture.FDCapture(fd)
cap.start()
os.write(fd, data)
f = cap.done()
s = f.read()
assert s == "hello" assert s == "hello"
f.close()
def test_simple_many(self, tmpfile): def test_simple_many(self, tmpfile):
for i in range(10): for i in range(10):
@ -702,6 +692,7 @@ class TestFDCapture:
def test_stderr(self): def test_stderr(self):
cap = capture.FDCapture(2, patchsys=True) cap = capture.FDCapture(2, patchsys=True)
cap.start()
print_("hello", file=sys.stderr) print_("hello", file=sys.stderr)
f = cap.done() f = cap.done()
s = f.read() s = f.read()
@ -711,6 +702,7 @@ class TestFDCapture:
tmpfile.write(tobytes("3")) tmpfile.write(tobytes("3"))
tmpfile.seek(0) tmpfile.seek(0)
cap = capture.FDCapture(0, tmpfile=tmpfile) cap = capture.FDCapture(0, tmpfile=tmpfile)
cap.start()
# check with os.read() directly instead of raw_input(), because # check with os.read() directly instead of raw_input(), because
# sys.stdin itself may be redirected (as pytest now does by default) # sys.stdin itself may be redirected (as pytest now does by default)
x = os.read(0, 100).strip() x = os.read(0, 100).strip()
@ -721,6 +713,7 @@ class TestFDCapture:
data1, data2 = tobytes("foo"), tobytes("bar") data1, data2 = tobytes("foo"), tobytes("bar")
try: try:
cap = capture.FDCapture(tmpfile.fileno()) cap = capture.FDCapture(tmpfile.fileno())
cap.start()
tmpfile.write(data1) tmpfile.write(data1)
cap.writeorg(data2) cap.writeorg(data2)
finally: finally:
@ -734,7 +727,9 @@ class TestFDCapture:
class TestStdCapture: class TestStdCapture:
def getcapture(self, **kw): def getcapture(self, **kw):
return capture.StdCapture(**kw) cap = capture.StdCapture(**kw)
cap.startall()
return cap
def test_capturing_done_simple(self): def test_capturing_done_simple(self):
cap = self.getcapture() cap = self.getcapture()
@ -785,15 +780,6 @@ class TestStdCapture:
out, err = cap.readouterr() out, err = cap.readouterr()
assert out == py.builtin._totext('\ufffd\n', 'unicode-escape') assert out == py.builtin._totext('\ufffd\n', 'unicode-escape')
def test_capturing_mixed(self):
cap = self.getcapture(mixed=True)
sys.stdout.write("hello ")
sys.stderr.write("world")
sys.stdout.write(".")
out, err = cap.reset()
assert out.strip() == "hello world."
assert not err
def test_reset_twice_error(self): def test_reset_twice_error(self):
cap = self.getcapture() cap = self.getcapture()
print ("hello") print ("hello")
@ -879,19 +865,13 @@ class TestStdCapture:
assert not err assert not err
class TestStdCaptureNotNow(TestStdCapture):
def getcapture(self, **kw):
kw['now'] = False
cap = capture.StdCapture(**kw)
cap.startall()
return cap
class TestStdCaptureFD(TestStdCapture): class TestStdCaptureFD(TestStdCapture):
pytestmark = needsosdup pytestmark = needsosdup
def getcapture(self, **kw): def getcapture(self, **kw):
return capture.StdCaptureFD(**kw) cap = capture.StdCaptureFD(**kw)
cap.startall()
return cap
def test_intermingling(self): def test_intermingling(self):
cap = self.getcapture() cap = self.getcapture()
@ -907,17 +887,6 @@ class TestStdCaptureFD(TestStdCapture):
assert out == "123" assert out == "123"
assert err == "abc" assert err == "abc"
def test_callcapture(self):
def func(x, y):
print (x)
sys.stderr.write(str(y))
return 42
res, out, err = capture.StdCaptureFD.call(func, 3, y=4)
assert res == 42
assert out.startswith("3")
assert err.startswith("4")
def test_many(self, capfd): def test_many(self, capfd):
def f(): def f():
for i in range(10): for i in range(10):
@ -926,15 +895,6 @@ class TestStdCaptureFD(TestStdCapture):
lsof_check(f) lsof_check(f)
class TestStdCaptureFDNotNow(TestStdCaptureFD):
pytestmark = needsosdup
def getcapture(self, **kw):
kw['now'] = False
cap = capture.StdCaptureFD(**kw)
cap.startall()
return cap
@needsosdup @needsosdup
def test_stdcapture_fd_tmpfile(tmpfile): def test_stdcapture_fd_tmpfile(tmpfile):
@ -974,7 +934,7 @@ class TestStdCaptureFDinvalidFD:
def test_capture_not_started_but_reset(): def test_capture_not_started_but_reset():
capsys = capture.StdCapture(now=False) capsys = capture.StdCapture()
capsys.done() capsys.done()
capsys.done() capsys.done()
capsys.reset() capsys.reset()
@ -985,6 +945,7 @@ def test_capture_no_sys():
capsys = capture.StdCapture() capsys = capture.StdCapture()
try: try:
cap = capture.StdCaptureFD(patchsys=False) cap = capture.StdCaptureFD(patchsys=False)
cap.startall()
sys.stdout.write("hello") sys.stdout.write("hello")
sys.stderr.write("world") sys.stderr.write("world")
oswritebytes(1, "1") oswritebytes(1, "1")
@ -996,31 +957,12 @@ def test_capture_no_sys():
capsys.reset() capsys.reset()
@needsosdup
def test_callcapture_nofd():
def func(x, y):
oswritebytes(1, "hello")
oswritebytes(2, "hello")
print (x)
sys.stderr.write(str(y))
return 42
capfd = capture.StdCaptureFD(patchsys=False)
try:
res, out, err = capture.StdCapture.call(func, 3, y=4)
finally:
capfd.reset()
assert res == 42
assert out.startswith("3")
assert err.startswith("4")
@needsosdup @needsosdup
@pytest.mark.parametrize('use', [True, False]) @pytest.mark.parametrize('use', [True, False])
def test_fdcapture_tmpfile_remains_the_same(tmpfile, use): def test_fdcapture_tmpfile_remains_the_same(tmpfile, use):
if not use: if not use:
tmpfile = True tmpfile = True
cap = capture.StdCaptureFD(out=False, err=tmpfile, now=False) cap = capture.StdCaptureFD(out=False, err=tmpfile)
try: try:
cap.startall() cap.startall()
capfile = cap.err.tmpfile capfile = cap.err.tmpfile
@ -1042,6 +984,7 @@ def test_capturing_and_logging_fundamentals(testdir, method):
import py, logging import py, logging
from _pytest import capture from _pytest import capture
cap = capture.%s(out=False, in_=False) cap = capture.%s(out=False, in_=False)
cap.startall()
logging.warn("hello1") logging.warn("hello1")
outerr = cap.suspend() outerr = cap.suspend()