simplifying errno error class creation and introduce a py.error.checked_call helper

that creates a proper errno-specific exception instead of OSErrors.  use it from
py.path.local.

--HG--
branch : trunk
This commit is contained in:
holger krekel 2009-08-25 09:38:19 +02:00
parent 58a9e71e81
commit 739edc26b4
6 changed files with 114 additions and 135 deletions

View File

@ -164,7 +164,7 @@ initpkg(__name__,
'io.TerminalWriter' : ('./io/terminalwriter.py', 'TerminalWriter'),
# error module, defining all errno's as Classes
'error' : ('./misc/error.py', 'error'),
'error' : ('./error.py', 'error'),
# small and mean xml/html generation
'xml.__doc__' : ('./xmlobj/__init__.py', '__doc__'),

80
py/error.py Normal file
View File

@ -0,0 +1,80 @@
import sys, os, errno
class Error(EnvironmentError):
def __repr__(self):
return "%s.%s %r: %s " %(self.__class__.__module__,
self.__class__.__name__,
self.__class__.__doc__,
" ".join(map(str, self.args)),
#repr(self.args)
)
def __str__(self):
s = "[%s]: %s" %(self.__class__.__doc__,
" ".join(map(str, self.args)),
)
return s
_winerrnomap = {
2: errno.ENOENT,
3: errno.ENOENT,
17: errno.EEXIST,
22: errno.ENOTDIR,
267: errno.ENOTDIR,
5: errno.EACCES, # anything better?
}
class ErrorMaker(object):
""" lazily provides Exception classes for each possible POSIX errno
(as defined per the 'errno' module). All such instances
subclass EnvironmentError.
"""
Error = Error
_errno2class = {}
def __getattr__(self, name):
eno = getattr(errno, name)
cls = self._geterrnoclass(eno)
setattr(self, name, cls)
return cls
def _geterrnoclass(self, eno):
try:
return self._errno2class[eno]
except KeyError:
clsname = errno.errorcode.get(eno, "UnknownErrno%d" %(eno,))
errorcls = type(Error)(clsname, (Error,),
{'__module__':'py.error',
'__doc__': os.strerror(eno)})
self._errno2class[eno] = errorcls
return errorcls
def checked_call(self, func, *args):
""" call a function and raise an errno-exception if applicable. """
__tracebackhide__ = True
try:
return func(*args)
except self.Error:
raise
except EnvironmentError:
cls, value, tb = sys.exc_info()
if not hasattr(value, 'errno'):
raise
__tracebackhide__ = False
errno = value.errno
try:
if not isinstance(value, WindowsError):
raise NameError
except NameError:
# we are not on Windows, or we got a proper OSError
cls = self._geterrnoclass(errno)
else:
try:
cls = self._geterrnoclass(_winerrnomap[eno])
except KeyError:
raise value
raise cls("%s%r" % (func.__name__, args))
__tracebackhide__ = True
error = ErrorMaker()

View File

@ -1,79 +0,0 @@
import py
import errno
class Error(EnvironmentError):
__module__ = 'py.error'
def __repr__(self):
return "%s.%s %r: %s " %(self.__class__.__module__,
self.__class__.__name__,
self.__class__.__doc__,
" ".join(map(str, self.args)),
#repr(self.args)
)
def __str__(self):
return "[%s]: %s" %(self.__class__.__doc__,
" ".join(map(str, self.args)),
)
_winerrnomap = {
2: errno.ENOENT,
3: errno.ENOENT,
17: errno.EEXIST,
22: errno.ENOTDIR,
267: errno.ENOTDIR,
5: errno.EACCES, # anything better?
}
# note: 'py.std' may not be imported yet at all, because
# the 'error' module in this file is imported very early.
# This is dependent on dict order.
ModuleType = type(py)
class py_error(ModuleType):
""" py.error lazily provides higher level Exception classes
for each possible POSIX errno (as defined per
the 'errno' module. All such Exceptions derive
from py.error.Error, which itself is a subclass
of EnvironmentError.
"""
Error = Error
def _getwinerrnoclass(cls, eno):
return cls._geterrnoclass(_winerrnomap[eno])
_getwinerrnoclass = classmethod(_getwinerrnoclass)
def _geterrnoclass(eno, _errno2class = {}):
try:
return _errno2class[eno]
except KeyError:
clsname = py.std.errno.errorcode.get(eno, "UnknownErrno%d" %(eno,))
cls = type(Error)(clsname, (Error,),
{'__module__':'py.error',
'__doc__': py.std.os.strerror(eno)})
_errno2class[eno] = cls
return cls
_geterrnoclass = staticmethod(_geterrnoclass)
def __getattr__(self, name):
eno = getattr(py.std.errno, name)
cls = self._geterrnoclass(eno)
setattr(self, name, cls)
return cls
def getdict(self, done=[]):
try:
return done[0]
except IndexError:
for name in py.std.errno.errorcode.values():
hasattr(self, name) # force attribute to be loaded, ignore errors
dictdescr = ModuleType.__dict__['__dict__']
done.append(dictdescr.__get__(self))
return done[0]
__dict__ = property(getdict)
del getdict
error = py_error('py.error', py_error.__doc__)

View File

@ -18,3 +18,9 @@ def test_unknown_error():
cls2 = py.error._geterrnoclass(num)
assert cls is cls2
def test_error_conversion_ENOTDIR(testdir):
p = testdir.makepyfile("")
excinfo = py.test.raises(py.error.Error, py.error.checked_call, p.listdir)
assert isinstance(excinfo.value, EnvironmentError)
assert isinstance(excinfo.value, py.error.Error)
assert "ENOTDIR" in repr(excinfo.value)

View File

@ -75,7 +75,7 @@ class Checkers:
return False
return True
class _dummyclass:
class NeverRaised(Exception):
pass
class PathBase(object):
@ -136,7 +136,7 @@ newline will be removed from the end of each line. """
f = self.open('rb')
try:
from cPickle import load
return self._callex(load, f)
return py.error.checked_call(load, f)
finally:
f.close()
@ -253,7 +253,7 @@ newline will be removed from the end of each line. """
except AttributeError:
return cmp(str(self), str(other)) # self.path, other.path)
def visit(self, fil=None, rec=None, ignore=_dummyclass):
def visit(self, fil=None, rec=None, ignore=NeverRaised):
""" yields all paths below the current one
fil is a filter (glob pattern or callable), if not matching the
@ -286,34 +286,6 @@ newline will be removed from the end of each line. """
if p.check(dir=1) and (rec is None or rec(p)):
reclist.append(p)
def _callex(self, func, *args):
""" call a function and raise errno-exception if applicable. """
__tracebackhide__ = True
try:
return func(*args)
except py.error.Error:
raise
except EnvironmentError:
cls, value, tb = sys.exc_info()
if not hasattr(value, 'errno'):
raise
__tracebackhide__ = False
errno = value.errno
try:
if not isinstance(value, WindowsError):
raise NameError
except NameError:
# we are not on Windows, or we got a proper OSError
cls = py.error._geterrnoclass(errno)
else:
try:
cls = py.error._getwinerrnoclass(errno)
except KeyError:
raise cls(value) # tb?
value = cls("%s%r" % (func.__name__, args))
__tracebackhide__ = True
raise cls(value)
class FNMatcher:
def __init__(self, pattern):
self.pattern = pattern

View File

@ -20,7 +20,7 @@ class Stat(object):
if iswin32:
raise NotImplementedError("XXX win32")
import pwd
entry = self.path._callex(pwd.getpwuid, self.uid)
entry = py.error.checked_call(pwd.getpwuid, self.uid)
return entry[0]
owner = property(owner, None, None, "owner of path")
@ -29,7 +29,7 @@ class Stat(object):
if iswin32:
raise NotImplementedError("XXX win32")
import grp
entry = self.path._callex(grp.getgrgid, self.gid)
entry = py.error.checked_call(grp.getgrgid, self.gid)
return entry[0]
group = property(group)
@ -45,21 +45,21 @@ class PosixPath(common.PathBase):
if rec:
for x in self.visit(rec=lambda x: x.check(link=0)):
if x.check(link=0):
self._callex(os.chown, str(x), uid, gid)
self._callex(os.chown, str(self), uid, gid)
py.error.checked_call(os.chown, str(x), uid, gid)
py.error.checked_call(os.chown, str(self), uid, gid)
def readlink(self):
""" return value of a symbolic link. """
return self._callex(os.readlink, self.strpath)
return py.error.checked_call(os.readlink, self.strpath)
def mklinkto(self, oldname):
""" posix style hard link to another name. """
self._callex(os.link, str(oldname), str(self))
py.error.checked_call(os.link, str(oldname), str(self))
def mksymlinkto(self, value, absolute=1):
""" create a symbolic link with the given value (pointing to another name). """
if absolute:
self._callex(os.symlink, str(value), self.strpath)
py.error.checked_call(os.symlink, str(value), self.strpath)
else:
base = self.common(value)
# with posix local paths '/' is always a common base
@ -67,7 +67,7 @@ class PosixPath(common.PathBase):
reldest = self.relto(base)
n = reldest.count(self.sep)
target = self.sep.join(('..', )*n + (relsource, ))
self._callex(os.symlink, target, self.strpath)
py.error.checked_call(os.symlink, target, self.strpath)
def samefile(self, other):
""" return True if other refers to the same stat object as self. """
@ -151,13 +151,13 @@ class LocalPath(FSBase):
# force remove of readonly files on windows
if iswin32:
self.chmod(448, rec=1) # octcal 0700
self._callex(py.std.shutil.rmtree, self.strpath)
py.error.checked_call(py.std.shutil.rmtree, self.strpath)
else:
self._callex(os.rmdir, self.strpath)
py.error.checked_call(os.rmdir, self.strpath)
else:
if iswin32:
self.chmod(448) # octcal 0700
self._callex(os.remove, self.strpath)
py.error.checked_call(os.remove, self.strpath)
def computehash(self, hashtype="md5", chunksize=524288):
""" return hexdigest of hashvalue for this file. """
@ -293,7 +293,7 @@ class LocalPath(FSBase):
def open(self, mode='r'):
""" return an opened file with the given mode. """
return self._callex(open, self.strpath, mode)
return py.error.checked_call(open, self.strpath, mode)
def listdir(self, fil=None, sort=None):
""" list directory contents, possibly filter by the given fil func
@ -302,7 +302,7 @@ class LocalPath(FSBase):
if isinstance(fil, str):
fil = common.FNMatcher(fil)
res = []
for name in self._callex(os.listdir, self.strpath):
for name in py.error.checked_call(os.listdir, self.strpath):
childurl = self.join(name)
if fil is None or fil(childurl):
res.append(childurl)
@ -344,20 +344,20 @@ class LocalPath(FSBase):
def rename(self, target):
""" rename this path to target. """
return self._callex(os.rename, str(self), str(target))
return py.error.checked_call(os.rename, str(self), str(target))
def dump(self, obj, bin=1):
""" pickle object into path location"""
f = self.open('wb')
try:
self._callex(py.std.cPickle.dump, obj, f, bin)
py.error.checked_call(py.std.cPickle.dump, obj, f, bin)
finally:
f.close()
def mkdir(self, *args):
""" create & return the directory joined with args. """
p = self.join(*args)
self._callex(os.mkdir, str(p))
py.error.checked_call(os.mkdir, str(p))
return p
def write(self, content, mode='wb'):
@ -401,11 +401,11 @@ class LocalPath(FSBase):
def stat(self):
""" Return an os.stat() tuple. """
return Stat(self, self._callex(os.stat, self.strpath))
return Stat(self, py.error.checked_call(os.stat, self.strpath))
def lstat(self):
""" Return an os.lstat() tuple. """
return Stat(self, self._callex(os.lstat, self.strpath))
return Stat(self, py.error.checked_call(os.lstat, self.strpath))
def setmtime(self, mtime=None):
""" set modification time for the given path. if 'mtime' is None
@ -414,16 +414,16 @@ class LocalPath(FSBase):
Note that the resolution for 'mtime' is platform dependent.
"""
if mtime is None:
return self._callex(os.utime, self.strpath, mtime)
return py.error.checked_call(os.utime, self.strpath, mtime)
try:
return self._callex(os.utime, self.strpath, (-1, mtime))
return py.error.checked_call(os.utime, self.strpath, (-1, mtime))
except py.error.EINVAL:
return self._callex(os.utime, self.strpath, (self.atime(), mtime))
return py.error.checked_call(os.utime, self.strpath, (self.atime(), mtime))
def chdir(self):
""" change directory to self and return old current directory """
old = self.__class__()
self._callex(os.chdir, self.strpath)
py.error.checked_call(os.chdir, self.strpath)
return old
def realpath(self):
@ -476,8 +476,8 @@ class LocalPath(FSBase):
raise TypeError("mode %r must be an integer" % (mode,))
if rec:
for x in self.visit(rec=rec):
self._callex(os.chmod, str(x), mode)
self._callex(os.chmod, str(self), mode)
py.error.checked_call(os.chmod, str(x), mode)
py.error.checked_call(os.chmod, str(self), mode)
def pyimport(self, modname=None, ensuresyspath=True):
""" return path as an imported python module.