Introduce compat.fspath

This commit is contained in:
Bruno Oliveira 2019-09-21 14:38:39 -03:00
parent f93f284356
commit 6f20b4b014
3 changed files with 45 additions and 33 deletions

View File

@ -28,6 +28,7 @@ from _pytest.assertion import util
from _pytest.assertion.util import ( # noqa: F401 from _pytest.assertion.util import ( # noqa: F401
format_explanation as _format_explanation, format_explanation as _format_explanation,
) )
from _pytest.compat import fspath
from _pytest.pathlib import fnmatch_ex from _pytest.pathlib import fnmatch_ex
from _pytest.pathlib import PurePath from _pytest.pathlib import PurePath
@ -120,7 +121,7 @@ class AssertionRewritingHook(importlib.abc.MetaPathFinder):
write = not sys.dont_write_bytecode write = not sys.dont_write_bytecode
cache_dir = get_cache_dir(fn) cache_dir = get_cache_dir(fn)
if write: if write:
ok = try_mkdir(cache_dir) ok = try_makedirs(cache_dir)
if not ok: if not ok:
write = False write = False
state.trace("read only directory: {}".format(cache_dir)) state.trace("read only directory: {}".format(cache_dir))
@ -259,7 +260,7 @@ def _write_pyc(state, co, source_stat, pyc):
# (C)Python, since these "pycs" should never be seen by builtin # (C)Python, since these "pycs" should never be seen by builtin
# import. However, there's little reason deviate. # import. However, there's little reason deviate.
try: try:
with atomicwrites.atomic_write(str(pyc), mode="wb", overwrite=True) as fp: with atomicwrites.atomic_write(fspath(pyc), mode="wb", overwrite=True) as fp:
fp.write(importlib.util.MAGIC_NUMBER) fp.write(importlib.util.MAGIC_NUMBER)
# as of now, bytecode header expects 32-bit numbers for size and mtime (#4903) # as of now, bytecode header expects 32-bit numbers for size and mtime (#4903)
mtime = int(source_stat.st_mtime) & 0xFFFFFFFF mtime = int(source_stat.st_mtime) & 0xFFFFFFFF
@ -278,7 +279,7 @@ def _write_pyc(state, co, source_stat, pyc):
def _rewrite_test(fn, config): def _rewrite_test(fn, config):
"""read and rewrite *fn* and return the code object.""" """read and rewrite *fn* and return the code object."""
fn = str(fn) fn = fspath(fn)
stat = os.stat(fn) stat = os.stat(fn)
with open(fn, "rb") as f: with open(fn, "rb") as f:
source = f.read() source = f.read()
@ -294,12 +295,12 @@ def _read_pyc(source, pyc, trace=lambda x: None):
Return rewritten code if successful or None if not. Return rewritten code if successful or None if not.
""" """
try: try:
fp = open(str(pyc), "rb") fp = open(fspath(pyc), "rb")
except IOError: except IOError:
return None return None
with fp: with fp:
try: try:
stat_result = os.stat(str(source)) stat_result = os.stat(fspath(source))
mtime = int(stat_result.st_mtime) mtime = int(stat_result.st_mtime)
size = stat_result.st_size size = stat_result.st_size
data = fp.read(12) data = fp.read(12)
@ -751,7 +752,7 @@ class AssertionRewriter(ast.NodeVisitor):
"assertion is always true, perhaps remove parentheses?" "assertion is always true, perhaps remove parentheses?"
), ),
category=None, category=None,
filename=str(self.module_path), filename=fspath(self.module_path),
lineno=assert_.lineno, lineno=assert_.lineno,
) )
@ -874,7 +875,7 @@ warn_explicit(
lineno={lineno}, lineno={lineno},
) )
""".format( """.format(
filename=str(module_path), lineno=lineno filename=fspath(module_path), lineno=lineno
) )
).body ).body
return ast.If(val_is_none, send_warning, []) return ast.If(val_is_none, send_warning, [])
@ -1020,18 +1021,15 @@ warn_explicit(
return res, self.explanation_param(self.pop_format_context(expl_call)) return res, self.explanation_param(self.pop_format_context(expl_call))
def try_mkdir(cache_dir): def try_makedirs(cache_dir) -> bool:
"""Attempts to create the given directory, returns True if successful""" """Attempts to create the given directory and sub-directories exist, returns True if
successful or it already exists"""
try: try:
os.makedirs(str(cache_dir)) os.makedirs(fspath(cache_dir), exist_ok=True)
except FileExistsError: except (FileNotFoundError, NotADirectoryError, FileExistsError):
# Either the pycache directory already exists (the # One of the path components was not a directory:
# common case) or it's blocked by a non-dir node. In the # - we're in a zip file
# latter case, we'll ignore it in _write_pyc. # - it is a file
return True
except (FileNotFoundError, NotADirectoryError):
# One of the path components was not a directory, likely
# because we're in a zip file.
return False return False
except PermissionError: except PermissionError:
return False return False

View File

@ -4,6 +4,7 @@ python version compatibility code
import functools import functools
import inspect import inspect
import io import io
import os
import re import re
import sys import sys
from contextlib import contextmanager from contextlib import contextmanager
@ -41,6 +42,19 @@ def _format_args(func):
REGEX_TYPE = type(re.compile("")) REGEX_TYPE = type(re.compile(""))
if sys.version_info < (3, 6):
def fspath(p):
"""os.fspath replacement, useful to point out when we should replace it by the
real function once we drop py35.
"""
return str(p)
else:
fspath = os.fspath
def is_generator(func): def is_generator(func):
genfunc = inspect.isgeneratorfunction(func) genfunc = inspect.isgeneratorfunction(func)
return genfunc and not iscoroutinefunction(func) return genfunc and not iscoroutinefunction(func)

View File

@ -1554,43 +1554,43 @@ def test_get_assertion_exprs(src, expected):
assert _get_assertion_exprs(src) == expected assert _get_assertion_exprs(src) == expected
def test_try_mkdir(monkeypatch, tmp_path): def test_try_makedirs(monkeypatch, tmp_path):
from _pytest.assertion.rewrite import try_mkdir from _pytest.assertion.rewrite import try_makedirs
p = tmp_path / "foo" p = tmp_path / "foo"
# create # create
assert try_mkdir(str(p)) assert try_makedirs(str(p))
assert p.is_dir() assert p.is_dir()
# already exist # already exist
assert try_mkdir(str(p)) assert try_makedirs(str(p))
# monkeypatch to simulate all error situations # monkeypatch to simulate all error situations
def fake_mkdir(p, mode, *, exc): def fake_mkdir(p, exist_ok=False, *, exc):
assert isinstance(p, str) assert isinstance(p, str)
raise exc raise exc
monkeypatch.setattr(os, "mkdir", partial(fake_mkdir, exc=FileNotFoundError())) monkeypatch.setattr(os, "makedirs", partial(fake_mkdir, exc=FileNotFoundError()))
assert not try_mkdir(str(p)) assert not try_makedirs(str(p))
monkeypatch.setattr(os, "mkdir", partial(fake_mkdir, exc=NotADirectoryError())) monkeypatch.setattr(os, "makedirs", partial(fake_mkdir, exc=NotADirectoryError()))
assert not try_mkdir(str(p)) assert not try_makedirs(str(p))
monkeypatch.setattr(os, "mkdir", partial(fake_mkdir, exc=PermissionError())) monkeypatch.setattr(os, "makedirs", partial(fake_mkdir, exc=PermissionError()))
assert not try_mkdir(str(p)) assert not try_makedirs(str(p))
err = OSError() err = OSError()
err.errno = errno.EROFS err.errno = errno.EROFS
monkeypatch.setattr(os, "mkdir", partial(fake_mkdir, exc=err)) monkeypatch.setattr(os, "makedirs", partial(fake_mkdir, exc=err))
assert not try_mkdir(str(p)) assert not try_makedirs(str(p))
# unhandled OSError should raise # unhandled OSError should raise
err = OSError() err = OSError()
err.errno = errno.ECHILD err.errno = errno.ECHILD
monkeypatch.setattr(os, "mkdir", partial(fake_mkdir, exc=err)) monkeypatch.setattr(os, "makedirs", partial(fake_mkdir, exc=err))
with pytest.raises(OSError) as exc_info: with pytest.raises(OSError) as exc_info:
try_mkdir(str(p)) try_makedirs(str(p))
assert exc_info.value.errno == errno.ECHILD assert exc_info.value.errno == errno.ECHILD