main: inline _collect() into collect()

This removes an unhelpful level of indirection and enables some upcoming
upcoming simplifications.
This commit is contained in:
Ran Benita 2020-08-22 11:15:10 +03:00
parent eec13ba57e
commit d0e8b71404
1 changed files with 91 additions and 92 deletions

View File

@ -637,105 +637,104 @@ class Session(nodes.FSCollector):
return items
def collect(self) -> Iterator[Union[nodes.Item, nodes.Collector]]:
for fspath, parts in self._initial_parts:
self.trace("processing argument", (fspath, parts))
from _pytest.python import Package
for argpath, names in self._initial_parts:
self.trace("processing argument", (argpath, names))
self.trace.root.indent += 1
yield from self._collect(fspath, parts)
# Start with a Session root, and delve to argpath item (dir or file)
# and stack all Packages found on the way.
# No point in finding packages when collecting doctests.
if not self.config.getoption("doctestmodules", False):
pm = self.config.pluginmanager
for parent in reversed(argpath.parts()):
if pm._confcutdir and pm._confcutdir.relto(parent):
break
if parent.isdir():
pkginit = parent.join("__init__.py")
if pkginit.isfile():
if pkginit not in self._collection_node_cache1:
col = self._collectfile(pkginit, handle_dupes=False)
if col:
if isinstance(col[0], Package):
self._collection_pkg_roots[str(parent)] = col[0]
self._collection_node_cache1[col[0].fspath] = [
col[0]
]
# If it's a directory argument, recurse and look for any Subpackages.
# Let the Package collector deal with subnodes, don't collect here.
if argpath.check(dir=1):
assert not names, "invalid arg {!r}".format((argpath, names))
seen_dirs = set() # type: Set[py.path.local]
for direntry in visit(str(argpath), self._recurse):
if not direntry.is_file():
continue
path = py.path.local(direntry.path)
dirpath = path.dirpath()
if dirpath not in seen_dirs:
# Collect packages first.
seen_dirs.add(dirpath)
pkginit = dirpath.join("__init__.py")
if pkginit.exists():
for x in self._collectfile(pkginit):
yield x
if isinstance(x, Package):
self._collection_pkg_roots[str(dirpath)] = x
if str(dirpath) in self._collection_pkg_roots:
# Do not collect packages here.
continue
for x in self._collectfile(path):
key = (type(x), x.fspath)
if key in self._collection_node_cache2:
yield self._collection_node_cache2[key]
else:
self._collection_node_cache2[key] = x
yield x
else:
assert argpath.check(file=1)
if argpath in self._collection_node_cache1:
col = self._collection_node_cache1[argpath]
else:
collect_root = self._collection_pkg_roots.get(argpath.dirname, self)
col = collect_root._collectfile(argpath, handle_dupes=False)
if col:
self._collection_node_cache1[argpath] = col
m = self.matchnodes(col, names)
if not m:
report_arg = "::".join((str(argpath), *names))
self._notfound.append((report_arg, col))
continue
# If __init__.py was the only file requested, then the matched node will be
# the corresponding Package, and the first yielded item will be the __init__
# Module itself, so just use that. If this special case isn't taken, then all
# the files in the package will be yielded.
if argpath.basename == "__init__.py":
assert isinstance(m[0], nodes.Collector)
try:
yield next(iter(m[0].collect()))
except StopIteration:
# The package collects nothing with only an __init__.py
# file in it, which gets ignored by the default
# "python_files" option.
pass
continue
yield from m
self.trace.root.indent -= 1
self._collection_node_cache1.clear()
self._collection_node_cache2.clear()
self._collection_matchnodes_cache.clear()
self._collection_pkg_roots.clear()
def _collect(
self, argpath: py.path.local, names: Sequence[str]
) -> Iterator[Union[nodes.Item, nodes.Collector]]:
from _pytest.python import Package
# Start with a Session root, and delve to argpath item (dir or file)
# and stack all Packages found on the way.
# No point in finding packages when collecting doctests.
if not self.config.getoption("doctestmodules", False):
pm = self.config.pluginmanager
for parent in reversed(argpath.parts()):
if pm._confcutdir and pm._confcutdir.relto(parent):
break
if parent.isdir():
pkginit = parent.join("__init__.py")
if pkginit.isfile():
if pkginit not in self._collection_node_cache1:
col = self._collectfile(pkginit, handle_dupes=False)
if col:
if isinstance(col[0], Package):
self._collection_pkg_roots[str(parent)] = col[0]
self._collection_node_cache1[col[0].fspath] = [col[0]]
# If it's a directory argument, recurse and look for any Subpackages.
# Let the Package collector deal with subnodes, don't collect here.
if argpath.check(dir=1):
assert not names, "invalid arg {!r}".format((argpath, names))
seen_dirs = set() # type: Set[py.path.local]
for direntry in visit(str(argpath), self._recurse):
if not direntry.is_file():
continue
path = py.path.local(direntry.path)
dirpath = path.dirpath()
if dirpath not in seen_dirs:
# Collect packages first.
seen_dirs.add(dirpath)
pkginit = dirpath.join("__init__.py")
if pkginit.exists():
for x in self._collectfile(pkginit):
yield x
if isinstance(x, Package):
self._collection_pkg_roots[str(dirpath)] = x
if str(dirpath) in self._collection_pkg_roots:
# Do not collect packages here.
continue
for x in self._collectfile(path):
key = (type(x), x.fspath)
if key in self._collection_node_cache2:
yield self._collection_node_cache2[key]
else:
self._collection_node_cache2[key] = x
yield x
else:
assert argpath.check(file=1)
if argpath in self._collection_node_cache1:
col = self._collection_node_cache1[argpath]
else:
collect_root = self._collection_pkg_roots.get(argpath.dirname, self)
col = collect_root._collectfile(argpath, handle_dupes=False)
if col:
self._collection_node_cache1[argpath] = col
m = self.matchnodes(col, names)
if not m:
report_arg = "::".join((str(argpath), *names))
self._notfound.append((report_arg, col))
return
# If __init__.py was the only file requested, then the matched node will be
# the corresponding Package, and the first yielded item will be the __init__
# Module itself, so just use that. If this special case isn't taken, then all
# the files in the package will be yielded.
if argpath.basename == "__init__.py":
assert isinstance(m[0], nodes.Collector)
try:
yield next(iter(m[0].collect()))
except StopIteration:
# The package collects nothing with only an __init__.py
# file in it, which gets ignored by the default
# "python_files" option.
pass
return
yield from m
def matchnodes(
self,
matching: Sequence[Union[nodes.Item, nodes.Collector]],