Merge pull request #5083 from blueyed/capture-_suspended

capture: store _state
This commit is contained in:
Daniel Hahler 2019-04-19 17:32:54 +02:00 committed by GitHub
commit e3e57a755b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 37 additions and 9 deletions

View File

@ -456,6 +456,7 @@ CaptureResult = collections.namedtuple("CaptureResult", ["out", "err"])
class MultiCapture(object):
out = err = in_ = None
_state = None
def __init__(self, out=True, err=True, in_=True, Capture=None):
if in_:
@ -466,9 +467,16 @@ class MultiCapture(object):
self.err = Capture(2)
def __repr__(self):
return "<MultiCapture out=%r err=%r in_=%r>" % (self.out, self.err, self.in_)
return "<MultiCapture out=%r err=%r in_=%r _state=%r _in_suspended=%r>" % (
self.out,
self.err,
self.in_,
self._state,
getattr(self, "_in_suspended", "<UNSET>"),
)
def start_capturing(self):
self._state = "started"
if self.in_:
self.in_.start()
if self.out:
@ -486,6 +494,7 @@ class MultiCapture(object):
return out, err
def suspend_capturing(self, in_=False):
self._state = "suspended"
if self.out:
self.out.suspend()
if self.err:
@ -495,6 +504,7 @@ class MultiCapture(object):
self._in_suspended = True
def resume_capturing(self):
self._state = "resumed"
if self.out:
self.out.resume()
if self.err:
@ -505,9 +515,9 @@ class MultiCapture(object):
def stop_capturing(self):
""" stop capturing and reset capturing streams """
if hasattr(self, "_reset"):
if self._state == "stopped":
raise ValueError("was already stopped")
self._reset = True
self._state = "stopped"
if self.out:
self.out.done()
if self.err:
@ -535,6 +545,7 @@ class FDCaptureBinary(object):
"""
EMPTY_BUFFER = b""
_state = None
def __init__(self, targetfd, tmpfile=None):
self.targetfd = targetfd
@ -561,9 +572,10 @@ class FDCaptureBinary(object):
self.tmpfile_fd = tmpfile.fileno()
def __repr__(self):
return "<FDCapture %s oldfd=%s>" % (
return "<FDCapture %s oldfd=%s _state=%r>" % (
self.targetfd,
getattr(self, "targetfd_save", None),
self._state,
)
def start(self):
@ -574,6 +586,7 @@ class FDCaptureBinary(object):
raise ValueError("saved filedescriptor not valid anymore")
os.dup2(self.tmpfile_fd, self.targetfd)
self.syscapture.start()
self._state = "started"
def snap(self):
self.tmpfile.seek(0)
@ -590,14 +603,17 @@ class FDCaptureBinary(object):
os.close(targetfd_save)
self.syscapture.done()
_attempt_to_close_capture_file(self.tmpfile)
self._state = "done"
def suspend(self):
self.syscapture.suspend()
os.dup2(self.targetfd_save, self.targetfd)
self._state = "suspended"
def resume(self):
self.syscapture.resume()
os.dup2(self.tmpfile_fd, self.targetfd)
self._state = "resumed"
def writeorg(self, data):
""" write to original file descriptor. """
@ -625,6 +641,7 @@ class FDCapture(FDCaptureBinary):
class SysCapture(object):
EMPTY_BUFFER = str()
_state = None
def __init__(self, fd, tmpfile=None):
name = patchsysdict[fd]
@ -637,8 +654,17 @@ class SysCapture(object):
tmpfile = CaptureIO()
self.tmpfile = tmpfile
def __repr__(self):
return "<SysCapture %s _old=%r, tmpfile=%r _state=%r>" % (
self.name,
self._old,
self.tmpfile,
self._state,
)
def start(self):
setattr(sys, self.name, self.tmpfile)
self._state = "started"
def snap(self):
res = self.tmpfile.getvalue()
@ -650,12 +676,15 @@ class SysCapture(object):
setattr(sys, self.name, self._old)
del self._old
_attempt_to_close_capture_file(self.tmpfile)
self._state = "done"
def suspend(self):
setattr(sys, self.name, self._old)
self._state = "suspended"
def resume(self):
setattr(sys, self.name, self.tmpfile)
self._state = "resumed"
def writeorg(self, data):
self._old.write(data)

View File

@ -1243,25 +1243,24 @@ class TestStdCaptureFDinvalidFD(object):
from _pytest import capture
def StdCaptureFD(out=True, err=True, in_=True):
return capture.MultiCapture(out, err, in_,
Capture=capture.FDCapture)
return capture.MultiCapture(out, err, in_, Capture=capture.FDCapture)
def test_stdout():
os.close(1)
cap = StdCaptureFD(out=True, err=False, in_=False)
assert repr(cap.out) == "<FDCapture 1 oldfd=None>"
assert repr(cap.out) == "<FDCapture 1 oldfd=None _state=None>"
cap.stop_capturing()
def test_stderr():
os.close(2)
cap = StdCaptureFD(out=False, err=True, in_=False)
assert repr(cap.err) == "<FDCapture 2 oldfd=None>"
assert repr(cap.err) == "<FDCapture 2 oldfd=None _state=None>"
cap.stop_capturing()
def test_stdin():
os.close(0)
cap = StdCaptureFD(out=False, err=False, in_=True)
assert repr(cap.in_) == "<FDCapture 0 oldfd=None>"
assert repr(cap.in_) == "<FDCapture 0 oldfd=None _state=None>"
cap.stop_capturing()
"""
)