From c0af19d8ad30840a8ef3c0aedd436816fc86ad3a Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Fri, 1 May 2020 14:40:16 +0300 Subject: [PATCH] Type annotate more of _pytest.terminal --- src/_pytest/terminal.py | 185 +++++++++++++++++++++++++--------------- 1 file changed, 114 insertions(+), 71 deletions(-) diff --git a/src/_pytest/terminal.py b/src/_pytest/terminal.py index 1b9601a22..b37828e5a 100644 --- a/src/_pytest/terminal.py +++ b/src/_pytest/terminal.py @@ -16,7 +16,9 @@ from typing import Generator from typing import List from typing import Mapping from typing import Optional +from typing import Sequence from typing import Set +from typing import TextIO from typing import Tuple from typing import Union @@ -37,11 +39,15 @@ from _pytest.config import Config from _pytest.config import ExitCode from _pytest.config.argparsing import Parser from _pytest.deprecated import TERMINALWRITER_WRITER +from _pytest.nodes import Item +from _pytest.nodes import Node from _pytest.reports import BaseReport from _pytest.reports import CollectReport from _pytest.reports import TestReport if TYPE_CHECKING: + from typing_extensions import Literal + from _pytest.main import Session @@ -69,7 +75,14 @@ class MoreQuietAction(argparse.Action): used to unify verbosity handling """ - def __init__(self, option_strings, dest, default=None, required=False, help=None): + def __init__( + self, + option_strings: Sequence[str], + dest: str, + default: object = None, + required: bool = False, + help: Optional[str] = None, + ) -> None: super().__init__( option_strings=option_strings, dest=dest, @@ -79,7 +92,13 @@ class MoreQuietAction(argparse.Action): help=help, ) - def __call__(self, parser, namespace, values, option_string=None): + def __call__( + self, + parser: argparse.ArgumentParser, + namespace: argparse.Namespace, + values: Union[str, Sequence[object], None], + option_string: Optional[str] = None, + ) -> None: new_count = getattr(namespace, self.dest, 0) - 1 setattr(namespace, self.dest, new_count) # todo Deprecate config.quiet @@ -194,7 +213,7 @@ def pytest_configure(config: Config) -> None: def getreportopt(config: Config) -> str: - reportchars = config.option.reportchars + reportchars = config.option.reportchars # type: str old_aliases = {"F", "S"} reportopts = "" @@ -247,10 +266,12 @@ class WarningReport: message = attr.ib(type=str) nodeid = attr.ib(type=Optional[str], default=None) - fslocation = attr.ib(default=None) + fslocation = attr.ib( + type=Optional[Union[Tuple[str, int], py.path.local]], default=None + ) count_towards_summary = True - def get_location(self, config): + def get_location(self, config: Config) -> Optional[str]: """ Returns the more user-friendly information about the location of a warning, or None. @@ -270,13 +291,13 @@ class WarningReport: class TerminalReporter: - def __init__(self, config: Config, file=None) -> None: + def __init__(self, config: Config, file: Optional[TextIO] = None) -> None: import _pytest.config self.config = config self._numcollected = 0 self._session = None # type: Optional[Session] - self._showfspath = None + self._showfspath = None # type: Optional[bool] self.stats = {} # type: Dict[str, List[Any]] self._main_color = None # type: Optional[str] @@ -293,6 +314,7 @@ class TerminalReporter: self._progress_nodeids_reported = set() # type: Set[str] self._show_progress_info = self._determine_show_progress_info() self._collect_report_last_write = None # type: Optional[float] + self._already_displayed_warnings = None # type: Optional[int] @property def writer(self) -> TerminalWriter: @@ -300,11 +322,11 @@ class TerminalReporter: return self._tw @writer.setter - def writer(self, value: TerminalWriter): + def writer(self, value: TerminalWriter) -> None: warnings.warn(TERMINALWRITER_WRITER, stacklevel=2) self._tw = value - def _determine_show_progress_info(self): + def _determine_show_progress_info(self) -> "Literal['progress', 'count', False]": """Return True if we should display progress information based on the current config""" # do not show progress if we are not capturing output (#3038) if self.config.getoption("capture", "no") == "no": @@ -312,38 +334,42 @@ class TerminalReporter: # do not show progress if we are showing fixture setup/teardown if self.config.getoption("setupshow", False): return False - cfg = self.config.getini("console_output_style") - if cfg in ("progress", "count"): - return cfg - return False + cfg = self.config.getini("console_output_style") # type: str + if cfg == "progress": + return "progress" + elif cfg == "count": + return "count" + else: + return False @property - def verbosity(self): - return self.config.option.verbose + def verbosity(self) -> int: + verbosity = self.config.option.verbose # type: int + return verbosity @property - def showheader(self): + def showheader(self) -> bool: return self.verbosity >= 0 @property - def showfspath(self): + def showfspath(self) -> bool: if self._showfspath is None: return self.verbosity >= 0 return self._showfspath @showfspath.setter - def showfspath(self, value): + def showfspath(self, value: Optional[bool]) -> None: self._showfspath = value @property - def showlongtestinfo(self): + def showlongtestinfo(self) -> bool: return self.verbosity > 0 - def hasopt(self, char): + def hasopt(self, char: str) -> bool: char = {"xfailed": "x", "skipped": "s"}.get(char, char) return char in self.reportchars - def write_fspath_result(self, nodeid, res, **markup): + def write_fspath_result(self, nodeid: str, res, **markup: bool) -> None: fspath = self.config.rootdir.join(nodeid.split("::")[0]) # NOTE: explicitly check for None to work around py bug, and for less # overhead in general (https://github.com/pytest-dev/py/pull/207). @@ -356,7 +382,7 @@ class TerminalReporter: self._tw.write(fspath + " ") self._tw.write(res, flush=True, **markup) - def write_ensure_prefix(self, prefix, extra="", **kwargs): + def write_ensure_prefix(self, prefix, extra: str = "", **kwargs) -> None: if self.currentfspath != prefix: self._tw.line() self.currentfspath = prefix @@ -376,13 +402,13 @@ class TerminalReporter: def flush(self) -> None: self._tw.flush() - def write_line(self, line: Union[str, bytes], **markup) -> None: + def write_line(self, line: Union[str, bytes], **markup: bool) -> None: if not isinstance(line, str): line = str(line, errors="replace") self.ensure_newline() self._tw.line(line, **markup) - def rewrite(self, line, **markup): + def rewrite(self, line: str, **markup: bool) -> None: """ Rewinds the terminal cursor to the beginning and writes the given line. @@ -400,14 +426,20 @@ class TerminalReporter: line = str(line) self._tw.write("\r" + line + fill, **markup) - def write_sep(self, sep, title=None, **markup): + def write_sep( + self, + sep: str, + title: Optional[str] = None, + fullwidth: Optional[int] = None, + **markup: bool + ) -> None: self.ensure_newline() - self._tw.sep(sep, title, **markup) + self._tw.sep(sep, title, fullwidth, **markup) - def section(self, title, sep="=", **kw): + def section(self, title: str, sep: str = "=", **kw: bool) -> None: self._tw.sep(sep, title, **kw) - def line(self, msg, **kw): + def line(self, msg: str, **kw: bool) -> None: self._tw.line(msg, **kw) def _add_stats(self, category: str, items: List) -> None: @@ -421,7 +453,9 @@ class TerminalReporter: self.write_line("INTERNALERROR> " + line) return 1 - def pytest_warning_recorded(self, warning_message, nodeid): + def pytest_warning_recorded( + self, warning_message: warnings.WarningMessage, nodeid: str, + ) -> None: from _pytest.warnings import warning_record_to_str fslocation = warning_message.filename, warning_message.lineno @@ -440,10 +474,10 @@ class TerminalReporter: # which garbles our output if we use self.write_line self.write_line(msg) - def pytest_deselected(self, items): + def pytest_deselected(self, items) -> None: self._add_stats("deselected", items) - def pytest_runtest_logstart(self, nodeid, location): + def pytest_runtest_logstart(self, nodeid, location) -> None: # ensure that the path is printed before the # 1st test of a module starts running if self.showlongtestinfo: @@ -457,7 +491,9 @@ class TerminalReporter: def pytest_runtest_logreport(self, report: TestReport) -> None: self._tests_ran = True rep = report - res = self.config.hook.pytest_report_teststatus(report=rep, config=self.config) + res = self.config.hook.pytest_report_teststatus( + report=rep, config=self.config + ) # type: Tuple[str, str, str] category, letter, word = res if isinstance(word, tuple): word, markup = word @@ -504,10 +540,11 @@ class TerminalReporter: self.flush() @property - def _is_last_item(self): + def _is_last_item(self) -> bool: + assert self._session is not None return len(self._progress_nodeids_reported) == self._session.testscollected - def pytest_runtest_logfinish(self, nodeid): + def pytest_runtest_logfinish(self, nodeid) -> None: assert self._session if self.verbosity <= 0 and self._show_progress_info: if self._show_progress_info == "count": @@ -545,7 +582,7 @@ class TerminalReporter: ) return " [100%]" - def _write_progress_information_filling_space(self): + def _write_progress_information_filling_space(self) -> None: color, _ = self._get_main_color() msg = self._get_progress_information_message() w = self._width_of_current_line @@ -553,7 +590,7 @@ class TerminalReporter: self.write(msg.rjust(fill), flush=True, **{color: True}) @property - def _width_of_current_line(self): + def _width_of_current_line(self) -> int: """Return the width of current line, using the superior implementation of py-1.6 when available""" return self._tw.width_of_current_line @@ -575,7 +612,7 @@ class TerminalReporter: if self.isatty: self.report_collect() - def report_collect(self, final=False): + def report_collect(self, final: bool = False) -> None: if self.config.option.verbose < 0: return @@ -643,7 +680,9 @@ class TerminalReporter: ) self._write_report_lines_from_hooks(lines) - def _write_report_lines_from_hooks(self, lines) -> None: + def _write_report_lines_from_hooks( + self, lines: List[Union[str, List[str]]] + ) -> None: lines.reverse() for line in collapse(lines): self.write_line(line) @@ -685,7 +724,7 @@ class TerminalReporter: for rep in failed: rep.toterminal(self._tw) - def _printcollecteditems(self, items): + def _printcollecteditems(self, items: Sequence[Item]) -> None: # to print out items and their parent collectors # we take care to leave out Instances aka () # because later versions are going to get rid of them anyway @@ -701,7 +740,7 @@ class TerminalReporter: for item in items: self._tw.line(item.nodeid) return - stack = [] + stack = [] # type: List[Node] indent = "" for item in items: needed_collectors = item.listchain()[1:] # strip root node @@ -716,11 +755,8 @@ class TerminalReporter: indent = (len(stack) - 1) * " " self._tw.line("{}{}".format(indent, col)) if self.config.option.verbose >= 1: - try: - obj = col.obj # type: ignore - except AttributeError: - continue - doc = inspect.getdoc(obj) + obj = getattr(col, "obj", None) + doc = inspect.getdoc(obj) if obj else None if doc: for line in doc.splitlines(): self._tw.line("{}{}".format(indent + " ", line)) @@ -744,12 +780,12 @@ class TerminalReporter: terminalreporter=self, exitstatus=exitstatus, config=self.config ) if session.shouldfail: - self.write_sep("!", session.shouldfail, red=True) + self.write_sep("!", str(session.shouldfail), red=True) if exitstatus == ExitCode.INTERRUPTED: self._report_keyboardinterrupt() del self._keyboardinterrupt_memo elif session.shouldstop: - self.write_sep("!", session.shouldstop, red=True) + self.write_sep("!", str(session.shouldstop), red=True) self.summary_stats() @pytest.hookimpl(hookwrapper=True) @@ -770,7 +806,7 @@ class TerminalReporter: if hasattr(self, "_keyboardinterrupt_memo"): self._report_keyboardinterrupt() - def _report_keyboardinterrupt(self): + def _report_keyboardinterrupt(self) -> None: excrepr = self._keyboardinterrupt_memo msg = excrepr.reprcrash.message self.write_sep("!", msg) @@ -824,14 +860,14 @@ class TerminalReporter: # # summaries for sessionfinish # - def getreports(self, name): + def getreports(self, name: str): values = [] for x in self.stats.get(name, []): if not hasattr(x, "_pdbshown"): values.append(x) return values - def summary_warnings(self): + def summary_warnings(self) -> None: if self.hasopt("w"): all_warnings = self.stats.get( "warnings" @@ -839,7 +875,7 @@ class TerminalReporter: if not all_warnings: return - final = hasattr(self, "_already_displayed_warnings") + final = self._already_displayed_warnings is not None if final: warning_reports = all_warnings[self._already_displayed_warnings :] else: @@ -854,7 +890,7 @@ class TerminalReporter: for wr in warning_reports: reports_grouped_by_message.setdefault(wr.message, []).append(wr) - def collapsed_location_report(reports: List[WarningReport]): + def collapsed_location_report(reports: List[WarningReport]) -> str: locations = [] for w in reports: location = w.get_location(self.config) @@ -888,10 +924,10 @@ class TerminalReporter: self._tw.line() self._tw.line("-- Docs: https://docs.pytest.org/en/latest/warnings.html") - def summary_passes(self): + def summary_passes(self) -> None: if self.config.option.tbstyle != "no": if self.hasopt("P"): - reports = self.getreports("passed") + reports = self.getreports("passed") # type: List[TestReport] if not reports: return self.write_sep("=", "PASSES") @@ -903,9 +939,10 @@ class TerminalReporter: self._handle_teardown_sections(rep.nodeid) def _get_teardown_reports(self, nodeid: str) -> List[TestReport]: + reports = self.getreports("") return [ report - for report in self.getreports("") + for report in reports if report.when == "teardown" and report.nodeid == nodeid ] @@ -926,9 +963,9 @@ class TerminalReporter: content = content[:-1] self._tw.line(content) - def summary_failures(self): + def summary_failures(self) -> None: if self.config.option.tbstyle != "no": - reports = self.getreports("failed") + reports = self.getreports("failed") # type: List[BaseReport] if not reports: return self.write_sep("=", "FAILURES") @@ -943,9 +980,9 @@ class TerminalReporter: self._outrep_summary(rep) self._handle_teardown_sections(rep.nodeid) - def summary_errors(self): + def summary_errors(self) -> None: if self.config.option.tbstyle != "no": - reports = self.getreports("error") + reports = self.getreports("error") # type: List[BaseReport] if not reports: return self.write_sep("=", "ERRORS") @@ -958,7 +995,7 @@ class TerminalReporter: self.write_sep("_", msg, red=True, bold=True) self._outrep_summary(rep) - def _outrep_summary(self, rep): + def _outrep_summary(self, rep: BaseReport) -> None: rep.toterminal(self._tw) showcapture = self.config.option.showcapture if showcapture == "no": @@ -971,7 +1008,7 @@ class TerminalReporter: content = content[:-1] self._tw.line(content) - def summary_stats(self): + def summary_stats(self) -> None: if self.verbosity < -1: return @@ -1041,7 +1078,7 @@ class TerminalReporter: lines.append("{} {} {}".format(verbose_word, pos, reason)) def show_skipped(lines: List[str]) -> None: - skipped = self.stats.get("skipped", []) + skipped = self.stats.get("skipped", []) # type: List[CollectReport] fskips = _folded_skips(self.startdir, skipped) if skipped else [] if not fskips: return @@ -1125,12 +1162,14 @@ class TerminalReporter: return parts, main_color -def _get_pos(config, rep): +def _get_pos(config: Config, rep: BaseReport): nodeid = config.cwd_relative_nodeid(rep.nodeid) return nodeid -def _get_line_with_reprcrash_message(config, rep, termwidth): +def _get_line_with_reprcrash_message( + config: Config, rep: BaseReport, termwidth: int +) -> str: """Get summary line for a report, trying to add reprcrash message.""" verbose_word = rep._get_verbose_word(config) pos = _get_pos(config, rep) @@ -1143,7 +1182,8 @@ def _get_line_with_reprcrash_message(config, rep, termwidth): return line try: - msg = rep.longrepr.reprcrash.message + # Type ignored intentionally -- possible AttributeError expected. + msg = rep.longrepr.reprcrash.message # type: ignore[union-attr] # noqa: F821 except AttributeError: pass else: @@ -1166,9 +1206,12 @@ def _get_line_with_reprcrash_message(config, rep, termwidth): return line -def _folded_skips(startdir, skipped): - d = {} +def _folded_skips( + startdir: py.path.local, skipped: Sequence[CollectReport], +) -> List[Tuple[int, str, Optional[int], str]]: + d = {} # type: Dict[Tuple[str, Optional[int], str], List[CollectReport]] for event in skipped: + assert event.longrepr is not None assert len(event.longrepr) == 3, (event, event.longrepr) fspath, lineno, reason = event.longrepr # For consistency, report all fspaths in relative form. @@ -1182,13 +1225,13 @@ def _folded_skips(startdir, skipped): and "skip" in keywords and "pytestmark" not in keywords ): - key = (fspath, None, reason) + key = (fspath, None, reason) # type: Tuple[str, Optional[int], str] else: key = (fspath, lineno, reason) d.setdefault(key, []).append(event) - values = [] + values = [] # type: List[Tuple[int, str, Optional[int], str]] for key, events in d.items(): - values.append((len(events),) + key) + values.append((len(events), *key)) return values @@ -1201,7 +1244,7 @@ _color_for_type = { _color_for_type_default = "yellow" -def _make_plural(count, noun): +def _make_plural(count: int, noun: str) -> Tuple[int, str]: # No need to pluralize words such as `failed` or `passed`. if noun not in ["error", "warnings"]: return count, noun