Refactoring: Separated suspend from snapping (stopped always snapping when suspending - solves bug but still missing tests), reorganized functions and context managers.
This commit is contained in:
parent
5cf7d1dba2
commit
7d9b198f73
|
@ -62,8 +62,9 @@ def pytest_load_initial_conftests(early_config, parser, args):
|
|||
# finally trigger conftest loading but while capturing (issue93)
|
||||
capman.start_global_capturing()
|
||||
outcome = yield
|
||||
out, err = capman.suspend_global_capture()
|
||||
capman.suspend_global_capture()
|
||||
if outcome.excinfo is not None:
|
||||
out, err = capman.snap_global_capture()
|
||||
sys.stdout.write(out)
|
||||
sys.stderr.write(err)
|
||||
|
||||
|
@ -96,6 +97,8 @@ class CaptureManager(object):
|
|||
else:
|
||||
raise ValueError("unknown capturing method: %r" % method)
|
||||
|
||||
# Global capturing control
|
||||
|
||||
def start_global_capturing(self):
|
||||
assert self._global_capturing is None
|
||||
self._global_capturing = self._getcapture(self._method)
|
||||
|
@ -110,29 +113,15 @@ class CaptureManager(object):
|
|||
def resume_global_capture(self):
|
||||
self._global_capturing.resume_capturing()
|
||||
|
||||
def suspend_global_capture(self, item=None, in_=False):
|
||||
if item is not None:
|
||||
self.deactivate_fixture(item)
|
||||
def suspend_global_capture(self, in_=False):
|
||||
cap = getattr(self, "_global_capturing", None)
|
||||
if cap is not None:
|
||||
try:
|
||||
outerr = cap.readouterr()
|
||||
finally:
|
||||
cap.suspend_capturing(in_=in_)
|
||||
return outerr
|
||||
cap.suspend_capturing(in_=in_)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def global_and_fixture_disabled(self):
|
||||
"""Context manager to temporarily disables global and current fixture capturing."""
|
||||
# Need to undo local capsys-et-al if exists before disabling global capture
|
||||
fixture = getattr(self._current_item, "_capture_fixture", None)
|
||||
ctx_manager = fixture._suspend() if fixture else dummy_context_manager()
|
||||
with ctx_manager:
|
||||
self.suspend_global_capture(item=None, in_=False)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
self.resume_global_capture()
|
||||
def snap_global_capture(self):
|
||||
return self._global_capturing.readouterr()
|
||||
|
||||
# Fixture Control (its just forwarding, think about removing this later)
|
||||
|
||||
def activate_fixture(self, item):
|
||||
"""If the current item is using ``capsys`` or ``capfd``, activate them so they take precedence over
|
||||
|
@ -148,12 +137,53 @@ class CaptureManager(object):
|
|||
if fixture is not None:
|
||||
fixture.close()
|
||||
|
||||
def suspend_fixture(self, item):
|
||||
fixture = getattr(item, "_capture_fixture", None)
|
||||
if fixture is not None:
|
||||
fixture._suspend()
|
||||
|
||||
def resume_fixture(self, item):
|
||||
fixture = getattr(item, "_capture_fixture", None)
|
||||
if fixture is not None:
|
||||
fixture._resume()
|
||||
|
||||
# Helper context managers
|
||||
|
||||
@contextlib.contextmanager
|
||||
def global_and_fixture_disabled(self):
|
||||
"""Context manager to temporarily disables global and current fixture capturing."""
|
||||
# Need to undo local capsys-et-al if exists before disabling global capture
|
||||
self.suspend_fixture(self._current_item)
|
||||
self.suspend_global_capture(in_=False)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
self.resume_global_capture()
|
||||
self.resume_fixture(self._current_item)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def item_capture(self, when, item):
|
||||
self.resume_global_capture()
|
||||
self.activate_fixture(item)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
self.deactivate_fixture(item)
|
||||
self.suspend_global_capture(in_=False)
|
||||
|
||||
out, err = self.snap_global_capture()
|
||||
item.add_report_section(when, "stdout", out)
|
||||
item.add_report_section(when, "stderr", err)
|
||||
|
||||
# Hooks
|
||||
|
||||
@pytest.hookimpl(hookwrapper=True)
|
||||
def pytest_make_collect_report(self, collector):
|
||||
if isinstance(collector, pytest.File):
|
||||
self.resume_global_capture()
|
||||
outcome = yield
|
||||
out, err = self.suspend_global_capture()
|
||||
self.suspend_global_capture()
|
||||
out, err = self.snap_global_capture()
|
||||
rep = outcome.get_result()
|
||||
if out:
|
||||
rep.sections.append(("Captured stdout", out))
|
||||
|
@ -163,35 +193,27 @@ class CaptureManager(object):
|
|||
yield
|
||||
|
||||
@pytest.hookimpl(hookwrapper=True)
|
||||
def pytest_runtest_setup(self, item):
|
||||
def pytest_runtest_logstart(self, item):
|
||||
self._current_item = item
|
||||
self.resume_global_capture()
|
||||
# no need to activate a capture fixture because they activate themselves during creation; this
|
||||
# only makes sense when a fixture uses a capture fixture, otherwise the capture fixture will
|
||||
# be activated during pytest_runtest_call
|
||||
yield
|
||||
self.suspend_capture_item(item, "setup")
|
||||
self._current_item = None
|
||||
|
||||
@pytest.hookimpl(hookwrapper=True)
|
||||
def pytest_runtest_logfinish(self, item):
|
||||
self._current_item = item
|
||||
|
||||
@pytest.hookimpl(hookwrapper=True)
|
||||
def pytest_runtest_setup(self, item):
|
||||
with self.item_capture("setup", item):
|
||||
yield
|
||||
|
||||
@pytest.hookimpl(hookwrapper=True)
|
||||
def pytest_runtest_call(self, item):
|
||||
self._current_item = item
|
||||
self.resume_global_capture()
|
||||
# it is important to activate this fixture during the call phase so it overwrites the "global"
|
||||
# capture
|
||||
self.activate_fixture(item)
|
||||
yield
|
||||
self.suspend_capture_item(item, "call")
|
||||
self._current_item = None
|
||||
with self.item_capture("call", item):
|
||||
yield
|
||||
|
||||
@pytest.hookimpl(hookwrapper=True)
|
||||
def pytest_runtest_teardown(self, item):
|
||||
self._current_item = item
|
||||
self.resume_global_capture()
|
||||
self.activate_fixture(item)
|
||||
yield
|
||||
self.suspend_capture_item(item, "teardown")
|
||||
self._current_item = None
|
||||
with self.item_capture("teardown", item):
|
||||
yield
|
||||
|
||||
@pytest.hookimpl(tryfirst=True)
|
||||
def pytest_keyboard_interrupt(self, excinfo):
|
||||
|
@ -201,11 +223,6 @@ class CaptureManager(object):
|
|||
def pytest_internalerror(self, excinfo):
|
||||
self.stop_global_capturing()
|
||||
|
||||
def suspend_capture_item(self, item, when, in_=False):
|
||||
out, err = self.suspend_global_capture(item, in_=in_)
|
||||
item.add_report_section(when, "stdout", out)
|
||||
item.add_report_section(when, "stderr", err)
|
||||
|
||||
|
||||
capture_fixtures = {"capfd", "capfdbinary", "capsys", "capsysbinary"}
|
||||
|
||||
|
@ -311,10 +328,12 @@ class CaptureFixture(object):
|
|||
self.request = request
|
||||
|
||||
def _start(self):
|
||||
self._capture = MultiCapture(
|
||||
out=True, err=True, in_=False, Capture=self.captureclass
|
||||
)
|
||||
self._capture.start_capturing()
|
||||
# Start if not started yet
|
||||
if getattr(self, "_capture", None) is not None:
|
||||
self._capture = MultiCapture(
|
||||
out=True, err=True, in_=False, Capture=self.captureclass
|
||||
)
|
||||
self._capture.start_capturing()
|
||||
|
||||
def close(self):
|
||||
cap = self.__dict__.pop("_capture", None)
|
||||
|
@ -332,14 +351,13 @@ class CaptureFixture(object):
|
|||
except AttributeError:
|
||||
return self._outerr
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _suspend(self):
|
||||
"""Suspends this fixture's own capturing temporarily."""
|
||||
self._capture.suspend_capturing()
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
self._capture.resume_capturing()
|
||||
|
||||
def _resume(self):
|
||||
"""Resumes this fixture's own capturing temporarily."""
|
||||
self._capture.resume_capturing()
|
||||
|
||||
@contextlib.contextmanager
|
||||
def disabled(self):
|
||||
|
@ -743,3 +761,4 @@ def _attempt_to_close_capture_file(f):
|
|||
pass
|
||||
else:
|
||||
f.close()
|
||||
|
||||
|
|
Loading…
Reference in New Issue