""" svn-Command based Implementation of a Subversion WorkingCopy Path. SvnWCCommandPath is the main class. SvnWC is an alias to this class. """ import os, sys, time, re, calendar from xml.dom import minidom from xml.parsers.expat import ExpatError import py from py.__.path import common from py.__.path.svn import cache from py.__.path.svn import svncommon DEBUG = 0 rex_blame = re.compile(r'\s*(\d+)\s*(\S+) (.*)') class SvnWCCommandPath(common.FSPathBase): """ path implementation offering access/modification to svn working copies. It has methods similar to the functions in os.path and similar to the commands of the svn client. """ sep = os.sep def __new__(cls, wcpath=None, auth=None): self = object.__new__(cls) if isinstance(wcpath, cls): if wcpath.__class__ == cls: return wcpath wcpath = wcpath.localpath if svncommon._check_for_bad_chars(str(wcpath), svncommon.ALLOWED_CHARS): raise ValueError("bad char in wcpath %s" % (wcpath, )) self.localpath = py.path.local(wcpath) self.auth = auth return self strpath = property(lambda x: str(x.localpath), None, None, "string path") def __eq__(self, other): return self.localpath == getattr(other, 'localpath', None) def _geturl(self): if getattr(self, '_url', None) is None: info = self.info() self._url = info.url #SvnPath(info.url, info.rev) assert isinstance(self._url, str) return self._url url = property(_geturl, None, None, "url of this WC item") def _escape(self, cmd): return svncommon._escape_helper(cmd) def dump(self, obj): """ pickle object into path location""" return self.localpath.dump(obj) def svnurl(self): """ return current SvnPath for this WC-item. """ info = self.info() return py.path.svnurl(info.url) def __repr__(self): return "svnwc(%r)" % (self.strpath) # , self._url) def __str__(self): return str(self.localpath) def _makeauthoptions(self): if self.auth is None: return '' return self.auth.makecmdoptions() def _authsvn(self, cmd, args=None): args = args and list(args) or [] args.append(self._makeauthoptions()) return self._svn(cmd, *args) def _svn(self, cmd, *args): l = ['svn %s' % cmd] args = [self._escape(item) for item in args] l.extend(args) l.append('"%s"' % self._escape(self.strpath)) # try fixing the locale because we can't otherwise parse string = svncommon.fixlocale() + " ".join(l) if DEBUG: print "execing", string try: try: key = 'LC_MESSAGES' hold = os.environ.get(key) os.environ[key] = 'C' out = py.process.cmdexec(string) finally: if hold: os.environ[key] = hold else: del os.environ[key] except py.process.cmdexec.Error, e: strerr = e.err.lower() if strerr.find('file not found') != -1: raise py.error.ENOENT(self) if (strerr.find('file exists') != -1 or strerr.find('file already exists') != -1 or strerr.find("can't create directory") != -1): raise py.error.EEXIST(self) raise return out def switch(self, url): """ switch to given URL. """ self._authsvn('switch', [url]) def checkout(self, url=None, rev=None): """ checkout from url to local wcpath. """ args = [] if url is None: url = self.url if rev is None or rev == -1: if (py.std.sys.platform != 'win32' and svncommon._getsvnversion() == '1.3'): url += "@HEAD" else: if svncommon._getsvnversion() == '1.3': url += "@%d" % rev else: args.append('-r' + str(rev)) args.append(url) self._authsvn('co', args) def update(self, rev = 'HEAD'): """ update working copy item to given revision. (None -> HEAD). """ self._authsvn('up', ['-r', rev]) def write(self, content, mode='wb'): """ write content into local filesystem wc. """ self.localpath.write(content, mode) def dirpath(self, *args): """ return the directory Path of the current Path. """ return self.__class__(self.localpath.dirpath(*args), auth=self.auth) def _ensuredirs(self): parent = self.dirpath() if parent.check(dir=0): parent._ensuredirs() if self.check(dir=0): self.mkdir() return self def ensure(self, *args, **kwargs): """ ensure that an args-joined path exists (by default as a file). if you specify a keyword argument 'directory=True' then the path is forced to be a directory path. """ try: p = self.join(*args) if p.check(): if p.check(versioned=False): p.add() return p if kwargs.get('dir', 0): return p._ensuredirs() parent = p.dirpath() parent._ensuredirs() p.write("") p.add() return p except: error_enhance(sys.exc_info()) def mkdir(self, *args): """ create & return the directory joined with args. """ if args: return self.join(*args).mkdir() else: self._svn('mkdir') return self def add(self): """ add ourself to svn """ self._svn('add') def remove(self, rec=1, force=1): """ remove a file or a directory tree. 'rec'ursive is ignored and considered always true (because of underlying svn semantics. """ assert rec, "svn cannot remove non-recursively" if not self.check(versioned=True): # not added to svn (anymore?), just remove py.path.local(self).remove() return flags = [] if force: flags.append('--force') self._svn('remove', *flags) def copy(self, target): """ copy path to target.""" py.process.cmdexec("svn copy %s %s" %(str(self), str(target))) def rename(self, target): """ rename this path to target. """ py.process.cmdexec("svn move --force %s %s" %(str(self), str(target))) def lock(self): """ set a lock (exclusive) on the resource """ out = self._authsvn('lock').strip() if not out: # warning or error, raise exception raise Exception(out[4:]) def unlock(self): """ unset a previously set lock """ out = self._authsvn('unlock').strip() if out.startswith('svn:'): # warning or error, raise exception raise Exception(out[4:]) def cleanup(self): """ remove any locks from the resource """ # XXX should be fixed properly!!! try: self.unlock() except: pass def status(self, updates=0, rec=0, externals=0): """ return (collective) Status object for this file. """ # http://svnbook.red-bean.com/book.html#svn-ch-3-sect-4.3.1 # 2201 2192 jum test # XXX if externals: raise ValueError("XXX cannot perform status() " "on external items yet") else: #1.2 supports: externals = '--ignore-externals' externals = '' if rec: rec= '' else: rec = '--non-recursive' # XXX does not work on all subversion versions #if not externals: # externals = '--ignore-externals' if updates: updates = '-u' else: updates = '' try: cmd = 'status -v --xml --no-ignore %s %s %s' % ( updates, rec, externals) out = self._authsvn(cmd) except py.process.cmdexec.Error: cmd = 'status -v --no-ignore %s %s %s' % ( updates, rec, externals) out = self._authsvn(cmd) rootstatus = WCStatus(self).fromstring(out, self) else: rootstatus = XMLWCStatus(self).fromstring(out, self) return rootstatus def diff(self, rev=None): """ return a diff of the current path against revision rev (defaulting to the last one). """ args = [] if rev is not None: args.append("-r %d" % rev) out = self._authsvn('diff', args) return out def blame(self): """ return a list of tuples of three elements: (revision, commiter, line)""" out = self._svn('blame') result = [] blamelines = out.splitlines() reallines = py.path.svnurl(self.url).readlines() for i, (blameline, line) in py.builtin.enumerate( zip(blamelines, reallines)): m = rex_blame.match(blameline) if not m: raise ValueError("output line %r of svn blame does not match " "expected format" % (line, )) rev, name, _ = m.groups() result.append((int(rev), name, line)) return result _rex_commit = re.compile(r'.*Committed revision (\d+)\.$', re.DOTALL) def commit(self, msg='', rec=1): """ commit with support for non-recursive commits """ from py.__.path.svn import cache # XXX i guess escaping should be done better here?!? cmd = 'commit -m "%s" --force-log' % (msg.replace('"', '\\"'),) if not rec: cmd += ' -N' out = self._authsvn(cmd) try: del cache.info[self] except KeyError: pass if out: m = self._rex_commit.match(out) return int(m.group(1)) def propset(self, name, value, *args): """ set property name to value on this path. """ d = py.path.local.mkdtemp() try: p = d.join('value') p.write(value) self._svn('propset', name, '--file', str(p), *args) finally: d.remove() def propget(self, name): """ get property name on this path. """ res = self._svn('propget', name) return res[:-1] # strip trailing newline def propdel(self, name): """ delete property name on this path. """ res = self._svn('propdel', name) return res[:-1] # strip trailing newline def proplist(self, rec=0): """ return a mapping of property names to property values. If rec is True, then return a dictionary mapping sub-paths to such mappings. """ if rec: res = self._svn('proplist -R') return make_recursive_propdict(self, res) else: res = self._svn('proplist') lines = res.split('\n') lines = map(str.strip, lines[1:]) return svncommon.PropListDict(self, lines) def revert(self, rec=0): """ revert the local changes of this path. if rec is True, do so recursively. """ if rec: result = self._svn('revert -R') else: result = self._svn('revert') return result def new(self, **kw): """ create a modified version of this path. A 'rev' argument indicates a new revision. the following keyword arguments modify various path parts: http://host.com/repo/path/file.ext |-----------------------| dirname |------| basename |--| purebasename |--| ext """ if kw: localpath = self.localpath.new(**kw) else: localpath = self.localpath return self.__class__(localpath, auth=self.auth) def join(self, *args, **kwargs): """ return a new Path (with the same revision) which is composed of the self Path followed by 'args' path components. """ if not args: return self localpath = self.localpath.join(*args, **kwargs) return self.__class__(localpath, auth=self.auth) def info(self, usecache=1): """ return an Info structure with svn-provided information. """ info = usecache and cache.info.get(self) if not info: try: output = self._svn('info') except py.process.cmdexec.Error, e: if e.err.find('Path is not a working copy directory') != -1: raise py.error.ENOENT(self, e.err) raise # XXX SVN 1.3 has output on stderr instead of stdout (while it does # return 0!), so a bit nasty, but we assume no output is output # to stderr... if (output.strip() == '' or output.lower().find('not a versioned resource') != -1): raise py.error.ENOENT(self, output) info = InfoSvnWCCommand(output) # Can't reliably compare on Windows without access to win32api if py.std.sys.platform != 'win32': if info.path != self.localpath: raise py.error.ENOENT(self, "not a versioned resource:" + " %s != %s" % (info.path, self.localpath)) cache.info[self] = info self.rev = info.rev return info def listdir(self, fil=None, sort=None): """ return a sequence of Paths. listdir will return either a tuple or a list of paths depending on implementation choices. """ if isinstance(fil, str): fil = common.fnmatch(fil) # XXX unify argument naming with LocalPath.listdir def notsvn(path): return path.basename != '.svn' paths = [] for localpath in self.localpath.listdir(notsvn): p = self.__class__(localpath, auth=self.auth) paths.append(p) if fil or sort: paths = filter(fil, paths) paths = isinstance(paths, list) and paths or list(paths) if callable(sort): paths.sort(sort) elif sort: paths.sort() return paths def open(self, mode='r'): """ return an opened file with the given mode. """ return open(self.strpath, mode) def _getbyspec(self, spec): return self.localpath._getbyspec(spec) class Checkers(py.path.local.Checkers): def __init__(self, path): self.svnwcpath = path self.path = path.localpath def versioned(self): try: s = self.svnwcpath.info() except (py.error.ENOENT, py.error.EEXIST): return False except py.process.cmdexec.Error, e: if e.err.find('is not a working copy')!=-1: return False raise else: return True def log(self, rev_start=None, rev_end=1, verbose=False): """ return a list of LogEntry instances for this path. rev_start is the starting revision (defaulting to the first one). rev_end is the last revision (defaulting to HEAD). if verbose is True, then the LogEntry instances also know which files changed. """ from py.__.path.svn.urlcommand import _Head, LogEntry assert self.check() # make it simpler for the pipe rev_start = rev_start is None and _Head or rev_start rev_end = rev_end is None and _Head or rev_end if rev_start is _Head and rev_end == 1: rev_opt = "" else: rev_opt = "-r %s:%s" % (rev_start, rev_end) verbose_opt = verbose and "-v" or "" locale_env = svncommon.fixlocale() # some blather on stderr auth_opt = self._makeauthoptions() stdin, stdout, stderr = os.popen3(locale_env + 'svn log --xml %s %s %s "%s"' % ( rev_opt, verbose_opt, auth_opt, self.strpath)) from xml.dom import minidom from xml.parsers.expat import ExpatError try: tree = minidom.parse(stdout) except ExpatError: # XXX not entirely sure about this exception... shouldn't it be # some py.error.* something? raise ValueError('no such revision') result = [] for logentry in filter(None, tree.firstChild.childNodes): if logentry.nodeType == logentry.ELEMENT_NODE: result.append(LogEntry(logentry)) return result def size(self): """ Return the size of the file content of the Path. """ return self.info().size def mtime(self): """ Return the last modification time of the file. """ return self.info().mtime def __hash__(self): return hash((self.strpath, self.__class__, self.auth)) class WCStatus: attrnames = ('modified','added', 'conflict', 'unchanged', 'external', 'deleted', 'prop_modified', 'unknown', 'update_available', 'incomplete', 'kindmismatch', 'ignored', 'locked', 'replaced' ) def __init__(self, wcpath, rev=None, modrev=None, author=None): self.wcpath = wcpath self.rev = rev self.modrev = modrev self.author = author for name in self.attrnames: setattr(self, name, []) def allpath(self, sort=True, **kw): d = {} for name in self.attrnames: if name not in kw or kw[name]: for path in getattr(self, name): d[path] = 1 l = d.keys() if sort: l.sort() return l # XXX a bit scary to assume there's always 2 spaces between username and # path, however with win32 allowing spaces in user names there doesn't # seem to be a more solid approach :( _rex_status = re.compile(r'\s+(\d+|-)\s+(\S+)\s+(.+?)\s{2,}(.*)') def fromstring(data, rootwcpath, rev=None, modrev=None, author=None): """ return a new WCStatus object from data 's' """ rootstatus = WCStatus(rootwcpath, rev, modrev, author) update_rev = None for line in data.split('\n'): if not line.strip(): continue #print "processing %r" % line flags, rest = line[:8], line[8:] # first column c0,c1,c2,c3,c4,c5,x6,c7 = flags #if '*' in line: # print "flags", repr(flags), "rest", repr(rest) if c0 in '?XI': fn = line.split(None, 1)[1] if c0 == '?': wcpath = rootwcpath.join(fn, abs=1) rootstatus.unknown.append(wcpath) elif c0 == 'X': wcpath = rootwcpath.__class__( rootwcpath.localpath.join(fn, abs=1), auth=rootwcpath.auth) rootstatus.external.append(wcpath) elif c0 == 'I': wcpath = rootwcpath.join(fn, abs=1) rootstatus.ignored.append(wcpath) continue #elif c0 in '~!' or c4 == 'S': # raise NotImplementedError("received flag %r" % c0) m = WCStatus._rex_status.match(rest) if not m: if c7 == '*': fn = rest.strip() wcpath = rootwcpath.join(fn, abs=1) rootstatus.update_available.append(wcpath) continue if line.lower().find('against revision:')!=-1: update_rev = int(rest.split(':')[1].strip()) continue if line.lower().find('status on external') > -1: # XXX not sure what to do here... perhaps we want to # store some state instead of just continuing, as right # now it makes the top-level external get added twice # (once as external, once as 'normal' unchanged item) # because of the way SVN presents external items continue # keep trying raise ValueError, "could not parse line %r" % line else: rev, modrev, author, fn = m.groups() wcpath = rootwcpath.join(fn, abs=1) #assert wcpath.check() if c0 == 'M': assert wcpath.check(file=1), "didn't expect a directory with changed content here" rootstatus.modified.append(wcpath) elif c0 == 'A' or c3 == '+' : rootstatus.added.append(wcpath) elif c0 == 'D': rootstatus.deleted.append(wcpath) elif c0 == 'C': rootstatus.conflict.append(wcpath) elif c0 == '~': rootstatus.kindmismatch.append(wcpath) elif c0 == '!': rootstatus.incomplete.append(wcpath) elif c0 == 'R': rootstatus.replaced.append(wcpath) elif not c0.strip(): rootstatus.unchanged.append(wcpath) else: raise NotImplementedError("received flag %r" % c0) if c1 == 'M': rootstatus.prop_modified.append(wcpath) # XXX do we cover all client versions here? if c2 == 'L' or c5 == 'K': rootstatus.locked.append(wcpath) if c7 == '*': rootstatus.update_available.append(wcpath) if wcpath == rootwcpath: rootstatus.rev = rev rootstatus.modrev = modrev rootstatus.author = author if update_rev: rootstatus.update_rev = update_rev continue return rootstatus fromstring = staticmethod(fromstring) class XMLWCStatus(WCStatus): def fromstring(data, rootwcpath, rev=None, modrev=None, author=None): """ parse 'data' (XML string as outputted by svn st) into a status obj """ # XXX for externals, the path is shown twice: once # with external information, and once with full info as if # the item was a normal non-external... the current way of # dealing with this issue is by ignoring it - this does make # externals appear as external items as well as 'normal', # unchanged ones in the status object so this is far from ideal rootstatus = WCStatus(rootwcpath, rev, modrev, author) update_rev = None try: doc = minidom.parseString(data) except ExpatError, e: raise ValueError(str(e)) urevels = doc.getElementsByTagName('against') if urevels: rootstatus.update_rev = urevels[-1].getAttribute('revision') for entryel in doc.getElementsByTagName('entry'): path = entryel.getAttribute('path') statusel = entryel.getElementsByTagName('wc-status')[0] itemstatus = statusel.getAttribute('item') if itemstatus == 'unversioned': wcpath = rootwcpath.join(path, abs=1) rootstatus.unknown.append(wcpath) continue elif itemstatus == 'external': wcpath = rootwcpath.__class__( rootwcpath.localpath.join(path, abs=1), auth=rootwcpath.auth) rootstatus.external.append(wcpath) continue elif itemstatus == 'ignored': wcpath = rootwcpath.join(path, abs=1) rootstatus.ignored.append(wcpath) continue rev = statusel.getAttribute('revision') if itemstatus == 'added' or itemstatus == 'none': rev = '0' modrev = '?' author = '?' date = '' else: #print entryel.toxml() commitel = entryel.getElementsByTagName('commit')[0] if commitel: modrev = commitel.getAttribute('revision') author = '' for c in commitel.getElementsByTagName('author')[0]\ .childNodes: author += c.nodeValue date = '' for c in commitel.getElementsByTagName('date')[0]\ .childNodes: date += c.nodeValue wcpath = rootwcpath.join(path, abs=1) assert itemstatus != 'modified' or wcpath.check(file=1), ( 'did\'t expect a directory with changed content here') itemattrname = { 'normal': 'unchanged', 'unversioned': 'unknown', 'conflicted': 'conflict', 'none': 'added', }.get(itemstatus, itemstatus) attr = getattr(rootstatus, itemattrname) attr.append(wcpath) propsstatus = statusel.getAttribute('props') if propsstatus not in ('none', 'normal'): rootstatus.prop_modified.append(wcpath) if wcpath == rootwcpath: rootstatus.rev = rev rootstatus.modrev = modrev rootstatus.author = author rootstatus.date = date # handle repos-status element (remote info) rstatusels = entryel.getElementsByTagName('repos-status') if rstatusels: rstatusel = rstatusels[0] ritemstatus = rstatusel.getAttribute('item') if ritemstatus in ('added', 'modified'): rootstatus.update_available.append(wcpath) lockels = entryel.getElementsByTagName('lock') if len(lockels): rootstatus.locked.append(wcpath) return rootstatus fromstring = staticmethod(fromstring) class InfoSvnWCCommand: def __init__(self, output): # Path: test # URL: http://codespeak.net/svn/std.path/trunk/dist/std.path/test # Repository UUID: fd0d7bf2-dfb6-0310-8d31-b7ecfe96aada # Revision: 2151 # Node Kind: directory # Schedule: normal # Last Changed Author: hpk # Last Changed Rev: 2100 # Last Changed Date: 2003-10-27 20:43:14 +0100 (Mon, 27 Oct 2003) # Properties Last Updated: 2003-11-03 14:47:48 +0100 (Mon, 03 Nov 2003) d = {} for line in output.split('\n'): if not line.strip(): continue key, value = line.split(':', 1) key = key.lower().replace(' ', '') value = value.strip() d[key] = value try: self.url = d['url'] except KeyError: raise ValueError, "Not a versioned resource" #raise ValueError, "Not a versioned resource %r" % path self.kind = d['nodekind'] == 'directory' and 'dir' or d['nodekind'] self.rev = int(d['revision']) self.path = py.path.local(d['path']) self.size = self.path.size() if 'lastchangedrev' in d: self.created_rev = int(d['lastchangedrev']) if 'lastchangedauthor' in d: self.last_author = d['lastchangedauthor'] if 'lastchangeddate' in d: self.mtime = parse_wcinfotime(d['lastchangeddate']) self.time = self.mtime * 1000000 def __eq__(self, other): return self.__dict__ == other.__dict__ def parse_wcinfotime(timestr): """ Returns seconds since epoch, UTC. """ # example: 2003-10-27 20:43:14 +0100 (Mon, 27 Oct 2003) m = re.match(r'(\d+-\d+-\d+ \d+:\d+:\d+) ([+-]\d+) .*', timestr) if not m: raise ValueError, "timestring %r does not match" % timestr timestr, timezone = m.groups() # do not handle timezone specially, return value should be UTC parsedtime = time.strptime(timestr, "%Y-%m-%d %H:%M:%S") return calendar.timegm(parsedtime) def make_recursive_propdict(wcroot, output, rex = re.compile("Properties on '(.*)':")): """ Return a dictionary of path->PropListDict mappings. """ lines = filter(None, output.split('\n')) pdict = {} while lines: line = lines.pop(0) m = rex.match(line) if not m: raise ValueError, "could not parse propget-line: %r" % line path = m.groups()[0] wcpath = wcroot.join(path, abs=1) propnames = [] while lines and lines[0].startswith(' '): propname = lines.pop(0).strip() propnames.append(propname) assert propnames, "must have found properties!" pdict[wcpath] = svncommon.PropListDict(wcpath, propnames) return pdict def error_enhance((cls, error, tb)): raise cls, error, tb