Merge pull request #11026 from bluetech/small-fixes

Small fixes and improvements
This commit is contained in:
Ran Benita 2023-05-24 21:44:03 +03:00 committed by GitHub
commit af124c7f21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 99 additions and 58 deletions

View File

@ -46,6 +46,7 @@ from _pytest.compat import getimfunc
from _pytest.compat import getlocation
from _pytest.compat import is_generator
from _pytest.compat import NOTSET
from _pytest.compat import NotSetType
from _pytest.compat import overload
from _pytest.compat import safe_getattr
from _pytest.config import _PluggyPlugin
@ -112,16 +113,18 @@ def pytest_sessionstart(session: "Session") -> None:
session._fixturemanager = FixtureManager(session)
def get_scope_package(node, fixturedef: "FixtureDef[object]"):
import pytest
def get_scope_package(
node: nodes.Item,
fixturedef: "FixtureDef[object]",
) -> Optional[Union[nodes.Item, nodes.Collector]]:
from _pytest.python import Package
cls = pytest.Package
current = node
current: Optional[Union[nodes.Item, nodes.Collector]] = node
fixture_package_name = "{}/{}".format(fixturedef.baseid, "__init__.py")
while current and (
type(current) is not cls or fixture_package_name != current.nodeid
not isinstance(current, Package) or fixture_package_name != current.nodeid
):
current = current.parent
current = current.parent # type: ignore[assignment]
if current is None:
return node.session
return current
@ -434,7 +437,23 @@ class FixtureRequest:
@property
def node(self):
"""Underlying collection node (depends on current request scope)."""
return self._getscopeitem(self._scope)
scope = self._scope
if scope is Scope.Function:
# This might also be a non-function Item despite its attribute name.
node: Optional[Union[nodes.Item, nodes.Collector]] = self._pyfuncitem
elif scope is Scope.Package:
# FIXME: _fixturedef is not defined on FixtureRequest (this class),
# but on FixtureRequest (a subclass).
node = get_scope_package(self._pyfuncitem, self._fixturedef) # type: ignore[attr-defined]
else:
node = get_scope_node(self._pyfuncitem, scope)
if node is None and scope is Scope.Class:
# Fallback to function item itself.
node = self._pyfuncitem
assert node, 'Could not obtain a node for scope "{}" for function {!r}'.format(
scope, self._pyfuncitem
)
return node
def _getnextfixturedef(self, argname: str) -> "FixtureDef[Any]":
fixturedefs = self._arg2fixturedefs.get(argname, None)
@ -518,11 +537,7 @@ class FixtureRequest:
"""Add finalizer/teardown function to be called without arguments after
the last test within the requesting test context finished execution."""
# XXX usually this method is shadowed by fixturedef specific ones.
self._addfinalizer(finalizer, scope=self.scope)
def _addfinalizer(self, finalizer: Callable[[], object], scope) -> None:
node = self._getscopeitem(scope)
node.addfinalizer(finalizer)
self.node.addfinalizer(finalizer)
def applymarker(self, marker: Union[str, MarkDecorator]) -> None:
"""Apply a marker to a single test function invocation.
@ -717,28 +732,6 @@ class FixtureRequest:
lines.append("%s:%d: def %s%s" % (p, lineno + 1, factory.__name__, args))
return lines
def _getscopeitem(
self, scope: Union[Scope, "_ScopeName"]
) -> Union[nodes.Item, nodes.Collector]:
if isinstance(scope, str):
scope = Scope(scope)
if scope is Scope.Function:
# This might also be a non-function Item despite its attribute name.
node: Optional[Union[nodes.Item, nodes.Collector]] = self._pyfuncitem
elif scope is Scope.Package:
# FIXME: _fixturedef is not defined on FixtureRequest (this class),
# but on FixtureRequest (a subclass).
node = get_scope_package(self._pyfuncitem, self._fixturedef) # type: ignore[attr-defined]
else:
node = get_scope_node(self._pyfuncitem, scope)
if node is None and scope is Scope.Class:
# Fallback to function item itself.
node = self._pyfuncitem
assert node, 'Could not obtain a node for scope "{}" for function {!r}'.format(
scope, self._pyfuncitem
)
return node
def __repr__(self) -> str:
return "<FixtureRequest for %r>" % (self.node)
@ -1593,13 +1586,52 @@ class FixtureManager:
# Separate parametrized setups.
items[:] = reorder_items(items)
@overload
def parsefactories(
self, node_or_obj, nodeid=NOTSET, unittest: bool = False
self,
node_or_obj: nodes.Node,
*,
unittest: bool = ...,
) -> None:
raise NotImplementedError()
@overload
def parsefactories( # noqa: F811
self,
node_or_obj: object,
nodeid: Optional[str],
*,
unittest: bool = ...,
) -> None:
raise NotImplementedError()
def parsefactories( # noqa: F811
self,
node_or_obj: Union[nodes.Node, object],
nodeid: Union[str, NotSetType, None] = NOTSET,
*,
unittest: bool = False,
) -> None:
"""Collect fixtures from a collection node or object.
Found fixtures are parsed into `FixtureDef`s and saved.
If `node_or_object` is a collection node (with an underlying Python
object), the node's object is traversed and the node's nodeid is used to
determine the fixtures' visibilty. `nodeid` must not be specified in
this case.
If `node_or_object` is an object (e.g. a plugin), the object is
traversed and the given `nodeid` is used to determine the fixtures'
visibility. `nodeid` must be specified in this case; None and "" mean
total visibility.
"""
if nodeid is not NOTSET:
holderobj = node_or_obj
else:
holderobj = node_or_obj.obj
assert isinstance(node_or_obj, nodes.Node)
holderobj = cast(object, node_or_obj.obj) # type: ignore[attr-defined]
assert isinstance(node_or_obj.nodeid, str)
nodeid = node_or_obj.nodeid
if holderobj in self._holderobjseen:
return

View File

@ -27,6 +27,7 @@ from typing import Callable
from typing import Dict
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
@ -669,30 +670,38 @@ def resolve_package_path(path: Path) -> Optional[Path]:
return result
def scandir(path: Union[str, "os.PathLike[str]"]) -> List["os.DirEntry[str]"]:
"""Scan a directory recursively, in breadth-first order.
The returned entries are sorted.
"""
entries = []
with os.scandir(path) as s:
# Skip entries with symlink loops and other brokenness, so the caller
# doesn't have to deal with it.
for entry in s:
try:
entry.is_file()
except OSError as err:
if _ignore_error(err):
continue
raise
entries.append(entry)
entries.sort(key=lambda entry: entry.name)
return entries
def visit(
path: Union[str, "os.PathLike[str]"], recurse: Callable[["os.DirEntry[str]"], bool]
) -> Iterator["os.DirEntry[str]"]:
"""Walk a directory recursively, in breadth-first order.
The `recurse` predicate determines whether a directory is recursed.
Entries at each directory level are sorted.
"""
# Skip entries with symlink loops and other brokenness, so the caller doesn't
# have to deal with it.
entries = []
for entry in os.scandir(path):
try:
entry.is_file()
except OSError as err:
if _ignore_error(err):
continue
raise
entries.append(entry)
entries.sort(key=lambda entry: entry.name)
entries = scandir(path)
yield from entries
for entry in entries:
if entry.is_dir() and recurse(entry):
yield from visit(entry.path, recurse)

View File

@ -667,7 +667,7 @@ class Package(Module):
config=None,
session=None,
nodeid=None,
path=Optional[Path],
path: Optional[Path] = None,
) -> None:
# NOTE: Could be just the following, but kept as-is for compat.
# nodes.FSCollector.__init__(self, fspath, parent=parent)
@ -745,11 +745,11 @@ class Package(Module):
def collect(self) -> Iterable[Union[nodes.Item, nodes.Collector]]:
this_path = self.path.parent
init_module = this_path / "__init__.py"
if init_module.is_file() and path_matches_patterns(
init_module, self.config.getini("python_files")
):
yield Module.from_parent(self, path=init_module)
# Always collect the __init__ first.
if path_matches_patterns(self.path, self.config.getini("python_files")):
yield Module.from_parent(self, path=self.path)
pkg_prefixes: Set[Path] = set()
for direntry in visit(str(this_path), recurse=self._recurse):
path = Path(direntry.path)