- try to fix the nightly failures by refining internal capturing mechanism

and adding tests, including a "lsof" test for making sure the number of
  open file descriptors does not increase.
- also move a py.io related logging test to testing/io

--HG--
branch : trunk
This commit is contained in:
holger krekel 2010-05-18 16:01:58 +02:00
parent 1a97c59439
commit 4f5d7948f7
3 changed files with 111 additions and 64 deletions

View File

@ -42,11 +42,16 @@ class FDCapture:
tmpfile = dupfile(f, encoding="UTF-8")
f.close()
self.tmpfile = tmpfile
self._savefd = os.dup(self.targetfd)
if now:
self.start()
def start(self):
self._savefd = os.dup(self.targetfd)
try:
os.fstat(self._savefd)
except OSError:
raise ValueError("saved filedescriptor not valid, "
"did you call start() twice?")
os.dup2(self.tmpfile.fileno(), self.targetfd)
def setasfile(self, name, module=sys):
@ -66,12 +71,9 @@ class FDCapture:
def done(self):
""" unpatch and clean up, returns the self.tmpfile (file object)
"""
try:
os.dup2(self._savefd, self.targetfd)
os.close(self._savefd)
self.tmpfile.seek(0)
except (AttributeError, ValueError, OSError):
pass
os.dup2(self._savefd, self.targetfd)
os.close(self._savefd)
self.tmpfile.seek(0)
self.unsetfiles()
return self.tmpfile
@ -107,7 +109,7 @@ def dupfile(f, mode=None, buffering=0, raising=False, encoding=None):
if encoding is not None:
mode = mode.replace("b", "")
buffering = True
return os.fdopen(newfd, mode, buffering, encoding, closefd=False)
return os.fdopen(newfd, mode, buffering, encoding, closefd=True)
else:
f = os.fdopen(newfd, mode, buffering)
if encoding is not None:
@ -177,15 +179,26 @@ class StdCaptureFD(Capture):
"""
def __init__(self, out=True, err=True, mixed=False,
in_=True, patchsys=True, now=True):
self._options = locals()
self._save()
self.patchsys = patchsys
if now:
self.startall()
def _save(self):
in_ = self._options['in_']
out = self._options['out']
err = self._options['err']
mixed = self._options['mixed']
self.in_ = in_
if in_:
self._oldin = (sys.stdin, os.dup(0))
if out:
tmpfile = None
if hasattr(out, 'write'):
tmpfile = None
tmpfile = out
self.out = py.io.FDCapture(1, tmpfile=tmpfile, now=False)
self.out_tmpfile = tmpfile
self._options['out'] = self.out.tmpfile
if err:
if out and mixed:
tmpfile = self.out.tmpfile
@ -194,10 +207,7 @@ class StdCaptureFD(Capture):
else:
tmpfile = None
self.err = py.io.FDCapture(2, tmpfile=tmpfile, now=False)
self.err_tmpfile = tmpfile
self.patchsys = patchsys
if now:
self.startall()
self._options['err'] = self.err.tmpfile
def startall(self):
if self.in_:
@ -219,27 +229,21 @@ class StdCaptureFD(Capture):
def resume(self):
""" resume capturing with original temp files. """
#if hasattr(self, 'out'):
# self.out.restart()
#if hasattr(self, 'err'):
# self.err.restart()
self.startall()
def done(self):
""" return (outfile, errfile) and stop capturing. """
outfile = errfile = None
if hasattr(self, 'out'):
if hasattr(self, 'out') and not self.out.tmpfile.closed:
outfile = self.out.done()
if hasattr(self, 'err'):
if hasattr(self, 'err') and not self.err.tmpfile.closed:
errfile = self.err.done()
if hasattr(self, '_oldin'):
oldsys, oldfd = self._oldin
try:
os.dup2(oldfd, 0)
os.close(oldfd)
except OSError:
pass
os.dup2(oldfd, 0)
os.close(oldfd)
sys.stdin = oldsys
self._save()
return outfile, errfile
def readouterr(self):

View File

@ -76,22 +76,21 @@ def pytest_funcarg__tmpfile(request):
@needsdup
def test_dupfile(tmpfile):
somefile = tmpfile
flist = []
for i in range(5):
nf = py.io.dupfile(somefile, encoding="utf-8")
assert nf != somefile
assert nf.fileno() != somefile.fileno()
nf = py.io.dupfile(tmpfile, encoding="utf-8")
assert nf != tmpfile
assert nf.fileno() != tmpfile.fileno()
assert nf not in flist
print_(i, end="", file=nf)
flist.append(nf)
for i in range(5):
f = flist[i]
f.close()
somefile.seek(0)
s = somefile.read()
tmpfile.seek(0)
s = tmpfile.read()
assert "01234" in repr(s)
somefile.close()
tmpfile.close()
class TestFDCapture:
pytestmark = needsdup
@ -111,7 +110,7 @@ class TestFDCapture:
s = f.read()
assert s == "hello"
def test_stdout(self, tmpfile):
def test_simple(self, tmpfile):
fd = tmpfile.fileno()
cap = py.io.FDCapture(fd)
data = tobytes("hello")
@ -119,6 +118,30 @@ class TestFDCapture:
f = cap.done()
s = f.read()
assert s == "hello"
f.close()
def test_simple_many(self, tmpfile):
for i in range(10):
self.test_simple(tmpfile)
def test_simple_many_check_open_files(self, tmpfile):
pid = os.getpid()
try:
out = py.process.cmdexec("lsof -p %d" % pid)
except py.process.cmdexec.Error:
py.test.skip("could not run 'lsof'")
self.test_simple_many(tmpfile)
out2 = py.process.cmdexec("lsof -p %d" % pid)
len1 = len([x for x in out.split("\n") if "REG" in x])
len2 = len([x for x in out2.split("\n") if "REG" in x])
assert len2 < len1 + 3, out2
def test_simple_fail_second_start(self, tmpfile):
fd = tmpfile.fileno()
cap = py.io.FDCapture(fd)
f = cap.done()
py.test.raises(ValueError, cap.start)
f.close()
def test_stderr(self):
cap = py.io.FDCapture(2)
@ -329,6 +352,14 @@ class TestStdCaptureFDNotNow(TestStdCaptureFD):
cap.startall()
return cap
@needsdup
def test_stdcapture_fd_tmpfile(tmpfile):
capfd = py.io.StdCaptureFD(out=tmpfile)
os.write(1, "hello".encode("ascii"))
os.write(2, "world".encode("ascii"))
outf, errf = capfd.done()
assert outf == tmpfile
def test_capture_not_started_but_reset():
capsys = py.io.StdCapture(now=False)
capsys.done()
@ -368,3 +399,45 @@ def test_callcapture_nofd():
assert res == 42
assert out.startswith("3")
assert err.startswith("4")
@needsdup
@py.test.mark.multi(use=[True, False])
def test_fdcapture_tmpfile_remains_the_same(tmpfile, use):
if not use:
tmpfile = True
cap = py.io.StdCaptureFD(out=False, err=tmpfile, now=False)
cap.startall()
capfile = cap.err.tmpfile
cap.suspend()
cap.resume()
capfile2 = cap.err.tmpfile
assert capfile2 == capfile
@py.test.mark.multi(method=['StdCapture', 'StdCaptureFD'])
def test_capturing_and_logging_fundamentals(testdir, method):
if method == "StdCaptureFD" and not hasattr(os, 'dup'):
py.test.skip("need os.dup")
# here we check a fundamental feature
p = testdir.makepyfile("""
import sys, os
import py, logging
cap = py.io.%s(out=False, in_=False)
logging.warn("hello1")
outerr = cap.suspend()
print ("suspend, captured %%s" %%(outerr,))
logging.warn("hello2")
cap.resume()
logging.warn("hello3")
outerr = cap.suspend()
print ("suspend2, captured %%s" %% (outerr,))
""" % (method,))
result = testdir.runpython(p)
result.stdout.fnmatch_lines([
"suspend, captured*hello1*",
"suspend2, captured*hello2*WARNING:root:hello3*",
])
assert "atexit" not in result.stderr.str()

View File

@ -251,36 +251,6 @@ class TestLoggingInteraction:
""")
result = testdir.runpytest(p)
result.stderr.str().find("atexit") == -1
def test_capturing_and_logging_fundamentals(self, testdir):
# here we check a fundamental feature
p = testdir.makepyfile("""
import sys, os
import py, logging
if hasattr(os, 'dup'):
cap = py.io.StdCaptureFD(out=False, in_=False)
else:
cap = py.io.StdCapture(out=False, in_=False)
logging.warn("hello1")
outerr = cap.suspend()
print ("suspeneded and captured %s" % (outerr,))
logging.warn("hello2")
cap.resume()
logging.warn("hello3")
outerr = cap.suspend()
print ("suspend2 and captured %s" % (outerr,))
""")
result = testdir.runpython(p)
result.stdout.fnmatch_lines([
"suspeneded and captured*hello1*",
"suspend2 and captured*hello2*WARNING:root:hello3*",
])
assert "atexit" not in result.stderr.str()
def test_logging_and_immediate_setupteardown(self, testdir):
p = testdir.makepyfile("""
@ -402,7 +372,7 @@ def test_fdfuncarg_skips_on_no_osdup(testdir):
def test_hello(capfd):
pass
""")
result = testdir.runpytest()
result = testdir.runpytest("--capture=no")
result.stdout.fnmatch_lines([
"*1 skipped*"
])