Merge pull request #5322 from blueyed/pdb-wrapper
pdb: move/refactor initialization of PytestPdbWrapper
This commit is contained in:
commit
d6ce2e5858
|
@ -81,6 +81,7 @@ class pytestPDB(object):
|
||||||
_config = None
|
_config = None
|
||||||
_saved = []
|
_saved = []
|
||||||
_recursive_debug = 0
|
_recursive_debug = 0
|
||||||
|
_wrapped_pdb_cls = None
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _is_capturing(cls, capman):
|
def _is_capturing(cls, capman):
|
||||||
|
@ -89,43 +90,138 @@ class pytestPDB(object):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _import_pdb_cls(cls):
|
def _import_pdb_cls(cls, capman):
|
||||||
if not cls._config:
|
if not cls._config:
|
||||||
# Happens when using pytest.set_trace outside of a test.
|
# Happens when using pytest.set_trace outside of a test.
|
||||||
return pdb.Pdb
|
return pdb.Pdb
|
||||||
|
|
||||||
pdb_cls = cls._config.getvalue("usepdb_cls")
|
usepdb_cls = cls._config.getvalue("usepdb_cls")
|
||||||
if not pdb_cls:
|
|
||||||
return pdb.Pdb
|
|
||||||
|
|
||||||
modname, classname = pdb_cls
|
if cls._wrapped_pdb_cls and cls._wrapped_pdb_cls[0] == usepdb_cls:
|
||||||
|
return cls._wrapped_pdb_cls[1]
|
||||||
|
|
||||||
try:
|
if usepdb_cls:
|
||||||
__import__(modname)
|
modname, classname = usepdb_cls
|
||||||
mod = sys.modules[modname]
|
|
||||||
|
|
||||||
# Handle --pdbcls=pdb:pdb.Pdb (useful e.g. with pdbpp).
|
try:
|
||||||
parts = classname.split(".")
|
__import__(modname)
|
||||||
pdb_cls = getattr(mod, parts[0])
|
mod = sys.modules[modname]
|
||||||
for part in parts[1:]:
|
|
||||||
pdb_cls = getattr(pdb_cls, part)
|
|
||||||
|
|
||||||
return pdb_cls
|
# Handle --pdbcls=pdb:pdb.Pdb (useful e.g. with pdbpp).
|
||||||
except Exception as exc:
|
parts = classname.split(".")
|
||||||
value = ":".join((modname, classname))
|
pdb_cls = getattr(mod, parts[0])
|
||||||
raise UsageError("--pdbcls: could not import {!r}: {}".format(value, exc))
|
for part in parts[1:]:
|
||||||
|
pdb_cls = getattr(pdb_cls, part)
|
||||||
|
except Exception as exc:
|
||||||
|
value = ":".join((modname, classname))
|
||||||
|
raise UsageError(
|
||||||
|
"--pdbcls: could not import {!r}: {}".format(value, exc)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
pdb_cls = pdb.Pdb
|
||||||
|
|
||||||
|
wrapped_cls = cls._get_pdb_wrapper_class(pdb_cls, capman)
|
||||||
|
cls._wrapped_pdb_cls = (usepdb_cls, wrapped_cls)
|
||||||
|
return wrapped_cls
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _init_pdb(cls, *args, **kwargs):
|
def _get_pdb_wrapper_class(cls, pdb_cls, capman):
|
||||||
|
import _pytest.config
|
||||||
|
|
||||||
|
class PytestPdbWrapper(pdb_cls, object):
|
||||||
|
_pytest_capman = capman
|
||||||
|
_continued = False
|
||||||
|
|
||||||
|
def do_debug(self, arg):
|
||||||
|
cls._recursive_debug += 1
|
||||||
|
ret = super(PytestPdbWrapper, self).do_debug(arg)
|
||||||
|
cls._recursive_debug -= 1
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def do_continue(self, arg):
|
||||||
|
ret = super(PytestPdbWrapper, self).do_continue(arg)
|
||||||
|
if cls._recursive_debug == 0:
|
||||||
|
tw = _pytest.config.create_terminal_writer(cls._config)
|
||||||
|
tw.line()
|
||||||
|
|
||||||
|
capman = self._pytest_capman
|
||||||
|
capturing = pytestPDB._is_capturing(capman)
|
||||||
|
if capturing:
|
||||||
|
if capturing == "global":
|
||||||
|
tw.sep(">", "PDB continue (IO-capturing resumed)")
|
||||||
|
else:
|
||||||
|
tw.sep(
|
||||||
|
">",
|
||||||
|
"PDB continue (IO-capturing resumed for %s)"
|
||||||
|
% capturing,
|
||||||
|
)
|
||||||
|
capman.resume()
|
||||||
|
else:
|
||||||
|
tw.sep(">", "PDB continue")
|
||||||
|
cls._pluginmanager.hook.pytest_leave_pdb(config=cls._config, pdb=self)
|
||||||
|
self._continued = True
|
||||||
|
return ret
|
||||||
|
|
||||||
|
do_c = do_cont = do_continue
|
||||||
|
|
||||||
|
def do_quit(self, arg):
|
||||||
|
"""Raise Exit outcome when quit command is used in pdb.
|
||||||
|
|
||||||
|
This is a bit of a hack - it would be better if BdbQuit
|
||||||
|
could be handled, but this would require to wrap the
|
||||||
|
whole pytest run, and adjust the report etc.
|
||||||
|
"""
|
||||||
|
ret = super(PytestPdbWrapper, self).do_quit(arg)
|
||||||
|
|
||||||
|
if cls._recursive_debug == 0:
|
||||||
|
outcomes.exit("Quitting debugger")
|
||||||
|
|
||||||
|
return ret
|
||||||
|
|
||||||
|
do_q = do_quit
|
||||||
|
do_exit = do_quit
|
||||||
|
|
||||||
|
def setup(self, f, tb):
|
||||||
|
"""Suspend on setup().
|
||||||
|
|
||||||
|
Needed after do_continue resumed, and entering another
|
||||||
|
breakpoint again.
|
||||||
|
"""
|
||||||
|
ret = super(PytestPdbWrapper, self).setup(f, tb)
|
||||||
|
if not ret and self._continued:
|
||||||
|
# pdb.setup() returns True if the command wants to exit
|
||||||
|
# from the interaction: do not suspend capturing then.
|
||||||
|
if self._pytest_capman:
|
||||||
|
self._pytest_capman.suspend_global_capture(in_=True)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def get_stack(self, f, t):
|
||||||
|
stack, i = super(PytestPdbWrapper, self).get_stack(f, t)
|
||||||
|
if f is None:
|
||||||
|
# Find last non-hidden frame.
|
||||||
|
i = max(0, len(stack) - 1)
|
||||||
|
while i and stack[i][0].f_locals.get("__tracebackhide__", False):
|
||||||
|
i -= 1
|
||||||
|
return stack, i
|
||||||
|
|
||||||
|
return PytestPdbWrapper
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _init_pdb(cls, method, *args, **kwargs):
|
||||||
""" Initialize PDB debugging, dropping any IO capturing. """
|
""" Initialize PDB debugging, dropping any IO capturing. """
|
||||||
import _pytest.config
|
import _pytest.config
|
||||||
|
|
||||||
if cls._pluginmanager is not None:
|
if cls._pluginmanager is not None:
|
||||||
capman = cls._pluginmanager.getplugin("capturemanager")
|
capman = cls._pluginmanager.getplugin("capturemanager")
|
||||||
if capman:
|
else:
|
||||||
capman.suspend(in_=True)
|
capman = None
|
||||||
|
if capman:
|
||||||
|
capman.suspend(in_=True)
|
||||||
|
|
||||||
|
if cls._config:
|
||||||
tw = _pytest.config.create_terminal_writer(cls._config)
|
tw = _pytest.config.create_terminal_writer(cls._config)
|
||||||
tw.line()
|
tw.line()
|
||||||
|
|
||||||
if cls._recursive_debug == 0:
|
if cls._recursive_debug == 0:
|
||||||
# Handle header similar to pdb.set_trace in py37+.
|
# Handle header similar to pdb.set_trace in py37+.
|
||||||
header = kwargs.pop("header", None)
|
header = kwargs.pop("header", None)
|
||||||
|
@ -133,112 +229,28 @@ class pytestPDB(object):
|
||||||
tw.sep(">", header)
|
tw.sep(">", header)
|
||||||
else:
|
else:
|
||||||
capturing = cls._is_capturing(capman)
|
capturing = cls._is_capturing(capman)
|
||||||
if capturing:
|
if capturing == "global":
|
||||||
if capturing == "global":
|
tw.sep(">", "PDB %s (IO-capturing turned off)" % (method,))
|
||||||
tw.sep(">", "PDB set_trace (IO-capturing turned off)")
|
elif capturing:
|
||||||
else:
|
tw.sep(
|
||||||
tw.sep(
|
">",
|
||||||
">",
|
"PDB %s (IO-capturing turned off for %s)"
|
||||||
"PDB set_trace (IO-capturing turned off for %s)"
|
% (method, capturing),
|
||||||
% capturing,
|
)
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
tw.sep(">", "PDB set_trace")
|
tw.sep(">", "PDB %s" % (method,))
|
||||||
|
|
||||||
pdb_cls = cls._import_pdb_cls()
|
_pdb = cls._import_pdb_cls(capman)(**kwargs)
|
||||||
|
|
||||||
class PytestPdbWrapper(pdb_cls, object):
|
if cls._pluginmanager:
|
||||||
_pytest_capman = capman
|
|
||||||
_continued = False
|
|
||||||
|
|
||||||
def do_debug(self, arg):
|
|
||||||
cls._recursive_debug += 1
|
|
||||||
ret = super(PytestPdbWrapper, self).do_debug(arg)
|
|
||||||
cls._recursive_debug -= 1
|
|
||||||
return ret
|
|
||||||
|
|
||||||
def do_continue(self, arg):
|
|
||||||
ret = super(PytestPdbWrapper, self).do_continue(arg)
|
|
||||||
if cls._recursive_debug == 0:
|
|
||||||
tw = _pytest.config.create_terminal_writer(cls._config)
|
|
||||||
tw.line()
|
|
||||||
|
|
||||||
capman = self._pytest_capman
|
|
||||||
capturing = pytestPDB._is_capturing(capman)
|
|
||||||
if capturing:
|
|
||||||
if capturing == "global":
|
|
||||||
tw.sep(">", "PDB continue (IO-capturing resumed)")
|
|
||||||
else:
|
|
||||||
tw.sep(
|
|
||||||
">",
|
|
||||||
"PDB continue (IO-capturing resumed for %s)"
|
|
||||||
% capturing,
|
|
||||||
)
|
|
||||||
capman.resume()
|
|
||||||
else:
|
|
||||||
tw.sep(">", "PDB continue")
|
|
||||||
cls._pluginmanager.hook.pytest_leave_pdb(
|
|
||||||
config=cls._config, pdb=self
|
|
||||||
)
|
|
||||||
self._continued = True
|
|
||||||
return ret
|
|
||||||
|
|
||||||
do_c = do_cont = do_continue
|
|
||||||
|
|
||||||
def do_quit(self, arg):
|
|
||||||
"""Raise Exit outcome when quit command is used in pdb.
|
|
||||||
|
|
||||||
This is a bit of a hack - it would be better if BdbQuit
|
|
||||||
could be handled, but this would require to wrap the
|
|
||||||
whole pytest run, and adjust the report etc.
|
|
||||||
"""
|
|
||||||
ret = super(PytestPdbWrapper, self).do_quit(arg)
|
|
||||||
|
|
||||||
if cls._recursive_debug == 0:
|
|
||||||
outcomes.exit("Quitting debugger")
|
|
||||||
|
|
||||||
return ret
|
|
||||||
|
|
||||||
do_q = do_quit
|
|
||||||
do_exit = do_quit
|
|
||||||
|
|
||||||
def setup(self, f, tb):
|
|
||||||
"""Suspend on setup().
|
|
||||||
|
|
||||||
Needed after do_continue resumed, and entering another
|
|
||||||
breakpoint again.
|
|
||||||
"""
|
|
||||||
ret = super(PytestPdbWrapper, self).setup(f, tb)
|
|
||||||
if not ret and self._continued:
|
|
||||||
# pdb.setup() returns True if the command wants to exit
|
|
||||||
# from the interaction: do not suspend capturing then.
|
|
||||||
if self._pytest_capman:
|
|
||||||
self._pytest_capman.suspend_global_capture(in_=True)
|
|
||||||
return ret
|
|
||||||
|
|
||||||
def get_stack(self, f, t):
|
|
||||||
stack, i = super(PytestPdbWrapper, self).get_stack(f, t)
|
|
||||||
if f is None:
|
|
||||||
# Find last non-hidden frame.
|
|
||||||
i = max(0, len(stack) - 1)
|
|
||||||
while i and stack[i][0].f_locals.get(
|
|
||||||
"__tracebackhide__", False
|
|
||||||
):
|
|
||||||
i -= 1
|
|
||||||
return stack, i
|
|
||||||
|
|
||||||
_pdb = PytestPdbWrapper(**kwargs)
|
|
||||||
cls._pluginmanager.hook.pytest_enter_pdb(config=cls._config, pdb=_pdb)
|
cls._pluginmanager.hook.pytest_enter_pdb(config=cls._config, pdb=_pdb)
|
||||||
else:
|
|
||||||
pdb_cls = cls._import_pdb_cls()
|
|
||||||
_pdb = pdb_cls(**kwargs)
|
|
||||||
return _pdb
|
return _pdb
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def set_trace(cls, *args, **kwargs):
|
def set_trace(cls, *args, **kwargs):
|
||||||
"""Invoke debugging via ``Pdb.set_trace``, dropping any IO capturing."""
|
"""Invoke debugging via ``Pdb.set_trace``, dropping any IO capturing."""
|
||||||
frame = sys._getframe().f_back
|
frame = sys._getframe().f_back
|
||||||
_pdb = cls._init_pdb(*args, **kwargs)
|
_pdb = cls._init_pdb("set_trace", *args, **kwargs)
|
||||||
_pdb.set_trace(frame)
|
_pdb.set_trace(frame)
|
||||||
|
|
||||||
|
|
||||||
|
@ -265,7 +277,7 @@ class PdbTrace(object):
|
||||||
|
|
||||||
|
|
||||||
def _test_pytest_function(pyfuncitem):
|
def _test_pytest_function(pyfuncitem):
|
||||||
_pdb = pytestPDB._init_pdb()
|
_pdb = pytestPDB._init_pdb("runcall")
|
||||||
testfunction = pyfuncitem.obj
|
testfunction = pyfuncitem.obj
|
||||||
pyfuncitem.obj = _pdb.runcall
|
pyfuncitem.obj = _pdb.runcall
|
||||||
if "func" in pyfuncitem._fixtureinfo.argnames: # pragma: no branch
|
if "func" in pyfuncitem._fixtureinfo.argnames: # pragma: no branch
|
||||||
|
@ -315,7 +327,7 @@ def _postmortem_traceback(excinfo):
|
||||||
|
|
||||||
|
|
||||||
def post_mortem(t):
|
def post_mortem(t):
|
||||||
p = pytestPDB._init_pdb()
|
p = pytestPDB._init_pdb("post_mortem")
|
||||||
p.reset()
|
p.reset()
|
||||||
p.interaction(None, t)
|
p.interaction(None, t)
|
||||||
if p.quitting:
|
if p.quitting:
|
||||||
|
|
|
@ -638,36 +638,35 @@ class TestPDB(object):
|
||||||
class pytestPDBTest(_pytest.debugging.pytestPDB):
|
class pytestPDBTest(_pytest.debugging.pytestPDB):
|
||||||
@classmethod
|
@classmethod
|
||||||
def set_trace(cls, *args, **kwargs):
|
def set_trace(cls, *args, **kwargs):
|
||||||
# Init _PdbWrapper to handle capturing.
|
# Init PytestPdbWrapper to handle capturing.
|
||||||
_pdb = cls._init_pdb(*args, **kwargs)
|
_pdb = cls._init_pdb("set_trace", *args, **kwargs)
|
||||||
|
|
||||||
# Mock out pdb.Pdb.do_continue.
|
# Mock out pdb.Pdb.do_continue.
|
||||||
import pdb
|
import pdb
|
||||||
pdb.Pdb.do_continue = lambda self, arg: None
|
pdb.Pdb.do_continue = lambda self, arg: None
|
||||||
|
|
||||||
print("=== SET_TRACE ===")
|
print("===" + " SET_TRACE ===")
|
||||||
assert input() == "debug set_trace()"
|
assert input() == "debug set_trace()"
|
||||||
|
|
||||||
# Simulate _PdbWrapper.do_debug
|
# Simulate PytestPdbWrapper.do_debug
|
||||||
cls._recursive_debug += 1
|
cls._recursive_debug += 1
|
||||||
print("ENTERING RECURSIVE DEBUGGER")
|
print("ENTERING RECURSIVE DEBUGGER")
|
||||||
print("=== SET_TRACE_2 ===")
|
print("===" + " SET_TRACE_2 ===")
|
||||||
|
|
||||||
assert input() == "c"
|
assert input() == "c"
|
||||||
_pdb.do_continue("")
|
_pdb.do_continue("")
|
||||||
print("=== SET_TRACE_3 ===")
|
print("===" + " SET_TRACE_3 ===")
|
||||||
|
|
||||||
# Simulate _PdbWrapper.do_debug
|
# Simulate PytestPdbWrapper.do_debug
|
||||||
print("LEAVING RECURSIVE DEBUGGER")
|
print("LEAVING RECURSIVE DEBUGGER")
|
||||||
cls._recursive_debug -= 1
|
cls._recursive_debug -= 1
|
||||||
|
|
||||||
print("=== SET_TRACE_4 ===")
|
print("===" + " SET_TRACE_4 ===")
|
||||||
assert input() == "c"
|
assert input() == "c"
|
||||||
_pdb.do_continue("")
|
_pdb.do_continue("")
|
||||||
|
|
||||||
def do_continue(self, arg):
|
def do_continue(self, arg):
|
||||||
print("=== do_continue")
|
print("=== do_continue")
|
||||||
# _PdbWrapper.do_continue("")
|
|
||||||
|
|
||||||
monkeypatch.setattr(_pytest.debugging, "pytestPDB", pytestPDBTest)
|
monkeypatch.setattr(_pytest.debugging, "pytestPDB", pytestPDBTest)
|
||||||
|
|
||||||
|
@ -677,7 +676,7 @@ class TestPDB(object):
|
||||||
set_trace()
|
set_trace()
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
child = testdir.spawn_pytest("%s %s" % (p1, capture_arg))
|
child = testdir.spawn_pytest("--tb=short %s %s" % (p1, capture_arg))
|
||||||
child.expect("=== SET_TRACE ===")
|
child.expect("=== SET_TRACE ===")
|
||||||
before = child.before.decode("utf8")
|
before = child.before.decode("utf8")
|
||||||
if not capture_arg:
|
if not capture_arg:
|
||||||
|
@ -1207,3 +1206,33 @@ def test_raises_bdbquit_with_eoferror(testdir):
|
||||||
result = testdir.runpytest(str(p1))
|
result = testdir.runpytest(str(p1))
|
||||||
result.stdout.fnmatch_lines(["E *BdbQuit", "*= 1 failed in*"])
|
result.stdout.fnmatch_lines(["E *BdbQuit", "*= 1 failed in*"])
|
||||||
assert result.ret == 1
|
assert result.ret == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_pdb_wrapper_class_is_reused(testdir):
|
||||||
|
p1 = testdir.makepyfile(
|
||||||
|
"""
|
||||||
|
def test():
|
||||||
|
__import__("pdb").set_trace()
|
||||||
|
__import__("pdb").set_trace()
|
||||||
|
|
||||||
|
import mypdb
|
||||||
|
instances = mypdb.instances
|
||||||
|
assert len(instances) == 2
|
||||||
|
assert instances[0].__class__ is instances[1].__class__
|
||||||
|
""",
|
||||||
|
mypdb="""
|
||||||
|
instances = []
|
||||||
|
|
||||||
|
class MyPdb:
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
instances.append(self)
|
||||||
|
|
||||||
|
def set_trace(self, *args):
|
||||||
|
print("set_trace_called", args)
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
result = testdir.runpytest(str(p1), "--pdbcls=mypdb:MyPdb", syspathinsert=True)
|
||||||
|
assert result.ret == 0
|
||||||
|
result.stdout.fnmatch_lines(
|
||||||
|
["*set_trace_called*", "*set_trace_called*", "* 1 passed in *"]
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in New Issue