diff --git a/src/_pytest/doctest.py b/src/_pytest/doctest.py index acedd389b..c744bb369 100644 --- a/src/_pytest/doctest.py +++ b/src/_pytest/doctest.py @@ -32,6 +32,7 @@ from _pytest.compat import TYPE_CHECKING from _pytest.config import Config from _pytest.config.argparsing import Parser from _pytest.fixtures import FixtureRequest +from _pytest.nodes import Collector from _pytest.outcomes import OutcomeException from _pytest.pathlib import import_path from _pytest.python_api import approx @@ -118,7 +119,7 @@ def pytest_unconfigure() -> None: def pytest_collect_file( - path: py.path.local, parent + path: py.path.local, parent: Collector, ) -> Optional[Union["DoctestModule", "DoctestTextfile"]]: config = parent.config if path.ext == ".py": diff --git a/src/_pytest/hookspec.py b/src/_pytest/hookspec.py index 8e6be7d56..d4aaba1ec 100644 --- a/src/_pytest/hookspec.py +++ b/src/_pytest/hookspec.py @@ -274,10 +274,12 @@ def pytest_ignore_collect(path: py.path.local, config: "Config") -> Optional[boo """ -def pytest_collect_file(path: py.path.local, parent) -> "Optional[Collector]": - """Return collection Node or None for the given path. +def pytest_collect_file( + path: py.path.local, parent: "Collector" +) -> "Optional[Collector]": + """Create a Collector for the given path, or None if not relevant. - Any new node needs to have the specified ``parent`` as a parent. + The new node needs to have the specified ``parent`` as a parent. :param py.path.local path: The path to collect. """ diff --git a/src/_pytest/main.py b/src/_pytest/main.py index 479f34cdc..29b1e5f70 100644 --- a/src/_pytest/main.py +++ b/src/_pytest/main.py @@ -45,8 +45,6 @@ if TYPE_CHECKING: from typing import Type from typing_extensions import Literal - from _pytest.python import Package - def pytest_addoption(parser: Parser) -> None: parser.addini( @@ -402,10 +400,6 @@ class FSHookProxy: return x -class NoMatch(Exception): - """Matching cannot locate matching names.""" - - class Interrupted(KeyboardInterrupt): """Signals that the test run was interrupted.""" @@ -447,20 +441,6 @@ class Session(nodes.FSCollector): self.startdir = config.invocation_dir self._initialpaths = frozenset() # type: FrozenSet[py.path.local] - # Keep track of any collected nodes in here, so we don't duplicate fixtures. - self._collection_node_cache1 = ( - {} - ) # type: Dict[py.path.local, Sequence[nodes.Collector]] - self._collection_node_cache2 = ( - {} - ) # type: Dict[Tuple[Type[nodes.Collector], py.path.local], nodes.Collector] - self._collection_node_cache3 = ( - {} - ) # type: Dict[Tuple[Type[nodes.Collector], str], CollectReport] - - # Dirnames of pkgs with dunder-init files. - self._collection_pkg_roots = {} # type: Dict[str, Package] - self._bestrelpathcache = _bestrelpath_cache( config.rootdir ) # type: Dict[py.path.local, str] @@ -523,6 +503,42 @@ class Session(nodes.FSCollector): proxy = self.config.hook return proxy + def _recurse(self, direntry: "os.DirEntry[str]") -> bool: + if direntry.name == "__pycache__": + return False + path = py.path.local(direntry.path) + ihook = self.gethookproxy(path.dirpath()) + if ihook.pytest_ignore_collect(path=path, config=self.config): + return False + norecursepatterns = self.config.getini("norecursedirs") + if any(path.check(fnmatch=pat) for pat in norecursepatterns): + return False + return True + + def _collectfile( + self, path: py.path.local, handle_dupes: bool = True + ) -> Sequence[nodes.Collector]: + assert ( + path.isfile() + ), "{!r} is not a file (isdir={!r}, exists={!r}, islink={!r})".format( + path, path.isdir(), path.exists(), path.islink() + ) + ihook = self.gethookproxy(path) + if not self.isinitpath(path): + if ihook.pytest_ignore_collect(path=path, config=self.config): + return () + + if handle_dupes: + keepduplicates = self.config.getoption("keepduplicates") + if not keepduplicates: + duplicate_paths = self.config.pluginmanager._duplicatepaths + if path in duplicate_paths: + return () + else: + duplicate_paths.add(path) + + return ihook.pytest_collect_file(path=path, parent=self) # type: ignore[no-any-return] + @overload def perform_collect( self, args: Optional[Sequence[str]] = ..., genitems: "Literal[True]" = ... @@ -552,219 +568,206 @@ class Session(nodes.FSCollector): in which case the return value contains these collectors unexpanded, and ``session.items`` is empty. """ + if args is None: + args = self.config.args + + self.trace("perform_collect", self, args) + self.trace.root.indent += 1 + + self._notfound = [] # type: List[Tuple[str, Sequence[nodes.Collector]]] + self._initial_parts = [] # type: List[Tuple[py.path.local, List[str]]] + self.items = [] # type: List[nodes.Item] + hook = self.config.hook + + items = self.items # type: Sequence[Union[nodes.Item, nodes.Collector]] try: - items = self._perform_collect(args, genitems) + initialpaths = [] # type: List[py.path.local] + for arg in args: + fspath, parts = resolve_collection_argument( + self.config.invocation_dir, arg, as_pypath=self.config.option.pyargs + ) + self._initial_parts.append((fspath, parts)) + initialpaths.append(fspath) + self._initialpaths = frozenset(initialpaths) + rep = collect_one_node(self) + self.ihook.pytest_collectreport(report=rep) + self.trace.root.indent -= 1 + if self._notfound: + errors = [] + for arg, cols in self._notfound: + line = "(no name {!r} in any of {!r})".format(arg, cols) + errors.append("not found: {}\n{}".format(arg, line)) + raise UsageError(*errors) + if not genitems: + items = rep.result + else: + if rep.passed: + for node in rep.result: + self.items.extend(self.genitems(node)) + self.config.pluginmanager.check_pending() hook.pytest_collection_modifyitems( session=self, config=self.config, items=items ) finally: hook.pytest_collection_finish(session=self) + self.testscollected = len(items) return items - @overload - def _perform_collect( - self, args: Optional[Sequence[str]], genitems: "Literal[True]" - ) -> List[nodes.Item]: - ... - - @overload # noqa: F811 - def _perform_collect( # noqa: F811 - self, args: Optional[Sequence[str]], genitems: bool - ) -> Union[List[Union[nodes.Item]], List[Union[nodes.Item, nodes.Collector]]]: - ... - - def _perform_collect( # noqa: F811 - self, args: Optional[Sequence[str]], genitems: bool - ) -> Union[List[Union[nodes.Item]], List[Union[nodes.Item, nodes.Collector]]]: - if args is None: - args = self.config.args - self.trace("perform_collect", self, args) - self.trace.root.indent += 1 - self._notfound = [] # type: List[Tuple[str, NoMatch]] - initialpaths = [] # type: List[py.path.local] - self._initial_parts = [] # type: List[Tuple[py.path.local, List[str]]] - self.items = items = [] # type: List[nodes.Item] - for arg in args: - fspath, parts = resolve_collection_argument( - self.config.invocation_dir, arg, as_pypath=self.config.option.pyargs - ) - self._initial_parts.append((fspath, parts)) - initialpaths.append(fspath) - self._initialpaths = frozenset(initialpaths) - rep = collect_one_node(self) - self.ihook.pytest_collectreport(report=rep) - self.trace.root.indent -= 1 - if self._notfound: - errors = [] - for arg, exc in self._notfound: - line = "(no name {!r} in any of {!r})".format(arg, exc.args[0]) - errors.append("not found: {}\n{}".format(arg, line)) - raise UsageError(*errors) - if not genitems: - return rep.result - else: - if rep.passed: - for node in rep.result: - self.items.extend(self.genitems(node)) - return items - def collect(self) -> Iterator[Union[nodes.Item, nodes.Collector]]: - for fspath, parts in self._initial_parts: - self.trace("processing argument", (fspath, parts)) - self.trace.root.indent += 1 - try: - yield from self._collect(fspath, parts) - except NoMatch as exc: - report_arg = "::".join((str(fspath), *parts)) - # we are inside a make_report hook so - # we cannot directly pass through the exception - self._notfound.append((report_arg, exc)) - - self.trace.root.indent -= 1 - self._collection_node_cache1.clear() - self._collection_node_cache2.clear() - self._collection_node_cache3.clear() - self._collection_pkg_roots.clear() - - def _collect( - self, argpath: py.path.local, names: List[str] - ) -> Iterator[Union[nodes.Item, nodes.Collector]]: from _pytest.python import Package - # Start with a Session root, and delve to argpath item (dir or file) - # and stack all Packages found on the way. - # No point in finding packages when collecting doctests. - if not self.config.getoption("doctestmodules", False): - pm = self.config.pluginmanager - for parent in reversed(argpath.parts()): - if pm._confcutdir and pm._confcutdir.relto(parent): - break + # Keep track of any collected nodes in here, so we don't duplicate fixtures. + node_cache1 = {} # type: Dict[py.path.local, Sequence[nodes.Collector]] + node_cache2 = ( + {} + ) # type: Dict[Tuple[Type[nodes.Collector], py.path.local], nodes.Collector] - if parent.isdir(): - pkginit = parent.join("__init__.py") - if pkginit.isfile(): - if pkginit not in self._collection_node_cache1: + # Keep track of any collected collectors in matchnodes paths, so they + # are not collected more than once. + matchnodes_cache = ( + {} + ) # type: Dict[Tuple[Type[nodes.Collector], str], CollectReport] + + # Dirnames of pkgs with dunder-init files. + pkg_roots = {} # type: Dict[str, Package] + + for argpath, names in self._initial_parts: + self.trace("processing argument", (argpath, names)) + self.trace.root.indent += 1 + + # Start with a Session root, and delve to argpath item (dir or file) + # and stack all Packages found on the way. + # No point in finding packages when collecting doctests. + if not self.config.getoption("doctestmodules", False): + pm = self.config.pluginmanager + for parent in reversed(argpath.parts()): + if pm._confcutdir and pm._confcutdir.relto(parent): + break + + if parent.isdir(): + pkginit = parent.join("__init__.py") + if pkginit.isfile() and pkginit not in node_cache1: col = self._collectfile(pkginit, handle_dupes=False) if col: if isinstance(col[0], Package): - self._collection_pkg_roots[str(parent)] = col[0] - # Always store a list in the cache, matchnodes expects it. - self._collection_node_cache1[col[0].fspath] = [col[0]] + pkg_roots[str(parent)] = col[0] + node_cache1[col[0].fspath] = [col[0]] - # If it's a directory argument, recurse and look for any Subpackages. - # Let the Package collector deal with subnodes, don't collect here. - if argpath.check(dir=1): - assert not names, "invalid arg {!r}".format((argpath, names)) + # If it's a directory argument, recurse and look for any Subpackages. + # Let the Package collector deal with subnodes, don't collect here. + if argpath.check(dir=1): + assert not names, "invalid arg {!r}".format((argpath, names)) - seen_dirs = set() # type: Set[py.path.local] - for direntry in visit(str(argpath), self._recurse): - if not direntry.is_file(): - continue + seen_dirs = set() # type: Set[py.path.local] + for direntry in visit(str(argpath), self._recurse): + if not direntry.is_file(): + continue - path = py.path.local(direntry.path) - dirpath = path.dirpath() + path = py.path.local(direntry.path) + dirpath = path.dirpath() - if dirpath not in seen_dirs: - # Collect packages first. - seen_dirs.add(dirpath) - pkginit = dirpath.join("__init__.py") - if pkginit.exists(): - for x in self._collectfile(pkginit): + if dirpath not in seen_dirs: + # Collect packages first. + seen_dirs.add(dirpath) + pkginit = dirpath.join("__init__.py") + if pkginit.exists(): + for x in self._collectfile(pkginit): + yield x + if isinstance(x, Package): + pkg_roots[str(dirpath)] = x + if str(dirpath) in pkg_roots: + # Do not collect packages here. + continue + + for x in self._collectfile(path): + key = (type(x), x.fspath) + if key in node_cache2: + yield node_cache2[key] + else: + node_cache2[key] = x yield x - if isinstance(x, Package): - self._collection_pkg_roots[str(dirpath)] = x - if str(dirpath) in self._collection_pkg_roots: - # Do not collect packages here. + else: + assert argpath.check(file=1) + + if argpath in node_cache1: + col = node_cache1[argpath] + else: + collect_root = pkg_roots.get(argpath.dirname, self) + col = collect_root._collectfile(argpath, handle_dupes=False) + if col: + node_cache1[argpath] = col + + matching = [] + work = [ + (col, names) + ] # type: List[Tuple[Sequence[Union[nodes.Item, nodes.Collector]], Sequence[str]]] + while work: + self.trace("matchnodes", col, names) + self.trace.root.indent += 1 + + matchnodes, matchnames = work.pop() + for node in matchnodes: + if not matchnames: + matching.append(node) + continue + if not isinstance(node, nodes.Collector): + continue + key = (type(node), node.nodeid) + if key in matchnodes_cache: + rep = matchnodes_cache[key] + else: + rep = collect_one_node(node) + matchnodes_cache[key] = rep + if rep.passed: + submatchnodes = [] + for r in rep.result: + # TODO: Remove parametrized workaround once collection structure contains + # parametrization. + if ( + r.name == matchnames[0] + or r.name.split("[")[0] == matchnames[0] + ): + submatchnodes.append(r) + if submatchnodes: + work.append((submatchnodes, matchnames[1:])) + # XXX Accept IDs that don't have "()" for class instances. + elif len(rep.result) == 1 and rep.result[0].name == "()": + work.append((rep.result, matchnames)) + else: + # Report collection failures here to avoid failing to run some test + # specified in the command line because the module could not be + # imported (#134). + node.ihook.pytest_collectreport(report=rep) + + self.trace("matchnodes finished -> ", len(matching), "nodes") + self.trace.root.indent -= 1 + + if not matching: + report_arg = "::".join((str(argpath), *names)) + self._notfound.append((report_arg, col)) continue - for x in self._collectfile(path): - key = (type(x), x.fspath) - if key in self._collection_node_cache2: - yield self._collection_node_cache2[key] - else: - self._collection_node_cache2[key] = x - yield x - else: - assert argpath.check(file=1) + # If __init__.py was the only file requested, then the matched node will be + # the corresponding Package, and the first yielded item will be the __init__ + # Module itself, so just use that. If this special case isn't taken, then all + # the files in the package will be yielded. + if argpath.basename == "__init__.py": + assert isinstance(matching[0], nodes.Collector) + try: + yield next(iter(matching[0].collect())) + except StopIteration: + # The package collects nothing with only an __init__.py + # file in it, which gets ignored by the default + # "python_files" option. + pass + continue - if argpath in self._collection_node_cache1: - col = self._collection_node_cache1[argpath] - else: - collect_root = self._collection_pkg_roots.get(argpath.dirname, self) - col = collect_root._collectfile(argpath, handle_dupes=False) - if col: - self._collection_node_cache1[argpath] = col - m = self.matchnodes(col, names) - # If __init__.py was the only file requested, then the matched node will be - # the corresponding Package, and the first yielded item will be the __init__ - # Module itself, so just use that. If this special case isn't taken, then all - # the files in the package will be yielded. - if argpath.basename == "__init__.py": - assert isinstance(m[0], nodes.Collector) - try: - yield next(iter(m[0].collect())) - except StopIteration: - # The package collects nothing with only an __init__.py - # file in it, which gets ignored by the default - # "python_files" option. - pass - return - yield from m + yield from matching - def matchnodes( - self, matching: Sequence[Union[nodes.Item, nodes.Collector]], names: List[str], - ) -> Sequence[Union[nodes.Item, nodes.Collector]]: - self.trace("matchnodes", matching, names) - self.trace.root.indent += 1 - nodes = self._matchnodes(matching, names) - num = len(nodes) - self.trace("matchnodes finished -> ", num, "nodes") - self.trace.root.indent -= 1 - if num == 0: - raise NoMatch(matching, names[:1]) - return nodes - - def _matchnodes( - self, matching: Sequence[Union[nodes.Item, nodes.Collector]], names: List[str], - ) -> Sequence[Union[nodes.Item, nodes.Collector]]: - if not matching or not names: - return matching - name = names[0] - assert name - nextnames = names[1:] - resultnodes = [] # type: List[Union[nodes.Item, nodes.Collector]] - for node in matching: - if isinstance(node, nodes.Item): - if not names: - resultnodes.append(node) - continue - assert isinstance(node, nodes.Collector) - key = (type(node), node.nodeid) - if key in self._collection_node_cache3: - rep = self._collection_node_cache3[key] - else: - rep = collect_one_node(node) - self._collection_node_cache3[key] = rep - if rep.passed: - has_matched = False - for x in rep.result: - # TODO: Remove parametrized workaround once collection structure contains parametrization. - if x.name == name or x.name.split("[")[0] == name: - resultnodes.extend(self.matchnodes([x], nextnames)) - has_matched = True - # XXX Accept IDs that don't have "()" for class instances. - if not has_matched and len(rep.result) == 1 and x.name == "()": - nextnames.insert(0, name) - resultnodes.extend(self.matchnodes([x], nextnames)) - else: - # Report collection failures here to avoid failing to run some test - # specified in the command line because the module could not be - # imported (#134). - node.ihook.pytest_collectreport(report=rep) - return resultnodes + self.trace.root.indent -= 1 def genitems( self, node: Union[nodes.Item, nodes.Collector] diff --git a/src/_pytest/nodes.py b/src/_pytest/nodes.py index addeaf1c2..26fab67fe 100644 --- a/src/_pytest/nodes.py +++ b/src/_pytest/nodes.py @@ -8,7 +8,6 @@ from typing import Iterable from typing import Iterator from typing import List from typing import Optional -from typing import Sequence from typing import Set from typing import Tuple from typing import TypeVar @@ -528,8 +527,6 @@ class FSCollector(Collector): super().__init__(name, parent, config, session, nodeid=nodeid, fspath=fspath) - self._norecursepatterns = self.config.getini("norecursedirs") - @classmethod def from_parent(cls, parent, *, fspath, **kw): """The public constructor.""" @@ -543,42 +540,6 @@ class FSCollector(Collector): warnings.warn(FSCOLLECTOR_GETHOOKPROXY_ISINITPATH, stacklevel=2) return self.session.isinitpath(path) - def _recurse(self, direntry: "os.DirEntry[str]") -> bool: - if direntry.name == "__pycache__": - return False - path = py.path.local(direntry.path) - ihook = self.session.gethookproxy(path.dirpath()) - if ihook.pytest_ignore_collect(path=path, config=self.config): - return False - for pat in self._norecursepatterns: - if path.check(fnmatch=pat): - return False - return True - - def _collectfile( - self, path: py.path.local, handle_dupes: bool = True - ) -> Sequence[Collector]: - assert ( - path.isfile() - ), "{!r} is not a file (isdir={!r}, exists={!r}, islink={!r})".format( - path, path.isdir(), path.exists(), path.islink() - ) - ihook = self.session.gethookproxy(path) - if not self.session.isinitpath(path): - if ihook.pytest_ignore_collect(path=path, config=self.config): - return () - - if handle_dupes: - keepduplicates = self.config.getoption("keepduplicates") - if not keepduplicates: - duplicate_paths = self.config.pluginmanager._duplicatepaths - if path in duplicate_paths: - return () - else: - duplicate_paths.add(path) - - return ihook.pytest_collect_file(path=path, parent=self) # type: ignore[no-any-return] - class File(FSCollector): """Base class for collecting tests from a file.""" diff --git a/src/_pytest/python.py b/src/_pytest/python.py index ae5108e76..f792acbd8 100644 --- a/src/_pytest/python.py +++ b/src/_pytest/python.py @@ -184,7 +184,9 @@ def pytest_pyfunc_call(pyfuncitem: "Function") -> Optional[object]: return True -def pytest_collect_file(path: py.path.local, parent) -> Optional["Module"]: +def pytest_collect_file( + path: py.path.local, parent: nodes.Collector +) -> Optional["Module"]: ext = path.ext if ext == ".py": if not parent.session.isinitpath(path): @@ -634,6 +636,42 @@ class Package(Module): warnings.warn(FSCOLLECTOR_GETHOOKPROXY_ISINITPATH, stacklevel=2) return self.session.isinitpath(path) + def _recurse(self, direntry: "os.DirEntry[str]") -> bool: + if direntry.name == "__pycache__": + return False + path = py.path.local(direntry.path) + ihook = self.session.gethookproxy(path.dirpath()) + if ihook.pytest_ignore_collect(path=path, config=self.config): + return False + norecursepatterns = self.config.getini("norecursedirs") + if any(path.check(fnmatch=pat) for pat in norecursepatterns): + return False + return True + + def _collectfile( + self, path: py.path.local, handle_dupes: bool = True + ) -> typing.Sequence[nodes.Collector]: + assert ( + path.isfile() + ), "{!r} is not a file (isdir={!r}, exists={!r}, islink={!r})".format( + path, path.isdir(), path.exists(), path.islink() + ) + ihook = self.session.gethookproxy(path) + if not self.session.isinitpath(path): + if ihook.pytest_ignore_collect(path=path, config=self.config): + return () + + if handle_dupes: + keepduplicates = self.config.getoption("keepduplicates") + if not keepduplicates: + duplicate_paths = self.config.pluginmanager._duplicatepaths + if path in duplicate_paths: + return () + else: + duplicate_paths.add(path) + + return ihook.pytest_collect_file(path=path, parent=self) # type: ignore[no-any-return] + def collect(self) -> Iterable[Union[nodes.Item, nodes.Collector]]: this_path = self.fspath.dirpath() init_module = this_path.join("__init__.py")