Merge pull request #7670 from bluetech/session-inline
Start simplifying collection code in Session
This commit is contained in:
commit
ff41e7ad5d
|
@ -32,6 +32,7 @@ from _pytest.compat import TYPE_CHECKING
|
|||
from _pytest.config import Config
|
||||
from _pytest.config.argparsing import Parser
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
from _pytest.nodes import Collector
|
||||
from _pytest.outcomes import OutcomeException
|
||||
from _pytest.pathlib import import_path
|
||||
from _pytest.python_api import approx
|
||||
|
@ -118,7 +119,7 @@ def pytest_unconfigure() -> None:
|
|||
|
||||
|
||||
def pytest_collect_file(
|
||||
path: py.path.local, parent
|
||||
path: py.path.local, parent: Collector,
|
||||
) -> Optional[Union["DoctestModule", "DoctestTextfile"]]:
|
||||
config = parent.config
|
||||
if path.ext == ".py":
|
||||
|
|
|
@ -274,10 +274,12 @@ def pytest_ignore_collect(path: py.path.local, config: "Config") -> Optional[boo
|
|||
"""
|
||||
|
||||
|
||||
def pytest_collect_file(path: py.path.local, parent) -> "Optional[Collector]":
|
||||
"""Return collection Node or None for the given path.
|
||||
def pytest_collect_file(
|
||||
path: py.path.local, parent: "Collector"
|
||||
) -> "Optional[Collector]":
|
||||
"""Create a Collector for the given path, or None if not relevant.
|
||||
|
||||
Any new node needs to have the specified ``parent`` as a parent.
|
||||
The new node needs to have the specified ``parent`` as a parent.
|
||||
|
||||
:param py.path.local path: The path to collect.
|
||||
"""
|
||||
|
|
|
@ -45,8 +45,6 @@ if TYPE_CHECKING:
|
|||
from typing import Type
|
||||
from typing_extensions import Literal
|
||||
|
||||
from _pytest.python import Package
|
||||
|
||||
|
||||
def pytest_addoption(parser: Parser) -> None:
|
||||
parser.addini(
|
||||
|
@ -402,10 +400,6 @@ class FSHookProxy:
|
|||
return x
|
||||
|
||||
|
||||
class NoMatch(Exception):
|
||||
"""Matching cannot locate matching names."""
|
||||
|
||||
|
||||
class Interrupted(KeyboardInterrupt):
|
||||
"""Signals that the test run was interrupted."""
|
||||
|
||||
|
@ -447,20 +441,6 @@ class Session(nodes.FSCollector):
|
|||
self.startdir = config.invocation_dir
|
||||
self._initialpaths = frozenset() # type: FrozenSet[py.path.local]
|
||||
|
||||
# Keep track of any collected nodes in here, so we don't duplicate fixtures.
|
||||
self._collection_node_cache1 = (
|
||||
{}
|
||||
) # type: Dict[py.path.local, Sequence[nodes.Collector]]
|
||||
self._collection_node_cache2 = (
|
||||
{}
|
||||
) # type: Dict[Tuple[Type[nodes.Collector], py.path.local], nodes.Collector]
|
||||
self._collection_node_cache3 = (
|
||||
{}
|
||||
) # type: Dict[Tuple[Type[nodes.Collector], str], CollectReport]
|
||||
|
||||
# Dirnames of pkgs with dunder-init files.
|
||||
self._collection_pkg_roots = {} # type: Dict[str, Package]
|
||||
|
||||
self._bestrelpathcache = _bestrelpath_cache(
|
||||
config.rootdir
|
||||
) # type: Dict[py.path.local, str]
|
||||
|
@ -523,6 +503,42 @@ class Session(nodes.FSCollector):
|
|||
proxy = self.config.hook
|
||||
return proxy
|
||||
|
||||
def _recurse(self, direntry: "os.DirEntry[str]") -> bool:
|
||||
if direntry.name == "__pycache__":
|
||||
return False
|
||||
path = py.path.local(direntry.path)
|
||||
ihook = self.gethookproxy(path.dirpath())
|
||||
if ihook.pytest_ignore_collect(path=path, config=self.config):
|
||||
return False
|
||||
norecursepatterns = self.config.getini("norecursedirs")
|
||||
if any(path.check(fnmatch=pat) for pat in norecursepatterns):
|
||||
return False
|
||||
return True
|
||||
|
||||
def _collectfile(
|
||||
self, path: py.path.local, handle_dupes: bool = True
|
||||
) -> Sequence[nodes.Collector]:
|
||||
assert (
|
||||
path.isfile()
|
||||
), "{!r} is not a file (isdir={!r}, exists={!r}, islink={!r})".format(
|
||||
path, path.isdir(), path.exists(), path.islink()
|
||||
)
|
||||
ihook = self.gethookproxy(path)
|
||||
if not self.isinitpath(path):
|
||||
if ihook.pytest_ignore_collect(path=path, config=self.config):
|
||||
return ()
|
||||
|
||||
if handle_dupes:
|
||||
keepduplicates = self.config.getoption("keepduplicates")
|
||||
if not keepduplicates:
|
||||
duplicate_paths = self.config.pluginmanager._duplicatepaths
|
||||
if path in duplicate_paths:
|
||||
return ()
|
||||
else:
|
||||
duplicate_paths.add(path)
|
||||
|
||||
return ihook.pytest_collect_file(path=path, parent=self) # type: ignore[no-any-return]
|
||||
|
||||
@overload
|
||||
def perform_collect(
|
||||
self, args: Optional[Sequence[str]] = ..., genitems: "Literal[True]" = ...
|
||||
|
@ -552,219 +568,206 @@ class Session(nodes.FSCollector):
|
|||
in which case the return value contains these collectors unexpanded,
|
||||
and ``session.items`` is empty.
|
||||
"""
|
||||
if args is None:
|
||||
args = self.config.args
|
||||
|
||||
self.trace("perform_collect", self, args)
|
||||
self.trace.root.indent += 1
|
||||
|
||||
self._notfound = [] # type: List[Tuple[str, Sequence[nodes.Collector]]]
|
||||
self._initial_parts = [] # type: List[Tuple[py.path.local, List[str]]]
|
||||
self.items = [] # type: List[nodes.Item]
|
||||
|
||||
hook = self.config.hook
|
||||
|
||||
items = self.items # type: Sequence[Union[nodes.Item, nodes.Collector]]
|
||||
try:
|
||||
items = self._perform_collect(args, genitems)
|
||||
initialpaths = [] # type: List[py.path.local]
|
||||
for arg in args:
|
||||
fspath, parts = resolve_collection_argument(
|
||||
self.config.invocation_dir, arg, as_pypath=self.config.option.pyargs
|
||||
)
|
||||
self._initial_parts.append((fspath, parts))
|
||||
initialpaths.append(fspath)
|
||||
self._initialpaths = frozenset(initialpaths)
|
||||
rep = collect_one_node(self)
|
||||
self.ihook.pytest_collectreport(report=rep)
|
||||
self.trace.root.indent -= 1
|
||||
if self._notfound:
|
||||
errors = []
|
||||
for arg, cols in self._notfound:
|
||||
line = "(no name {!r} in any of {!r})".format(arg, cols)
|
||||
errors.append("not found: {}\n{}".format(arg, line))
|
||||
raise UsageError(*errors)
|
||||
if not genitems:
|
||||
items = rep.result
|
||||
else:
|
||||
if rep.passed:
|
||||
for node in rep.result:
|
||||
self.items.extend(self.genitems(node))
|
||||
|
||||
self.config.pluginmanager.check_pending()
|
||||
hook.pytest_collection_modifyitems(
|
||||
session=self, config=self.config, items=items
|
||||
)
|
||||
finally:
|
||||
hook.pytest_collection_finish(session=self)
|
||||
|
||||
self.testscollected = len(items)
|
||||
return items
|
||||
|
||||
@overload
|
||||
def _perform_collect(
|
||||
self, args: Optional[Sequence[str]], genitems: "Literal[True]"
|
||||
) -> List[nodes.Item]:
|
||||
...
|
||||
|
||||
@overload # noqa: F811
|
||||
def _perform_collect( # noqa: F811
|
||||
self, args: Optional[Sequence[str]], genitems: bool
|
||||
) -> Union[List[Union[nodes.Item]], List[Union[nodes.Item, nodes.Collector]]]:
|
||||
...
|
||||
|
||||
def _perform_collect( # noqa: F811
|
||||
self, args: Optional[Sequence[str]], genitems: bool
|
||||
) -> Union[List[Union[nodes.Item]], List[Union[nodes.Item, nodes.Collector]]]:
|
||||
if args is None:
|
||||
args = self.config.args
|
||||
self.trace("perform_collect", self, args)
|
||||
self.trace.root.indent += 1
|
||||
self._notfound = [] # type: List[Tuple[str, NoMatch]]
|
||||
initialpaths = [] # type: List[py.path.local]
|
||||
self._initial_parts = [] # type: List[Tuple[py.path.local, List[str]]]
|
||||
self.items = items = [] # type: List[nodes.Item]
|
||||
for arg in args:
|
||||
fspath, parts = resolve_collection_argument(
|
||||
self.config.invocation_dir, arg, as_pypath=self.config.option.pyargs
|
||||
)
|
||||
self._initial_parts.append((fspath, parts))
|
||||
initialpaths.append(fspath)
|
||||
self._initialpaths = frozenset(initialpaths)
|
||||
rep = collect_one_node(self)
|
||||
self.ihook.pytest_collectreport(report=rep)
|
||||
self.trace.root.indent -= 1
|
||||
if self._notfound:
|
||||
errors = []
|
||||
for arg, exc in self._notfound:
|
||||
line = "(no name {!r} in any of {!r})".format(arg, exc.args[0])
|
||||
errors.append("not found: {}\n{}".format(arg, line))
|
||||
raise UsageError(*errors)
|
||||
if not genitems:
|
||||
return rep.result
|
||||
else:
|
||||
if rep.passed:
|
||||
for node in rep.result:
|
||||
self.items.extend(self.genitems(node))
|
||||
return items
|
||||
|
||||
def collect(self) -> Iterator[Union[nodes.Item, nodes.Collector]]:
|
||||
for fspath, parts in self._initial_parts:
|
||||
self.trace("processing argument", (fspath, parts))
|
||||
self.trace.root.indent += 1
|
||||
try:
|
||||
yield from self._collect(fspath, parts)
|
||||
except NoMatch as exc:
|
||||
report_arg = "::".join((str(fspath), *parts))
|
||||
# we are inside a make_report hook so
|
||||
# we cannot directly pass through the exception
|
||||
self._notfound.append((report_arg, exc))
|
||||
|
||||
self.trace.root.indent -= 1
|
||||
self._collection_node_cache1.clear()
|
||||
self._collection_node_cache2.clear()
|
||||
self._collection_node_cache3.clear()
|
||||
self._collection_pkg_roots.clear()
|
||||
|
||||
def _collect(
|
||||
self, argpath: py.path.local, names: List[str]
|
||||
) -> Iterator[Union[nodes.Item, nodes.Collector]]:
|
||||
from _pytest.python import Package
|
||||
|
||||
# Start with a Session root, and delve to argpath item (dir or file)
|
||||
# and stack all Packages found on the way.
|
||||
# No point in finding packages when collecting doctests.
|
||||
if not self.config.getoption("doctestmodules", False):
|
||||
pm = self.config.pluginmanager
|
||||
for parent in reversed(argpath.parts()):
|
||||
if pm._confcutdir and pm._confcutdir.relto(parent):
|
||||
break
|
||||
# Keep track of any collected nodes in here, so we don't duplicate fixtures.
|
||||
node_cache1 = {} # type: Dict[py.path.local, Sequence[nodes.Collector]]
|
||||
node_cache2 = (
|
||||
{}
|
||||
) # type: Dict[Tuple[Type[nodes.Collector], py.path.local], nodes.Collector]
|
||||
|
||||
if parent.isdir():
|
||||
pkginit = parent.join("__init__.py")
|
||||
if pkginit.isfile():
|
||||
if pkginit not in self._collection_node_cache1:
|
||||
# Keep track of any collected collectors in matchnodes paths, so they
|
||||
# are not collected more than once.
|
||||
matchnodes_cache = (
|
||||
{}
|
||||
) # type: Dict[Tuple[Type[nodes.Collector], str], CollectReport]
|
||||
|
||||
# Dirnames of pkgs with dunder-init files.
|
||||
pkg_roots = {} # type: Dict[str, Package]
|
||||
|
||||
for argpath, names in self._initial_parts:
|
||||
self.trace("processing argument", (argpath, names))
|
||||
self.trace.root.indent += 1
|
||||
|
||||
# Start with a Session root, and delve to argpath item (dir or file)
|
||||
# and stack all Packages found on the way.
|
||||
# No point in finding packages when collecting doctests.
|
||||
if not self.config.getoption("doctestmodules", False):
|
||||
pm = self.config.pluginmanager
|
||||
for parent in reversed(argpath.parts()):
|
||||
if pm._confcutdir and pm._confcutdir.relto(parent):
|
||||
break
|
||||
|
||||
if parent.isdir():
|
||||
pkginit = parent.join("__init__.py")
|
||||
if pkginit.isfile() and pkginit not in node_cache1:
|
||||
col = self._collectfile(pkginit, handle_dupes=False)
|
||||
if col:
|
||||
if isinstance(col[0], Package):
|
||||
self._collection_pkg_roots[str(parent)] = col[0]
|
||||
# Always store a list in the cache, matchnodes expects it.
|
||||
self._collection_node_cache1[col[0].fspath] = [col[0]]
|
||||
pkg_roots[str(parent)] = col[0]
|
||||
node_cache1[col[0].fspath] = [col[0]]
|
||||
|
||||
# If it's a directory argument, recurse and look for any Subpackages.
|
||||
# Let the Package collector deal with subnodes, don't collect here.
|
||||
if argpath.check(dir=1):
|
||||
assert not names, "invalid arg {!r}".format((argpath, names))
|
||||
# If it's a directory argument, recurse and look for any Subpackages.
|
||||
# Let the Package collector deal with subnodes, don't collect here.
|
||||
if argpath.check(dir=1):
|
||||
assert not names, "invalid arg {!r}".format((argpath, names))
|
||||
|
||||
seen_dirs = set() # type: Set[py.path.local]
|
||||
for direntry in visit(str(argpath), self._recurse):
|
||||
if not direntry.is_file():
|
||||
continue
|
||||
seen_dirs = set() # type: Set[py.path.local]
|
||||
for direntry in visit(str(argpath), self._recurse):
|
||||
if not direntry.is_file():
|
||||
continue
|
||||
|
||||
path = py.path.local(direntry.path)
|
||||
dirpath = path.dirpath()
|
||||
path = py.path.local(direntry.path)
|
||||
dirpath = path.dirpath()
|
||||
|
||||
if dirpath not in seen_dirs:
|
||||
# Collect packages first.
|
||||
seen_dirs.add(dirpath)
|
||||
pkginit = dirpath.join("__init__.py")
|
||||
if pkginit.exists():
|
||||
for x in self._collectfile(pkginit):
|
||||
if dirpath not in seen_dirs:
|
||||
# Collect packages first.
|
||||
seen_dirs.add(dirpath)
|
||||
pkginit = dirpath.join("__init__.py")
|
||||
if pkginit.exists():
|
||||
for x in self._collectfile(pkginit):
|
||||
yield x
|
||||
if isinstance(x, Package):
|
||||
pkg_roots[str(dirpath)] = x
|
||||
if str(dirpath) in pkg_roots:
|
||||
# Do not collect packages here.
|
||||
continue
|
||||
|
||||
for x in self._collectfile(path):
|
||||
key = (type(x), x.fspath)
|
||||
if key in node_cache2:
|
||||
yield node_cache2[key]
|
||||
else:
|
||||
node_cache2[key] = x
|
||||
yield x
|
||||
if isinstance(x, Package):
|
||||
self._collection_pkg_roots[str(dirpath)] = x
|
||||
if str(dirpath) in self._collection_pkg_roots:
|
||||
# Do not collect packages here.
|
||||
else:
|
||||
assert argpath.check(file=1)
|
||||
|
||||
if argpath in node_cache1:
|
||||
col = node_cache1[argpath]
|
||||
else:
|
||||
collect_root = pkg_roots.get(argpath.dirname, self)
|
||||
col = collect_root._collectfile(argpath, handle_dupes=False)
|
||||
if col:
|
||||
node_cache1[argpath] = col
|
||||
|
||||
matching = []
|
||||
work = [
|
||||
(col, names)
|
||||
] # type: List[Tuple[Sequence[Union[nodes.Item, nodes.Collector]], Sequence[str]]]
|
||||
while work:
|
||||
self.trace("matchnodes", col, names)
|
||||
self.trace.root.indent += 1
|
||||
|
||||
matchnodes, matchnames = work.pop()
|
||||
for node in matchnodes:
|
||||
if not matchnames:
|
||||
matching.append(node)
|
||||
continue
|
||||
if not isinstance(node, nodes.Collector):
|
||||
continue
|
||||
key = (type(node), node.nodeid)
|
||||
if key in matchnodes_cache:
|
||||
rep = matchnodes_cache[key]
|
||||
else:
|
||||
rep = collect_one_node(node)
|
||||
matchnodes_cache[key] = rep
|
||||
if rep.passed:
|
||||
submatchnodes = []
|
||||
for r in rep.result:
|
||||
# TODO: Remove parametrized workaround once collection structure contains
|
||||
# parametrization.
|
||||
if (
|
||||
r.name == matchnames[0]
|
||||
or r.name.split("[")[0] == matchnames[0]
|
||||
):
|
||||
submatchnodes.append(r)
|
||||
if submatchnodes:
|
||||
work.append((submatchnodes, matchnames[1:]))
|
||||
# XXX Accept IDs that don't have "()" for class instances.
|
||||
elif len(rep.result) == 1 and rep.result[0].name == "()":
|
||||
work.append((rep.result, matchnames))
|
||||
else:
|
||||
# Report collection failures here to avoid failing to run some test
|
||||
# specified in the command line because the module could not be
|
||||
# imported (#134).
|
||||
node.ihook.pytest_collectreport(report=rep)
|
||||
|
||||
self.trace("matchnodes finished -> ", len(matching), "nodes")
|
||||
self.trace.root.indent -= 1
|
||||
|
||||
if not matching:
|
||||
report_arg = "::".join((str(argpath), *names))
|
||||
self._notfound.append((report_arg, col))
|
||||
continue
|
||||
|
||||
for x in self._collectfile(path):
|
||||
key = (type(x), x.fspath)
|
||||
if key in self._collection_node_cache2:
|
||||
yield self._collection_node_cache2[key]
|
||||
else:
|
||||
self._collection_node_cache2[key] = x
|
||||
yield x
|
||||
else:
|
||||
assert argpath.check(file=1)
|
||||
# If __init__.py was the only file requested, then the matched node will be
|
||||
# the corresponding Package, and the first yielded item will be the __init__
|
||||
# Module itself, so just use that. If this special case isn't taken, then all
|
||||
# the files in the package will be yielded.
|
||||
if argpath.basename == "__init__.py":
|
||||
assert isinstance(matching[0], nodes.Collector)
|
||||
try:
|
||||
yield next(iter(matching[0].collect()))
|
||||
except StopIteration:
|
||||
# The package collects nothing with only an __init__.py
|
||||
# file in it, which gets ignored by the default
|
||||
# "python_files" option.
|
||||
pass
|
||||
continue
|
||||
|
||||
if argpath in self._collection_node_cache1:
|
||||
col = self._collection_node_cache1[argpath]
|
||||
else:
|
||||
collect_root = self._collection_pkg_roots.get(argpath.dirname, self)
|
||||
col = collect_root._collectfile(argpath, handle_dupes=False)
|
||||
if col:
|
||||
self._collection_node_cache1[argpath] = col
|
||||
m = self.matchnodes(col, names)
|
||||
# If __init__.py was the only file requested, then the matched node will be
|
||||
# the corresponding Package, and the first yielded item will be the __init__
|
||||
# Module itself, so just use that. If this special case isn't taken, then all
|
||||
# the files in the package will be yielded.
|
||||
if argpath.basename == "__init__.py":
|
||||
assert isinstance(m[0], nodes.Collector)
|
||||
try:
|
||||
yield next(iter(m[0].collect()))
|
||||
except StopIteration:
|
||||
# The package collects nothing with only an __init__.py
|
||||
# file in it, which gets ignored by the default
|
||||
# "python_files" option.
|
||||
pass
|
||||
return
|
||||
yield from m
|
||||
yield from matching
|
||||
|
||||
def matchnodes(
|
||||
self, matching: Sequence[Union[nodes.Item, nodes.Collector]], names: List[str],
|
||||
) -> Sequence[Union[nodes.Item, nodes.Collector]]:
|
||||
self.trace("matchnodes", matching, names)
|
||||
self.trace.root.indent += 1
|
||||
nodes = self._matchnodes(matching, names)
|
||||
num = len(nodes)
|
||||
self.trace("matchnodes finished -> ", num, "nodes")
|
||||
self.trace.root.indent -= 1
|
||||
if num == 0:
|
||||
raise NoMatch(matching, names[:1])
|
||||
return nodes
|
||||
|
||||
def _matchnodes(
|
||||
self, matching: Sequence[Union[nodes.Item, nodes.Collector]], names: List[str],
|
||||
) -> Sequence[Union[nodes.Item, nodes.Collector]]:
|
||||
if not matching or not names:
|
||||
return matching
|
||||
name = names[0]
|
||||
assert name
|
||||
nextnames = names[1:]
|
||||
resultnodes = [] # type: List[Union[nodes.Item, nodes.Collector]]
|
||||
for node in matching:
|
||||
if isinstance(node, nodes.Item):
|
||||
if not names:
|
||||
resultnodes.append(node)
|
||||
continue
|
||||
assert isinstance(node, nodes.Collector)
|
||||
key = (type(node), node.nodeid)
|
||||
if key in self._collection_node_cache3:
|
||||
rep = self._collection_node_cache3[key]
|
||||
else:
|
||||
rep = collect_one_node(node)
|
||||
self._collection_node_cache3[key] = rep
|
||||
if rep.passed:
|
||||
has_matched = False
|
||||
for x in rep.result:
|
||||
# TODO: Remove parametrized workaround once collection structure contains parametrization.
|
||||
if x.name == name or x.name.split("[")[0] == name:
|
||||
resultnodes.extend(self.matchnodes([x], nextnames))
|
||||
has_matched = True
|
||||
# XXX Accept IDs that don't have "()" for class instances.
|
||||
if not has_matched and len(rep.result) == 1 and x.name == "()":
|
||||
nextnames.insert(0, name)
|
||||
resultnodes.extend(self.matchnodes([x], nextnames))
|
||||
else:
|
||||
# Report collection failures here to avoid failing to run some test
|
||||
# specified in the command line because the module could not be
|
||||
# imported (#134).
|
||||
node.ihook.pytest_collectreport(report=rep)
|
||||
return resultnodes
|
||||
self.trace.root.indent -= 1
|
||||
|
||||
def genitems(
|
||||
self, node: Union[nodes.Item, nodes.Collector]
|
||||
|
|
|
@ -8,7 +8,6 @@ from typing import Iterable
|
|||
from typing import Iterator
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Sequence
|
||||
from typing import Set
|
||||
from typing import Tuple
|
||||
from typing import TypeVar
|
||||
|
@ -528,8 +527,6 @@ class FSCollector(Collector):
|
|||
|
||||
super().__init__(name, parent, config, session, nodeid=nodeid, fspath=fspath)
|
||||
|
||||
self._norecursepatterns = self.config.getini("norecursedirs")
|
||||
|
||||
@classmethod
|
||||
def from_parent(cls, parent, *, fspath, **kw):
|
||||
"""The public constructor."""
|
||||
|
@ -543,42 +540,6 @@ class FSCollector(Collector):
|
|||
warnings.warn(FSCOLLECTOR_GETHOOKPROXY_ISINITPATH, stacklevel=2)
|
||||
return self.session.isinitpath(path)
|
||||
|
||||
def _recurse(self, direntry: "os.DirEntry[str]") -> bool:
|
||||
if direntry.name == "__pycache__":
|
||||
return False
|
||||
path = py.path.local(direntry.path)
|
||||
ihook = self.session.gethookproxy(path.dirpath())
|
||||
if ihook.pytest_ignore_collect(path=path, config=self.config):
|
||||
return False
|
||||
for pat in self._norecursepatterns:
|
||||
if path.check(fnmatch=pat):
|
||||
return False
|
||||
return True
|
||||
|
||||
def _collectfile(
|
||||
self, path: py.path.local, handle_dupes: bool = True
|
||||
) -> Sequence[Collector]:
|
||||
assert (
|
||||
path.isfile()
|
||||
), "{!r} is not a file (isdir={!r}, exists={!r}, islink={!r})".format(
|
||||
path, path.isdir(), path.exists(), path.islink()
|
||||
)
|
||||
ihook = self.session.gethookproxy(path)
|
||||
if not self.session.isinitpath(path):
|
||||
if ihook.pytest_ignore_collect(path=path, config=self.config):
|
||||
return ()
|
||||
|
||||
if handle_dupes:
|
||||
keepduplicates = self.config.getoption("keepduplicates")
|
||||
if not keepduplicates:
|
||||
duplicate_paths = self.config.pluginmanager._duplicatepaths
|
||||
if path in duplicate_paths:
|
||||
return ()
|
||||
else:
|
||||
duplicate_paths.add(path)
|
||||
|
||||
return ihook.pytest_collect_file(path=path, parent=self) # type: ignore[no-any-return]
|
||||
|
||||
|
||||
class File(FSCollector):
|
||||
"""Base class for collecting tests from a file."""
|
||||
|
|
|
@ -184,7 +184,9 @@ def pytest_pyfunc_call(pyfuncitem: "Function") -> Optional[object]:
|
|||
return True
|
||||
|
||||
|
||||
def pytest_collect_file(path: py.path.local, parent) -> Optional["Module"]:
|
||||
def pytest_collect_file(
|
||||
path: py.path.local, parent: nodes.Collector
|
||||
) -> Optional["Module"]:
|
||||
ext = path.ext
|
||||
if ext == ".py":
|
||||
if not parent.session.isinitpath(path):
|
||||
|
@ -634,6 +636,42 @@ class Package(Module):
|
|||
warnings.warn(FSCOLLECTOR_GETHOOKPROXY_ISINITPATH, stacklevel=2)
|
||||
return self.session.isinitpath(path)
|
||||
|
||||
def _recurse(self, direntry: "os.DirEntry[str]") -> bool:
|
||||
if direntry.name == "__pycache__":
|
||||
return False
|
||||
path = py.path.local(direntry.path)
|
||||
ihook = self.session.gethookproxy(path.dirpath())
|
||||
if ihook.pytest_ignore_collect(path=path, config=self.config):
|
||||
return False
|
||||
norecursepatterns = self.config.getini("norecursedirs")
|
||||
if any(path.check(fnmatch=pat) for pat in norecursepatterns):
|
||||
return False
|
||||
return True
|
||||
|
||||
def _collectfile(
|
||||
self, path: py.path.local, handle_dupes: bool = True
|
||||
) -> typing.Sequence[nodes.Collector]:
|
||||
assert (
|
||||
path.isfile()
|
||||
), "{!r} is not a file (isdir={!r}, exists={!r}, islink={!r})".format(
|
||||
path, path.isdir(), path.exists(), path.islink()
|
||||
)
|
||||
ihook = self.session.gethookproxy(path)
|
||||
if not self.session.isinitpath(path):
|
||||
if ihook.pytest_ignore_collect(path=path, config=self.config):
|
||||
return ()
|
||||
|
||||
if handle_dupes:
|
||||
keepduplicates = self.config.getoption("keepduplicates")
|
||||
if not keepduplicates:
|
||||
duplicate_paths = self.config.pluginmanager._duplicatepaths
|
||||
if path in duplicate_paths:
|
||||
return ()
|
||||
else:
|
||||
duplicate_paths.add(path)
|
||||
|
||||
return ihook.pytest_collect_file(path=path, parent=self) # type: ignore[no-any-return]
|
||||
|
||||
def collect(self) -> Iterable[Union[nodes.Item, nodes.Collector]]:
|
||||
this_path = self.fspath.dirpath()
|
||||
init_module = this_path.join("__init__.py")
|
||||
|
|
Loading…
Reference in New Issue