Merge pull request #3988 from RonnyPfannschmidt/tmpdir-port-pathlib
Tmpdir port pathlib
This commit is contained in:
commit
933de16fe4
|
@ -20,7 +20,9 @@ repos:
|
|||
- id: check-yaml
|
||||
- id: debug-statements
|
||||
exclude: _pytest/debugging.py
|
||||
language_version: python3
|
||||
- id: flake8
|
||||
language_version: python3
|
||||
- repo: https://github.com/asottile/pyupgrade
|
||||
rev: v1.8.0
|
||||
hooks:
|
||||
|
@ -41,6 +43,6 @@ repos:
|
|||
- id: changelogs-rst
|
||||
name: changelog filenames
|
||||
language: fail
|
||||
entry: 'changelog files must be named ####.(feature|bugfix|doc|removal|vendor|trivial).rst'
|
||||
entry: 'changelog files must be named ####.(feature|bugfix|doc|deprecation|removal|vendor|trivial).rst'
|
||||
exclude: changelog/(\d+\.(feature|bugfix|doc|deprecation|removal|vendor|trivial).rst|README.rst|_template.rst)
|
||||
files: ^changelog/
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Introduce ``tmp_path`` as a fixture providing a Path object.
|
|
@ -0,0 +1 @@
|
|||
Add a Deprecation warning for pytest.ensuretemp as it was deprecated since a while.
|
|
@ -0,0 +1 @@
|
|||
Port the implementation of tmpdir to pathlib.
|
|
@ -5,6 +5,55 @@
|
|||
Temporary directories and files
|
||||
================================================
|
||||
|
||||
The ``tmp_path`` fixture
|
||||
------------------------
|
||||
|
||||
.. versionadded:: 3.9
|
||||
|
||||
|
||||
You can use the ``tmpdir`` fixture which will
|
||||
provide a temporary directory unique to the test invocation,
|
||||
created in the `base temporary directory`_.
|
||||
|
||||
``tmpdir`` is a ``pathlib/pathlib2.Path`` object. Here is an example test usage:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# content of test_tmp_path.py
|
||||
import os
|
||||
|
||||
CONTENT = u"content"
|
||||
|
||||
|
||||
def test_create_file(tmp_path):
|
||||
d = tmp_path / "sub"
|
||||
d.mkdir()
|
||||
p = d / "hello.txt"
|
||||
p.write_text(CONTENT)
|
||||
assert p.read_text() == CONTENT
|
||||
assert len(tmpdir.listdir()) == 1
|
||||
assert 0
|
||||
|
||||
Running this would result in a passed test except for the last
|
||||
``assert 0`` line which we use to look at values::
|
||||
|
||||
$ pytest test_tmp_path.py
|
||||
... #fill fom regendoc
|
||||
|
||||
|
||||
|
||||
The ``tmp_path_factory`` fixture
|
||||
--------------------------------
|
||||
|
||||
.. versionadded:: 3.9
|
||||
|
||||
|
||||
The ``tmp_path_facotry`` is a session-scoped fixture which can be used
|
||||
to create arbitrary temporary directories from any other fixture or test.
|
||||
|
||||
its intended to replace ``tmpdir_factory`` and returns :class:`pathlib.Path` instances.
|
||||
|
||||
|
||||
The 'tmpdir' fixture
|
||||
--------------------
|
||||
|
||||
|
|
|
@ -17,8 +17,9 @@ import atomicwrites
|
|||
import py
|
||||
|
||||
from _pytest.assertion import util
|
||||
from _pytest.compat import PurePath, spec_from_file_location
|
||||
from _pytest.paths import fnmatch_ex
|
||||
from _pytest.pathlib import PurePath
|
||||
from _pytest.compat import spec_from_file_location
|
||||
from _pytest.pathlib import fnmatch_ex
|
||||
|
||||
# pytest caches rewritten pycs in __pycache__.
|
||||
if hasattr(imp, "get_tag"):
|
||||
|
|
|
@ -13,10 +13,9 @@ import attr
|
|||
|
||||
import pytest
|
||||
import json
|
||||
import shutil
|
||||
|
||||
from . import paths
|
||||
from .compat import _PY2 as PY2, Path
|
||||
from .compat import _PY2 as PY2
|
||||
from .pathlib import Path, resolve_from_str, rmtree
|
||||
|
||||
README_CONTENT = u"""\
|
||||
# pytest cache directory #
|
||||
|
@ -39,13 +38,13 @@ class Cache(object):
|
|||
def for_config(cls, config):
|
||||
cachedir = cls.cache_dir_from_config(config)
|
||||
if config.getoption("cacheclear") and cachedir.exists():
|
||||
shutil.rmtree(str(cachedir))
|
||||
rmtree(cachedir, force=True)
|
||||
cachedir.mkdir()
|
||||
return cls(cachedir, config)
|
||||
|
||||
@staticmethod
|
||||
def cache_dir_from_config(config):
|
||||
return paths.resolve_from_str(config.getini("cache_dir"), config.rootdir)
|
||||
return resolve_from_str(config.getini("cache_dir"), config.rootdir)
|
||||
|
||||
def warn(self, fmt, **args):
|
||||
from _pytest.warnings import _issue_config_warning
|
||||
|
|
|
@ -23,8 +23,6 @@ except ImportError: # pragma: no cover
|
|||
# Only available in Python 3.4+ or as a backport
|
||||
enum = None
|
||||
|
||||
__all__ = ["Path", "PurePath"]
|
||||
|
||||
_PY3 = sys.version_info > (3, 0)
|
||||
_PY2 = not _PY3
|
||||
|
||||
|
@ -41,11 +39,6 @@ PY35 = sys.version_info[:2] >= (3, 5)
|
|||
PY36 = sys.version_info[:2] >= (3, 6)
|
||||
MODULE_NOT_FOUND_ERROR = "ModuleNotFoundError" if PY36 else "ImportError"
|
||||
|
||||
if PY36:
|
||||
from pathlib import Path, PurePath
|
||||
else:
|
||||
from pathlib2 import Path, PurePath
|
||||
|
||||
|
||||
if _PY3:
|
||||
from collections.abc import MutableMapping as MappingMixin
|
||||
|
|
|
@ -109,3 +109,8 @@ PYTEST_PLUGINS_FROM_NON_TOP_LEVEL_CONFTEST = RemovedInPytest4Warning(
|
|||
PYTEST_NAMESPACE = RemovedInPytest4Warning(
|
||||
"pytest_namespace is deprecated and will be removed soon"
|
||||
)
|
||||
|
||||
PYTEST_ENSURETEMP = RemovedInPytest4Warning(
|
||||
"pytest/tmpdir_factory.ensuretemp is deprecated, \n"
|
||||
"please use the tmp_path fixture or tmp_path_factory.mktemp"
|
||||
)
|
||||
|
|
|
@ -156,7 +156,10 @@ def pytest_addoption(parser):
|
|||
dest="basetemp",
|
||||
default=None,
|
||||
metavar="dir",
|
||||
help="base temporary directory for this test run.",
|
||||
help=(
|
||||
"base temporary directory for this test run."
|
||||
"(warning: this directory is removed if it exists)"
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,281 @@
|
|||
|
||||
import os
|
||||
import errno
|
||||
import atexit
|
||||
import operator
|
||||
import six
|
||||
import sys
|
||||
from functools import reduce
|
||||
import uuid
|
||||
from six.moves import map
|
||||
import itertools
|
||||
import shutil
|
||||
from os.path import expanduser, expandvars, isabs, sep
|
||||
from posixpath import sep as posix_sep
|
||||
import fnmatch
|
||||
import stat
|
||||
|
||||
from .compat import PY36
|
||||
|
||||
|
||||
if PY36:
|
||||
from pathlib import Path, PurePath
|
||||
else:
|
||||
from pathlib2 import Path, PurePath
|
||||
|
||||
__all__ = ["Path", "PurePath"]
|
||||
|
||||
|
||||
LOCK_TIMEOUT = 60 * 60 * 3
|
||||
|
||||
get_lock_path = operator.methodcaller("joinpath", ".lock")
|
||||
|
||||
|
||||
def ensure_reset_dir(path):
|
||||
"""
|
||||
ensures the given path is a empty directory
|
||||
"""
|
||||
if path.exists():
|
||||
rmtree(path, force=True)
|
||||
path.mkdir()
|
||||
|
||||
|
||||
def _shutil_rmtree_remove_writable(func, fspath, _):
|
||||
"Clear the readonly bit and reattempt the removal"
|
||||
os.chmod(fspath, stat.S_IWRITE)
|
||||
func(fspath)
|
||||
|
||||
|
||||
def rmtree(path, force=False):
|
||||
if force:
|
||||
# ignore_errors leaves dead folders around
|
||||
# python needs a rm -rf as a followup
|
||||
# the trick with _shutil_rmtree_remove_writable is unreliable
|
||||
shutil.rmtree(str(path), ignore_errors=True)
|
||||
else:
|
||||
shutil.rmtree(str(path))
|
||||
|
||||
|
||||
def find_prefixed(root, prefix):
|
||||
"""finds all elements in root that begin with the prefix, case insensitive"""
|
||||
l_prefix = prefix.lower()
|
||||
for x in root.iterdir():
|
||||
if x.name.lower().startswith(l_prefix):
|
||||
yield x
|
||||
|
||||
|
||||
def extract_suffixes(iter, prefix):
|
||||
"""
|
||||
:param iter: iterator over path names
|
||||
:param prefix: expected prefix of the path names
|
||||
:returns: the parts of the paths following the prefix
|
||||
"""
|
||||
p_len = len(prefix)
|
||||
for p in iter:
|
||||
yield p.name[p_len:]
|
||||
|
||||
|
||||
def find_suffixes(root, prefix):
|
||||
"""combines find_prefixes and extract_suffixes
|
||||
"""
|
||||
return extract_suffixes(find_prefixed(root, prefix), prefix)
|
||||
|
||||
|
||||
def parse_num(maybe_num):
|
||||
"""parses number path suffixes, returns -1 on error"""
|
||||
try:
|
||||
return int(maybe_num)
|
||||
except ValueError:
|
||||
return -1
|
||||
|
||||
|
||||
if six.PY2:
|
||||
|
||||
def _max(iterable, default):
|
||||
"""needed due to python2.7 lacking the default argument for max"""
|
||||
return reduce(max, iterable, default)
|
||||
|
||||
|
||||
else:
|
||||
_max = max
|
||||
|
||||
|
||||
def make_numbered_dir(root, prefix):
|
||||
"""create a directory with a increased number as suffix for the given prefix"""
|
||||
for i in range(10):
|
||||
# try up to 10 times to create the folder
|
||||
max_existing = _max(map(parse_num, find_suffixes(root, prefix)), default=-1)
|
||||
new_number = max_existing + 1
|
||||
new_path = root.joinpath("{}{}".format(prefix, new_number))
|
||||
try:
|
||||
new_path.mkdir()
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
return new_path
|
||||
else:
|
||||
raise EnvironmentError(
|
||||
"could not create numbered dir with prefix "
|
||||
"{prefix} in {root} after 10 tries".format(prefix=prefix, root=root)
|
||||
)
|
||||
|
||||
|
||||
def create_cleanup_lock(p):
|
||||
"""crates a lock to prevent premature folder cleanup"""
|
||||
lock_path = get_lock_path(p)
|
||||
try:
|
||||
fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
|
||||
except OSError as e:
|
||||
if e.errno == errno.EEXIST:
|
||||
six.raise_from(
|
||||
EnvironmentError("cannot create lockfile in {path}".format(path=p)), e
|
||||
)
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
pid = os.getpid()
|
||||
spid = str(pid)
|
||||
if not isinstance(spid, six.binary_type):
|
||||
spid = spid.encode("ascii")
|
||||
os.write(fd, spid)
|
||||
os.close(fd)
|
||||
if not lock_path.is_file():
|
||||
raise EnvironmentError("lock path got renamed after sucessfull creation")
|
||||
return lock_path
|
||||
|
||||
|
||||
def register_cleanup_lock_removal(lock_path, register=atexit.register):
|
||||
"""registers a cleanup function for removing a lock, by default on atexit"""
|
||||
pid = os.getpid()
|
||||
|
||||
def cleanup_on_exit(lock_path=lock_path, original_pid=pid):
|
||||
current_pid = os.getpid()
|
||||
if current_pid != original_pid:
|
||||
# fork
|
||||
return
|
||||
try:
|
||||
lock_path.unlink()
|
||||
except (OSError, IOError):
|
||||
pass
|
||||
|
||||
return register(cleanup_on_exit)
|
||||
|
||||
|
||||
def delete_a_numbered_dir(path):
|
||||
"""removes a numbered directory"""
|
||||
create_cleanup_lock(path)
|
||||
parent = path.parent
|
||||
|
||||
garbage = parent.joinpath("garbage-{}".format(uuid.uuid4()))
|
||||
path.rename(garbage)
|
||||
rmtree(garbage, force=True)
|
||||
|
||||
|
||||
def ensure_deletable(path, consider_lock_dead_if_created_before):
|
||||
"""checks if a lock exists and breaks it if its considered dead"""
|
||||
lock = get_lock_path(path)
|
||||
if not lock.exists():
|
||||
return True
|
||||
try:
|
||||
lock_time = lock.stat().st_mtime
|
||||
except Exception:
|
||||
return False
|
||||
else:
|
||||
if lock_time < consider_lock_dead_if_created_before:
|
||||
lock.unlink()
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def try_cleanup(path, consider_lock_dead_if_created_before):
|
||||
"""tries to cleanup a folder if we can ensure its deletable"""
|
||||
if ensure_deletable(path, consider_lock_dead_if_created_before):
|
||||
delete_a_numbered_dir(path)
|
||||
|
||||
|
||||
def cleanup_candidates(root, prefix, keep):
|
||||
"""lists candidates for numbered directories to be removed - follows py.path"""
|
||||
max_existing = _max(map(parse_num, find_suffixes(root, prefix)), default=-1)
|
||||
max_delete = max_existing - keep
|
||||
paths = find_prefixed(root, prefix)
|
||||
paths, paths2 = itertools.tee(paths)
|
||||
numbers = map(parse_num, extract_suffixes(paths2, prefix))
|
||||
for path, number in zip(paths, numbers):
|
||||
if number <= max_delete:
|
||||
yield path
|
||||
|
||||
|
||||
def cleanup_numbered_dir(root, prefix, keep, consider_lock_dead_if_created_before):
|
||||
"""cleanup for lock driven numbered directories"""
|
||||
for path in cleanup_candidates(root, prefix, keep):
|
||||
try_cleanup(path, consider_lock_dead_if_created_before)
|
||||
for path in root.glob("garbage-*"):
|
||||
try_cleanup(path, consider_lock_dead_if_created_before)
|
||||
|
||||
|
||||
def make_numbered_dir_with_cleanup(root, prefix, keep, lock_timeout):
|
||||
"""creates a numbered dir with a cleanup lock and removes old ones"""
|
||||
e = None
|
||||
for i in range(10):
|
||||
try:
|
||||
p = make_numbered_dir(root, prefix)
|
||||
lock_path = create_cleanup_lock(p)
|
||||
register_cleanup_lock_removal(lock_path)
|
||||
except Exception as e:
|
||||
pass
|
||||
else:
|
||||
consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout
|
||||
cleanup_numbered_dir(
|
||||
root=root,
|
||||
prefix=prefix,
|
||||
keep=keep,
|
||||
consider_lock_dead_if_created_before=consider_lock_dead_if_created_before,
|
||||
)
|
||||
return p
|
||||
assert e is not None
|
||||
raise e
|
||||
|
||||
|
||||
def resolve_from_str(input, root):
|
||||
assert not isinstance(input, Path), "would break on py2"
|
||||
root = Path(root)
|
||||
input = expanduser(input)
|
||||
input = expandvars(input)
|
||||
if isabs(input):
|
||||
return Path(input)
|
||||
else:
|
||||
return root.joinpath(input)
|
||||
|
||||
|
||||
def fnmatch_ex(pattern, path):
|
||||
"""FNMatcher port from py.path.common which works with PurePath() instances.
|
||||
|
||||
The difference between this algorithm and PurePath.match() is that the latter matches "**" glob expressions
|
||||
for each part of the path, while this algorithm uses the whole path instead.
|
||||
|
||||
For example:
|
||||
"tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py" with this algorithm, but not with
|
||||
PurePath.match().
|
||||
|
||||
This algorithm was ported to keep backward-compatibility with existing settings which assume paths match according
|
||||
this logic.
|
||||
|
||||
References:
|
||||
* https://bugs.python.org/issue29249
|
||||
* https://bugs.python.org/issue34731
|
||||
"""
|
||||
path = PurePath(path)
|
||||
iswin32 = sys.platform.startswith("win")
|
||||
|
||||
if iswin32 and sep not in pattern and posix_sep in pattern:
|
||||
# Running on Windows, the pattern has no Windows path separators,
|
||||
# and the pattern has one or more Posix path separators. Replace
|
||||
# the Posix path separators with the Windows path separator.
|
||||
pattern = pattern.replace(posix_sep, sep)
|
||||
|
||||
if sep not in pattern:
|
||||
name = path.name
|
||||
else:
|
||||
name = six.text_type(path)
|
||||
return fnmatch.fnmatch(name, pattern)
|
|
@ -1,52 +0,0 @@
|
|||
from os.path import expanduser, expandvars, isabs, sep
|
||||
from posixpath import sep as posix_sep
|
||||
import fnmatch
|
||||
import sys
|
||||
|
||||
import six
|
||||
|
||||
from .compat import Path, PurePath
|
||||
|
||||
|
||||
def resolve_from_str(input, root):
|
||||
assert not isinstance(input, Path), "would break on py2"
|
||||
root = Path(root)
|
||||
input = expanduser(input)
|
||||
input = expandvars(input)
|
||||
if isabs(input):
|
||||
return Path(input)
|
||||
else:
|
||||
return root.joinpath(input)
|
||||
|
||||
|
||||
def fnmatch_ex(pattern, path):
|
||||
"""FNMatcher port from py.path.common which works with PurePath() instances.
|
||||
|
||||
The difference between this algorithm and PurePath.match() is that the latter matches "**" glob expressions
|
||||
for each part of the path, while this algorithm uses the whole path instead.
|
||||
|
||||
For example:
|
||||
"tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py" with this algorithm, but not with
|
||||
PurePath.match().
|
||||
|
||||
This algorithm was ported to keep backward-compatibility with existing settings which assume paths match according
|
||||
this logic.
|
||||
|
||||
References:
|
||||
* https://bugs.python.org/issue29249
|
||||
* https://bugs.python.org/issue34731
|
||||
"""
|
||||
path = PurePath(path)
|
||||
iswin32 = sys.platform.startswith("win")
|
||||
|
||||
if iswin32 and sep not in pattern and posix_sep in pattern:
|
||||
# Running on Windows, the pattern has no Windows path separators,
|
||||
# and the pattern has one or more Posix path separators. Replace
|
||||
# the Posix path separators with the Windows path separator.
|
||||
pattern = pattern.replace(posix_sep, sep)
|
||||
|
||||
if sep not in pattern:
|
||||
name = path.name
|
||||
else:
|
||||
name = six.text_type(path)
|
||||
return fnmatch.fnmatch(name, pattern)
|
|
@ -17,13 +17,14 @@ from weakref import WeakKeyDictionary
|
|||
|
||||
from _pytest.capture import MultiCapture, SysCapture
|
||||
from _pytest._code import Source
|
||||
import py
|
||||
import pytest
|
||||
from _pytest.main import Session, EXIT_OK
|
||||
from _pytest.assertion.rewrite import AssertionRewritingHook
|
||||
from _pytest.compat import Path
|
||||
from _pytest.pathlib import Path
|
||||
from _pytest.compat import safe_str
|
||||
|
||||
import py
|
||||
import pytest
|
||||
|
||||
IGNORE_PAM = [ # filenames added when obtaining details about the current user
|
||||
u"/var/lib/sss/mc/passwd"
|
||||
]
|
||||
|
@ -495,6 +496,8 @@ class Testdir(object):
|
|||
self._mod_collections = WeakKeyDictionary()
|
||||
name = request.function.__name__
|
||||
self.tmpdir = tmpdir_factory.mktemp(name, numbered=True)
|
||||
self.test_tmproot = tmpdir_factory.mktemp("tmp-" + name, numbered=True)
|
||||
os.environ["PYTEST_DEBUG_TEMPROOT"] = str(self.test_tmproot)
|
||||
self.plugins = []
|
||||
self._cwd_snapshot = CwdSnapshot()
|
||||
self._sys_path_snapshot = SysPathsSnapshot()
|
||||
|
@ -521,6 +524,7 @@ class Testdir(object):
|
|||
self._sys_modules_snapshot.restore()
|
||||
self._sys_path_snapshot.restore()
|
||||
self._cwd_snapshot.restore()
|
||||
os.environ.pop("PYTEST_DEBUG_TEMPROOT", None)
|
||||
|
||||
def __take_sys_modules_snapshot(self):
|
||||
# some zope modules used by twisted-related tests keep internal state
|
||||
|
|
|
@ -1,22 +1,86 @@
|
|||
""" support for providing temporary directories to test functions. """
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
import os
|
||||
import re
|
||||
|
||||
import pytest
|
||||
import py
|
||||
from _pytest.monkeypatch import MonkeyPatch
|
||||
import attr
|
||||
import tempfile
|
||||
import warnings
|
||||
|
||||
from .pathlib import (
|
||||
Path,
|
||||
make_numbered_dir,
|
||||
make_numbered_dir_with_cleanup,
|
||||
ensure_reset_dir,
|
||||
LOCK_TIMEOUT,
|
||||
)
|
||||
|
||||
|
||||
class TempdirFactory(object):
|
||||
@attr.s
|
||||
class TempPathFactory(object):
|
||||
"""Factory for temporary directories under the common base temp directory.
|
||||
|
||||
The base directory can be configured using the ``--basetemp`` option.
|
||||
The base directory can be configured using the ``--basetemp`` option."""
|
||||
|
||||
_given_basetemp = attr.ib()
|
||||
_trace = attr.ib()
|
||||
_basetemp = attr.ib(default=None)
|
||||
|
||||
@classmethod
|
||||
def from_config(cls, config):
|
||||
"""
|
||||
:param config: a pytest configuration
|
||||
"""
|
||||
return cls(
|
||||
given_basetemp=config.option.basetemp, trace=config.trace.get("tmpdir")
|
||||
)
|
||||
|
||||
def mktemp(self, basename, numbered=True):
|
||||
"""makes a temporary directory managed by the factory"""
|
||||
if not numbered:
|
||||
p = self.getbasetemp().joinpath(basename)
|
||||
p.mkdir()
|
||||
else:
|
||||
p = make_numbered_dir(root=self.getbasetemp(), prefix=basename)
|
||||
self._trace("mktemp", p)
|
||||
return p
|
||||
|
||||
def getbasetemp(self):
|
||||
""" return base temporary directory. """
|
||||
if self._basetemp is None:
|
||||
if self._given_basetemp is not None:
|
||||
basetemp = Path(self._given_basetemp)
|
||||
ensure_reset_dir(basetemp)
|
||||
else:
|
||||
from_env = os.environ.get("PYTEST_DEBUG_TEMPROOT")
|
||||
temproot = Path(from_env or tempfile.gettempdir())
|
||||
user = get_user() or "unknown"
|
||||
# use a sub-directory in the temproot to speed-up
|
||||
# make_numbered_dir() call
|
||||
rootdir = temproot.joinpath("pytest-of-{}".format(user))
|
||||
rootdir.mkdir(exist_ok=True)
|
||||
basetemp = make_numbered_dir_with_cleanup(
|
||||
prefix="pytest-", root=rootdir, keep=3, lock_timeout=LOCK_TIMEOUT
|
||||
)
|
||||
assert basetemp is not None
|
||||
self._basetemp = t = basetemp
|
||||
self._trace("new basetemp", t)
|
||||
return t
|
||||
else:
|
||||
return self._basetemp
|
||||
|
||||
|
||||
@attr.s
|
||||
class TempdirFactory(object):
|
||||
"""
|
||||
backward comptibility wrapper that implements
|
||||
:class:``py.path.local`` for :class:``TempPathFactory``
|
||||
"""
|
||||
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
self.trace = config.trace.get("tmpdir")
|
||||
_tmppath_factory = attr.ib()
|
||||
|
||||
def ensuretemp(self, string, dir=1):
|
||||
""" (deprecated) return temporary directory path with
|
||||
|
@ -26,6 +90,9 @@ class TempdirFactory(object):
|
|||
and is guaranteed to be empty.
|
||||
"""
|
||||
# py.log._apiwarn(">1.1", "use tmpdir function argument")
|
||||
from .deprecated import PYTEST_ENSURETEMP
|
||||
|
||||
warnings.warn(PYTEST_ENSURETEMP, stacklevel=2)
|
||||
return self.getbasetemp().ensure(string, dir=dir)
|
||||
|
||||
def mktemp(self, basename, numbered=True):
|
||||
|
@ -33,46 +100,11 @@ class TempdirFactory(object):
|
|||
If ``numbered``, ensure the directory is unique by adding a number
|
||||
prefix greater than any existing one.
|
||||
"""
|
||||
basetemp = self.getbasetemp()
|
||||
if not numbered:
|
||||
p = basetemp.mkdir(basename)
|
||||
else:
|
||||
p = py.path.local.make_numbered_dir(
|
||||
prefix=basename, keep=0, rootdir=basetemp, lock_timeout=None
|
||||
)
|
||||
self.trace("mktemp", p)
|
||||
return p
|
||||
return py.path.local(self._tmppath_factory.mktemp(basename, numbered).resolve())
|
||||
|
||||
def getbasetemp(self):
|
||||
""" return base temporary directory. """
|
||||
try:
|
||||
return self._basetemp
|
||||
except AttributeError:
|
||||
basetemp = self.config.option.basetemp
|
||||
if basetemp:
|
||||
basetemp = py.path.local(basetemp)
|
||||
if basetemp.check():
|
||||
basetemp.remove()
|
||||
basetemp.mkdir()
|
||||
else:
|
||||
temproot = py.path.local.get_temproot()
|
||||
user = get_user()
|
||||
if user:
|
||||
# use a sub-directory in the temproot to speed-up
|
||||
# make_numbered_dir() call
|
||||
rootdir = temproot.join("pytest-of-%s" % user)
|
||||
else:
|
||||
rootdir = temproot
|
||||
rootdir.ensure(dir=1)
|
||||
basetemp = py.path.local.make_numbered_dir(
|
||||
prefix="pytest-", rootdir=rootdir
|
||||
)
|
||||
self._basetemp = t = basetemp.realpath()
|
||||
self.trace("new basetemp", t)
|
||||
return t
|
||||
|
||||
def finish(self):
|
||||
self.trace("finish")
|
||||
"""backward compat wrapper for ``_tmppath_factory.getbasetemp``"""
|
||||
return py.path.local(self._tmppath_factory.getbasetemp().resolve())
|
||||
|
||||
|
||||
def get_user():
|
||||
|
@ -87,10 +119,6 @@ def get_user():
|
|||
return None
|
||||
|
||||
|
||||
# backward compatibility
|
||||
TempdirHandler = TempdirFactory
|
||||
|
||||
|
||||
def pytest_configure(config):
|
||||
"""Create a TempdirFactory and attach it to the config object.
|
||||
|
||||
|
@ -99,19 +127,36 @@ def pytest_configure(config):
|
|||
to the tmpdir_factory session fixture.
|
||||
"""
|
||||
mp = MonkeyPatch()
|
||||
t = TempdirFactory(config)
|
||||
config._cleanup.extend([mp.undo, t.finish])
|
||||
tmppath_handler = TempPathFactory.from_config(config)
|
||||
t = TempdirFactory(tmppath_handler)
|
||||
config._cleanup.append(mp.undo)
|
||||
mp.setattr(config, "_tmp_path_factory", tmppath_handler, raising=False)
|
||||
mp.setattr(config, "_tmpdirhandler", t, raising=False)
|
||||
mp.setattr(pytest, "ensuretemp", t.ensuretemp, raising=False)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def tmpdir_factory(request):
|
||||
"""Return a TempdirFactory instance for the test session.
|
||||
"""Return a :class:`_pytest.tmpdir.TempdirFactory` instance for the test session.
|
||||
"""
|
||||
return request.config._tmpdirhandler
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def tmp_path_factory(request):
|
||||
"""Return a :class:`_pytest.tmpdir.TempPathFactory` instance for the test session.
|
||||
"""
|
||||
return request.config._tmp_path_factory
|
||||
|
||||
|
||||
def _mk_tmp(request, factory):
|
||||
name = request.node.name
|
||||
name = re.sub(r"[\W]", "_", name)
|
||||
MAXVAL = 30
|
||||
name = name[:MAXVAL]
|
||||
return factory.mktemp(name, numbered=True)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmpdir(request, tmpdir_factory):
|
||||
"""Return a temporary directory path object
|
||||
|
@ -122,10 +167,20 @@ def tmpdir(request, tmpdir_factory):
|
|||
|
||||
.. _`py.path.local`: https://py.readthedocs.io/en/latest/path.html
|
||||
"""
|
||||
name = request.node.name
|
||||
name = re.sub(r"[\W]", "_", name)
|
||||
MAXVAL = 30
|
||||
if len(name) > MAXVAL:
|
||||
name = name[:MAXVAL]
|
||||
x = tmpdir_factory.mktemp(name, numbered=True)
|
||||
return x
|
||||
return _mk_tmp(request, tmpdir_factory)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_path(request, tmp_path_factory):
|
||||
"""Return a temporary directory path object
|
||||
which is unique to each test function invocation,
|
||||
created as a sub directory of the base temporary
|
||||
directory. The returned object is a :class:`pathlib.Path`
|
||||
object.
|
||||
|
||||
.. note::
|
||||
|
||||
in python < 3.6 this is a pathlib2.Path
|
||||
"""
|
||||
|
||||
return _mk_tmp(request, tmp_path_factory)
|
||||
|
|
|
@ -6,7 +6,7 @@ import pytest
|
|||
from _pytest.pytester import get_public_names
|
||||
from _pytest.fixtures import FixtureLookupError, FixtureRequest
|
||||
from _pytest import fixtures
|
||||
from _pytest.compat import Path
|
||||
from _pytest.pathlib import Path
|
||||
|
||||
|
||||
def test_getfuncargnames():
|
||||
|
|
|
@ -4,7 +4,7 @@ import py
|
|||
|
||||
import pytest
|
||||
|
||||
from _pytest.paths import fnmatch_ex
|
||||
from _pytest.pathlib import fnmatch_ex
|
||||
|
||||
|
||||
class TestPort:
|
||||
|
|
|
@ -19,11 +19,11 @@ def test_ensuretemp(recwarn):
|
|||
|
||||
class TestTempdirHandler(object):
|
||||
def test_mktemp(self, testdir):
|
||||
from _pytest.tmpdir import TempdirFactory
|
||||
from _pytest.tmpdir import TempdirFactory, TempPathFactory
|
||||
|
||||
config = testdir.parseconfig()
|
||||
config.option.basetemp = testdir.mkdir("hello")
|
||||
t = TempdirFactory(config)
|
||||
t = TempdirFactory(TempPathFactory.from_config(config))
|
||||
tmp = t.mktemp("world")
|
||||
assert tmp.relto(t.getbasetemp()) == "world0"
|
||||
tmp = t.mktemp("this")
|
||||
|
@ -111,7 +111,7 @@ def test_tmpdir_factory(testdir):
|
|||
def session_dir(tmpdir_factory):
|
||||
return tmpdir_factory.mktemp('data', numbered=False)
|
||||
def test_some(session_dir):
|
||||
session_dir.isdir()
|
||||
assert session_dir.isdir()
|
||||
"""
|
||||
)
|
||||
reprec = testdir.inline_run()
|
||||
|
@ -184,3 +184,94 @@ def test_get_user(monkeypatch):
|
|||
monkeypatch.delenv("USER", raising=False)
|
||||
monkeypatch.delenv("USERNAME", raising=False)
|
||||
assert get_user() is None
|
||||
|
||||
|
||||
class TestNumberedDir(object):
|
||||
PREFIX = "fun-"
|
||||
|
||||
def test_make(self, tmp_path):
|
||||
from _pytest.pathlib import make_numbered_dir
|
||||
|
||||
for i in range(10):
|
||||
d = make_numbered_dir(root=tmp_path, prefix=self.PREFIX)
|
||||
assert d.name.startswith(self.PREFIX)
|
||||
assert d.name.endswith(str(i))
|
||||
|
||||
def test_cleanup_lock_create(self, tmp_path):
|
||||
d = tmp_path.joinpath("test")
|
||||
d.mkdir()
|
||||
from _pytest.pathlib import create_cleanup_lock
|
||||
|
||||
lockfile = create_cleanup_lock(d)
|
||||
with pytest.raises(EnvironmentError, match="cannot create lockfile in .*"):
|
||||
create_cleanup_lock(d)
|
||||
|
||||
lockfile.unlink()
|
||||
|
||||
def test_lock_register_cleanup_removal(self, tmp_path):
|
||||
from _pytest.pathlib import create_cleanup_lock, register_cleanup_lock_removal
|
||||
|
||||
lock = create_cleanup_lock(tmp_path)
|
||||
|
||||
registry = []
|
||||
register_cleanup_lock_removal(lock, register=registry.append)
|
||||
|
||||
cleanup_func, = registry
|
||||
|
||||
assert lock.is_file()
|
||||
|
||||
cleanup_func(original_pid="intentionally_different")
|
||||
|
||||
assert lock.is_file()
|
||||
|
||||
cleanup_func()
|
||||
|
||||
assert not lock.exists()
|
||||
|
||||
cleanup_func()
|
||||
|
||||
assert not lock.exists()
|
||||
|
||||
def test_cleanup_keep(self, tmp_path):
|
||||
self.test_make(tmp_path)
|
||||
from _pytest.pathlib import cleanup_numbered_dir
|
||||
|
||||
cleanup_numbered_dir(
|
||||
root=tmp_path,
|
||||
prefix=self.PREFIX,
|
||||
keep=2,
|
||||
consider_lock_dead_if_created_before=0,
|
||||
)
|
||||
a, b = tmp_path.iterdir()
|
||||
print(a, b)
|
||||
|
||||
def test_cleanup_locked(self, tmp_path):
|
||||
|
||||
from _pytest import pathlib
|
||||
|
||||
p = pathlib.make_numbered_dir(root=tmp_path, prefix=self.PREFIX)
|
||||
|
||||
pathlib.create_cleanup_lock(p)
|
||||
|
||||
assert not pathlib.ensure_deletable(
|
||||
p, consider_lock_dead_if_created_before=p.stat().st_mtime - 1
|
||||
)
|
||||
assert pathlib.ensure_deletable(
|
||||
p, consider_lock_dead_if_created_before=p.stat().st_mtime + 1
|
||||
)
|
||||
|
||||
def test_rmtree(self, tmp_path):
|
||||
from _pytest.pathlib import rmtree
|
||||
|
||||
adir = tmp_path / "adir"
|
||||
adir.mkdir()
|
||||
rmtree(adir)
|
||||
|
||||
assert not adir.exists()
|
||||
|
||||
adir.mkdir()
|
||||
afile = adir / "afile"
|
||||
afile.write_bytes(b"aa")
|
||||
|
||||
rmtree(adir, force=True)
|
||||
assert not adir.exists()
|
||||
|
|
Loading…
Reference in New Issue