Merge pull request #6066 from bluetech/type-annotations-paths

Add type annotations to _pytest.pathlib and _pytest.tmpdir
This commit is contained in:
Ran Benita 2019-10-28 15:20:45 +02:00 committed by GitHub
commit 7f8bf4d9f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 58 additions and 38 deletions

View File

@ -1,7 +1,6 @@
import atexit import atexit
import fnmatch import fnmatch
import itertools import itertools
import operator
import os import os
import shutil import shutil
import sys import sys
@ -13,6 +12,11 @@ from os.path import expandvars
from os.path import isabs from os.path import isabs
from os.path import sep from os.path import sep
from posixpath import sep as posix_sep from posixpath import sep as posix_sep
from typing import Iterable
from typing import Iterator
from typing import Set
from typing import TypeVar
from typing import Union
from _pytest.warning_types import PytestWarning from _pytest.warning_types import PytestWarning
@ -26,10 +30,15 @@ __all__ = ["Path", "PurePath"]
LOCK_TIMEOUT = 60 * 60 * 3 LOCK_TIMEOUT = 60 * 60 * 3
get_lock_path = operator.methodcaller("joinpath", ".lock")
_AnyPurePath = TypeVar("_AnyPurePath", bound=PurePath)
def ensure_reset_dir(path): def get_lock_path(path: _AnyPurePath) -> _AnyPurePath:
return path.joinpath(".lock")
def ensure_reset_dir(path: Path) -> None:
""" """
ensures the given path is an empty directory ensures the given path is an empty directory
""" """
@ -38,7 +47,7 @@ def ensure_reset_dir(path):
path.mkdir() path.mkdir()
def on_rm_rf_error(func, path: str, exc, *, start_path) -> bool: def on_rm_rf_error(func, path: str, exc, *, start_path: Path) -> bool:
"""Handles known read-only errors during rmtree. """Handles known read-only errors during rmtree.
The returned value is used only by our own tests. The returned value is used only by our own tests.
@ -71,7 +80,7 @@ def on_rm_rf_error(func, path: str, exc, *, start_path) -> bool:
# Chmod + retry. # Chmod + retry.
import stat import stat
def chmod_rw(p: str): def chmod_rw(p: str) -> None:
mode = os.stat(p).st_mode mode = os.stat(p).st_mode
os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR) os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR)
@ -90,7 +99,7 @@ def on_rm_rf_error(func, path: str, exc, *, start_path) -> bool:
return True return True
def rm_rf(path: Path): def rm_rf(path: Path) -> None:
"""Remove the path contents recursively, even if some elements """Remove the path contents recursively, even if some elements
are read-only. are read-only.
""" """
@ -98,7 +107,7 @@ def rm_rf(path: Path):
shutil.rmtree(str(path), onerror=onerror) shutil.rmtree(str(path), onerror=onerror)
def find_prefixed(root, prefix): def find_prefixed(root: Path, prefix: str) -> Iterator[Path]:
"""finds all elements in root that begin with the prefix, case insensitive""" """finds all elements in root that begin with the prefix, case insensitive"""
l_prefix = prefix.lower() l_prefix = prefix.lower()
for x in root.iterdir(): for x in root.iterdir():
@ -106,7 +115,7 @@ def find_prefixed(root, prefix):
yield x yield x
def extract_suffixes(iter, prefix): def extract_suffixes(iter: Iterable[PurePath], prefix: str) -> Iterator[str]:
""" """
:param iter: iterator over path names :param iter: iterator over path names
:param prefix: expected prefix of the path names :param prefix: expected prefix of the path names
@ -117,13 +126,13 @@ def extract_suffixes(iter, prefix):
yield p.name[p_len:] yield p.name[p_len:]
def find_suffixes(root, prefix): def find_suffixes(root: Path, prefix: str) -> Iterator[str]:
"""combines find_prefixes and extract_suffixes """combines find_prefixes and extract_suffixes
""" """
return extract_suffixes(find_prefixed(root, prefix), prefix) return extract_suffixes(find_prefixed(root, prefix), prefix)
def parse_num(maybe_num): def parse_num(maybe_num) -> int:
"""parses number path suffixes, returns -1 on error""" """parses number path suffixes, returns -1 on error"""
try: try:
return int(maybe_num) return int(maybe_num)
@ -131,7 +140,9 @@ def parse_num(maybe_num):
return -1 return -1
def _force_symlink(root, target, link_to): def _force_symlink(
root: Path, target: Union[str, PurePath], link_to: Union[str, Path]
) -> None:
"""helper to create the current symlink """helper to create the current symlink
it's full of race conditions that are reasonably ok to ignore it's full of race conditions that are reasonably ok to ignore
@ -151,7 +162,7 @@ def _force_symlink(root, target, link_to):
pass pass
def make_numbered_dir(root, prefix): def make_numbered_dir(root: Path, prefix: str) -> Path:
"""create a directory with an increased number as suffix for the given prefix""" """create a directory with an increased number as suffix for the given prefix"""
for i in range(10): for i in range(10):
# try up to 10 times to create the folder # try up to 10 times to create the folder
@ -172,7 +183,7 @@ def make_numbered_dir(root, prefix):
) )
def create_cleanup_lock(p): def create_cleanup_lock(p: Path) -> Path:
"""crates a lock to prevent premature folder cleanup""" """crates a lock to prevent premature folder cleanup"""
lock_path = get_lock_path(p) lock_path = get_lock_path(p)
try: try:
@ -189,11 +200,11 @@ def create_cleanup_lock(p):
return lock_path return lock_path
def register_cleanup_lock_removal(lock_path, register=atexit.register): def register_cleanup_lock_removal(lock_path: Path, register=atexit.register):
"""registers a cleanup function for removing a lock, by default on atexit""" """registers a cleanup function for removing a lock, by default on atexit"""
pid = os.getpid() pid = os.getpid()
def cleanup_on_exit(lock_path=lock_path, original_pid=pid): def cleanup_on_exit(lock_path: Path = lock_path, original_pid: int = pid) -> None:
current_pid = os.getpid() current_pid = os.getpid()
if current_pid != original_pid: if current_pid != original_pid:
# fork # fork
@ -206,7 +217,7 @@ def register_cleanup_lock_removal(lock_path, register=atexit.register):
return register(cleanup_on_exit) return register(cleanup_on_exit)
def maybe_delete_a_numbered_dir(path): def maybe_delete_a_numbered_dir(path: Path) -> None:
"""removes a numbered directory if its lock can be obtained and it does not seem to be in use""" """removes a numbered directory if its lock can be obtained and it does not seem to be in use"""
lock_path = None lock_path = None
try: try:
@ -232,7 +243,7 @@ def maybe_delete_a_numbered_dir(path):
pass pass
def ensure_deletable(path, consider_lock_dead_if_created_before): def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) -> bool:
"""checks if a lock exists and breaks it if its considered dead""" """checks if a lock exists and breaks it if its considered dead"""
if path.is_symlink(): if path.is_symlink():
return False return False
@ -251,13 +262,13 @@ def ensure_deletable(path, consider_lock_dead_if_created_before):
return False return False
def try_cleanup(path, consider_lock_dead_if_created_before): def try_cleanup(path: Path, consider_lock_dead_if_created_before: float) -> None:
"""tries to cleanup a folder if we can ensure it's deletable""" """tries to cleanup a folder if we can ensure it's deletable"""
if ensure_deletable(path, consider_lock_dead_if_created_before): if ensure_deletable(path, consider_lock_dead_if_created_before):
maybe_delete_a_numbered_dir(path) maybe_delete_a_numbered_dir(path)
def cleanup_candidates(root, prefix, keep): def cleanup_candidates(root: Path, prefix: str, keep: int) -> Iterator[Path]:
"""lists candidates for numbered directories to be removed - follows py.path""" """lists candidates for numbered directories to be removed - follows py.path"""
max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1) max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1)
max_delete = max_existing - keep max_delete = max_existing - keep
@ -269,7 +280,9 @@ def cleanup_candidates(root, prefix, keep):
yield path yield path
def cleanup_numbered_dir(root, prefix, keep, consider_lock_dead_if_created_before): def cleanup_numbered_dir(
root: Path, prefix: str, keep: int, consider_lock_dead_if_created_before: float
) -> None:
"""cleanup for lock driven numbered directories""" """cleanup for lock driven numbered directories"""
for path in cleanup_candidates(root, prefix, keep): for path in cleanup_candidates(root, prefix, keep):
try_cleanup(path, consider_lock_dead_if_created_before) try_cleanup(path, consider_lock_dead_if_created_before)
@ -277,7 +290,9 @@ def cleanup_numbered_dir(root, prefix, keep, consider_lock_dead_if_created_befor
try_cleanup(path, consider_lock_dead_if_created_before) try_cleanup(path, consider_lock_dead_if_created_before)
def make_numbered_dir_with_cleanup(root, prefix, keep, lock_timeout): def make_numbered_dir_with_cleanup(
root: Path, prefix: str, keep: int, lock_timeout: float
) -> Path:
"""creates a numbered dir with a cleanup lock and removes old ones""" """creates a numbered dir with a cleanup lock and removes old ones"""
e = None e = None
for i in range(10): for i in range(10):
@ -311,7 +326,7 @@ def resolve_from_str(input, root):
return root.joinpath(input) return root.joinpath(input)
def fnmatch_ex(pattern, path): def fnmatch_ex(pattern: str, path) -> bool:
"""FNMatcher port from py.path.common which works with PurePath() instances. """FNMatcher port from py.path.common which works with PurePath() instances.
The difference between this algorithm and PurePath.match() is that the latter matches "**" glob expressions The difference between this algorithm and PurePath.match() is that the latter matches "**" glob expressions
@ -346,6 +361,6 @@ def fnmatch_ex(pattern, path):
return fnmatch.fnmatch(name, pattern) return fnmatch.fnmatch(name, pattern)
def parts(s): def parts(s: str) -> Set[str]:
parts = s.split(sep) parts = s.split(sep)
return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))} return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))}

