Type annotate more of _pytest.terminal

This commit is contained in:
Ran Benita 2020-05-01 14:40:16 +03:00
parent 2833884688
commit c0af19d8ad
1 changed files with 114 additions and 71 deletions

View File

@ -16,7 +16,9 @@ from typing import Generator
from typing import List from typing import List
from typing import Mapping from typing import Mapping
from typing import Optional from typing import Optional
from typing import Sequence
from typing import Set from typing import Set
from typing import TextIO
from typing import Tuple from typing import Tuple
from typing import Union from typing import Union
@ -37,11 +39,15 @@ from _pytest.config import Config
from _pytest.config import ExitCode from _pytest.config import ExitCode
from _pytest.config.argparsing import Parser from _pytest.config.argparsing import Parser
from _pytest.deprecated import TERMINALWRITER_WRITER from _pytest.deprecated import TERMINALWRITER_WRITER
from _pytest.nodes import Item
from _pytest.nodes import Node
from _pytest.reports import BaseReport from _pytest.reports import BaseReport
from _pytest.reports import CollectReport from _pytest.reports import CollectReport
from _pytest.reports import TestReport from _pytest.reports import TestReport
if TYPE_CHECKING: if TYPE_CHECKING:
from typing_extensions import Literal
from _pytest.main import Session from _pytest.main import Session
@ -69,7 +75,14 @@ class MoreQuietAction(argparse.Action):
used to unify verbosity handling used to unify verbosity handling
""" """
def __init__(self, option_strings, dest, default=None, required=False, help=None): def __init__(
self,
option_strings: Sequence[str],
dest: str,
default: object = None,
required: bool = False,
help: Optional[str] = None,
) -> None:
super().__init__( super().__init__(
option_strings=option_strings, option_strings=option_strings,
dest=dest, dest=dest,
@ -79,7 +92,13 @@ class MoreQuietAction(argparse.Action):
help=help, help=help,
) )
def __call__(self, parser, namespace, values, option_string=None): def __call__(
self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: Union[str, Sequence[object], None],
option_string: Optional[str] = None,
) -> None:
new_count = getattr(namespace, self.dest, 0) - 1 new_count = getattr(namespace, self.dest, 0) - 1
setattr(namespace, self.dest, new_count) setattr(namespace, self.dest, new_count)
# todo Deprecate config.quiet # todo Deprecate config.quiet
@ -194,7 +213,7 @@ def pytest_configure(config: Config) -> None:
def getreportopt(config: Config) -> str: def getreportopt(config: Config) -> str:
reportchars = config.option.reportchars reportchars = config.option.reportchars # type: str
old_aliases = {"F", "S"} old_aliases = {"F", "S"}
reportopts = "" reportopts = ""
@ -247,10 +266,12 @@ class WarningReport:
message = attr.ib(type=str) message = attr.ib(type=str)
nodeid = attr.ib(type=Optional[str], default=None) nodeid = attr.ib(type=Optional[str], default=None)
fslocation = attr.ib(default=None) fslocation = attr.ib(
type=Optional[Union[Tuple[str, int], py.path.local]], default=None
)
count_towards_summary = True count_towards_summary = True
def get_location(self, config): def get_location(self, config: Config) -> Optional[str]:
""" """
Returns the more user-friendly information about the location Returns the more user-friendly information about the location
of a warning, or None. of a warning, or None.
@ -270,13 +291,13 @@ class WarningReport:
class TerminalReporter: class TerminalReporter:
def __init__(self, config: Config, file=None) -> None: def __init__(self, config: Config, file: Optional[TextIO] = None) -> None:
import _pytest.config import _pytest.config
self.config = config self.config = config
self._numcollected = 0 self._numcollected = 0
self._session = None # type: Optional[Session] self._session = None # type: Optional[Session]
self._showfspath = None self._showfspath = None # type: Optional[bool]
self.stats = {} # type: Dict[str, List[Any]] self.stats = {} # type: Dict[str, List[Any]]
self._main_color = None # type: Optional[str] self._main_color = None # type: Optional[str]
@ -293,6 +314,7 @@ class TerminalReporter:
self._progress_nodeids_reported = set() # type: Set[str] self._progress_nodeids_reported = set() # type: Set[str]
self._show_progress_info = self._determine_show_progress_info() self._show_progress_info = self._determine_show_progress_info()
self._collect_report_last_write = None # type: Optional[float] self._collect_report_last_write = None # type: Optional[float]
self._already_displayed_warnings = None # type: Optional[int]
@property @property
def writer(self) -> TerminalWriter: def writer(self) -> TerminalWriter:
@ -300,11 +322,11 @@ class TerminalReporter:
return self._tw return self._tw
@writer.setter @writer.setter
def writer(self, value: TerminalWriter): def writer(self, value: TerminalWriter) -> None:
warnings.warn(TERMINALWRITER_WRITER, stacklevel=2) warnings.warn(TERMINALWRITER_WRITER, stacklevel=2)
self._tw = value self._tw = value
def _determine_show_progress_info(self): def _determine_show_progress_info(self) -> "Literal['progress', 'count', False]":
"""Return True if we should display progress information based on the current config""" """Return True if we should display progress information based on the current config"""
# do not show progress if we are not capturing output (#3038) # do not show progress if we are not capturing output (#3038)
if self.config.getoption("capture", "no") == "no": if self.config.getoption("capture", "no") == "no":
@ -312,38 +334,42 @@ class TerminalReporter:
# do not show progress if we are showing fixture setup/teardown # do not show progress if we are showing fixture setup/teardown
if self.config.getoption("setupshow", False): if self.config.getoption("setupshow", False):
return False return False
cfg = self.config.getini("console_output_style") cfg = self.config.getini("console_output_style") # type: str
if cfg in ("progress", "count"): if cfg == "progress":
return cfg return "progress"
elif cfg == "count":
return "count"
else:
return False return False
@property @property
def verbosity(self): def verbosity(self) -> int:
return self.config.option.verbose verbosity = self.config.option.verbose # type: int
return verbosity
@property @property
def showheader(self): def showheader(self) -> bool:
return self.verbosity >= 0 return self.verbosity >= 0
@property @property
def showfspath(self): def showfspath(self) -> bool:
if self._showfspath is None: if self._showfspath is None:
return self.verbosity >= 0 return self.verbosity >= 0
return self._showfspath return self._showfspath
@showfspath.setter @showfspath.setter
def showfspath(self, value): def showfspath(self, value: Optional[bool]) -> None:
self._showfspath = value self._showfspath = value
@property @property
def showlongtestinfo(self): def showlongtestinfo(self) -> bool:
return self.verbosity > 0 return self.verbosity > 0
def hasopt(self, char): def hasopt(self, char: str) -> bool:
char = {"xfailed": "x", "skipped": "s"}.get(char, char) char = {"xfailed": "x", "skipped": "s"}.get(char, char)
return char in self.reportchars return char in self.reportchars
def write_fspath_result(self, nodeid, res, **markup): def write_fspath_result(self, nodeid: str, res, **markup: bool) -> None:
fspath = self.config.rootdir.join(nodeid.split("::")[0]) fspath = self.config.rootdir.join(nodeid.split("::")[0])
# NOTE: explicitly check for None to work around py bug, and for less # NOTE: explicitly check for None to work around py bug, and for less
# overhead in general (https://github.com/pytest-dev/py/pull/207). # overhead in general (https://github.com/pytest-dev/py/pull/207).
@ -356,7 +382,7 @@ class TerminalReporter:
self._tw.write(fspath + " ") self._tw.write(fspath + " ")
self._tw.write(res, flush=True, **markup) self._tw.write(res, flush=True, **markup)
def write_ensure_prefix(self, prefix, extra="", **kwargs): def write_ensure_prefix(self, prefix, extra: str = "", **kwargs) -> None:
if self.currentfspath != prefix: if self.currentfspath != prefix:
self._tw.line() self._tw.line()
self.currentfspath = prefix self.currentfspath = prefix
@ -376,13 +402,13 @@ class TerminalReporter:
def flush(self) -> None: def flush(self) -> None:
self._tw.flush() self._tw.flush()
def write_line(self, line: Union[str, bytes], **markup) -> None: def write_line(self, line: Union[str, bytes], **markup: bool) -> None:
if not isinstance(line, str): if not isinstance(line, str):
line = str(line, errors="replace") line = str(line, errors="replace")
self.ensure_newline() self.ensure_newline()
self._tw.line(line, **markup) self._tw.line(line, **markup)
def rewrite(self, line, **markup): def rewrite(self, line: str, **markup: bool) -> None:
""" """
Rewinds the terminal cursor to the beginning and writes the given line. Rewinds the terminal cursor to the beginning and writes the given line.
@ -400,14 +426,20 @@ class TerminalReporter:
line = str(line) line = str(line)
self._tw.write("\r" + line + fill, **markup) self._tw.write("\r" + line + fill, **markup)
def write_sep(self, sep, title=None, **markup): def write_sep(
self,
sep: str,
title: Optional[str] = None,
fullwidth: Optional[int] = None,
**markup: bool
) -> None:
self.ensure_newline() self.ensure_newline()
self._tw.sep(sep, title, **markup) self._tw.sep(sep, title, fullwidth, **markup)
def section(self, title, sep="=", **kw): def section(self, title: str, sep: str = "=", **kw: bool) -> None:
self._tw.sep(sep, title, **kw) self._tw.sep(sep, title, **kw)
def line(self, msg, **kw): def line(self, msg: str, **kw: bool) -> None:
self._tw.line(msg, **kw) self._tw.line(msg, **kw)
def _add_stats(self, category: str, items: List) -> None: def _add_stats(self, category: str, items: List) -> None:
@ -421,7 +453,9 @@ class TerminalReporter:
self.write_line("INTERNALERROR> " + line) self.write_line("INTERNALERROR> " + line)
return 1 return 1
def pytest_warning_recorded(self, warning_message, nodeid): def pytest_warning_recorded(
self, warning_message: warnings.WarningMessage, nodeid: str,
) -> None:
from _pytest.warnings import warning_record_to_str from _pytest.warnings import warning_record_to_str
fslocation = warning_message.filename, warning_message.lineno fslocation = warning_message.filename, warning_message.lineno
@ -440,10 +474,10 @@ class TerminalReporter:
# which garbles our output if we use self.write_line # which garbles our output if we use self.write_line
self.write_line(msg) self.write_line(msg)
def pytest_deselected(self, items): def pytest_deselected(self, items) -> None:
self._add_stats("deselected", items) self._add_stats("deselected", items)
def pytest_runtest_logstart(self, nodeid, location): def pytest_runtest_logstart(self, nodeid, location) -> None:
# ensure that the path is printed before the # ensure that the path is printed before the
# 1st test of a module starts running # 1st test of a module starts running
if self.showlongtestinfo: if self.showlongtestinfo:
@ -457,7 +491,9 @@ class TerminalReporter:
def pytest_runtest_logreport(self, report: TestReport) -> None: def pytest_runtest_logreport(self, report: TestReport) -> None:
self._tests_ran = True self._tests_ran = True
rep = report rep = report
res = self.config.hook.pytest_report_teststatus(report=rep, config=self.config) res = self.config.hook.pytest_report_teststatus(
report=rep, config=self.config
) # type: Tuple[str, str, str]
category, letter, word = res category, letter, word = res
if isinstance(word, tuple): if isinstance(word, tuple):
word, markup = word word, markup = word
@ -504,10 +540,11 @@ class TerminalReporter:
self.flush() self.flush()
@property @property
def _is_last_item(self): def _is_last_item(self) -> bool:
assert self._session is not None
return len(self._progress_nodeids_reported) == self._session.testscollected return len(self._progress_nodeids_reported) == self._session.testscollected
def pytest_runtest_logfinish(self, nodeid): def pytest_runtest_logfinish(self, nodeid) -> None:
assert self._session assert self._session
if self.verbosity <= 0 and self._show_progress_info: if self.verbosity <= 0 and self._show_progress_info:
if self._show_progress_info == "count": if self._show_progress_info == "count":
@ -545,7 +582,7 @@ class TerminalReporter:
) )
return " [100%]" return " [100%]"
def _write_progress_information_filling_space(self): def _write_progress_information_filling_space(self) -> None:
color, _ = self._get_main_color() color, _ = self._get_main_color()
msg = self._get_progress_information_message() msg = self._get_progress_information_message()
w = self._width_of_current_line w = self._width_of_current_line
@ -553,7 +590,7 @@ class TerminalReporter:
self.write(msg.rjust(fill), flush=True, **{color: True}) self.write(msg.rjust(fill), flush=True, **{color: True})
@property @property
def _width_of_current_line(self): def _width_of_current_line(self) -> int:
"""Return the width of current line, using the superior implementation of py-1.6 when available""" """Return the width of current line, using the superior implementation of py-1.6 when available"""
return self._tw.width_of_current_line return self._tw.width_of_current_line
@ -575,7 +612,7 @@ class TerminalReporter:
if self.isatty: if self.isatty:
self.report_collect() self.report_collect()
def report_collect(self, final=False): def report_collect(self, final: bool = False) -> None:
if self.config.option.verbose < 0: if self.config.option.verbose < 0:
return return
@ -643,7 +680,9 @@ class TerminalReporter:
) )
self._write_report_lines_from_hooks(lines) self._write_report_lines_from_hooks(lines)
def _write_report_lines_from_hooks(self, lines) -> None: def _write_report_lines_from_hooks(
self, lines: List[Union[str, List[str]]]
) -> None:
lines.reverse() lines.reverse()
for line in collapse(lines): for line in collapse(lines):
self.write_line(line) self.write_line(line)
@ -685,7 +724,7 @@ class TerminalReporter:
for rep in failed: for rep in failed:
rep.toterminal(self._tw) rep.toterminal(self._tw)
def _printcollecteditems(self, items): def _printcollecteditems(self, items: Sequence[Item]) -> None:
# to print out items and their parent collectors # to print out items and their parent collectors
# we take care to leave out Instances aka () # we take care to leave out Instances aka ()
# because later versions are going to get rid of them anyway # because later versions are going to get rid of them anyway
@ -701,7 +740,7 @@ class TerminalReporter:
for item in items: for item in items:
self._tw.line(item.nodeid) self._tw.line(item.nodeid)
return return
stack = [] stack = [] # type: List[Node]
indent = "" indent = ""
for item in items: for item in items:
needed_collectors = item.listchain()[1:] # strip root node needed_collectors = item.listchain()[1:] # strip root node
@ -716,11 +755,8 @@ class TerminalReporter:
indent = (len(stack) - 1) * " " indent = (len(stack) - 1) * " "
self._tw.line("{}{}".format(indent, col)) self._tw.line("{}{}".format(indent, col))
if self.config.option.verbose >= 1: if self.config.option.verbose >= 1:
try: obj = getattr(col, "obj", None)
obj = col.obj # type: ignore doc = inspect.getdoc(obj) if obj else None
except AttributeError:
continue
doc = inspect.getdoc(obj)
if doc: if doc:
for line in doc.splitlines(): for line in doc.splitlines():
self._tw.line("{}{}".format(indent + " ", line)) self._tw.line("{}{}".format(indent + " ", line))
@ -744,12 +780,12 @@ class TerminalReporter:
terminalreporter=self, exitstatus=exitstatus, config=self.config terminalreporter=self, exitstatus=exitstatus, config=self.config
) )
if session.shouldfail: if session.shouldfail:
self.write_sep("!", session.shouldfail, red=True) self.write_sep("!", str(session.shouldfail), red=True)
if exitstatus == ExitCode.INTERRUPTED: if exitstatus == ExitCode.INTERRUPTED:
self._report_keyboardinterrupt() self._report_keyboardinterrupt()
del self._keyboardinterrupt_memo del self._keyboardinterrupt_memo
elif session.shouldstop: elif session.shouldstop:
self.write_sep("!", session.shouldstop, red=True) self.write_sep("!", str(session.shouldstop), red=True)
self.summary_stats() self.summary_stats()
@pytest.hookimpl(hookwrapper=True) @pytest.hookimpl(hookwrapper=True)
@ -770,7 +806,7 @@ class TerminalReporter:
if hasattr(self, "_keyboardinterrupt_memo"): if hasattr(self, "_keyboardinterrupt_memo"):
self._report_keyboardinterrupt() self._report_keyboardinterrupt()
def _report_keyboardinterrupt(self): def _report_keyboardinterrupt(self) -> None:
excrepr = self._keyboardinterrupt_memo excrepr = self._keyboardinterrupt_memo
msg = excrepr.reprcrash.message msg = excrepr.reprcrash.message
self.write_sep("!", msg) self.write_sep("!", msg)
@ -824,14 +860,14 @@ class TerminalReporter:
# #
# summaries for sessionfinish # summaries for sessionfinish
# #
def getreports(self, name): def getreports(self, name: str):
values = [] values = []
for x in self.stats.get(name, []): for x in self.stats.get(name, []):
if not hasattr(x, "_pdbshown"): if not hasattr(x, "_pdbshown"):
values.append(x) values.append(x)
return values return values
def summary_warnings(self): def summary_warnings(self) -> None:
if self.hasopt("w"): if self.hasopt("w"):
all_warnings = self.stats.get( all_warnings = self.stats.get(
"warnings" "warnings"
@ -839,7 +875,7 @@ class TerminalReporter:
if not all_warnings: if not all_warnings:
return return
final = hasattr(self, "_already_displayed_warnings") final = self._already_displayed_warnings is not None
if final: if final:
warning_reports = all_warnings[self._already_displayed_warnings :] warning_reports = all_warnings[self._already_displayed_warnings :]
else: else:
@ -854,7 +890,7 @@ class TerminalReporter:
for wr in warning_reports: for wr in warning_reports:
reports_grouped_by_message.setdefault(wr.message, []).append(wr) reports_grouped_by_message.setdefault(wr.message, []).append(wr)
def collapsed_location_report(reports: List[WarningReport]): def collapsed_location_report(reports: List[WarningReport]) -> str:
locations = [] locations = []
for w in reports: for w in reports:
location = w.get_location(self.config) location = w.get_location(self.config)
@ -888,10 +924,10 @@ class TerminalReporter:
self._tw.line() self._tw.line()
self._tw.line("-- Docs: https://docs.pytest.org/en/latest/warnings.html") self._tw.line("-- Docs: https://docs.pytest.org/en/latest/warnings.html")
def summary_passes(self): def summary_passes(self) -> None:
if self.config.option.tbstyle != "no": if self.config.option.tbstyle != "no":
if self.hasopt("P"): if self.hasopt("P"):
reports = self.getreports("passed") reports = self.getreports("passed") # type: List[TestReport]
if not reports: if not reports:
return return
self.write_sep("=", "PASSES") self.write_sep("=", "PASSES")
@ -903,9 +939,10 @@ class TerminalReporter:
self._handle_teardown_sections(rep.nodeid) self._handle_teardown_sections(rep.nodeid)
def _get_teardown_reports(self, nodeid: str) -> List[TestReport]: def _get_teardown_reports(self, nodeid: str) -> List[TestReport]:
reports = self.getreports("")
return [ return [
report report
for report in self.getreports("") for report in reports
if report.when == "teardown" and report.nodeid == nodeid if report.when == "teardown" and report.nodeid == nodeid
] ]
@ -926,9 +963,9 @@ class TerminalReporter:
content = content[:-1] content = content[:-1]
self._tw.line(content) self._tw.line(content)
def summary_failures(self): def summary_failures(self) -> None:
if self.config.option.tbstyle != "no": if self.config.option.tbstyle != "no":
reports = self.getreports("failed") reports = self.getreports("failed") # type: List[BaseReport]
if not reports: if not reports:
return return
self.write_sep("=", "FAILURES") self.write_sep("=", "FAILURES")
@ -943,9 +980,9 @@ class TerminalReporter:
self._outrep_summary(rep) self._outrep_summary(rep)
self._handle_teardown_sections(rep.nodeid) self._handle_teardown_sections(rep.nodeid)
def summary_errors(self): def summary_errors(self) -> None:
if self.config.option.tbstyle != "no": if self.config.option.tbstyle != "no":
reports = self.getreports("error") reports = self.getreports("error") # type: List[BaseReport]
if not reports: if not reports:
return return
self.write_sep("=", "ERRORS") self.write_sep("=", "ERRORS")
@ -958,7 +995,7 @@ class TerminalReporter:
self.write_sep("_", msg, red=True, bold=True) self.write_sep("_", msg, red=True, bold=True)
self._outrep_summary(rep) self._outrep_summary(rep)
def _outrep_summary(self, rep): def _outrep_summary(self, rep: BaseReport) -> None:
rep.toterminal(self._tw) rep.toterminal(self._tw)
showcapture = self.config.option.showcapture showcapture = self.config.option.showcapture
if showcapture == "no": if showcapture == "no":
@ -971,7 +1008,7 @@ class TerminalReporter:
content = content[:-1] content = content[:-1]
self._tw.line(content) self._tw.line(content)
def summary_stats(self): def summary_stats(self) -> None:
if self.verbosity < -1: if self.verbosity < -1:
return return
@ -1041,7 +1078,7 @@ class TerminalReporter:
lines.append("{} {} {}".format(verbose_word, pos, reason)) lines.append("{} {} {}".format(verbose_word, pos, reason))
def show_skipped(lines: List[str]) -> None: def show_skipped(lines: List[str]) -> None:
skipped = self.stats.get("skipped", []) skipped = self.stats.get("skipped", []) # type: List[CollectReport]
fskips = _folded_skips(self.startdir, skipped) if skipped else [] fskips = _folded_skips(self.startdir, skipped) if skipped else []
if not fskips: if not fskips:
return return
@ -1125,12 +1162,14 @@ class TerminalReporter:
return parts, main_color return parts, main_color
def _get_pos(config, rep): def _get_pos(config: Config, rep: BaseReport):
nodeid = config.cwd_relative_nodeid(rep.nodeid) nodeid = config.cwd_relative_nodeid(rep.nodeid)
return nodeid return nodeid
def _get_line_with_reprcrash_message(config, rep, termwidth): def _get_line_with_reprcrash_message(
config: Config, rep: BaseReport, termwidth: int
) -> str:
"""Get summary line for a report, trying to add reprcrash message.""" """Get summary line for a report, trying to add reprcrash message."""
verbose_word = rep._get_verbose_word(config) verbose_word = rep._get_verbose_word(config)
pos = _get_pos(config, rep) pos = _get_pos(config, rep)
@ -1143,7 +1182,8 @@ def _get_line_with_reprcrash_message(config, rep, termwidth):
return line return line
try: try:
msg = rep.longrepr.reprcrash.message # Type ignored intentionally -- possible AttributeError expected.
msg = rep.longrepr.reprcrash.message # type: ignore[union-attr] # noqa: F821
except AttributeError: except AttributeError:
pass pass
else: else:
@ -1166,9 +1206,12 @@ def _get_line_with_reprcrash_message(config, rep, termwidth):
return line return line
def _folded_skips(startdir, skipped): def _folded_skips(
d = {} startdir: py.path.local, skipped: Sequence[CollectReport],
) -> List[Tuple[int, str, Optional[int], str]]:
d = {} # type: Dict[Tuple[str, Optional[int], str], List[CollectReport]]
for event in skipped: for event in skipped:
assert event.longrepr is not None
assert len(event.longrepr) == 3, (event, event.longrepr) assert len(event.longrepr) == 3, (event, event.longrepr)
fspath, lineno, reason = event.longrepr fspath, lineno, reason = event.longrepr
# For consistency, report all fspaths in relative form. # For consistency, report all fspaths in relative form.
@ -1182,13 +1225,13 @@ def _folded_skips(startdir, skipped):
and "skip" in keywords and "skip" in keywords
and "pytestmark" not in keywords and "pytestmark" not in keywords
): ):
key = (fspath, None, reason) key = (fspath, None, reason) # type: Tuple[str, Optional[int], str]
else: else:
key = (fspath, lineno, reason) key = (fspath, lineno, reason)
d.setdefault(key, []).append(event) d.setdefault(key, []).append(event)
values = [] values = [] # type: List[Tuple[int, str, Optional[int], str]]
for key, events in d.items(): for key, events in d.items():
values.append((len(events),) + key) values.append((len(events), *key))
return values return values
@ -1201,7 +1244,7 @@ _color_for_type = {
_color_for_type_default = "yellow" _color_for_type_default = "yellow"
def _make_plural(count, noun): def _make_plural(count: int, noun: str) -> Tuple[int, str]:
# No need to pluralize words such as `failed` or `passed`. # No need to pluralize words such as `failed` or `passed`.
if noun not in ["error", "warnings"]: if noun not in ["error", "warnings"]:
return count, noun return count, noun