Refactor internal scope handling by introducing Scope enum

PR #8913
This commit is contained in:
Bruno Oliveira 2021-08-01 06:11:56 -03:00 committed by GitHub
parent ef5d81ad5c
commit a83b359cf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 298 additions and 173 deletions

View File

@ -0,0 +1 @@
The private ``CallSpec2._arg2scopenum`` attribute has been removed after an internal refactoring.

View File

@ -5,6 +5,7 @@ import sys
import warnings import warnings
from collections import defaultdict from collections import defaultdict
from collections import deque from collections import deque
from contextlib import suppress
from pathlib import Path from pathlib import Path
from types import TracebackType from types import TracebackType
from typing import Any from typing import Any
@ -62,20 +63,21 @@ from _pytest.outcomes import fail
from _pytest.outcomes import TEST_OUTCOME from _pytest.outcomes import TEST_OUTCOME
from _pytest.pathlib import absolutepath from _pytest.pathlib import absolutepath
from _pytest.pathlib import bestrelpath from _pytest.pathlib import bestrelpath
from _pytest.scope import HIGH_SCOPES
from _pytest.scope import Scope
from _pytest.stash import StashKey from _pytest.stash import StashKey
if TYPE_CHECKING: if TYPE_CHECKING:
from typing import Deque from typing import Deque
from typing import NoReturn from typing import NoReturn
from typing_extensions import Literal
from _pytest.scope import _ScopeName
from _pytest.main import Session from _pytest.main import Session
from _pytest.python import CallSpec2 from _pytest.python import CallSpec2
from _pytest.python import Function from _pytest.python import Function
from _pytest.python import Metafunc from _pytest.python import Metafunc
_Scope = Literal["session", "package", "module", "class", "function"]
# The value of the fixture -- return/yield of the fixture function (type variable). # The value of the fixture -- return/yield of the fixture function (type variable).
FixtureValue = TypeVar("FixtureValue") FixtureValue = TypeVar("FixtureValue")
@ -104,10 +106,10 @@ _FixtureCachedResult = Union[
] ]
@attr.s(frozen=True) @attr.s(frozen=True, auto_attribs=True)
class PseudoFixtureDef(Generic[FixtureValue]): class PseudoFixtureDef(Generic[FixtureValue]):
cached_result = attr.ib(type="_FixtureCachedResult[FixtureValue]") cached_result: "_FixtureCachedResult[FixtureValue]"
scope = attr.ib(type="_Scope") _scope: Scope
def pytest_sessionstart(session: "Session") -> None: def pytest_sessionstart(session: "Session") -> None:
@ -130,19 +132,19 @@ def get_scope_package(node, fixturedef: "FixtureDef[object]"):
def get_scope_node( def get_scope_node(
node: nodes.Node, scope: "_Scope" node: nodes.Node, scope: Scope
) -> Optional[Union[nodes.Item, nodes.Collector]]: ) -> Optional[Union[nodes.Item, nodes.Collector]]:
import _pytest.python import _pytest.python
if scope == "function": if scope is Scope.Function:
return node.getparent(nodes.Item) return node.getparent(nodes.Item)
elif scope == "class": elif scope is Scope.Class:
return node.getparent(_pytest.python.Class) return node.getparent(_pytest.python.Class)
elif scope == "module": elif scope is Scope.Module:
return node.getparent(_pytest.python.Module) return node.getparent(_pytest.python.Module)
elif scope == "package": elif scope is Scope.Package:
return node.getparent(_pytest.python.Package) return node.getparent(_pytest.python.Package)
elif scope == "session": elif scope is Scope.Session:
return node.getparent(_pytest.main.Session) return node.getparent(_pytest.main.Session)
else: else:
assert_never(scope) assert_never(scope)
@ -166,7 +168,7 @@ def add_funcarg_pseudo_fixture_def(
return return
# Collect funcargs of all callspecs into a list of values. # Collect funcargs of all callspecs into a list of values.
arg2params: Dict[str, List[object]] = {} arg2params: Dict[str, List[object]] = {}
arg2scope: Dict[str, _Scope] = {} arg2scope: Dict[str, Scope] = {}
for callspec in metafunc._calls: for callspec in metafunc._calls:
for argname, argvalue in callspec.funcargs.items(): for argname, argvalue in callspec.funcargs.items():
assert argname not in callspec.params assert argname not in callspec.params
@ -175,8 +177,8 @@ def add_funcarg_pseudo_fixture_def(
callspec.indices[argname] = len(arg2params_list) callspec.indices[argname] = len(arg2params_list)
arg2params_list.append(argvalue) arg2params_list.append(argvalue)
if argname not in arg2scope: if argname not in arg2scope:
scopenum = callspec._arg2scopenum.get(argname, scopenum_function) scope = callspec._arg2scope.get(argname, Scope.Function)
arg2scope[argname] = scopes[scopenum] arg2scope[argname] = scope
callspec.funcargs.clear() callspec.funcargs.clear()
# Register artificial FixtureDef's so that later at test execution # Register artificial FixtureDef's so that later at test execution
@ -189,10 +191,12 @@ def add_funcarg_pseudo_fixture_def(
# node related to the scope. # node related to the scope.
scope = arg2scope[argname] scope = arg2scope[argname]
node = None node = None
if scope != "function": if scope is not Scope.Function:
node = get_scope_node(collector, scope) node = get_scope_node(collector, scope)
if node is None: if node is None:
assert scope == "class" and isinstance(collector, _pytest.python.Module) assert scope is Scope.Class and isinstance(
collector, _pytest.python.Module
)
# Use module-level collector for class-scope (for now). # Use module-level collector for class-scope (for now).
node = collector node = collector
if node is None: if node is None:
@ -238,10 +242,10 @@ def getfixturemarker(obj: object) -> Optional["FixtureFunctionMarker"]:
_Key = Tuple[object, ...] _Key = Tuple[object, ...]
def get_parametrized_fixture_keys(item: nodes.Item, scopenum: int) -> Iterator[_Key]: def get_parametrized_fixture_keys(item: nodes.Item, scope: Scope) -> Iterator[_Key]:
"""Return list of keys for all parametrized arguments which match """Return list of keys for all parametrized arguments which match
the specified scope.""" the specified scope."""
assert scopenum < scopenum_function # function assert scope is not Scope.Function
try: try:
callspec = item.callspec # type: ignore[attr-defined] callspec = item.callspec # type: ignore[attr-defined]
except AttributeError: except AttributeError:
@ -252,67 +256,71 @@ def get_parametrized_fixture_keys(item: nodes.Item, scopenum: int) -> Iterator[_
# sort this so that different calls to # sort this so that different calls to
# get_parametrized_fixture_keys will be deterministic. # get_parametrized_fixture_keys will be deterministic.
for argname, param_index in sorted(cs.indices.items()): for argname, param_index in sorted(cs.indices.items()):
if cs._arg2scopenum[argname] != scopenum: if cs._arg2scope[argname] != scope:
continue continue
if scopenum == 0: # session if scope is Scope.Session:
key: _Key = (argname, param_index) key: _Key = (argname, param_index)
elif scopenum == 1: # package elif scope is Scope.Package:
key = (argname, param_index, item.path.parent) key = (argname, param_index, item.path.parent)
elif scopenum == 2: # module elif scope is Scope.Module:
key = (argname, param_index, item.path) key = (argname, param_index, item.path)
elif scopenum == 3: # class elif scope is Scope.Class:
item_cls = item.cls # type: ignore[attr-defined] item_cls = item.cls # type: ignore[attr-defined]
key = (argname, param_index, item.path, item_cls) key = (argname, param_index, item.path, item_cls)
else:
assert_never(scope)
yield key yield key
# Algorithm for sorting on a per-parametrized resource setup basis. # Algorithm for sorting on a per-parametrized resource setup basis.
# It is called for scopenum==0 (session) first and performs sorting # It is called for Session scope first and performs sorting
# down to the lower scopes such as to minimize number of "high scope" # down to the lower scopes such as to minimize number of "high scope"
# setups and teardowns. # setups and teardowns.
def reorder_items(items: Sequence[nodes.Item]) -> List[nodes.Item]: def reorder_items(items: Sequence[nodes.Item]) -> List[nodes.Item]:
argkeys_cache: Dict[int, Dict[nodes.Item, Dict[_Key, None]]] = {} argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[_Key, None]]] = {}
items_by_argkey: Dict[int, Dict[_Key, Deque[nodes.Item]]] = {} items_by_argkey: Dict[Scope, Dict[_Key, Deque[nodes.Item]]] = {}
for scopenum in range(scopenum_function): for scope in HIGH_SCOPES:
d: Dict[nodes.Item, Dict[_Key, None]] = {} d: Dict[nodes.Item, Dict[_Key, None]] = {}
argkeys_cache[scopenum] = d argkeys_cache[scope] = d
item_d: Dict[_Key, Deque[nodes.Item]] = defaultdict(deque) item_d: Dict[_Key, Deque[nodes.Item]] = defaultdict(deque)
items_by_argkey[scopenum] = item_d items_by_argkey[scope] = item_d
for item in items: for item in items:
keys = dict.fromkeys(get_parametrized_fixture_keys(item, scopenum), None) keys = dict.fromkeys(get_parametrized_fixture_keys(item, scope), None)
if keys: if keys:
d[item] = keys d[item] = keys
for key in keys: for key in keys:
item_d[key].append(item) item_d[key].append(item)
items_dict = dict.fromkeys(items, None) items_dict = dict.fromkeys(items, None)
return list(reorder_items_atscope(items_dict, argkeys_cache, items_by_argkey, 0)) return list(
reorder_items_atscope(items_dict, argkeys_cache, items_by_argkey, Scope.Session)
)
def fix_cache_order( def fix_cache_order(
item: nodes.Item, item: nodes.Item,
argkeys_cache: Dict[int, Dict[nodes.Item, Dict[_Key, None]]], argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[_Key, None]]],
items_by_argkey: Dict[int, Dict[_Key, "Deque[nodes.Item]"]], items_by_argkey: Dict[Scope, Dict[_Key, "Deque[nodes.Item]"]],
) -> None: ) -> None:
for scopenum in range(scopenum_function): for scope in HIGH_SCOPES:
for key in argkeys_cache[scopenum].get(item, []): for key in argkeys_cache[scope].get(item, []):
items_by_argkey[scopenum][key].appendleft(item) items_by_argkey[scope][key].appendleft(item)
def reorder_items_atscope( def reorder_items_atscope(
items: Dict[nodes.Item, None], items: Dict[nodes.Item, None],
argkeys_cache: Dict[int, Dict[nodes.Item, Dict[_Key, None]]], argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[_Key, None]]],
items_by_argkey: Dict[int, Dict[_Key, "Deque[nodes.Item]"]], items_by_argkey: Dict[Scope, Dict[_Key, "Deque[nodes.Item]"]],
scopenum: int, scope: Scope,
) -> Dict[nodes.Item, None]: ) -> Dict[nodes.Item, None]:
if scopenum >= scopenum_function or len(items) < 3: if scope is Scope.Function or len(items) < 3:
return items return items
ignore: Set[Optional[_Key]] = set() ignore: Set[Optional[_Key]] = set()
items_deque = deque(items) items_deque = deque(items)
items_done: Dict[nodes.Item, None] = {} items_done: Dict[nodes.Item, None] = {}
scoped_items_by_argkey = items_by_argkey[scopenum] scoped_items_by_argkey = items_by_argkey[scope]
scoped_argkeys_cache = argkeys_cache[scopenum] scoped_argkeys_cache = argkeys_cache[scope]
while items_deque: while items_deque:
no_argkey_group: Dict[nodes.Item, None] = {} no_argkey_group: Dict[nodes.Item, None] = {}
slicing_argkey = None slicing_argkey = None
@ -338,7 +346,7 @@ def reorder_items_atscope(
break break
if no_argkey_group: if no_argkey_group:
no_argkey_group = reorder_items_atscope( no_argkey_group = reorder_items_atscope(
no_argkey_group, argkeys_cache, items_by_argkey, scopenum + 1 no_argkey_group, argkeys_cache, items_by_argkey, scope.next_lower()
) )
for item in no_argkey_group: for item in no_argkey_group:
items_done[item] = None items_done[item] = None
@ -437,14 +445,18 @@ class FixtureRequest:
self._pyfuncitem = pyfuncitem self._pyfuncitem = pyfuncitem
#: Fixture for which this request is being performed. #: Fixture for which this request is being performed.
self.fixturename: Optional[str] = None self.fixturename: Optional[str] = None
#: Scope string, one of "function", "class", "module", "session". self._scope = Scope.Function
self.scope: _Scope = "function"
self._fixture_defs: Dict[str, FixtureDef[Any]] = {} self._fixture_defs: Dict[str, FixtureDef[Any]] = {}
fixtureinfo: FuncFixtureInfo = pyfuncitem._fixtureinfo fixtureinfo: FuncFixtureInfo = pyfuncitem._fixtureinfo
self._arg2fixturedefs = fixtureinfo.name2fixturedefs.copy() self._arg2fixturedefs = fixtureinfo.name2fixturedefs.copy()
self._arg2index: Dict[str, int] = {} self._arg2index: Dict[str, int] = {}
self._fixturemanager: FixtureManager = pyfuncitem.session._fixturemanager self._fixturemanager: FixtureManager = pyfuncitem.session._fixturemanager
@property
def scope(self) -> "_ScopeName":
"""Scope string, one of "function", "class", "module", "package", "session"."""
return self._scope.value
@property @property
def fixturenames(self) -> List[str]: def fixturenames(self) -> List[str]:
"""Names of all active fixtures in this request.""" """Names of all active fixtures in this request."""
@ -455,7 +467,7 @@ class FixtureRequest:
@property @property
def node(self): def node(self):
"""Underlying collection node (depends on current request scope).""" """Underlying collection node (depends on current request scope)."""
return self._getscopeitem(self.scope) return self._getscopeitem(self._scope)
def _getnextfixturedef(self, argname: str) -> "FixtureDef[Any]": def _getnextfixturedef(self, argname: str) -> "FixtureDef[Any]":
fixturedefs = self._arg2fixturedefs.get(argname, None) fixturedefs = self._arg2fixturedefs.get(argname, None)
@ -598,8 +610,7 @@ class FixtureRequest:
except FixtureLookupError: except FixtureLookupError:
if argname == "request": if argname == "request":
cached_result = (self, [0], None) cached_result = (self, [0], None)
scope: _Scope = "function" return PseudoFixtureDef(cached_result, Scope.Function)
return PseudoFixtureDef(cached_result, scope)
raise raise
# Remove indent to prevent the python3 exception # Remove indent to prevent the python3 exception
# from leaking into the call. # from leaking into the call.
@ -628,7 +639,7 @@ class FixtureRequest:
# (latter managed by fixturedef) # (latter managed by fixturedef)
argname = fixturedef.argname argname = fixturedef.argname
funcitem = self._pyfuncitem funcitem = self._pyfuncitem
scope = fixturedef.scope scope = fixturedef._scope
try: try:
param = funcitem.callspec.getparam(argname) param = funcitem.callspec.getparam(argname)
except (AttributeError, ValueError): except (AttributeError, ValueError):
@ -675,16 +686,15 @@ class FixtureRequest:
param_index = funcitem.callspec.indices[argname] param_index = funcitem.callspec.indices[argname]
# If a parametrize invocation set a scope it will override # If a parametrize invocation set a scope it will override
# the static scope defined with the fixture function. # the static scope defined with the fixture function.
paramscopenum = funcitem.callspec._arg2scopenum.get(argname) with suppress(KeyError):
if paramscopenum is not None: scope = funcitem.callspec._arg2scope[argname]
scope = scopes[paramscopenum]
subrequest = SubRequest( subrequest = SubRequest(
self, scope, param, param_index, fixturedef, _ispytest=True self, scope, param, param_index, fixturedef, _ispytest=True
) )
# Check if a higher-level scoped fixture accesses a lower level one. # Check if a higher-level scoped fixture accesses a lower level one.
subrequest._check_scope(argname, self.scope, scope) subrequest._check_scope(argname, self._scope, scope)
try: try:
# Call the fixture function. # Call the fixture function.
fixturedef.execute(request=subrequest) fixturedef.execute(request=subrequest)
@ -700,19 +710,18 @@ class FixtureRequest:
def _check_scope( def _check_scope(
self, self,
argname: str, argname: str,
invoking_scope: "_Scope", invoking_scope: Scope,
requested_scope: "_Scope", requested_scope: Scope,
) -> None: ) -> None:
if argname == "request": if argname == "request":
return return
if scopemismatch(invoking_scope, requested_scope): if invoking_scope > requested_scope:
# Try to report something helpful. # Try to report something helpful.
lines = self._factorytraceback() text = "\n".join(self._factorytraceback())
fail( fail(
"ScopeMismatch: You tried to access the %r scoped " f"ScopeMismatch: You tried to access the {requested_scope.value} scoped "
"fixture %r with a %r scoped request object, " f"fixture {argname} with a {invoking_scope.value} scoped request object, "
"involved factories\n%s" f"involved factories:\n{text}",
% ((requested_scope, argname, invoking_scope, "\n".join(lines))),
pytrace=False, pytrace=False,
) )
@ -730,17 +739,21 @@ class FixtureRequest:
lines.append("%s:%d: def %s%s" % (p, lineno + 1, factory.__name__, args)) lines.append("%s:%d: def %s%s" % (p, lineno + 1, factory.__name__, args))
return lines return lines
def _getscopeitem(self, scope: "_Scope") -> Union[nodes.Item, nodes.Collector]: def _getscopeitem(
if scope == "function": self, scope: Union[Scope, "_ScopeName"]
) -> Union[nodes.Item, nodes.Collector]:
if isinstance(scope, str):
scope = Scope(scope)
if scope is Scope.Function:
# This might also be a non-function Item despite its attribute name. # This might also be a non-function Item despite its attribute name.
node: Optional[Union[nodes.Item, nodes.Collector]] = self._pyfuncitem node: Optional[Union[nodes.Item, nodes.Collector]] = self._pyfuncitem
elif scope == "package": elif scope is Scope.Package:
# FIXME: _fixturedef is not defined on FixtureRequest (this class), # FIXME: _fixturedef is not defined on FixtureRequest (this class),
# but on FixtureRequest (a subclass). # but on FixtureRequest (a subclass).
node = get_scope_package(self._pyfuncitem, self._fixturedef) # type: ignore[attr-defined] node = get_scope_package(self._pyfuncitem, self._fixturedef) # type: ignore[attr-defined]
else: else:
node = get_scope_node(self._pyfuncitem, scope) node = get_scope_node(self._pyfuncitem, scope)
if node is None and scope == "class": if node is None and scope is Scope.Class:
# Fallback to function item itself. # Fallback to function item itself.
node = self._pyfuncitem node = self._pyfuncitem
assert node, 'Could not obtain a node for scope "{}" for function {!r}'.format( assert node, 'Could not obtain a node for scope "{}" for function {!r}'.format(
@ -759,7 +772,7 @@ class SubRequest(FixtureRequest):
def __init__( def __init__(
self, self,
request: "FixtureRequest", request: "FixtureRequest",
scope: "_Scope", scope: Scope,
param: Any, param: Any,
param_index: int, param_index: int,
fixturedef: "FixtureDef[object]", fixturedef: "FixtureDef[object]",
@ -772,7 +785,7 @@ class SubRequest(FixtureRequest):
if param is not NOTSET: if param is not NOTSET:
self.param = param self.param = param
self.param_index = param_index self.param_index = param_index
self.scope = scope self._scope = scope
self._fixturedef = fixturedef self._fixturedef = fixturedef
self._pyfuncitem = request._pyfuncitem self._pyfuncitem = request._pyfuncitem
self._fixture_defs = request._fixture_defs self._fixture_defs = request._fixture_defs
@ -801,29 +814,6 @@ class SubRequest(FixtureRequest):
super()._schedule_finalizers(fixturedef, subrequest) super()._schedule_finalizers(fixturedef, subrequest)
scopes: List["_Scope"] = ["session", "package", "module", "class", "function"]
scopenum_function = scopes.index("function")
def scopemismatch(currentscope: "_Scope", newscope: "_Scope") -> bool:
return scopes.index(newscope) > scopes.index(currentscope)
def scope2index(scope: str, descr: str, where: Optional[str] = None) -> int:
"""Look up the index of ``scope`` and raise a descriptive value error
if not defined."""
strscopes: Sequence[str] = scopes
try:
return strscopes.index(scope)
except ValueError:
fail(
"{} {}got an unexpected scope value '{}'".format(
descr, f"from {where} " if where else "", scope
),
pytrace=False,
)
@final @final
class FixtureLookupError(LookupError): class FixtureLookupError(LookupError):
"""Could not return a requested fixture (missing or invalid).""" """Could not return a requested fixture (missing or invalid)."""
@ -955,10 +945,10 @@ def _teardown_yield_fixture(fixturefunc, it) -> None:
def _eval_scope_callable( def _eval_scope_callable(
scope_callable: "Callable[[str, Config], _Scope]", scope_callable: "Callable[[str, Config], _ScopeName]",
fixture_name: str, fixture_name: str,
config: Config, config: Config,
) -> "_Scope": ) -> "_ScopeName":
try: try:
# Type ignored because there is no typing mechanism to specify # Type ignored because there is no typing mechanism to specify
# keyword arguments, currently. # keyword arguments, currently.
@ -989,7 +979,7 @@ class FixtureDef(Generic[FixtureValue]):
baseid: Optional[str], baseid: Optional[str],
argname: str, argname: str,
func: "_FixtureFunc[FixtureValue]", func: "_FixtureFunc[FixtureValue]",
scope: "Union[_Scope, Callable[[str, Config], _Scope]]", scope: Union[Scope, "_ScopeName", Callable[[str, Config], "_ScopeName"], None],
params: Optional[Sequence[object]], params: Optional[Sequence[object]],
unittest: bool = False, unittest: bool = False,
ids: Optional[ ids: Optional[
@ -1004,17 +994,16 @@ class FixtureDef(Generic[FixtureValue]):
self.has_location = baseid is not None self.has_location = baseid is not None
self.func = func self.func = func
self.argname = argname self.argname = argname
if callable(scope): if scope is None:
scope_ = _eval_scope_callable(scope, argname, fixturemanager.config) scope = Scope.Function
else: elif callable(scope):
scope_ = scope scope = _eval_scope_callable(scope, argname, fixturemanager.config)
self.scopenum = scope2index(
# TODO: Check if the `or` here is really necessary. if isinstance(scope, str):
scope_ or "function", # type: ignore[unreachable] scope = Scope.from_user(
descr=f"Fixture '{func.__name__}'", scope, descr=f"Fixture '{func.__name__}'", where=baseid
where=baseid, )
) self._scope = scope
self.scope = scope_
self.params: Optional[Sequence[object]] = params self.params: Optional[Sequence[object]] = params
self.argnames: Tuple[str, ...] = getfuncargnames( self.argnames: Tuple[str, ...] = getfuncargnames(
func, name=argname, is_method=unittest func, name=argname, is_method=unittest
@ -1024,6 +1013,11 @@ class FixtureDef(Generic[FixtureValue]):
self.cached_result: Optional[_FixtureCachedResult[FixtureValue]] = None self.cached_result: Optional[_FixtureCachedResult[FixtureValue]] = None
self._finalizers: List[Callable[[], object]] = [] self._finalizers: List[Callable[[], object]] = []
@property
def scope(self) -> "_ScopeName":
"""Scope string, one of "function", "class", "module", "package", "session"."""
return self._scope.value
def addfinalizer(self, finalizer: Callable[[], object]) -> None: def addfinalizer(self, finalizer: Callable[[], object]) -> None:
self._finalizers.append(finalizer) self._finalizers.append(finalizer)
@ -1126,7 +1120,7 @@ def pytest_fixture_setup(
fixdef = request._get_active_fixturedef(argname) fixdef = request._get_active_fixturedef(argname)
assert fixdef.cached_result is not None assert fixdef.cached_result is not None
result, arg_cache_key, exc = fixdef.cached_result result, arg_cache_key, exc = fixdef.cached_result
request._check_scope(argname, request.scope, fixdef.scope) request._check_scope(argname, request._scope, fixdef._scope)
kwargs[argname] = result kwargs[argname] = result
fixturefunc = resolve_fixture_function(fixturedef, request) fixturefunc = resolve_fixture_function(fixturedef, request)
@ -1195,18 +1189,17 @@ def wrap_function_to_error_out_if_called_directly(
@final @final
@attr.s(frozen=True) @attr.s(frozen=True)
class FixtureFunctionMarker: class FixtureFunctionMarker:
scope = attr.ib(type="Union[_Scope, Callable[[str, Config], _Scope]]") scope: "Union[_ScopeName, Callable[[str, Config], _ScopeName]]" = attr.ib()
params = attr.ib(type=Optional[Tuple[object, ...]], converter=_params_converter) params: Optional[Tuple[object, ...]] = attr.ib(converter=_params_converter)
autouse = attr.ib(type=bool, default=False) autouse: bool = attr.ib(default=False)
ids = attr.ib( ids: Union[
type=Union[ Tuple[Union[None, str, float, int, bool], ...],
Tuple[Union[None, str, float, int, bool], ...], Callable[[Any], Optional[object]],
Callable[[Any], Optional[object]], ] = attr.ib(
],
default=None, default=None,
converter=_ensure_immutable_ids, converter=_ensure_immutable_ids,
) )
name = attr.ib(type=Optional[str], default=None) name: Optional[str] = attr.ib(default=None)
def __call__(self, function: FixtureFunction) -> FixtureFunction: def __call__(self, function: FixtureFunction) -> FixtureFunction:
if inspect.isclass(function): if inspect.isclass(function):
@ -1238,7 +1231,7 @@ class FixtureFunctionMarker:
def fixture( def fixture(
fixture_function: FixtureFunction, fixture_function: FixtureFunction,
*, *,
scope: "Union[_Scope, Callable[[str, Config], _Scope]]" = ..., scope: "Union[_ScopeName, Callable[[str, Config], _ScopeName]]" = ...,
params: Optional[Iterable[object]] = ..., params: Optional[Iterable[object]] = ...,
autouse: bool = ..., autouse: bool = ...,
ids: Optional[ ids: Optional[
@ -1256,7 +1249,7 @@ def fixture(
def fixture( def fixture(
fixture_function: None = ..., fixture_function: None = ...,
*, *,
scope: "Union[_Scope, Callable[[str, Config], _Scope]]" = ..., scope: "Union[_ScopeName, Callable[[str, Config], _ScopeName]]" = ...,
params: Optional[Iterable[object]] = ..., params: Optional[Iterable[object]] = ...,
autouse: bool = ..., autouse: bool = ...,
ids: Optional[ ids: Optional[
@ -1273,7 +1266,7 @@ def fixture(
def fixture( def fixture(
fixture_function: Optional[FixtureFunction] = None, fixture_function: Optional[FixtureFunction] = None,
*, *,
scope: "Union[_Scope, Callable[[str, Config], _Scope]]" = "function", scope: "Union[_ScopeName, Callable[[str, Config], _ScopeName]]" = "function",
params: Optional[Iterable[object]] = None, params: Optional[Iterable[object]] = None,
autouse: bool = False, autouse: bool = False,
ids: Optional[ ids: Optional[
@ -1552,15 +1545,15 @@ class FixtureManager:
arg2fixturedefs[argname] = fixturedefs arg2fixturedefs[argname] = fixturedefs
merge(fixturedefs[-1].argnames) merge(fixturedefs[-1].argnames)
def sort_by_scope(arg_name: str) -> int: def sort_by_scope(arg_name: str) -> Scope:
try: try:
fixturedefs = arg2fixturedefs[arg_name] fixturedefs = arg2fixturedefs[arg_name]
except KeyError: except KeyError:
return scopes.index("function") return Scope.Function
else: else:
return fixturedefs[-1].scopenum return fixturedefs[-1]._scope
fixturenames_closure.sort(key=sort_by_scope) fixturenames_closure.sort(key=sort_by_scope, reverse=True)
return initialnames, fixturenames_closure, arg2fixturedefs return initialnames, fixturenames_closure, arg2fixturedefs
def pytest_generate_tests(self, metafunc: "Metafunc") -> None: def pytest_generate_tests(self, metafunc: "Metafunc") -> None:

View File

@ -400,7 +400,7 @@ def store_mark(obj, mark: Mark) -> None:
# Typing for builtin pytest marks. This is cheating; it gives builtin marks # Typing for builtin pytest marks. This is cheating; it gives builtin marks
# special privilege, and breaks modularity. But practicality beats purity... # special privilege, and breaks modularity. But practicality beats purity...
if TYPE_CHECKING: if TYPE_CHECKING:
from _pytest.fixtures import _Scope from _pytest.scope import _ScopeName
class _SkipMarkDecorator(MarkDecorator): class _SkipMarkDecorator(MarkDecorator):
@overload # type: ignore[override,misc] @overload # type: ignore[override,misc]
@ -450,7 +450,7 @@ if TYPE_CHECKING:
Callable[[Any], Optional[object]], Callable[[Any], Optional[object]],
] ]
] = ..., ] = ...,
scope: Optional[_Scope] = ..., scope: Optional[_ScopeName] = ...,
) -> MarkDecorator: ) -> MarkDecorator:
... ...

View File

@ -72,12 +72,13 @@ from _pytest.pathlib import import_path
from _pytest.pathlib import ImportPathMismatchError from _pytest.pathlib import ImportPathMismatchError
from _pytest.pathlib import parts from _pytest.pathlib import parts
from _pytest.pathlib import visit from _pytest.pathlib import visit
from _pytest.scope import Scope
from _pytest.warning_types import PytestCollectionWarning from _pytest.warning_types import PytestCollectionWarning
from _pytest.warning_types import PytestUnhandledCoroutineWarning from _pytest.warning_types import PytestUnhandledCoroutineWarning
if TYPE_CHECKING: if TYPE_CHECKING:
from typing_extensions import Literal from typing_extensions import Literal
from _pytest.fixtures import _Scope from _pytest.scope import _ScopeName
def pytest_addoption(parser: Parser) -> None: def pytest_addoption(parser: Parser) -> None:
@ -896,7 +897,7 @@ class CallSpec2:
self._idlist: List[str] = [] self._idlist: List[str] = []
self.params: Dict[str, object] = {} self.params: Dict[str, object] = {}
# Used for sorting parametrized resources. # Used for sorting parametrized resources.
self._arg2scopenum: Dict[str, int] = {} self._arg2scope: Dict[str, Scope] = {}
self.marks: List[Mark] = [] self.marks: List[Mark] = []
self.indices: Dict[str, int] = {} self.indices: Dict[str, int] = {}
@ -906,7 +907,7 @@ class CallSpec2:
cs.params.update(self.params) cs.params.update(self.params)
cs.marks.extend(self.marks) cs.marks.extend(self.marks)
cs.indices.update(self.indices) cs.indices.update(self.indices)
cs._arg2scopenum.update(self._arg2scopenum) cs._arg2scope.update(self._arg2scope)
cs._idlist = list(self._idlist) cs._idlist = list(self._idlist)
return cs return cs
@ -927,7 +928,7 @@ class CallSpec2:
valset: Iterable[object], valset: Iterable[object],
id: str, id: str,
marks: Iterable[Union[Mark, MarkDecorator]], marks: Iterable[Union[Mark, MarkDecorator]],
scopenum: int, scope: Scope,
param_index: int, param_index: int,
) -> None: ) -> None:
for arg, val in zip(argnames, valset): for arg, val in zip(argnames, valset):
@ -941,7 +942,7 @@ class CallSpec2:
else: # pragma: no cover else: # pragma: no cover
assert False, f"Unhandled valtype for arg: {valtype_for_arg}" assert False, f"Unhandled valtype for arg: {valtype_for_arg}"
self.indices[arg] = param_index self.indices[arg] = param_index
self._arg2scopenum[arg] = scopenum self._arg2scope[arg] = scope
self._idlist.append(id) self._idlist.append(id)
self.marks.extend(normalize_mark_list(marks)) self.marks.extend(normalize_mark_list(marks))
@ -999,7 +1000,7 @@ class Metafunc:
Callable[[Any], Optional[object]], Callable[[Any], Optional[object]],
] ]
] = None, ] = None,
scope: "Optional[_Scope]" = None, scope: "Optional[_ScopeName]" = None,
*, *,
_param_mark: Optional[Mark] = None, _param_mark: Optional[Mark] = None,
) -> None: ) -> None:
@ -1055,8 +1056,6 @@ class Metafunc:
It will also override any fixture-function defined scope, allowing It will also override any fixture-function defined scope, allowing
to set a dynamic scope using test context or configuration. to set a dynamic scope using test context or configuration.
""" """
from _pytest.fixtures import scope2index
argnames, parameters = ParameterSet._for_parametrize( argnames, parameters = ParameterSet._for_parametrize(
argnames, argnames,
argvalues, argvalues,
@ -1072,8 +1071,12 @@ class Metafunc:
pytrace=False, pytrace=False,
) )
if scope is None: if scope is not None:
scope = _find_parametrized_scope(argnames, self._arg2fixturedefs, indirect) scope_ = Scope.from_user(
scope, descr=f"parametrize() call in {self.function.__name__}"
)
else:
scope_ = _find_parametrized_scope(argnames, self._arg2fixturedefs, indirect)
self._validate_if_using_arg_names(argnames, indirect) self._validate_if_using_arg_names(argnames, indirect)
@ -1093,10 +1096,6 @@ class Metafunc:
if _param_mark and _param_mark._param_ids_from and generated_ids is None: if _param_mark and _param_mark._param_ids_from and generated_ids is None:
object.__setattr__(_param_mark._param_ids_from, "_param_ids_generated", ids) object.__setattr__(_param_mark._param_ids_from, "_param_ids_generated", ids)
scopenum = scope2index(
scope, descr=f"parametrize() call in {self.function.__name__}"
)
# Create the new calls: if we are parametrize() multiple times (by applying the decorator # Create the new calls: if we are parametrize() multiple times (by applying the decorator
# more than once) then we accumulate those calls generating the cartesian product # more than once) then we accumulate those calls generating the cartesian product
# of all calls. # of all calls.
@ -1110,7 +1109,7 @@ class Metafunc:
param_set.values, param_set.values,
param_id, param_id,
param_set.marks, param_set.marks,
scopenum, scope_,
param_index, param_index,
) )
newcalls.append(newcallspec) newcalls.append(newcallspec)
@ -1263,7 +1262,7 @@ def _find_parametrized_scope(
argnames: Sequence[str], argnames: Sequence[str],
arg2fixturedefs: Mapping[str, Sequence[fixtures.FixtureDef[object]]], arg2fixturedefs: Mapping[str, Sequence[fixtures.FixtureDef[object]]],
indirect: Union[bool, Sequence[str]], indirect: Union[bool, Sequence[str]],
) -> "fixtures._Scope": ) -> Scope:
"""Find the most appropriate scope for a parametrized call based on its arguments. """Find the most appropriate scope for a parametrized call based on its arguments.
When there's at least one direct argument, always use "function" scope. When there's at least one direct argument, always use "function" scope.
@ -1281,17 +1280,14 @@ def _find_parametrized_scope(
if all_arguments_are_fixtures: if all_arguments_are_fixtures:
fixturedefs = arg2fixturedefs or {} fixturedefs = arg2fixturedefs or {}
used_scopes = [ used_scopes = [
fixturedef[0].scope fixturedef[0]._scope
for name, fixturedef in fixturedefs.items() for name, fixturedef in fixturedefs.items()
if name in argnames if name in argnames
] ]
if used_scopes: # Takes the most narrow scope from used fixtures.
# Takes the most narrow scope from used fixtures. return min(used_scopes, default=Scope.Function)
for scope in reversed(fixtures.scopes):
if scope in used_scopes:
return scope
return "function" return Scope.Function
def _ascii_escaped_by_config(val: Union[str, bytes], config: Optional[Config]) -> str: def _ascii_escaped_by_config(val: Union[str, bytes], config: Optional[Config]) -> str:

89
src/_pytest/scope.py Normal file
View File

@ -0,0 +1,89 @@
"""
Scope definition and related utilities.
Those are defined here, instead of in the 'fixtures' module because
their use is spread across many other pytest modules, and centralizing it in 'fixtures'
would cause circular references.
Also this makes the module light to import, as it should.
"""
from enum import Enum
from functools import total_ordering
from typing import Optional
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing_extensions import Literal
_ScopeName = Literal["session", "package", "module", "class", "function"]
@total_ordering
class Scope(Enum):
"""
Represents one of the possible fixture scopes in pytest.
Scopes are ordered from lower to higher, that is:
->>> higher ->>>
Function < Class < Module < Package < Session
<<<- lower <<<-
"""
# Scopes need to be listed from lower to higher.
Function: "_ScopeName" = "function"
Class: "_ScopeName" = "class"
Module: "_ScopeName" = "module"
Package: "_ScopeName" = "package"
Session: "_ScopeName" = "session"
def next_lower(self) -> "Scope":
"""Return the next lower scope."""
index = _SCOPE_INDICES[self]
if index == 0:
raise ValueError(f"{self} is the lower-most scope")
return _ALL_SCOPES[index - 1]
def next_higher(self) -> "Scope":
"""Return the next higher scope."""
index = _SCOPE_INDICES[self]
if index == len(_SCOPE_INDICES) - 1:
raise ValueError(f"{self} is the upper-most scope")
return _ALL_SCOPES[index + 1]
def __lt__(self, other: "Scope") -> bool:
self_index = _SCOPE_INDICES[self]
other_index = _SCOPE_INDICES[other]
return self_index < other_index
@classmethod
def from_user(
cls, scope_name: "_ScopeName", descr: str, where: Optional[str] = None
) -> "Scope":
"""
Given a scope name from the user, return the equivalent Scope enum. Should be used
whenever we want to convert a user provided scope name to its enum object.
If the scope name is invalid, construct a user friendly message and call pytest.fail.
"""
from _pytest.outcomes import fail
try:
return Scope(scope_name)
except ValueError:
fail(
"{} {}got an unexpected scope value '{}'".format(
descr, f"from {where} " if where else "", scope_name
),
pytrace=False,
)
_ALL_SCOPES = list(Scope)
_SCOPE_INDICES = {scope: index for index, scope in enumerate(_ALL_SCOPES)}
# Ordered list of scopes which can contain many tests (in practice all except Function).
HIGH_SCOPES = [x for x in Scope if x is not Scope.Function]

View File

@ -9,6 +9,7 @@ from _pytest.config import ExitCode
from _pytest.config.argparsing import Parser from _pytest.config.argparsing import Parser
from _pytest.fixtures import FixtureDef from _pytest.fixtures import FixtureDef
from _pytest.fixtures import SubRequest from _pytest.fixtures import SubRequest
from _pytest.scope import Scope
def pytest_addoption(parser: Parser) -> None: def pytest_addoption(parser: Parser) -> None:
@ -64,7 +65,9 @@ def _show_fixture_action(fixturedef: FixtureDef[object], msg: str) -> None:
tw = config.get_terminal_writer() tw = config.get_terminal_writer()
tw.line() tw.line()
tw.write(" " * 2 * fixturedef.scopenum) # Use smaller indentation the higher the scope: Session = 0, Package = 1, etc.
scope_indent = list(reversed(Scope)).index(fixturedef._scope)
tw.write(" " * 2 * scope_indent)
tw.write( tw.write(
"{step} {scope} {fixture}".format( "{step} {scope} {fixture}".format(
step=msg.ljust(8), # align the output to TEARDOWN step=msg.ljust(8), # align the output to TEARDOWN

View File

@ -29,13 +29,12 @@ from _pytest.python import Class
from _pytest.python import Function from _pytest.python import Function
from _pytest.python import PyCollector from _pytest.python import PyCollector
from _pytest.runner import CallInfo from _pytest.runner import CallInfo
from _pytest.scope import Scope
if TYPE_CHECKING: if TYPE_CHECKING:
import unittest import unittest
import twisted.trial.unittest import twisted.trial.unittest
from _pytest.fixtures import _Scope
_SysExcInfoType = Union[ _SysExcInfoType = Union[
Tuple[Type[BaseException], BaseException, types.TracebackType], Tuple[Type[BaseException], BaseException, types.TracebackType],
Tuple[None, None, None], Tuple[None, None, None],
@ -102,7 +101,7 @@ class UnitTestCase(Class):
"setUpClass", "setUpClass",
"tearDownClass", "tearDownClass",
"doClassCleanups", "doClassCleanups",
scope="class", scope=Scope.Class,
pass_self=False, pass_self=False,
) )
if class_fixture: if class_fixture:
@ -113,7 +112,7 @@ class UnitTestCase(Class):
"setup_method", "setup_method",
"teardown_method", "teardown_method",
None, None,
scope="function", scope=Scope.Function,
pass_self=True, pass_self=True,
) )
if method_fixture: if method_fixture:
@ -125,7 +124,7 @@ def _make_xunit_fixture(
setup_name: str, setup_name: str,
teardown_name: str, teardown_name: str,
cleanup_name: Optional[str], cleanup_name: Optional[str],
scope: "_Scope", scope: Scope,
pass_self: bool, pass_self: bool,
): ):
setup = getattr(obj, setup_name, None) setup = getattr(obj, setup_name, None)
@ -141,7 +140,7 @@ def _make_xunit_fixture(
pass pass
@pytest.fixture( @pytest.fixture(
scope=scope, scope=scope.value,
autouse=True, autouse=True,
# Use a unique name to speed up lookup. # Use a unique name to speed up lookup.
name=f"_unittest_{setup_name}_fixture_{obj.__qualname__}", name=f"_unittest_{setup_name}_fixture_{obj.__qualname__}",

View File

@ -26,6 +26,7 @@ from _pytest.outcomes import fail
from _pytest.pytester import Pytester from _pytest.pytester import Pytester
from _pytest.python import _idval from _pytest.python import _idval
from _pytest.python import idmaker from _pytest.python import idmaker
from _pytest.scope import Scope
class TestMetafunc: class TestMetafunc:
@ -142,16 +143,16 @@ class TestMetafunc:
@attr.s @attr.s
class DummyFixtureDef: class DummyFixtureDef:
scope = attr.ib() _scope = attr.ib()
fixtures_defs = cast( fixtures_defs = cast(
Dict[str, Sequence[fixtures.FixtureDef[object]]], Dict[str, Sequence[fixtures.FixtureDef[object]]],
dict( dict(
session_fix=[DummyFixtureDef("session")], session_fix=[DummyFixtureDef(Scope.Session)],
package_fix=[DummyFixtureDef("package")], package_fix=[DummyFixtureDef(Scope.Package)],
module_fix=[DummyFixtureDef("module")], module_fix=[DummyFixtureDef(Scope.Module)],
class_fix=[DummyFixtureDef("class")], class_fix=[DummyFixtureDef(Scope.Class)],
func_fix=[DummyFixtureDef("function")], func_fix=[DummyFixtureDef(Scope.Function)],
), ),
) )
@ -160,29 +161,33 @@ class TestMetafunc:
def find_scope(argnames, indirect): def find_scope(argnames, indirect):
return _find_parametrized_scope(argnames, fixtures_defs, indirect=indirect) return _find_parametrized_scope(argnames, fixtures_defs, indirect=indirect)
assert find_scope(["func_fix"], indirect=True) == "function" assert find_scope(["func_fix"], indirect=True) == Scope.Function
assert find_scope(["class_fix"], indirect=True) == "class" assert find_scope(["class_fix"], indirect=True) == Scope.Class
assert find_scope(["module_fix"], indirect=True) == "module" assert find_scope(["module_fix"], indirect=True) == Scope.Module
assert find_scope(["package_fix"], indirect=True) == "package" assert find_scope(["package_fix"], indirect=True) == Scope.Package
assert find_scope(["session_fix"], indirect=True) == "session" assert find_scope(["session_fix"], indirect=True) == Scope.Session
assert find_scope(["class_fix", "func_fix"], indirect=True) == "function" assert find_scope(["class_fix", "func_fix"], indirect=True) == Scope.Function
assert find_scope(["func_fix", "session_fix"], indirect=True) == "function" assert find_scope(["func_fix", "session_fix"], indirect=True) == Scope.Function
assert find_scope(["session_fix", "class_fix"], indirect=True) == "class" assert find_scope(["session_fix", "class_fix"], indirect=True) == Scope.Class
assert find_scope(["package_fix", "session_fix"], indirect=True) == "package" assert (
assert find_scope(["module_fix", "session_fix"], indirect=True) == "module" find_scope(["package_fix", "session_fix"], indirect=True) == Scope.Package
)
assert find_scope(["module_fix", "session_fix"], indirect=True) == Scope.Module
# when indirect is False or is not for all scopes, always use function # when indirect is False or is not for all scopes, always use function
assert find_scope(["session_fix", "module_fix"], indirect=False) == "function" assert (
find_scope(["session_fix", "module_fix"], indirect=False) == Scope.Function
)
assert ( assert (
find_scope(["session_fix", "module_fix"], indirect=["module_fix"]) find_scope(["session_fix", "module_fix"], indirect=["module_fix"])
== "function" == Scope.Function
) )
assert ( assert (
find_scope( find_scope(
["session_fix", "module_fix"], indirect=["session_fix", "module_fix"] ["session_fix", "module_fix"], indirect=["session_fix", "module_fix"]
) )
== "module" == Scope.Module
) )
def test_parametrize_and_id(self) -> None: def test_parametrize_and_id(self) -> None:

39
testing/test_scope.py Normal file
View File

@ -0,0 +1,39 @@
import re
import pytest
from _pytest.scope import Scope
def test_ordering() -> None:
assert Scope.Session > Scope.Package
assert Scope.Package > Scope.Module
assert Scope.Module > Scope.Class
assert Scope.Class > Scope.Function
def test_next_lower() -> None:
assert Scope.Session.next_lower() is Scope.Package
assert Scope.Package.next_lower() is Scope.Module
assert Scope.Module.next_lower() is Scope.Class
assert Scope.Class.next_lower() is Scope.Function
with pytest.raises(ValueError, match="Function is the lower-most scope"):
Scope.Function.next_lower()
def test_next_higher() -> None:
assert Scope.Function.next_higher() is Scope.Class
assert Scope.Class.next_higher() is Scope.Module
assert Scope.Module.next_higher() is Scope.Package
assert Scope.Package.next_higher() is Scope.Session
with pytest.raises(ValueError, match="Session is the upper-most scope"):
Scope.Session.next_higher()
def test_from_user() -> None:
assert Scope.from_user("module", "for parametrize", "some::id") is Scope.Module
expected_msg = "for parametrize from some::id got an unexpected scope value 'foo'"
with pytest.raises(pytest.fail.Exception, match=re.escape(expected_msg)):
Scope.from_user("foo", "for parametrize", "some::id") # type:ignore[arg-type]