consolidate svn path implementations and tests into files named after the package namespaces.

--HG--
branch : trunk
This commit is contained in:
holger krekel 2009-08-20 20:35:35 +02:00
parent f3fcb5e6d3
commit 5118821c10
14 changed files with 515 additions and 542 deletions

View File

@ -104,10 +104,10 @@ initpkg(__name__,
# path implementation
'path.__doc__' : ('./path/__init__.py', '__doc__'),
'path.svnwc' : ('./path/svn/wccommand.py', 'SvnWCCommandPath'),
'path.svnurl' : ('./path/svn/urlcommand.py', 'SvnCommandPath'),
'path.svnwc' : ('./path/svnwc.py', 'SvnWCCommandPath'),
'path.svnurl' : ('./path/svnurl.py', 'SvnCommandPath'),
'path.local' : ('./path/local.py', 'LocalPath'),
'path.SvnAuth' : ('./path/svn/svncommon.py', 'SvnAuth'),
'path.SvnAuth' : ('./path/svnwc.py', 'SvnAuth'),
# some nice slightly magic APIs
'magic.__doc__' : ('./magic/__init__.py', '__doc__'),

View File

@ -1 +0,0 @@
#

View File

@ -1,69 +0,0 @@
"""
# generic cache mechanism for subversion-related structures
# XXX make mt-safe
"""
import time
proplist = {}
info = {}
entries = {}
prop = {}
#-----------------------------------------------------------
# Caching latest repository revision and repo-paths
# (getting them is slow with the current implementations)
#
# XXX make mt-safe
#-----------------------------------------------------------
class RepoEntry:
def __init__(self, url, rev, timestamp):
self.url = url
self.rev = rev
self.timestamp = timestamp
def __str__(self):
return "repo: %s;%s %s" %(self.url, self.rev, self.timestamp)
class RepoCache:
""" The Repocache manages discovered repository paths
and their revisions. If inside a timeout the cache
will even return the revision of the root.
"""
timeout = 20 # seconds after which we forget that we know the last revision
def __init__(self):
self.repos = []
def clear(self):
self.repos = []
def put(self, url, rev, timestamp=None):
if rev is None:
return
if timestamp is None:
timestamp = time.time()
for entry in self.repos:
if url == entry.url:
entry.timestamp = timestamp
entry.rev = rev
#print "set repo", entry
break
else:
entry = RepoEntry(url, rev, timestamp)
self.repos.append(entry)
#print "appended repo", entry
def get(self, url):
now = time.time()
for entry in self.repos:
if url.startswith(entry.url):
if now < entry.timestamp + self.timeout:
#print "returning immediate Etrny", entry
return entry.url, entry.rev
return entry.url, -1
return url, -1
repositories = RepoCache()

View File

@ -1,368 +0,0 @@
"""
module with a base subversion path object.
"""
import os, sys, time, re, string
import py
from py.__.path import common
ALLOWED_CHARS = "_ -/\\=$.~+" #add characters as necessary when tested
if sys.platform == "win32":
ALLOWED_CHARS += ":"
ALLOWED_CHARS_HOST = ALLOWED_CHARS + '@:'
def _getsvnversion(ver=[]):
try:
return ver[0]
except IndexError:
v = py.process.cmdexec("svn -q --version")
v.strip()
v = '.'.join(v.split('.')[:2])
ver.append(v)
return v
def _escape_helper(text):
text = str(text)
if py.std.sys.platform != 'win32':
text = str(text).replace('$', '\\$')
return text
def _check_for_bad_chars(text, allowed_chars=ALLOWED_CHARS):
for c in str(text):
if c.isalnum():
continue
if c in allowed_chars:
continue
return True
return False
def checkbadchars(url):
# (hpk) not quite sure about the exact purpose, guido w.?
proto, uri = url.split("://", 1)
if proto != "file":
host, uripath = uri.split('/', 1)
# only check for bad chars in the non-protocol parts
if (_check_for_bad_chars(host, ALLOWED_CHARS_HOST) \
or _check_for_bad_chars(uripath, ALLOWED_CHARS)):
raise ValueError("bad char in %r" % (url, ))
#_______________________________________________________________
class SvnPathBase(common.PathBase):
""" Base implementation for SvnPath implementations. """
sep = '/'
def _geturl(self):
return self.strpath
url = property(_geturl, None, None, "url of this svn-path.")
def __str__(self):
""" return a string representation (including rev-number) """
return self.strpath
def __hash__(self):
return hash(self.strpath)
def new(self, **kw):
""" create a modified version of this path. A 'rev' argument
indicates a new revision.
the following keyword arguments modify various path parts:
http://host.com/repo/path/file.ext
|-----------------------| dirname
|------| basename
|--| purebasename
|--| ext
"""
obj = object.__new__(self.__class__)
obj.rev = kw.get('rev', self.rev)
obj.auth = kw.get('auth', self.auth)
dirname, basename, purebasename, ext = self._getbyspec(
"dirname,basename,purebasename,ext")
if 'basename' in kw:
if 'purebasename' in kw or 'ext' in kw:
raise ValueError("invalid specification %r" % kw)
else:
pb = kw.setdefault('purebasename', purebasename)
ext = kw.setdefault('ext', ext)
if ext and not ext.startswith('.'):
ext = '.' + ext
kw['basename'] = pb + ext
kw.setdefault('dirname', dirname)
kw.setdefault('sep', self.sep)
if kw['basename']:
obj.strpath = "%(dirname)s%(sep)s%(basename)s" % kw
else:
obj.strpath = "%(dirname)s" % kw
return obj
def _getbyspec(self, spec):
""" get specified parts of the path. 'arg' is a string
with comma separated path parts. The parts are returned
in exactly the order of the specification.
you may specify the following parts:
http://host.com/repo/path/file.ext
|-----------------------| dirname
|------| basename
|--| purebasename
|--| ext
"""
res = []
parts = self.strpath.split(self.sep)
for name in spec.split(','):
name = name.strip()
if name == 'dirname':
res.append(self.sep.join(parts[:-1]))
elif name == 'basename':
res.append(parts[-1])
else:
basename = parts[-1]
i = basename.rfind('.')
if i == -1:
purebasename, ext = basename, ''
else:
purebasename, ext = basename[:i], basename[i:]
if name == 'purebasename':
res.append(purebasename)
elif name == 'ext':
res.append(ext)
else:
raise NameError, "Don't know part %r" % name
return res
def __eq__(self, other):
""" return true if path and rev attributes each match """
return (str(self) == str(other) and
(self.rev == other.rev or self.rev == other.rev))
def __ne__(self, other):
return not self == other
def join(self, *args):
""" return a new Path (with the same revision) which is composed
of the self Path followed by 'args' path components.
"""
if not args:
return self
args = tuple([arg.strip(self.sep) for arg in args])
parts = (self.strpath, ) + args
newpath = self.__class__(self.sep.join(parts), self.rev, self.auth)
return newpath
def propget(self, name):
""" return the content of the given property. """
value = self._propget(name)
return value
def proplist(self):
""" list all property names. """
content = self._proplist()
return content
def listdir(self, fil=None, sort=None):
""" list directory contents, possibly filter by the given fil func
and possibly sorted.
"""
if isinstance(fil, str):
fil = common.FNMatcher(fil)
nameinfo_seq = self._listdir_nameinfo()
if len(nameinfo_seq) == 1:
name, info = nameinfo_seq[0]
if name == self.basename and info.kind == 'file':
#if not self.check(dir=1):
raise py.error.ENOTDIR(self)
paths = self._make_path_tuple(nameinfo_seq)
if fil or sort:
paths = filter(fil, paths)
paths = isinstance(paths, list) and paths or list(paths)
if callable(sort):
paths.sort(sort)
elif sort:
paths.sort()
return paths
def info(self):
""" return an Info structure with svn-provided information. """
parent = self.dirpath()
nameinfo_seq = parent._listdir_nameinfo()
bn = self.basename
for name, info in nameinfo_seq:
if name == bn:
return info
raise py.error.ENOENT(self)
def size(self):
""" Return the size of the file content of the Path. """
return self.info().size
def mtime(self):
""" Return the last modification time of the file. """
return self.info().mtime
# shared help methods
def _escape(self, cmd):
return _escape_helper(cmd)
def _make_path_tuple(self, nameinfo_seq):
""" return a tuple of paths from a nameinfo-tuple sequence.
"""
#assert self.rev is not None, "revision of %s should not be None here" % self
res = []
for name, info in nameinfo_seq:
child = self.join(name)
res.append(child)
return tuple(res)
def _childmaxrev(self):
""" return maximum revision number of childs (or self.rev if no childs) """
rev = self.rev
for name, info in self._listdir_nameinfo():
rev = max(rev, info.created_rev)
return rev
#def _getlatestrevision(self):
# """ return latest repo-revision for this path. """
# url = self.strpath
# path = self.__class__(url, None)
#
# # we need a long walk to find the root-repo and revision
# while 1:
# try:
# rev = max(rev, path._childmaxrev())
# previous = path
# path = path.dirpath()
# except (IOError, process.cmdexec.Error):
# break
# if rev is None:
# raise IOError, "could not determine newest repo revision for %s" % self
# return rev
class Checkers(common.Checkers):
def dir(self):
try:
return self.path.info().kind == 'dir'
except py.error.Error:
return self._listdirworks()
def _listdirworks(self):
try:
self.path.listdir()
except py.error.ENOENT:
return False
else:
return True
def file(self):
try:
return self.path.info().kind == 'file'
except py.error.ENOENT:
return False
def exists(self):
try:
return self.path.info()
except py.error.ENOENT:
return self._listdirworks()
def parse_apr_time(timestr):
i = timestr.rfind('.')
if i == -1:
raise ValueError, "could not parse %s" % timestr
timestr = timestr[:i]
parsedtime = time.strptime(timestr, "%Y-%m-%dT%H:%M:%S")
return time.mktime(parsedtime)
class PropListDict(dict):
""" a Dictionary which fetches values (InfoSvnCommand instances) lazily"""
def __init__(self, path, keynames):
dict.__init__(self, [(x, None) for x in keynames])
self.path = path
def __getitem__(self, key):
value = dict.__getitem__(self, key)
if value is None:
value = self.path.propget(key)
dict.__setitem__(self, key, value)
return value
def fixlocale():
if sys.platform != 'win32':
return 'LC_ALL=C '
return ''
# some nasty chunk of code to solve path and url conversion and quoting issues
ILLEGAL_CHARS = '* | \ / : < > ? \t \n \x0b \x0c \r'.split(' ')
if os.sep in ILLEGAL_CHARS:
ILLEGAL_CHARS.remove(os.sep)
ISWINDOWS = sys.platform == 'win32'
_reg_allow_disk = re.compile(r'^([a-z]\:\\)?[^:]+$', re.I)
def _check_path(path):
illegal = ILLEGAL_CHARS[:]
sp = path.strpath
if ISWINDOWS:
illegal.remove(':')
if not _reg_allow_disk.match(sp):
raise ValueError('path may not contain a colon (:)')
for char in sp:
if char not in string.printable or char in illegal:
raise ValueError('illegal character %r in path' % (char,))
def path_to_fspath(path, addat=True):
_check_path(path)
sp = path.strpath
if addat and path.rev != -1:
sp = '%s@%s' % (sp, path.rev)
elif addat:
sp = '%s@HEAD' % (sp,)
return sp
def url_from_path(path):
fspath = path_to_fspath(path, False)
quote = py.std.urllib.quote
if ISWINDOWS:
match = _reg_allow_disk.match(fspath)
fspath = fspath.replace('\\', '/')
if match.group(1):
fspath = '/%s%s' % (match.group(1).replace('\\', '/'),
quote(fspath[len(match.group(1)):]))
else:
fspath = quote(fspath)
else:
fspath = quote(fspath)
if path.rev != -1:
fspath = '%s@%s' % (fspath, path.rev)
else:
fspath = '%s@HEAD' % (fspath,)
return 'file://%s' % (fspath,)
class SvnAuth(object):
""" container for auth information for Subversion """
def __init__(self, username, password, cache_auth=True, interactive=True):
self.username = username
self.password = password
self.cache_auth = cache_auth
self.interactive = interactive
def makecmdoptions(self):
uname = self.username.replace('"', '\\"')
passwd = self.password.replace('"', '\\"')
ret = []
if uname:
ret.append('--username="%s"' % (uname,))
if passwd:
ret.append('--password="%s"' % (passwd,))
if not self.cache_auth:
ret.append('--no-auth-cache')
if not self.interactive:
ret.append('--non-interactive')
return ' '.join(ret)
def __str__(self):
return "<SvnAuth username=%s ...>" %(self.username,)

View File

@ -1 +0,0 @@
#

View File

@ -1,25 +0,0 @@
import py
from py.__.path.svn.testing.svntestbase import make_test_repo, getsvnbin
class TestMakeRepo(object):
def setup_class(cls):
getsvnbin()
cls.repo = make_test_repo()
cls.wc = py.path.svnwc(py.test.ensuretemp("test-wc").join("wc"))
def test_empty_checkout(self):
self.wc.checkout(self.repo)
assert len(self.wc.listdir()) == 0
def test_commit(self):
self.wc.checkout(self.repo)
p = self.wc.join("a_file")
p.write("test file")
p.add()
rev = self.wc.commit("some test")
assert p.info().rev == 1
assert rev == 1
rev = self.wc.commit()
assert rev is None

View File

@ -1,16 +1,14 @@
"""
module defining a subversion path object based on the external
command 'svn'. This modules aims to work with svn 1.3 and higher
but might also interact well with earlier versions.
"""
import os, sys, time, re, calendar
import os, sys, time, re
import py
from py import path, process
from py.__.path import common
from py.__.path.svn import svncommon
from py.__.path import svnwc as svncommon
from py.__.misc.cache import BuildcostAccessCache, AgingCache
DEBUG=False
@ -253,10 +251,10 @@ rev_end is the last revision (defaulting to HEAD).
if verbose is True, then the LogEntry instances also know which files changed.
"""
assert self.check() #make it simpler for the pipe
rev_start = rev_start is None and _Head or rev_start
rev_end = rev_end is None and _Head or rev_end
rev_start = rev_start is None and "HEAD" or rev_start
rev_end = rev_end is None and "HEAD" or rev_end
if rev_start is _Head and rev_end == 1:
if rev_start == "HEAD" and rev_end == 1:
rev_opt = ""
else:
rev_opt = "-r %s:%s" % (rev_start, rev_end)
@ -268,7 +266,7 @@ if verbose is True, then the LogEntry instances also know which files changed.
result = []
for logentry in filter(None, tree.firstChild.childNodes):
if logentry.nodeType == logentry.ELEMENT_NODE:
result.append(LogEntry(logentry))
result.append(svncommon.LogEntry(logentry))
return result
#01234567890123456789012345678901234567890123467
@ -313,6 +311,7 @@ def parse_time_with_missing_year(timestr):
the svn output doesn't show the year makes the 'timestr'
ambigous.
"""
import calendar
t_now = time.gmtime()
tparts = timestr.split()
@ -341,31 +340,3 @@ class PathEntry:
if self.copyfrom_path:
self.copyfrom_rev = int(ppart.getAttribute('copyfrom-rev'))
class LogEntry:
def __init__(self, logentry):
self.rev = int(logentry.getAttribute('revision'))
for lpart in filter(None, logentry.childNodes):
if lpart.nodeType == lpart.ELEMENT_NODE:
if lpart.nodeName == u'author':
self.author = lpart.firstChild.nodeValue.encode('UTF-8')
elif lpart.nodeName == u'msg':
if lpart.firstChild:
self.msg = lpart.firstChild.nodeValue.encode('UTF-8')
else:
self.msg = ''
elif lpart.nodeName == u'date':
#2003-07-29T20:05:11.598637Z
timestr = lpart.firstChild.nodeValue.encode('UTF-8')
self.date = svncommon.parse_apr_time(timestr)
elif lpart.nodeName == u'paths':
self.strpaths = []
for ppart in filter(None, lpart.childNodes):
if ppart.nodeType == ppart.ELEMENT_NODE:
self.strpaths.append(PathEntry(ppart))
def __repr__(self):
return '<Logentry rev=%d author=%s date=%s>' % (
self.rev, self.author, self.date)
_Head = "HEAD"

View File

@ -1,20 +1,442 @@
"""
svn-Command based Implementation of a Subversion WorkingCopy Path.
SvnWCCommandPath is the main class.
SvnWC is an alias to this class.
"""
import os, sys, time, re, calendar
import py
from py.__.path import common
from py.__.path.svn import cache
from py.__.path.svn import svncommon
DEBUG = 0
#-----------------------------------------------------------
# Caching latest repository revision and repo-paths
# (getting them is slow with the current implementations)
#
# XXX make mt-safe
#-----------------------------------------------------------
class cache:
proplist = {}
info = {}
entries = {}
prop = {}
class RepoEntry:
def __init__(self, url, rev, timestamp):
self.url = url
self.rev = rev
self.timestamp = timestamp
def __str__(self):
return "repo: %s;%s %s" %(self.url, self.rev, self.timestamp)
class RepoCache:
""" The Repocache manages discovered repository paths
and their revisions. If inside a timeout the cache
will even return the revision of the root.
"""
timeout = 20 # seconds after which we forget that we know the last revision
def __init__(self):
self.repos = []
def clear(self):
self.repos = []
def put(self, url, rev, timestamp=None):
if rev is None:
return
if timestamp is None:
timestamp = time.time()
for entry in self.repos:
if url == entry.url:
entry.timestamp = timestamp
entry.rev = rev
#print "set repo", entry
break
else:
entry = RepoEntry(url, rev, timestamp)
self.repos.append(entry)
#print "appended repo", entry
def get(self, url):
now = time.time()
for entry in self.repos:
if url.startswith(entry.url):
if now < entry.timestamp + self.timeout:
#print "returning immediate Etrny", entry
return entry.url, entry.rev
return entry.url, -1
return url, -1
repositories = RepoCache()
# svn support code
ALLOWED_CHARS = "_ -/\\=$.~+" #add characters as necessary when tested
if sys.platform == "win32":
ALLOWED_CHARS += ":"
ALLOWED_CHARS_HOST = ALLOWED_CHARS + '@:'
def _getsvnversion(ver=[]):
try:
return ver[0]
except IndexError:
v = py.process.cmdexec("svn -q --version")
v.strip()
v = '.'.join(v.split('.')[:2])
ver.append(v)
return v
def _escape_helper(text):
text = str(text)
if py.std.sys.platform != 'win32':
text = str(text).replace('$', '\\$')
return text
def _check_for_bad_chars(text, allowed_chars=ALLOWED_CHARS):
for c in str(text):
if c.isalnum():
continue
if c in allowed_chars:
continue
return True
return False
def checkbadchars(url):
# (hpk) not quite sure about the exact purpose, guido w.?
proto, uri = url.split("://", 1)
if proto != "file":
host, uripath = uri.split('/', 1)
# only check for bad chars in the non-protocol parts
if (_check_for_bad_chars(host, ALLOWED_CHARS_HOST) \
or _check_for_bad_chars(uripath, ALLOWED_CHARS)):
raise ValueError("bad char in %r" % (url, ))
#_______________________________________________________________
class SvnPathBase(common.PathBase):
""" Base implementation for SvnPath implementations. """
sep = '/'
def _geturl(self):
return self.strpath
url = property(_geturl, None, None, "url of this svn-path.")
def __str__(self):
""" return a string representation (including rev-number) """
return self.strpath
def __hash__(self):
return hash(self.strpath)
def new(self, **kw):
""" create a modified version of this path. A 'rev' argument
indicates a new revision.
the following keyword arguments modify various path parts:
http://host.com/repo/path/file.ext
|-----------------------| dirname
|------| basename
|--| purebasename
|--| ext
"""
obj = object.__new__(self.__class__)
obj.rev = kw.get('rev', self.rev)
obj.auth = kw.get('auth', self.auth)
dirname, basename, purebasename, ext = self._getbyspec(
"dirname,basename,purebasename,ext")
if 'basename' in kw:
if 'purebasename' in kw or 'ext' in kw:
raise ValueError("invalid specification %r" % kw)
else:
pb = kw.setdefault('purebasename', purebasename)
ext = kw.setdefault('ext', ext)
if ext and not ext.startswith('.'):
ext = '.' + ext
kw['basename'] = pb + ext
kw.setdefault('dirname', dirname)
kw.setdefault('sep', self.sep)
if kw['basename']:
obj.strpath = "%(dirname)s%(sep)s%(basename)s" % kw
else:
obj.strpath = "%(dirname)s" % kw
return obj
def _getbyspec(self, spec):
""" get specified parts of the path. 'arg' is a string
with comma separated path parts. The parts are returned
in exactly the order of the specification.
you may specify the following parts:
http://host.com/repo/path/file.ext
|-----------------------| dirname
|------| basename
|--| purebasename
|--| ext
"""
res = []
parts = self.strpath.split(self.sep)
for name in spec.split(','):
name = name.strip()
if name == 'dirname':
res.append(self.sep.join(parts[:-1]))
elif name == 'basename':
res.append(parts[-1])
else:
basename = parts[-1]
i = basename.rfind('.')
if i == -1:
purebasename, ext = basename, ''
else:
purebasename, ext = basename[:i], basename[i:]
if name == 'purebasename':
res.append(purebasename)
elif name == 'ext':
res.append(ext)
else:
raise NameError, "Don't know part %r" % name
return res
def __eq__(self, other):
""" return true if path and rev attributes each match """
return (str(self) == str(other) and
(self.rev == other.rev or self.rev == other.rev))
def __ne__(self, other):
return not self == other
def join(self, *args):
""" return a new Path (with the same revision) which is composed
of the self Path followed by 'args' path components.
"""
if not args:
return self
args = tuple([arg.strip(self.sep) for arg in args])
parts = (self.strpath, ) + args
newpath = self.__class__(self.sep.join(parts), self.rev, self.auth)
return newpath
def propget(self, name):
""" return the content of the given property. """
value = self._propget(name)
return value
def proplist(self):
""" list all property names. """
content = self._proplist()
return content
def listdir(self, fil=None, sort=None):
""" list directory contents, possibly filter by the given fil func
and possibly sorted.
"""
if isinstance(fil, str):
fil = common.FNMatcher(fil)
nameinfo_seq = self._listdir_nameinfo()
if len(nameinfo_seq) == 1:
name, info = nameinfo_seq[0]
if name == self.basename and info.kind == 'file':
#if not self.check(dir=1):
raise py.error.ENOTDIR(self)
paths = self._make_path_tuple(nameinfo_seq)
if fil or sort:
paths = filter(fil, paths)
paths = isinstance(paths, list) and paths or list(paths)
if callable(sort):
paths.sort(sort)
elif sort:
paths.sort()
return paths
def info(self):
""" return an Info structure with svn-provided information. """
parent = self.dirpath()
nameinfo_seq = parent._listdir_nameinfo()
bn = self.basename
for name, info in nameinfo_seq:
if name == bn:
return info
raise py.error.ENOENT(self)
def size(self):
""" Return the size of the file content of the Path. """
return self.info().size
def mtime(self):
""" Return the last modification time of the file. """
return self.info().mtime
# shared help methods
def _escape(self, cmd):
return _escape_helper(cmd)
def _make_path_tuple(self, nameinfo_seq):
""" return a tuple of paths from a nameinfo-tuple sequence.
"""
#assert self.rev is not None, "revision of %s should not be None here" % self
res = []
for name, info in nameinfo_seq:
child = self.join(name)
res.append(child)
return tuple(res)
def _childmaxrev(self):
""" return maximum revision number of childs (or self.rev if no childs) """
rev = self.rev
for name, info in self._listdir_nameinfo():
rev = max(rev, info.created_rev)
return rev
#def _getlatestrevision(self):
# """ return latest repo-revision for this path. """
# url = self.strpath
# path = self.__class__(url, None)
#
# # we need a long walk to find the root-repo and revision
# while 1:
# try:
# rev = max(rev, path._childmaxrev())
# previous = path
# path = path.dirpath()
# except (IOError, process.cmdexec.Error):
# break
# if rev is None:
# raise IOError, "could not determine newest repo revision for %s" % self
# return rev
class Checkers(common.Checkers):
def dir(self):
try:
return self.path.info().kind == 'dir'
except py.error.Error:
return self._listdirworks()
def _listdirworks(self):
try:
self.path.listdir()
except py.error.ENOENT:
return False
else:
return True
def file(self):
try:
return self.path.info().kind == 'file'
except py.error.ENOENT:
return False
def exists(self):
try:
return self.path.info()
except py.error.ENOENT:
return self._listdirworks()
def parse_apr_time(timestr):
i = timestr.rfind('.')
if i == -1:
raise ValueError, "could not parse %s" % timestr
timestr = timestr[:i]
parsedtime = time.strptime(timestr, "%Y-%m-%dT%H:%M:%S")
return time.mktime(parsedtime)
class PropListDict(dict):
""" a Dictionary which fetches values (InfoSvnCommand instances) lazily"""
def __init__(self, path, keynames):
dict.__init__(self, [(x, None) for x in keynames])
self.path = path
def __getitem__(self, key):
value = dict.__getitem__(self, key)
if value is None:
value = self.path.propget(key)
dict.__setitem__(self, key, value)
return value
def fixlocale():
if sys.platform != 'win32':
return 'LC_ALL=C '
return ''
# some nasty chunk of code to solve path and url conversion and quoting issues
ILLEGAL_CHARS = '* | \ / : < > ? \t \n \x0b \x0c \r'.split(' ')
if os.sep in ILLEGAL_CHARS:
ILLEGAL_CHARS.remove(os.sep)
ISWINDOWS = sys.platform == 'win32'
_reg_allow_disk = re.compile(r'^([a-z]\:\\)?[^:]+$', re.I)
def _check_path(path):
illegal = ILLEGAL_CHARS[:]
sp = path.strpath
if ISWINDOWS:
illegal.remove(':')
if not _reg_allow_disk.match(sp):
raise ValueError('path may not contain a colon (:)')
for char in sp:
if char not in string.printable or char in illegal:
raise ValueError('illegal character %r in path' % (char,))
def path_to_fspath(path, addat=True):
_check_path(path)
sp = path.strpath
if addat and path.rev != -1:
sp = '%s@%s' % (sp, path.rev)
elif addat:
sp = '%s@HEAD' % (sp,)
return sp
def url_from_path(path):
fspath = path_to_fspath(path, False)
quote = py.std.urllib.quote
if ISWINDOWS:
match = _reg_allow_disk.match(fspath)
fspath = fspath.replace('\\', '/')
if match.group(1):
fspath = '/%s%s' % (match.group(1).replace('\\', '/'),
quote(fspath[len(match.group(1)):]))
else:
fspath = quote(fspath)
else:
fspath = quote(fspath)
if path.rev != -1:
fspath = '%s@%s' % (fspath, path.rev)
else:
fspath = '%s@HEAD' % (fspath,)
return 'file://%s' % (fspath,)
class SvnAuth(object):
""" container for auth information for Subversion """
def __init__(self, username, password, cache_auth=True, interactive=True):
self.username = username
self.password = password
self.cache_auth = cache_auth
self.interactive = interactive
def makecmdoptions(self):
uname = self.username.replace('"', '\\"')
passwd = self.password.replace('"', '\\"')
ret = []
if uname:
ret.append('--username="%s"' % (uname,))
if passwd:
ret.append('--password="%s"' % (passwd,))
if not self.cache_auth:
ret.append('--no-auth-cache')
if not self.interactive:
ret.append('--non-interactive')
return ' '.join(ret)
def __str__(self):
return "<SvnAuth username=%s ...>" %(self.username,)
rex_blame = re.compile(r'\s*(\d+)\s*(\S+) (.*)')
@ -31,8 +453,8 @@ class SvnWCCommandPath(common.PathBase):
if wcpath.__class__ == cls:
return wcpath
wcpath = wcpath.localpath
if svncommon._check_for_bad_chars(str(wcpath),
svncommon.ALLOWED_CHARS):
if _check_for_bad_chars(str(wcpath),
ALLOWED_CHARS):
raise ValueError("bad char in wcpath %s" % (wcpath, ))
self.localpath = py.path.local(wcpath)
self.auth = auth
@ -53,7 +475,7 @@ class SvnWCCommandPath(common.PathBase):
url = property(_geturl, None, None, "url of this WC item")
def _escape(self, cmd):
return svncommon._escape_helper(cmd)
return _escape_helper(cmd)
def dump(self, obj):
""" pickle object into path location"""
@ -86,9 +508,7 @@ class SvnWCCommandPath(common.PathBase):
l.extend(args)
l.append('"%s"' % self._escape(self.strpath))
# try fixing the locale because we can't otherwise parse
string = svncommon.fixlocale() + " ".join(l)
if DEBUG:
print "execing", string
string = fixlocale() + " ".join(l)
try:
try:
key = 'LC_MESSAGES'
@ -122,10 +542,10 @@ class SvnWCCommandPath(common.PathBase):
url = self.url
if rev is None or rev == -1:
if (py.std.sys.platform != 'win32' and
svncommon._getsvnversion() == '1.3'):
_getsvnversion() == '1.3'):
url += "@HEAD"
else:
if svncommon._getsvnversion() == '1.3':
if _getsvnversion() == '1.3':
url += "@%d" % rev
else:
args.append('-r' + str(rev))
@ -280,7 +700,8 @@ class SvnWCCommandPath(common.PathBase):
def blame(self):
""" return a list of tuples of three elements:
(revision, commiter, line)"""
(revision, commiter, line)
"""
out = self._svn('blame')
result = []
blamelines = out.splitlines()
@ -298,7 +719,6 @@ class SvnWCCommandPath(common.PathBase):
_rex_commit = re.compile(r'.*Committed revision (\d+)\.$', re.DOTALL)
def commit(self, msg='', rec=1):
""" commit with support for non-recursive commits """
from py.__.path.svn import cache
# XXX i guess escaping should be done better here?!?
cmd = 'commit -m "%s" --force-log' % (msg.replace('"', '\\"'),)
if not rec:
@ -343,7 +763,7 @@ If rec is True, then return a dictionary mapping sub-paths to such mappings.
res = self._svn('proplist')
lines = res.split('\n')
lines = map(str.strip, lines[1:])
return svncommon.PropListDict(self, lines)
return PropListDict(self, lines)
def revert(self, rec=0):
""" revert the local changes of this path. if rec is True, do so
@ -466,17 +886,15 @@ rev_start is the starting revision (defaulting to the first one).
rev_end is the last revision (defaulting to HEAD).
if verbose is True, then the LogEntry instances also know which files changed.
"""
from py.__.path.svn.urlcommand import _Head, LogEntry
assert self.check() # make it simpler for the pipe
rev_start = rev_start is None and _Head or rev_start
rev_end = rev_end is None and _Head or rev_end
if rev_start is _Head and rev_end == 1:
rev_start = rev_start is None and "HEAD" or rev_start
rev_end = rev_end is None and "HEAD" or rev_end
if rev_start == "HEAD" and rev_end == 1:
rev_opt = ""
else:
rev_opt = "-r %s:%s" % (rev_start, rev_end)
verbose_opt = verbose and "-v" or ""
locale_env = svncommon.fixlocale()
locale_env = fixlocale()
# some blather on stderr
auth_opt = self._makeauthoptions()
stdin, stdout, stderr = os.popen3(locale_env +
@ -807,7 +1225,7 @@ def make_recursive_propdict(wcroot,
propname = lines.pop(0).strip()
propnames.append(propname)
assert propnames, "must have found properties!"
pdict[wcpath] = svncommon.PropListDict(wcpath, propnames)
pdict[wcpath] = PropListDict(wcpath, propnames)
return pdict
def error_enhance((cls, error, tb)):
@ -820,3 +1238,30 @@ def importxml(cache=[]):
from xml.parsers.expat import ExpatError
cache.extend([minidom, ExpatError])
return cache
class LogEntry:
def __init__(self, logentry):
self.rev = int(logentry.getAttribute('revision'))
for lpart in filter(None, logentry.childNodes):
if lpart.nodeType == lpart.ELEMENT_NODE:
if lpart.nodeName == u'author':
self.author = lpart.firstChild.nodeValue.encode('UTF-8')
elif lpart.nodeName == u'msg':
if lpart.firstChild:
self.msg = lpart.firstChild.nodeValue.encode('UTF-8')
else:
self.msg = ''
elif lpart.nodeName == u'date':
#2003-07-29T20:05:11.598637Z
timestr = lpart.firstChild.nodeValue.encode('UTF-8')
self.date = parse_apr_time(timestr)
elif lpart.nodeName == u'paths':
self.strpaths = []
for ppart in filter(None, lpart.childNodes):
if ppart.nodeType == ppart.ELEMENT_NODE:
self.strpaths.append(PathEntry(ppart))
def __repr__(self):
return '<Logentry rev=%d author=%s date=%s>' % (
self.rev, self.author, self.date)

View File

@ -2,7 +2,7 @@ import sys
import py
from py import path, test, process
from py.__.path.testing.fscommon import CommonFSTests, setuptestfs
from py.__.path.svn import cache, svncommon
from py.__.path import svnwc as svncommon
mypath = py.magic.autopath()
repodump = mypath.dirpath('repotest.dump')
@ -66,6 +66,7 @@ def restore_repowc((savedrepo, savedwc)):
# create an empty repository for testing purposes and return the url to it
def make_test_repo(name="test-repository"):
getsvnbin()
repo = py.test.ensuretemp(name)
try:
py.process.cmdexec('svnadmin create %s' % repo)
@ -149,14 +150,14 @@ class CommonCommandAndBindingTests(CommonSvnTests):
# the following tests are easier if we have a path class
def test_repocache_simple(self):
repocache = cache.RepoCache()
repocache = svncommon.RepoCache()
repocache.put(self.root.strpath, 42)
url, rev = repocache.get(self.root.join('test').strpath)
assert rev == 42
assert url == self.root.strpath
def test_repocache_notimeout(self):
repocache = cache.RepoCache()
repocache = svncommon.RepoCache()
repocache.timeout = 0
repocache.put(self.root.strpath, self.root.rev)
url, rev = repocache.get(self.root.strpath)
@ -164,7 +165,7 @@ class CommonCommandAndBindingTests(CommonSvnTests):
assert url == self.root.strpath
def test_repocache_outdated(self):
repocache = cache.RepoCache()
repocache = svncommon.RepoCache()
repocache.put(self.root.strpath, 42, timestamp=0)
url, rev = repocache.get(self.root.join('test').strpath)
assert rev == -1
@ -172,7 +173,7 @@ class CommonCommandAndBindingTests(CommonSvnTests):
def _test_getreporev(self):
""" this test runs so slow it's usually disabled """
old = cache.repositories.repos
old = svncommon.repositories.repos
try:
_repocache.clear()
root = self.root.new(rev=-1)

