Improve rm_rf to handle only known functions
Warnings are emitted if we cannot safely remove paths. Fix #5626
This commit is contained in:
parent
dc6e7b9fcf
commit
9064eea216
|
@ -8,6 +8,7 @@ import shutil
|
|||
import sys
|
||||
import uuid
|
||||
import warnings
|
||||
from functools import partial
|
||||
from os.path import expanduser
|
||||
from os.path import expandvars
|
||||
from os.path import isabs
|
||||
|
@ -38,38 +39,49 @@ def ensure_reset_dir(path):
|
|||
path.mkdir()
|
||||
|
||||
|
||||
def rm_rf(path):
|
||||
def on_rm_rf_error(func, path: str, exc, *, start_path):
|
||||
"""Handles known read-only errors during rmtree."""
|
||||
excvalue = exc[1]
|
||||
|
||||
if not isinstance(excvalue, PermissionError):
|
||||
warnings.warn(
|
||||
PytestWarning("(rm_rf) error removing {}: {}".format(path, excvalue))
|
||||
)
|
||||
return
|
||||
|
||||
if func not in (os.rmdir, os.remove, os.unlink):
|
||||
warnings.warn(
|
||||
PytestWarning("(rm_rf) error removing {}: {}".format(path, excvalue))
|
||||
)
|
||||
return
|
||||
|
||||
# Chmod + retry.
|
||||
import stat
|
||||
|
||||
def chmod_rw(p: str):
|
||||
mode = os.stat(p).st_mode
|
||||
os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR)
|
||||
|
||||
# For files, we need to recursively go upwards in the directories to
|
||||
# ensure they all are also writable.
|
||||
p = Path(path)
|
||||
if p.is_file():
|
||||
for parent in p.parents:
|
||||
chmod_rw(str(parent))
|
||||
# stop when we reach the original path passed to rm_rf
|
||||
if parent == start_path:
|
||||
break
|
||||
chmod_rw(str(path))
|
||||
|
||||
func(path)
|
||||
|
||||
|
||||
def rm_rf(path: Path):
|
||||
"""Remove the path contents recursively, even if some elements
|
||||
are read-only.
|
||||
"""
|
||||
|
||||
def chmod_w(p):
|
||||
import stat
|
||||
|
||||
mode = os.stat(str(p)).st_mode
|
||||
os.chmod(str(p), mode | stat.S_IWRITE)
|
||||
|
||||
def force_writable_and_retry(function, p, excinfo):
|
||||
p = Path(p)
|
||||
|
||||
# for files, we need to recursively go upwards
|
||||
# in the directories to ensure they all are also
|
||||
# writable
|
||||
if p.is_file():
|
||||
for parent in p.parents:
|
||||
chmod_w(parent)
|
||||
# stop when we reach the original path passed to rm_rf
|
||||
if parent == path:
|
||||
break
|
||||
|
||||
chmod_w(p)
|
||||
try:
|
||||
# retry the function that failed
|
||||
function(str(p))
|
||||
except Exception as e:
|
||||
warnings.warn(PytestWarning("(rm_rf) error removing {}: {}".format(p, e)))
|
||||
|
||||
shutil.rmtree(str(path), onerror=force_writable_and_retry)
|
||||
onerror = partial(on_rm_rf_error, start_path=path)
|
||||
shutil.rmtree(str(path), onerror=onerror)
|
||||
|
||||
|
||||
def find_prefixed(root, prefix):
|
||||
|
|
|
@ -312,7 +312,20 @@ class TestNumberedDir:
|
|||
p, consider_lock_dead_if_created_before=p.stat().st_mtime + 1
|
||||
)
|
||||
|
||||
def test_rmtree(self, tmp_path):
|
||||
def test_cleanup_ignores_symlink(self, tmp_path):
|
||||
the_symlink = tmp_path / (self.PREFIX + "current")
|
||||
attempt_symlink_to(the_symlink, tmp_path / (self.PREFIX + "5"))
|
||||
self._do_cleanup(tmp_path)
|
||||
|
||||
def test_removal_accepts_lock(self, tmp_path):
|
||||
folder = pathlib.make_numbered_dir(root=tmp_path, prefix=self.PREFIX)
|
||||
pathlib.create_cleanup_lock(folder)
|
||||
pathlib.maybe_delete_a_numbered_dir(folder)
|
||||
assert folder.is_dir()
|
||||
|
||||
|
||||
class TestRmRf:
|
||||
def test_rm_rf(self, tmp_path):
|
||||
from _pytest.pathlib import rm_rf
|
||||
|
||||
adir = tmp_path / "adir"
|
||||
|
@ -328,7 +341,7 @@ class TestNumberedDir:
|
|||
rm_rf(adir)
|
||||
assert not adir.exists()
|
||||
|
||||
def test_rmtree_with_read_only_file(self, tmp_path):
|
||||
def test_rm_rf_with_read_only_file(self, tmp_path):
|
||||
"""Ensure rm_rf can remove directories with read-only files in them (#5524)"""
|
||||
from _pytest.pathlib import rm_rf
|
||||
|
||||
|
@ -337,14 +350,17 @@ class TestNumberedDir:
|
|||
|
||||
fn.touch()
|
||||
|
||||
mode = os.stat(str(fn)).st_mode
|
||||
os.chmod(str(fn), mode & ~stat.S_IWRITE)
|
||||
self.chmod_r(fn)
|
||||
|
||||
rm_rf(fn.parent)
|
||||
|
||||
assert not fn.parent.is_dir()
|
||||
|
||||
def test_rmtree_with_read_only_directory(self, tmp_path):
|
||||
def chmod_r(self, path):
|
||||
mode = os.stat(str(path)).st_mode
|
||||
os.chmod(str(path), mode & ~stat.S_IWRITE)
|
||||
|
||||
def test_rm_rf_with_read_only_directory(self, tmp_path):
|
||||
"""Ensure rm_rf can remove read-only directories (#5524)"""
|
||||
from _pytest.pathlib import rm_rf
|
||||
|
||||
|
@ -352,23 +368,37 @@ class TestNumberedDir:
|
|||
adir.mkdir()
|
||||
|
||||
(adir / "foo.txt").touch()
|
||||
mode = os.stat(str(adir)).st_mode
|
||||
os.chmod(str(adir), mode & ~stat.S_IWRITE)
|
||||
self.chmod_r(adir)
|
||||
|
||||
rm_rf(adir)
|
||||
|
||||
assert not adir.is_dir()
|
||||
|
||||
def test_cleanup_ignores_symlink(self, tmp_path):
|
||||
the_symlink = tmp_path / (self.PREFIX + "current")
|
||||
attempt_symlink_to(the_symlink, tmp_path / (self.PREFIX + "5"))
|
||||
self._do_cleanup(tmp_path)
|
||||
def test_on_rm_rf_error(self, tmp_path):
|
||||
from _pytest.pathlib import on_rm_rf_error
|
||||
|
||||
def test_removal_accepts_lock(self, tmp_path):
|
||||
folder = pathlib.make_numbered_dir(root=tmp_path, prefix=self.PREFIX)
|
||||
pathlib.create_cleanup_lock(folder)
|
||||
pathlib.maybe_delete_a_numbered_dir(folder)
|
||||
assert folder.is_dir()
|
||||
adir = tmp_path / "dir"
|
||||
adir.mkdir()
|
||||
|
||||
fn = adir / "foo.txt"
|
||||
fn.touch()
|
||||
self.chmod_r(fn)
|
||||
|
||||
# unknown exception
|
||||
with pytest.warns(pytest.PytestWarning):
|
||||
exc_info = (None, RuntimeError(), None)
|
||||
on_rm_rf_error(os.unlink, str(fn), exc_info, start_path=tmp_path)
|
||||
assert fn.is_file()
|
||||
|
||||
# unknown function
|
||||
with pytest.warns(pytest.PytestWarning):
|
||||
exc_info = (None, PermissionError(), None)
|
||||
on_rm_rf_error(None, str(fn), exc_info, start_path=tmp_path)
|
||||
assert fn.is_file()
|
||||
|
||||
exc_info = (None, PermissionError(), None)
|
||||
on_rm_rf_error(os.unlink, str(fn), exc_info, start_path=tmp_path)
|
||||
assert not fn.is_file()
|
||||
|
||||
|
||||
def attempt_symlink_to(path, to_path):
|
||||
|
|
Loading…
Reference in New Issue