View File

@ -2,6 +2,7 @@
import os import os
import re import re
import tempfile import tempfile
from typing import Optional
import attr import attr
import py import py
@ -12,6 +13,7 @@ from .pathlib import LOCK_TIMEOUT
from .pathlib import make_numbered_dir from .pathlib import make_numbered_dir
from .pathlib import make_numbered_dir_with_cleanup from .pathlib import make_numbered_dir_with_cleanup
from .pathlib import Path from .pathlib import Path
from _pytest.fixtures import FixtureRequest
from _pytest.monkeypatch import MonkeyPatch from _pytest.monkeypatch import MonkeyPatch
@ -22,19 +24,20 @@ class TempPathFactory:
The base directory can be configured using the ``--basetemp`` option.""" The base directory can be configured using the ``--basetemp`` option."""
_given_basetemp = attr.ib( _given_basetemp = attr.ib(
type=Path,
# using os.path.abspath() to get absolute path instead of resolve() as it # using os.path.abspath() to get absolute path instead of resolve() as it
# does not work the same in all platforms (see #4427) # does not work the same in all platforms (see #4427)
# Path.absolute() exists, but it is not public (see https://bugs.python.org/issue25012) # Path.absolute() exists, but it is not public (see https://bugs.python.org/issue25012)
# Ignore type because of https://github.com/python/mypy/issues/6172. # Ignore type because of https://github.com/python/mypy/issues/6172.
converter=attr.converters.optional( converter=attr.converters.optional(
lambda p: Path(os.path.abspath(str(p))) # type: ignore lambda p: Path(os.path.abspath(str(p))) # type: ignore
) ),
) )
_trace = attr.ib() _trace = attr.ib()
_basetemp = attr.ib(default=None) _basetemp = attr.ib(type=Optional[Path], default=None)
@classmethod @classmethod
def from_config(cls, config): def from_config(cls, config) -> "TempPathFactory":
""" """
:param config: a pytest configuration :param config: a pytest configuration
""" """
@ -42,7 +45,7 @@ class TempPathFactory:
given_basetemp=config.option.basetemp, trace=config.trace.get("tmpdir") given_basetemp=config.option.basetemp, trace=config.trace.get("tmpdir")
) )
def mktemp(self, basename, numbered=True): def mktemp(self, basename: str, numbered: bool = True) -> Path:
"""makes a temporary directory managed by the factory""" """makes a temporary directory managed by the factory"""
if not numbered: if not numbered:
p = self.getbasetemp().joinpath(basename) p = self.getbasetemp().joinpath(basename)
@ -52,7 +55,7 @@ class TempPathFactory:
self._trace("mktemp", p) self._trace("mktemp", p)
return p return p
def getbasetemp(self): def getbasetemp(self) -> Path:
""" return base temporary directory. """ """ return base temporary directory. """
if self._basetemp is not None: if self._basetemp is not None:
return self._basetemp return self._basetemp
@ -85,9 +88,9 @@ class TempdirFactory:
:class:``py.path.local`` for :class:``TempPathFactory`` :class:``py.path.local`` for :class:``TempPathFactory``
""" """
_tmppath_factory = attr.ib() _tmppath_factory = attr.ib(type=TempPathFactory)
def mktemp(self, basename, numbered=True): def mktemp(self, basename: str, numbered: bool = True):
"""Create a subdirectory of the base temporary directory and return it. """Create a subdirectory of the base temporary directory and return it.
If ``numbered``, ensure the directory is unique by adding a number If ``numbered``, ensure the directory is unique by adding a number
prefix greater than any existing one. prefix greater than any existing one.
@ -99,7 +102,7 @@ class TempdirFactory:
return py.path.local(self._tmppath_factory.getbasetemp().resolve()) return py.path.local(self._tmppath_factory.getbasetemp().resolve())
def get_user(): def get_user() -> Optional[str]:
"""Return the current user name, or None if getuser() does not work """Return the current user name, or None if getuser() does not work
in the current environment (see #1010). in the current environment (see #1010).
""" """
@ -111,7 +114,7 @@ def get_user():
return None return None
def pytest_configure(config): def pytest_configure(config) -> None:
"""Create a TempdirFactory and attach it to the config object. """Create a TempdirFactory and attach it to the config object.
This is to comply with existing plugins which expect the handler to be This is to comply with existing plugins which expect the handler to be
@ -127,20 +130,22 @@ def pytest_configure(config):
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def tmpdir_factory(request): def tmpdir_factory(request: FixtureRequest) -> TempdirFactory:
"""Return a :class:`_pytest.tmpdir.TempdirFactory` instance for the test session. """Return a :class:`_pytest.tmpdir.TempdirFactory` instance for the test session.
""" """
return request.config._tmpdirhandler # Set dynamically by pytest_configure() above.
return request.config._tmpdirhandler # type: ignore
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def tmp_path_factory(request): def tmp_path_factory(request: FixtureRequest) -> TempPathFactory:
"""Return a :class:`_pytest.tmpdir.TempPathFactory` instance for the test session. """Return a :class:`_pytest.tmpdir.TempPathFactory` instance for the test session.
""" """
return request.config._tmp_path_factory # Set dynamically by pytest_configure() above.
return request.config._tmp_path_factory # type: ignore
def _mk_tmp(request, factory): def _mk_tmp(request: FixtureRequest, factory: TempPathFactory) -> Path:
name = request.node.name name = request.node.name
name = re.sub(r"[\W]", "_", name) name = re.sub(r"[\W]", "_", name)
MAXVAL = 30 MAXVAL = 30
@ -162,7 +167,7 @@ def tmpdir(tmp_path):
@pytest.fixture @pytest.fixture
def tmp_path(request, tmp_path_factory): def tmp_path(request: FixtureRequest, tmp_path_factory: TempPathFactory) -> Path:
"""Return a temporary directory path object """Return a temporary directory path object
which is unique to each test function invocation, which is unique to each test function invocation,
created as a sub directory of the base temporary created as a sub directory of the base temporary