View File

@ -1,6 +1,6 @@
import py
from py.__.path.svn.urlcommand import InfoSvnCommand
from py.__.path.svn.testing.svntestbase import CommonCommandAndBindingTests, \
from py.__.path.svnurl import InfoSvnCommand
from py.__.path.testing.svntestbase import CommonCommandAndBindingTests, \
getrepowc, getsvnbin
import datetime
import time

View File

@ -1,9 +1,8 @@
import py
import sys
from py.__.path.svn.testing.svntestbase import CommonSvnTests, getrepowc, getsvnbin
from py.__.path.svn.wccommand import InfoSvnWCCommand, XMLWCStatus
from py.__.path.svn.wccommand import parse_wcinfotime
from py.__.path.svn import svncommon
from py.__.path.testing.svntestbase import CommonSvnTests, getrepowc, getsvnbin, make_test_repo
from py.__.path.svnwc import InfoSvnWCCommand, XMLWCStatus, parse_wcinfotime
from py.__.path import svnwc as svncommon
if sys.platform != 'win32':
def normpath(p):
@ -23,6 +22,27 @@ else:
def setup_module(mod):
getsvnbin()
class TestMakeRepo(object):
def setup_class(cls):
cls.repo = make_test_repo()
cls.wc = py.path.svnwc(py.test.ensuretemp("test-wc").join("wc"))
def test_empty_checkout(self):
self.wc.checkout(self.repo)
assert len(self.wc.listdir()) == 0
def test_commit(self):
self.wc.checkout(self.repo)
p = self.wc.join("a_file")
p.write("test file")
p.add()
rev = self.wc.commit("some test")
assert p.info().rev == 1
assert rev == 1
rev = self.wc.commit()
assert rev is None
class TestWCSvnCommandPath(CommonSvnTests):
def setup_class(cls):
repo, cls.root = getrepowc()
@ -427,8 +447,8 @@ class TestInfoSvnWCCommand:
def test_svn_1_2(self):
output = """
Path: test_wccommand.py
Name: test_wccommand.py
Path: test_svnwc.py
Name: test_svnwc.py
URL: http://codespeak.net/svn/py/dist/py/path/svn/wccommand.py
Repository UUID: fd0d7bf2-dfb6-0310-8d31-b7ecfe96aada
Revision: 28137
@ -454,8 +474,8 @@ class TestInfoSvnWCCommand:
def test_svn_1_3(self):
output = """
Path: test_wccommand.py
Name: test_wccommand.py
Path: test_svnwc.py
Name: test_svnwc.py
URL: http://codespeak.net/svn/py/dist/py/path/svn/wccommand.py
Repository Root: http://codespeak.net/svn
Repository UUID: fd0d7bf2-dfb6-0310-8d31-b7ecfe96aada