Merge pull request #8516 from bluetech/mktmp

Fix minor temporary directory security issue
This commit is contained in:
Ran Benita 2021-04-04 00:33:53 +03:00 committed by GitHub
commit 07c6b3fb21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 97 additions and 24 deletions

10
changelog/8414.bugfix.rst Normal file
View File

@ -0,0 +1,10 @@
pytest used to create directories under ``/tmp`` with world-readable
permissions. This means that any user in the system was able to read
information written by tests in temporary directories (such as those created by
the ``tmp_path``/``tmpdir`` fixture). Now the directories are created with
private permissions.
pytest used silenty use a pre-existing ``/tmp/pytest-of-<username>`` directory,
even if owned by another user. This means another user could pre-create such a
directory and gain control of another user's temporary directory. Now such a
condition results in an error.

View File

@ -62,13 +62,6 @@ def get_lock_path(path: _AnyPurePath) -> _AnyPurePath:
return path.joinpath(".lock") return path.joinpath(".lock")
def ensure_reset_dir(path: Path) -> None:
"""Ensure the given path is an empty directory."""
if path.exists():
rm_rf(path)
path.mkdir()
def on_rm_rf_error(func, path: str, exc, *, start_path: Path) -> bool: def on_rm_rf_error(func, path: str, exc, *, start_path: Path) -> bool:
"""Handle known read-only errors during rmtree. """Handle known read-only errors during rmtree.
@ -212,7 +205,7 @@ def _force_symlink(
pass pass
def make_numbered_dir(root: Path, prefix: str) -> Path: def make_numbered_dir(root: Path, prefix: str, mode: int = 0o700) -> Path:
"""Create a directory with an increased number as suffix for the given prefix.""" """Create a directory with an increased number as suffix for the given prefix."""
for i in range(10): for i in range(10):
# try up to 10 times to create the folder # try up to 10 times to create the folder
@ -220,7 +213,7 @@ def make_numbered_dir(root: Path, prefix: str) -> Path:
new_number = max_existing + 1 new_number = max_existing + 1
new_path = root.joinpath(f"{prefix}{new_number}") new_path = root.joinpath(f"{prefix}{new_number}")
try: try:
new_path.mkdir() new_path.mkdir(mode=mode)
except Exception: except Exception:
pass pass
else: else:
@ -352,13 +345,17 @@ def cleanup_numbered_dir(
def make_numbered_dir_with_cleanup( def make_numbered_dir_with_cleanup(
root: Path, prefix: str, keep: int, lock_timeout: float root: Path,
prefix: str,
keep: int,
lock_timeout: float,
mode: int,
) -> Path: ) -> Path:
"""Create a numbered dir with a cleanup lock and remove old ones.""" """Create a numbered dir with a cleanup lock and remove old ones."""
e = None e = None
for i in range(10): for i in range(10):
try: try:
p = make_numbered_dir(root, prefix) p = make_numbered_dir(root, prefix, mode)
lock_path = create_cleanup_lock(p) lock_path = create_cleanup_lock(p)
register_cleanup_lock_removal(lock_path) register_cleanup_lock_removal(lock_path)
except Exception as exc: except Exception as exc:

View File

@ -1456,7 +1456,7 @@ class Pytester:
:py:class:`Pytester.TimeoutExpired`. :py:class:`Pytester.TimeoutExpired`.
""" """
__tracebackhide__ = True __tracebackhide__ = True
p = make_numbered_dir(root=self.path, prefix="runpytest-") p = make_numbered_dir(root=self.path, prefix="runpytest-", mode=0o700)
args = ("--basetemp=%s" % p,) + args args = ("--basetemp=%s" % p,) + args
plugins = [x for x in self.plugins if isinstance(x, str)] plugins = [x for x in self.plugins if isinstance(x, str)]
if plugins: if plugins:
@ -1475,7 +1475,7 @@ class Pytester:
The pexpect child is returned. The pexpect child is returned.
""" """
basetemp = self.path / "temp-pexpect" basetemp = self.path / "temp-pexpect"
basetemp.mkdir() basetemp.mkdir(mode=0o700)
invoke = " ".join(map(str, self._getpytestargs())) invoke = " ".join(map(str, self._getpytestargs()))
cmd = f"{invoke} --basetemp={basetemp} {string}" cmd = f"{invoke} --basetemp={basetemp} {string}"
return self.spawn(cmd, expect_timeout=expect_timeout) return self.spawn(cmd, expect_timeout=expect_timeout)

View File

@ -7,10 +7,10 @@ from typing import Optional
import attr import attr
from .pathlib import ensure_reset_dir
from .pathlib import LOCK_TIMEOUT from .pathlib import LOCK_TIMEOUT
from .pathlib import make_numbered_dir from .pathlib import make_numbered_dir
from .pathlib import make_numbered_dir_with_cleanup from .pathlib import make_numbered_dir_with_cleanup
from .pathlib import rm_rf
from _pytest.compat import final from _pytest.compat import final
from _pytest.compat import LEGACY_PATH from _pytest.compat import LEGACY_PATH
from _pytest.compat import legacy_path from _pytest.compat import legacy_path
@ -94,20 +94,22 @@ class TempPathFactory:
basename = self._ensure_relative_to_basetemp(basename) basename = self._ensure_relative_to_basetemp(basename)
if not numbered: if not numbered:
p = self.getbasetemp().joinpath(basename) p = self.getbasetemp().joinpath(basename)
p.mkdir() p.mkdir(mode=0o700)
else: else:
p = make_numbered_dir(root=self.getbasetemp(), prefix=basename) p = make_numbered_dir(root=self.getbasetemp(), prefix=basename, mode=0o700)
self._trace("mktemp", p) self._trace("mktemp", p)
return p return p
def getbasetemp(self) -> Path: def getbasetemp(self) -> Path:
"""Return base temporary directory.""" """Return the base temporary directory, creating it if needed."""
if self._basetemp is not None: if self._basetemp is not None:
return self._basetemp return self._basetemp
if self._given_basetemp is not None: if self._given_basetemp is not None:
basetemp = self._given_basetemp basetemp = self._given_basetemp
ensure_reset_dir(basetemp) if basetemp.exists():
rm_rf(basetemp)
basetemp.mkdir(mode=0o700)
basetemp = basetemp.resolve() basetemp = basetemp.resolve()
else: else:
from_env = os.environ.get("PYTEST_DEBUG_TEMPROOT") from_env = os.environ.get("PYTEST_DEBUG_TEMPROOT")
@ -117,18 +119,41 @@ class TempPathFactory:
# make_numbered_dir() call # make_numbered_dir() call
rootdir = temproot.joinpath(f"pytest-of-{user}") rootdir = temproot.joinpath(f"pytest-of-{user}")
try: try:
rootdir.mkdir(exist_ok=True) rootdir.mkdir(mode=0o700, exist_ok=True)
except OSError: except OSError:
# getuser() likely returned illegal characters for the platform, use unknown back off mechanism # getuser() likely returned illegal characters for the platform, use unknown back off mechanism
rootdir = temproot.joinpath("pytest-of-unknown") rootdir = temproot.joinpath("pytest-of-unknown")
rootdir.mkdir(exist_ok=True) rootdir.mkdir(mode=0o700, exist_ok=True)
# Because we use exist_ok=True with a predictable name, make sure
# we are the owners, to prevent any funny business (on unix, where
# temproot is usually shared).
# Also, to keep things private, fixup any world-readable temp
# rootdir's permissions. Historically 0o755 was used, so we can't
# just error out on this, at least for a while.
if hasattr(os, "getuid"):
rootdir_stat = rootdir.stat()
uid = os.getuid()
# getuid shouldn't fail, but cpython defines such a case.
# Let's hope for the best.
if uid != -1:
if rootdir_stat.st_uid != uid:
raise OSError(
f"The temporary directory {rootdir} is not owned by the current user. "
"Fix this and try again."
)
if (rootdir_stat.st_mode & 0o077) != 0:
os.chmod(rootdir, rootdir_stat.st_mode & ~0o077)
basetemp = make_numbered_dir_with_cleanup( basetemp = make_numbered_dir_with_cleanup(
prefix="pytest-", root=rootdir, keep=3, lock_timeout=LOCK_TIMEOUT prefix="pytest-",
root=rootdir,
keep=3,
lock_timeout=LOCK_TIMEOUT,
mode=0o700,
) )
assert basetemp is not None, basetemp assert basetemp is not None, basetemp
self._basetemp = t = basetemp self._basetemp = basetemp
self._trace("new basetemp", t) self._trace("new basetemp", basetemp)
return t return basetemp
@final @final

View File

@ -454,3 +454,44 @@ def test_tmp_path_factory_handles_invalid_dir_characters(
monkeypatch.setattr(tmp_path_factory, "_given_basetemp", None) monkeypatch.setattr(tmp_path_factory, "_given_basetemp", None)
p = tmp_path_factory.getbasetemp() p = tmp_path_factory.getbasetemp()
assert "pytest-of-unknown" in str(p) assert "pytest-of-unknown" in str(p)
@pytest.mark.skipif(not hasattr(os, "getuid"), reason="checks unix permissions")
def test_tmp_path_factory_create_directory_with_safe_permissions(
tmp_path: Path, monkeypatch: MonkeyPatch
) -> None:
"""Verify that pytest creates directories under /tmp with private permissions."""
# Use the test's tmp_path as the system temproot (/tmp).
monkeypatch.setenv("PYTEST_DEBUG_TEMPROOT", str(tmp_path))
tmp_factory = TempPathFactory(None, lambda *args: None, _ispytest=True)
basetemp = tmp_factory.getbasetemp()
# No world-readable permissions.
assert (basetemp.stat().st_mode & 0o077) == 0
# Parent too (pytest-of-foo).
assert (basetemp.parent.stat().st_mode & 0o077) == 0
@pytest.mark.skipif(not hasattr(os, "getuid"), reason="checks unix permissions")
def test_tmp_path_factory_fixes_up_world_readable_permissions(
tmp_path: Path, monkeypatch: MonkeyPatch
) -> None:
"""Verify that if a /tmp/pytest-of-foo directory already exists with
world-readable permissions, it is fixed.
pytest used to mkdir with such permissions, that's why we fix it up.
"""
# Use the test's tmp_path as the system temproot (/tmp).
monkeypatch.setenv("PYTEST_DEBUG_TEMPROOT", str(tmp_path))
tmp_factory = TempPathFactory(None, lambda *args: None, _ispytest=True)
basetemp = tmp_factory.getbasetemp()
# Before - simulate bad perms.
os.chmod(basetemp.parent, 0o777)
assert (basetemp.parent.stat().st_mode & 0o077) != 0
tmp_factory = TempPathFactory(None, lambda *args: None, _ispytest=True)
basetemp = tmp_factory.getbasetemp()
# After - fixed.
assert (basetemp.parent.stat().st_mode & 0o077) == 0