test_ok2/py/path/svn/wccommand.py

817 lines
29 KiB
Python

"""
svn-Command based Implementation of a Subversion WorkingCopy Path.
SvnWCCommandPath is the main class.
SvnWC is an alias to this class.
"""
import os, sys, time, re, calendar
import py
from py.__.path import common
from py.__.path.svn import cache
from py.__.path.svn import svncommon
DEBUG = 0
rex_blame = re.compile(r'\s*(\d+)\s*(\S+) (.*)')
class SvnWCCommandPath(common.FSPathBase):
""" path implementation offering access/modification to svn working copies.
It has methods similar to the functions in os.path and similar to the
commands of the svn client.
"""
sep = os.sep
def __new__(cls, wcpath=None, auth=None):
self = object.__new__(cls)
if isinstance(wcpath, cls):
if wcpath.__class__ == cls:
return wcpath
wcpath = wcpath.localpath
if svncommon._check_for_bad_chars(str(wcpath),
svncommon.ALLOWED_CHARS):
raise ValueError("bad char in wcpath %s" % (wcpath, ))
self.localpath = py.path.local(wcpath)
self.auth = auth
return self
strpath = property(lambda x: str(x.localpath), None, None, "string path")
def __eq__(self, other):
return self.localpath == getattr(other, 'localpath', None)
def _geturl(self):
if getattr(self, '_url', None) is None:
info = self.info()
self._url = info.url #SvnPath(info.url, info.rev)
assert isinstance(self._url, str)
return self._url
url = property(_geturl, None, None, "url of this WC item")
def _escape(self, cmd):
return svncommon._escape_helper(cmd)
def dump(self, obj):
""" pickle object into path location"""
return self.localpath.dump(obj)
def svnurl(self):
""" return current SvnPath for this WC-item. """
info = self.info()
return py.path.svnurl(info.url)
def __repr__(self):
return "svnwc(%r)" % (self.strpath) # , self._url)
def __str__(self):
return str(self.localpath)
def _makeauthoptions(self):
if self.auth is None:
return ''
return self.auth.makecmdoptions()
def _authsvn(self, cmd, args=None):
args = args and list(args) or []
args.append(self._makeauthoptions())
return self._svn(cmd, *args)
def _svn(self, cmd, *args):
l = ['svn %s' % cmd]
args = [self._escape(item) for item in args]
l.extend(args)
l.append('"%s"' % self._escape(self.strpath))
# try fixing the locale because we can't otherwise parse
string = svncommon.fixlocale() + " ".join(l)
if DEBUG:
print "execing", string
try:
try:
key = 'LC_MESSAGES'
hold = os.environ.get(key)
os.environ[key] = 'C'
out = py.process.cmdexec(string)
finally:
if hold:
os.environ[key] = hold
else:
del os.environ[key]
except py.process.cmdexec.Error, e:
strerr = e.err.lower()
if strerr.find('file not found') != -1:
raise py.error.ENOENT(self)
if (strerr.find('file exists') != -1 or
strerr.find('file already exists') != -1 or
strerr.find("can't create directory") != -1):
raise py.error.EEXIST(self)
raise
return out
def switch(self, url):
""" switch to given URL. """
self._authsvn('switch', [url])
def checkout(self, url=None, rev=None):
""" checkout from url to local wcpath. """
args = []
if url is None:
url = self.url
if rev is None or rev == -1:
if (py.std.sys.platform != 'win32' and
svncommon._getsvnversion() == '1.3'):
url += "@HEAD"
else:
if svncommon._getsvnversion() == '1.3':
url += "@%d" % rev
else:
args.append('-r' + str(rev))
args.append(url)
self._authsvn('co', args)
def update(self, rev = 'HEAD'):
""" update working copy item to given revision. (None -> HEAD). """
self._authsvn('up', ['-r', rev])
def write(self, content, mode='wb'):
""" write content into local filesystem wc. """
self.localpath.write(content, mode)
def dirpath(self, *args):
""" return the directory Path of the current Path. """
return self.__class__(self.localpath.dirpath(*args), auth=self.auth)
def _ensuredirs(self):
parent = self.dirpath()
if parent.check(dir=0):
parent._ensuredirs()
if self.check(dir=0):
self.mkdir()
return self
def ensure(self, *args, **kwargs):
""" ensure that an args-joined path exists (by default as
a file). if you specify a keyword argument 'directory=True'
then the path is forced to be a directory path.
"""
try:
p = self.join(*args)
if p.check():
if p.check(versioned=False):
p.add()
return p
if kwargs.get('dir', 0):
return p._ensuredirs()
parent = p.dirpath()
parent._ensuredirs()
p.write("")
p.add()
return p
except:
error_enhance(sys.exc_info())
def mkdir(self, *args):
""" create & return the directory joined with args. """
if args:
return self.join(*args).mkdir()
else:
self._svn('mkdir')
return self
def add(self):
""" add ourself to svn """
self._svn('add')
def remove(self, rec=1, force=1):
""" remove a file or a directory tree. 'rec'ursive is
ignored and considered always true (because of
underlying svn semantics.
"""
assert rec, "svn cannot remove non-recursively"
if not self.check(versioned=True):
# not added to svn (anymore?), just remove
py.path.local(self).remove()
return
flags = []
if force:
flags.append('--force')
self._svn('remove', *flags)
def copy(self, target):
""" copy path to target."""
py.process.cmdexec("svn copy %s %s" %(str(self), str(target)))
def rename(self, target):
""" rename this path to target. """
py.process.cmdexec("svn move --force %s %s" %(str(self), str(target)))
def lock(self):
""" set a lock (exclusive) on the resource """
out = self._authsvn('lock').strip()
if not out:
# warning or error, raise exception
raise Exception(out[4:])
def unlock(self):
""" unset a previously set lock """
out = self._authsvn('unlock').strip()
if out.startswith('svn:'):
# warning or error, raise exception
raise Exception(out[4:])
def cleanup(self):
""" remove any locks from the resource """
# XXX should be fixed properly!!!
try:
self.unlock()
except:
pass
def status(self, updates=0, rec=0, externals=0):
""" return (collective) Status object for this file. """
# http://svnbook.red-bean.com/book.html#svn-ch-3-sect-4.3.1
# 2201 2192 jum test
# XXX
if externals:
raise ValueError("XXX cannot perform status() "
"on external items yet")
else:
#1.2 supports: externals = '--ignore-externals'
externals = ''
if rec:
rec= ''
else:
rec = '--non-recursive'
# XXX does not work on all subversion versions
#if not externals:
# externals = '--ignore-externals'
if updates:
updates = '-u'
else:
updates = ''
try:
cmd = 'status -v --xml --no-ignore %s %s %s' % (
updates, rec, externals)
out = self._authsvn(cmd)
except py.process.cmdexec.Error:
cmd = 'status -v --no-ignore %s %s %s' % (
updates, rec, externals)
out = self._authsvn(cmd)
rootstatus = WCStatus(self).fromstring(out, self)
else:
rootstatus = XMLWCStatus(self).fromstring(out, self)
return rootstatus
def diff(self, rev=None):
""" return a diff of the current path against revision rev (defaulting
to the last one).
"""
args = []
if rev is not None:
args.append("-r %d" % rev)
out = self._authsvn('diff', args)
return out
def blame(self):
""" return a list of tuples of three elements:
(revision, commiter, line)"""
out = self._svn('blame')
result = []
blamelines = out.splitlines()
reallines = py.path.svnurl(self.url).readlines()
for i, (blameline, line) in py.builtin.enumerate(
zip(blamelines, reallines)):
m = rex_blame.match(blameline)
if not m:
raise ValueError("output line %r of svn blame does not match "
"expected format" % (line, ))
rev, name, _ = m.groups()
result.append((int(rev), name, line))
return result
_rex_commit = re.compile(r'.*Committed revision (\d+)\.$', re.DOTALL)
def commit(self, msg='', rec=1):
""" commit with support for non-recursive commits """
from py.__.path.svn import cache
# XXX i guess escaping should be done better here?!?
cmd = 'commit -m "%s" --force-log' % (msg.replace('"', '\\"'),)
if not rec:
cmd += ' -N'
out = self._authsvn(cmd)
try:
del cache.info[self]
except KeyError:
pass
if out:
m = self._rex_commit.match(out)
return int(m.group(1))
def propset(self, name, value, *args):
""" set property name to value on this path. """
d = py.path.local.mkdtemp()
try:
p = d.join('value')
p.write(value)
self._svn('propset', name, '--file', str(p), *args)
finally:
d.remove()
def propget(self, name):
""" get property name on this path. """
res = self._svn('propget', name)
return res[:-1] # strip trailing newline
def propdel(self, name):
""" delete property name on this path. """
res = self._svn('propdel', name)
return res[:-1] # strip trailing newline
def proplist(self, rec=0):
""" return a mapping of property names to property values.
If rec is True, then return a dictionary mapping sub-paths to such mappings.
"""
if rec:
res = self._svn('proplist -R')
return make_recursive_propdict(self, res)
else:
res = self._svn('proplist')
lines = res.split('\n')
lines = map(str.strip, lines[1:])
return svncommon.PropListDict(self, lines)
def revert(self, rec=0):
""" revert the local changes of this path. if rec is True, do so
recursively. """
if rec:
result = self._svn('revert -R')
else:
result = self._svn('revert')
return result
def new(self, **kw):
""" create a modified version of this path. A 'rev' argument
indicates a new revision.
the following keyword arguments modify various path parts:
http://host.com/repo/path/file.ext
|-----------------------| dirname
|------| basename
|--| purebasename
|--| ext
"""
if kw:
localpath = self.localpath.new(**kw)
else:
localpath = self.localpath
return self.__class__(localpath, auth=self.auth)
def join(self, *args, **kwargs):
""" return a new Path (with the same revision) which is composed
of the self Path followed by 'args' path components.
"""
if not args:
return self
localpath = self.localpath.join(*args, **kwargs)
return self.__class__(localpath, auth=self.auth)
def info(self, usecache=1):
""" return an Info structure with svn-provided information. """
info = usecache and cache.info.get(self)
if not info:
try:
output = self._svn('info')
except py.process.cmdexec.Error, e:
if e.err.find('Path is not a working copy directory') != -1:
raise py.error.ENOENT(self, e.err)
elif e.err.find("is not under version control") != -1:
raise py.error.ENOENT(self, e.err)
raise
# XXX SVN 1.3 has output on stderr instead of stdout (while it does
# return 0!), so a bit nasty, but we assume no output is output
# to stderr...
if (output.strip() == '' or
output.lower().find('not a versioned resource') != -1):
raise py.error.ENOENT(self, output)
info = InfoSvnWCCommand(output)
# Can't reliably compare on Windows without access to win32api
if py.std.sys.platform != 'win32':
if info.path != self.localpath:
raise py.error.ENOENT(self, "not a versioned resource:" +
" %s != %s" % (info.path, self.localpath))
cache.info[self] = info
self.rev = info.rev
return info
def listdir(self, fil=None, sort=None):
""" return a sequence of Paths.
listdir will return either a tuple or a list of paths
depending on implementation choices.
"""
if isinstance(fil, str):
fil = common.fnmatch(fil)
# XXX unify argument naming with LocalPath.listdir
def notsvn(path):
return path.basename != '.svn'
paths = []
for localpath in self.localpath.listdir(notsvn):
p = self.__class__(localpath, auth=self.auth)
paths.append(p)
if fil or sort:
paths = filter(fil, paths)
paths = isinstance(paths, list) and paths or list(paths)
if callable(sort):
paths.sort(sort)
elif sort:
paths.sort()
return paths
def open(self, mode='r'):
""" return an opened file with the given mode. """
return open(self.strpath, mode)
def _getbyspec(self, spec):
return self.localpath._getbyspec(spec)
class Checkers(py.path.local.Checkers):
def __init__(self, path):
self.svnwcpath = path
self.path = path.localpath
def versioned(self):
try:
s = self.svnwcpath.info()
except (py.error.ENOENT, py.error.EEXIST):
return False
except py.process.cmdexec.Error, e:
if e.err.find('is not a working copy')!=-1:
return False
raise
else:
return True
def log(self, rev_start=None, rev_end=1, verbose=False):
""" return a list of LogEntry instances for this path.
rev_start is the starting revision (defaulting to the first one).
rev_end is the last revision (defaulting to HEAD).
if verbose is True, then the LogEntry instances also know which files changed.
"""
from py.__.path.svn.urlcommand import _Head, LogEntry
assert self.check() # make it simpler for the pipe
rev_start = rev_start is None and _Head or rev_start
rev_end = rev_end is None and _Head or rev_end
if rev_start is _Head and rev_end == 1:
rev_opt = ""
else:
rev_opt = "-r %s:%s" % (rev_start, rev_end)
verbose_opt = verbose and "-v" or ""
locale_env = svncommon.fixlocale()
# some blather on stderr
auth_opt = self._makeauthoptions()
stdin, stdout, stderr = os.popen3(locale_env +
'svn log --xml %s %s %s "%s"' % (
rev_opt, verbose_opt, auth_opt,
self.strpath))
minidom,ExpatError = importxml()
try:
tree = minidom.parse(stdout)
except ExpatError:
raise ValueError('no such revision')
result = []
for logentry in filter(None, tree.firstChild.childNodes):
if logentry.nodeType == logentry.ELEMENT_NODE:
result.append(LogEntry(logentry))
return result
def size(self):
""" Return the size of the file content of the Path. """
return self.info().size
def mtime(self):
""" Return the last modification time of the file. """
return self.info().mtime
def __hash__(self):
return hash((self.strpath, self.__class__, self.auth))
class WCStatus:
attrnames = ('modified','added', 'conflict', 'unchanged', 'external',
'deleted', 'prop_modified', 'unknown', 'update_available',
'incomplete', 'kindmismatch', 'ignored', 'locked', 'replaced'
)
def __init__(self, wcpath, rev=None, modrev=None, author=None):
self.wcpath = wcpath
self.rev = rev
self.modrev = modrev
self.author = author
for name in self.attrnames:
setattr(self, name, [])
def allpath(self, sort=True, **kw):
d = {}
for name in self.attrnames:
if name not in kw or kw[name]:
for path in getattr(self, name):
d[path] = 1
l = d.keys()
if sort:
l.sort()
return l
# XXX a bit scary to assume there's always 2 spaces between username and
# path, however with win32 allowing spaces in user names there doesn't
# seem to be a more solid approach :(
_rex_status = re.compile(r'\s+(\d+|-)\s+(\S+)\s+(.+?)\s{2,}(.*)')
def fromstring(data, rootwcpath, rev=None, modrev=None, author=None):
""" return a new WCStatus object from data 's'
"""
rootstatus = WCStatus(rootwcpath, rev, modrev, author)
update_rev = None
for line in data.split('\n'):
if not line.strip():
continue
#print "processing %r" % line
flags, rest = line[:8], line[8:]
# first column
c0,c1,c2,c3,c4,c5,x6,c7 = flags
#if '*' in line:
# print "flags", repr(flags), "rest", repr(rest)
if c0 in '?XI':
fn = line.split(None, 1)[1]
if c0 == '?':
wcpath = rootwcpath.join(fn, abs=1)
rootstatus.unknown.append(wcpath)
elif c0 == 'X':
wcpath = rootwcpath.__class__(
rootwcpath.localpath.join(fn, abs=1),
auth=rootwcpath.auth)
rootstatus.external.append(wcpath)
elif c0 == 'I':
wcpath = rootwcpath.join(fn, abs=1)
rootstatus.ignored.append(wcpath)
continue
#elif c0 in '~!' or c4 == 'S':
# raise NotImplementedError("received flag %r" % c0)
m = WCStatus._rex_status.match(rest)
if not m:
if c7 == '*':
fn = rest.strip()
wcpath = rootwcpath.join(fn, abs=1)
rootstatus.update_available.append(wcpath)
continue
if line.lower().find('against revision:')!=-1:
update_rev = int(rest.split(':')[1].strip())
continue
if line.lower().find('status on external') > -1:
# XXX not sure what to do here... perhaps we want to
# store some state instead of just continuing, as right
# now it makes the top-level external get added twice
# (once as external, once as 'normal' unchanged item)
# because of the way SVN presents external items
continue
# keep trying
raise ValueError, "could not parse line %r" % line
else:
rev, modrev, author, fn = m.groups()
wcpath = rootwcpath.join(fn, abs=1)
#assert wcpath.check()
if c0 == 'M':
assert wcpath.check(file=1), "didn't expect a directory with changed content here"
rootstatus.modified.append(wcpath)
elif c0 == 'A' or c3 == '+' :
rootstatus.added.append(wcpath)
elif c0 == 'D':
rootstatus.deleted.append(wcpath)
elif c0 == 'C':
rootstatus.conflict.append(wcpath)
elif c0 == '~':
rootstatus.kindmismatch.append(wcpath)
elif c0 == '!':
rootstatus.incomplete.append(wcpath)
elif c0 == 'R':
rootstatus.replaced.append(wcpath)
elif not c0.strip():
rootstatus.unchanged.append(wcpath)
else:
raise NotImplementedError("received flag %r" % c0)
if c1 == 'M':
rootstatus.prop_modified.append(wcpath)
# XXX do we cover all client versions here?
if c2 == 'L' or c5 == 'K':
rootstatus.locked.append(wcpath)
if c7 == '*':
rootstatus.update_available.append(wcpath)
if wcpath == rootwcpath:
rootstatus.rev = rev
rootstatus.modrev = modrev
rootstatus.author = author
if update_rev:
rootstatus.update_rev = update_rev
continue
return rootstatus
fromstring = staticmethod(fromstring)
class XMLWCStatus(WCStatus):
def fromstring(data, rootwcpath, rev=None, modrev=None, author=None):
""" parse 'data' (XML string as outputted by svn st) into a status obj
"""
# XXX for externals, the path is shown twice: once
# with external information, and once with full info as if
# the item was a normal non-external... the current way of
# dealing with this issue is by ignoring it - this does make
# externals appear as external items as well as 'normal',
# unchanged ones in the status object so this is far from ideal
rootstatus = WCStatus(rootwcpath, rev, modrev, author)
update_rev = None
minidom, ExpatError = importxml()
try:
doc = minidom.parseString(data)
except ExpatError, e:
raise ValueError(str(e))
urevels = doc.getElementsByTagName('against')
if urevels:
rootstatus.update_rev = urevels[-1].getAttribute('revision')
for entryel in doc.getElementsByTagName('entry'):
path = entryel.getAttribute('path')
statusel = entryel.getElementsByTagName('wc-status')[0]
itemstatus = statusel.getAttribute('item')
if itemstatus == 'unversioned':
wcpath = rootwcpath.join(path, abs=1)
rootstatus.unknown.append(wcpath)
continue
elif itemstatus == 'external':
wcpath = rootwcpath.__class__(
rootwcpath.localpath.join(path, abs=1),
auth=rootwcpath.auth)
rootstatus.external.append(wcpath)
continue
elif itemstatus == 'ignored':
wcpath = rootwcpath.join(path, abs=1)
rootstatus.ignored.append(wcpath)
continue
rev = statusel.getAttribute('revision')
if itemstatus == 'added' or itemstatus == 'none':
rev = '0'
modrev = '?'
author = '?'
date = ''
else:
#print entryel.toxml()
commitel = entryel.getElementsByTagName('commit')[0]
if commitel:
modrev = commitel.getAttribute('revision')
author = ''
author_els = commitel.getElementsByTagName('author')
if author_els:
for c in author_els[0].childNodes:
author += c.nodeValue
date = ''
for c in commitel.getElementsByTagName('date')[0]\
.childNodes:
date += c.nodeValue
wcpath = rootwcpath.join(path, abs=1)
assert itemstatus != 'modified' or wcpath.check(file=1), (
'did\'t expect a directory with changed content here')
itemattrname = {
'normal': 'unchanged',
'unversioned': 'unknown',
'conflicted': 'conflict',
'none': 'added',
}.get(itemstatus, itemstatus)
attr = getattr(rootstatus, itemattrname)
attr.append(wcpath)
propsstatus = statusel.getAttribute('props')
if propsstatus not in ('none', 'normal'):
rootstatus.prop_modified.append(wcpath)
if wcpath == rootwcpath:
rootstatus.rev = rev
rootstatus.modrev = modrev
rootstatus.author = author
rootstatus.date = date
# handle repos-status element (remote info)
rstatusels = entryel.getElementsByTagName('repos-status')
if rstatusels:
rstatusel = rstatusels[0]
ritemstatus = rstatusel.getAttribute('item')
if ritemstatus in ('added', 'modified'):
rootstatus.update_available.append(wcpath)
lockels = entryel.getElementsByTagName('lock')
if len(lockels):
rootstatus.locked.append(wcpath)
return rootstatus
fromstring = staticmethod(fromstring)
class InfoSvnWCCommand:
def __init__(self, output):
# Path: test
# URL: http://codespeak.net/svn/std.path/trunk/dist/std.path/test
# Repository UUID: fd0d7bf2-dfb6-0310-8d31-b7ecfe96aada
# Revision: 2151
# Node Kind: directory
# Schedule: normal
# Last Changed Author: hpk
# Last Changed Rev: 2100
# Last Changed Date: 2003-10-27 20:43:14 +0100 (Mon, 27 Oct 2003)
# Properties Last Updated: 2003-11-03 14:47:48 +0100 (Mon, 03 Nov 2003)
d = {}
for line in output.split('\n'):
if not line.strip():
continue
key, value = line.split(':', 1)
key = key.lower().replace(' ', '')
value = value.strip()
d[key] = value
try:
self.url = d['url']
except KeyError:
raise ValueError, "Not a versioned resource"
#raise ValueError, "Not a versioned resource %r" % path
self.kind = d['nodekind'] == 'directory' and 'dir' or d['nodekind']
self.rev = int(d['revision'])
self.path = py.path.local(d['path'])
self.size = self.path.size()
if 'lastchangedrev' in d:
self.created_rev = int(d['lastchangedrev'])
if 'lastchangedauthor' in d:
self.last_author = d['lastchangedauthor']
if 'lastchangeddate' in d:
self.mtime = parse_wcinfotime(d['lastchangeddate'])
self.time = self.mtime * 1000000
def __eq__(self, other):
return self.__dict__ == other.__dict__
def parse_wcinfotime(timestr):
""" Returns seconds since epoch, UTC. """
# example: 2003-10-27 20:43:14 +0100 (Mon, 27 Oct 2003)
m = re.match(r'(\d+-\d+-\d+ \d+:\d+:\d+) ([+-]\d+) .*', timestr)
if not m:
raise ValueError, "timestring %r does not match" % timestr
timestr, timezone = m.groups()
# do not handle timezone specially, return value should be UTC
parsedtime = time.strptime(timestr, "%Y-%m-%d %H:%M:%S")
return calendar.timegm(parsedtime)
def make_recursive_propdict(wcroot,
output,
rex = re.compile("Properties on '(.*)':")):
""" Return a dictionary of path->PropListDict mappings. """
lines = filter(None, output.split('\n'))
pdict = {}
while lines:
line = lines.pop(0)
m = rex.match(line)
if not m:
raise ValueError, "could not parse propget-line: %r" % line
path = m.groups()[0]
wcpath = wcroot.join(path, abs=1)
propnames = []
while lines and lines[0].startswith(' '):
propname = lines.pop(0).strip()
propnames.append(propname)
assert propnames, "must have found properties!"
pdict[wcpath] = svncommon.PropListDict(wcpath, propnames)
return pdict
def error_enhance((cls, error, tb)):
raise cls, error, tb
def importxml(cache=[]):
if cache:
return cache
from xml.dom import minidom
from xml.parsers.expat import ExpatError
cache.extend([minidom, ExpatError])
return cache