Add type annotations to _pytest.pathlib

At least the ones I was sure of.
This commit is contained in:
Ran Benita 2019-10-26 20:07:44 +03:00
parent 1ad4ca6ac1
commit 59a59f371b
1 changed files with 38 additions and 23 deletions

View File

@ -1,7 +1,6 @@
import atexit
import fnmatch
import itertools
import operator
import os
import shutil
import sys
@ -13,6 +12,11 @@ from os.path import expandvars
from os.path import isabs
from os.path import sep
from posixpath import sep as posix_sep
from typing import Iterable
from typing import Iterator
from typing import Set
from typing import TypeVar
from typing import Union
from _pytest.warning_types import PytestWarning
@ -26,10 +30,15 @@ __all__ = ["Path", "PurePath"]
LOCK_TIMEOUT = 60 * 60 * 3
get_lock_path = operator.methodcaller("joinpath", ".lock")
_AnyPurePath = TypeVar("_AnyPurePath", bound=PurePath)
def ensure_reset_dir(path):
def get_lock_path(path: _AnyPurePath) -> _AnyPurePath:
return path.joinpath(".lock")
def ensure_reset_dir(path: Path) -> None:
"""
ensures the given path is an empty directory
"""
@ -38,7 +47,7 @@ def ensure_reset_dir(path):
path.mkdir()
def on_rm_rf_error(func, path: str, exc, *, start_path) -> bool:
def on_rm_rf_error(func, path: str, exc, *, start_path: Path) -> bool:
"""Handles known read-only errors during rmtree.
The returned value is used only by our own tests.
@ -71,7 +80,7 @@ def on_rm_rf_error(func, path: str, exc, *, start_path) -> bool:
# Chmod + retry.
import stat
def chmod_rw(p: str):
def chmod_rw(p: str) -> None:
mode = os.stat(p).st_mode
os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR)
@ -90,7 +99,7 @@ def on_rm_rf_error(func, path: str, exc, *, start_path) -> bool:
return True
def rm_rf(path: Path):
def rm_rf(path: Path) -> None:
"""Remove the path contents recursively, even if some elements
are read-only.
"""
@ -98,7 +107,7 @@ def rm_rf(path: Path):
shutil.rmtree(str(path), onerror=onerror)
def find_prefixed(root, prefix):
def find_prefixed(root: Path, prefix: str) -> Iterator[Path]:
"""finds all elements in root that begin with the prefix, case insensitive"""
l_prefix = prefix.lower()
for x in root.iterdir():
@ -106,7 +115,7 @@ def find_prefixed(root, prefix):
yield x
def extract_suffixes(iter, prefix):
def extract_suffixes(iter: Iterable[PurePath], prefix: str) -> Iterator[str]:
"""
:param iter: iterator over path names
:param prefix: expected prefix of the path names
@ -117,13 +126,13 @@ def extract_suffixes(iter, prefix):
yield p.name[p_len:]
def find_suffixes(root, prefix):
def find_suffixes(root: Path, prefix: str) -> Iterator[str]:
"""combines find_prefixes and extract_suffixes
"""
return extract_suffixes(find_prefixed(root, prefix), prefix)
def parse_num(maybe_num):
def parse_num(maybe_num) -> int:
"""parses number path suffixes, returns -1 on error"""
try:
return int(maybe_num)
@ -131,7 +140,9 @@ def parse_num(maybe_num):
return -1
def _force_symlink(root, target, link_to):
def _force_symlink(
root: Path, target: Union[str, PurePath], link_to: Union[str, Path]
) -> None:
"""helper to create the current symlink
it's full of race conditions that are reasonably ok to ignore
@ -151,7 +162,7 @@ def _force_symlink(root, target, link_to):
pass
def make_numbered_dir(root, prefix):
def make_numbered_dir(root: Path, prefix: str) -> Path:
"""create a directory with an increased number as suffix for the given prefix"""
for i in range(10):
# try up to 10 times to create the folder
@ -172,7 +183,7 @@ def make_numbered_dir(root, prefix):
)
def create_cleanup_lock(p):
def create_cleanup_lock(p: Path) -> Path:
"""crates a lock to prevent premature folder cleanup"""
lock_path = get_lock_path(p)
try:
@ -189,11 +200,11 @@ def create_cleanup_lock(p):
return lock_path
def register_cleanup_lock_removal(lock_path, register=atexit.register):
def register_cleanup_lock_removal(lock_path: Path, register=atexit.register):
"""registers a cleanup function for removing a lock, by default on atexit"""
pid = os.getpid()
def cleanup_on_exit(lock_path=lock_path, original_pid=pid):
def cleanup_on_exit(lock_path: Path = lock_path, original_pid: int = pid) -> None:
current_pid = os.getpid()
if current_pid != original_pid:
# fork
@ -206,7 +217,7 @@ def register_cleanup_lock_removal(lock_path, register=atexit.register):
return register(cleanup_on_exit)
def maybe_delete_a_numbered_dir(path):
def maybe_delete_a_numbered_dir(path: Path) -> None:
"""removes a numbered directory if its lock can be obtained and it does not seem to be in use"""
lock_path = None
try:
@ -232,7 +243,7 @@ def maybe_delete_a_numbered_dir(path):
pass
def ensure_deletable(path, consider_lock_dead_if_created_before):
def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) -> bool:
"""checks if a lock exists and breaks it if its considered dead"""
if path.is_symlink():
return False
@ -251,13 +262,13 @@ def ensure_deletable(path, consider_lock_dead_if_created_before):
return False
def try_cleanup(path, consider_lock_dead_if_created_before):
def try_cleanup(path: Path, consider_lock_dead_if_created_before: float) -> None:
"""tries to cleanup a folder if we can ensure it's deletable"""
if ensure_deletable(path, consider_lock_dead_if_created_before):
maybe_delete_a_numbered_dir(path)
def cleanup_candidates(root, prefix, keep):
def cleanup_candidates(root: Path, prefix: str, keep: int) -> Iterator[Path]:
"""lists candidates for numbered directories to be removed - follows py.path"""
max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1)
max_delete = max_existing - keep
@ -269,7 +280,9 @@ def cleanup_candidates(root, prefix, keep):
yield path
def cleanup_numbered_dir(root, prefix, keep, consider_lock_dead_if_created_before):
def cleanup_numbered_dir(
root: Path, prefix: str, keep: int, consider_lock_dead_if_created_before: float
) -> None:
"""cleanup for lock driven numbered directories"""
for path in cleanup_candidates(root, prefix, keep):
try_cleanup(path, consider_lock_dead_if_created_before)
@ -277,7 +290,9 @@ def cleanup_numbered_dir(root, prefix, keep, consider_lock_dead_if_created_befor
try_cleanup(path, consider_lock_dead_if_created_before)
def make_numbered_dir_with_cleanup(root, prefix, keep, lock_timeout):
def make_numbered_dir_with_cleanup(
root: Path, prefix: str, keep: int, lock_timeout: float
) -> Path:
"""creates a numbered dir with a cleanup lock and removes old ones"""
e = None
for i in range(10):
@ -311,7 +326,7 @@ def resolve_from_str(input, root):
return root.joinpath(input)
def fnmatch_ex(pattern, path):
def fnmatch_ex(pattern: str, path) -> bool:
"""FNMatcher port from py.path.common which works with PurePath() instances.
The difference between this algorithm and PurePath.match() is that the latter matches "**" glob expressions
@ -346,6 +361,6 @@ def fnmatch_ex(pattern, path):
return fnmatch.fnmatch(name, pattern)
def parts(s):
def parts(s: str) -> Set[str]:
parts = s.split(sep)
return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))}