""" svn-Command based Implementation of a Subversion WorkingCopy Path. SvnWCCommandPath is the main class. """ import os, sys, time, re, calendar import py import subprocess from py.__.path import common #----------------------------------------------------------- # Caching latest repository revision and repo-paths # (getting them is slow with the current implementations) # # XXX make mt-safe #----------------------------------------------------------- class cache: proplist = {} info = {} entries = {} prop = {} class RepoEntry: def __init__(self, url, rev, timestamp): self.url = url self.rev = rev self.timestamp = timestamp def __str__(self): return "repo: %s;%s %s" %(self.url, self.rev, self.timestamp) class RepoCache: """ The Repocache manages discovered repository paths and their revisions. If inside a timeout the cache will even return the revision of the root. """ timeout = 20 # seconds after which we forget that we know the last revision def __init__(self): self.repos = [] def clear(self): self.repos = [] def put(self, url, rev, timestamp=None): if rev is None: return if timestamp is None: timestamp = time.time() for entry in self.repos: if url == entry.url: entry.timestamp = timestamp entry.rev = rev #print "set repo", entry break else: entry = RepoEntry(url, rev, timestamp) self.repos.append(entry) #print "appended repo", entry def get(self, url): now = time.time() for entry in self.repos: if url.startswith(entry.url): if now < entry.timestamp + self.timeout: #print "returning immediate Etrny", entry return entry.url, entry.rev return entry.url, -1 return url, -1 repositories = RepoCache() # svn support code ALLOWED_CHARS = "_ -/\\=$.~+" #add characters as necessary when tested if sys.platform == "win32": ALLOWED_CHARS += ":" ALLOWED_CHARS_HOST = ALLOWED_CHARS + '@:' def _getsvnversion(ver=[]): try: return ver[0] except IndexError: v = py.process.cmdexec("svn -q --version") v.strip() v = '.'.join(v.split('.')[:2]) ver.append(v) return v def _escape_helper(text): text = str(text) if py.std.sys.platform != 'win32': text = str(text).replace('$', '\\$') return text def _check_for_bad_chars(text, allowed_chars=ALLOWED_CHARS): for c in str(text): if c.isalnum(): continue if c in allowed_chars: continue return True return False def checkbadchars(url): # (hpk) not quite sure about the exact purpose, guido w.? proto, uri = url.split("://", 1) if proto != "file": host, uripath = uri.split('/', 1) # only check for bad chars in the non-protocol parts if (_check_for_bad_chars(host, ALLOWED_CHARS_HOST) \ or _check_for_bad_chars(uripath, ALLOWED_CHARS)): raise ValueError("bad char in %r" % (url, )) #_______________________________________________________________ class SvnPathBase(common.PathBase): """ Base implementation for SvnPath implementations. """ sep = '/' def _geturl(self): return self.strpath url = property(_geturl, None, None, "url of this svn-path.") def __str__(self): """ return a string representation (including rev-number) """ return self.strpath def __hash__(self): return hash(self.strpath) def new(self, **kw): """ create a modified version of this path. A 'rev' argument indicates a new revision. the following keyword arguments modify various path parts: http://host.com/repo/path/file.ext |-----------------------| dirname |------| basename |--| purebasename |--| ext """ obj = object.__new__(self.__class__) obj.rev = kw.get('rev', self.rev) obj.auth = kw.get('auth', self.auth) dirname, basename, purebasename, ext = self._getbyspec( "dirname,basename,purebasename,ext") if 'basename' in kw: if 'purebasename' in kw or 'ext' in kw: raise ValueError("invalid specification %r" % kw) else: pb = kw.setdefault('purebasename', purebasename) ext = kw.setdefault('ext', ext) if ext and not ext.startswith('.'): ext = '.' + ext kw['basename'] = pb + ext kw.setdefault('dirname', dirname) kw.setdefault('sep', self.sep) if kw['basename']: obj.strpath = "%(dirname)s%(sep)s%(basename)s" % kw else: obj.strpath = "%(dirname)s" % kw return obj def _getbyspec(self, spec): """ get specified parts of the path. 'arg' is a string with comma separated path parts. The parts are returned in exactly the order of the specification. you may specify the following parts: http://host.com/repo/path/file.ext |-----------------------| dirname |------| basename |--| purebasename |--| ext """ res = [] parts = self.strpath.split(self.sep) for name in spec.split(','): name = name.strip() if name == 'dirname': res.append(self.sep.join(parts[:-1])) elif name == 'basename': res.append(parts[-1]) else: basename = parts[-1] i = basename.rfind('.') if i == -1: purebasename, ext = basename, '' else: purebasename, ext = basename[:i], basename[i:] if name == 'purebasename': res.append(purebasename) elif name == 'ext': res.append(ext) else: raise NameError("Don't know part %r" % name) return res def __eq__(self, other): """ return true if path and rev attributes each match """ return (str(self) == str(other) and (self.rev == other.rev or self.rev == other.rev)) def __ne__(self, other): return not self == other def join(self, *args): """ return a new Path (with the same revision) which is composed of the self Path followed by 'args' path components. """ if not args: return self args = tuple([arg.strip(self.sep) for arg in args]) parts = (self.strpath, ) + args newpath = self.__class__(self.sep.join(parts), self.rev, self.auth) return newpath def propget(self, name): """ return the content of the given property. """ value = self._propget(name) return value def proplist(self): """ list all property names. """ content = self._proplist() return content def info(self): """ return an Info structure with svn-provided information. """ parent = self.dirpath() nameinfo_seq = parent._listdir_nameinfo() bn = self.basename for name, info in nameinfo_seq: if name == bn: return info raise py.error.ENOENT(self) def size(self): """ Return the size of the file content of the Path. """ return self.info().size def mtime(self): """ Return the last modification time of the file. """ return self.info().mtime # shared help methods def _escape(self, cmd): return _escape_helper(cmd) #def _childmaxrev(self): # """ return maximum revision number of childs (or self.rev if no childs) """ # rev = self.rev # for name, info in self._listdir_nameinfo(): # rev = max(rev, info.created_rev) # return rev #def _getlatestrevision(self): # """ return latest repo-revision for this path. """ # url = self.strpath # path = self.__class__(url, None) # # # we need a long walk to find the root-repo and revision # while 1: # try: # rev = max(rev, path._childmaxrev()) # previous = path # path = path.dirpath() # except (IOError, process.cmdexec.Error): # break # if rev is None: # raise IOError, "could not determine newest repo revision for %s" % self # return rev class Checkers(common.Checkers): def dir(self): try: return self.path.info().kind == 'dir' except py.error.Error: return self._listdirworks() def _listdirworks(self): try: self.path.listdir() except py.error.ENOENT: return False else: return True def file(self): try: return self.path.info().kind == 'file' except py.error.ENOENT: return False def exists(self): try: return self.path.info() except py.error.ENOENT: return self._listdirworks() def parse_apr_time(timestr): i = timestr.rfind('.') if i == -1: raise ValueError("could not parse %s" % timestr) timestr = timestr[:i] parsedtime = time.strptime(timestr, "%Y-%m-%dT%H:%M:%S") return time.mktime(parsedtime) class PropListDict(dict): """ a Dictionary which fetches values (InfoSvnCommand instances) lazily""" def __init__(self, path, keynames): dict.__init__(self, [(x, None) for x in keynames]) self.path = path def __getitem__(self, key): value = dict.__getitem__(self, key) if value is None: value = self.path.propget(key) dict.__setitem__(self, key, value) return value def fixlocale(): if sys.platform != 'win32': return 'LC_ALL=C ' return '' # some nasty chunk of code to solve path and url conversion and quoting issues ILLEGAL_CHARS = '* | \ / : < > ? \t \n \x0b \x0c \r'.split(' ') if os.sep in ILLEGAL_CHARS: ILLEGAL_CHARS.remove(os.sep) ISWINDOWS = sys.platform == 'win32' _reg_allow_disk = re.compile(r'^([a-z]\:\\)?[^:]+$', re.I) def _check_path(path): illegal = ILLEGAL_CHARS[:] sp = path.strpath if ISWINDOWS: illegal.remove(':') if not _reg_allow_disk.match(sp): raise ValueError('path may not contain a colon (:)') for char in sp: if char not in string.printable or char in illegal: raise ValueError('illegal character %r in path' % (char,)) def path_to_fspath(path, addat=True): _check_path(path) sp = path.strpath if addat and path.rev != -1: sp = '%s@%s' % (sp, path.rev) elif addat: sp = '%s@HEAD' % (sp,) return sp def url_from_path(path): fspath = path_to_fspath(path, False) quote = py.std.urllib.quote if ISWINDOWS: match = _reg_allow_disk.match(fspath) fspath = fspath.replace('\\', '/') if match.group(1): fspath = '/%s%s' % (match.group(1).replace('\\', '/'), quote(fspath[len(match.group(1)):])) else: fspath = quote(fspath) else: fspath = quote(fspath) if path.rev != -1: fspath = '%s@%s' % (fspath, path.rev) else: fspath = '%s@HEAD' % (fspath,) return 'file://%s' % (fspath,) class SvnAuth(object): """ container for auth information for Subversion """ def __init__(self, username, password, cache_auth=True, interactive=True): self.username = username self.password = password self.cache_auth = cache_auth self.interactive = interactive def makecmdoptions(self): uname = self.username.replace('"', '\\"') passwd = self.password.replace('"', '\\"') ret = [] if uname: ret.append('--username="%s"' % (uname,)) if passwd: ret.append('--password="%s"' % (passwd,)) if not self.cache_auth: ret.append('--no-auth-cache') if not self.interactive: ret.append('--non-interactive') return ' '.join(ret) def __str__(self): return "" %(self.username,) rex_blame = re.compile(r'\s*(\d+)\s*(\S+) (.*)') class SvnWCCommandPath(common.PathBase): """ path implementation offering access/modification to svn working copies. It has methods similar to the functions in os.path and similar to the commands of the svn client. """ sep = os.sep def __new__(cls, wcpath=None, auth=None): self = object.__new__(cls) if isinstance(wcpath, cls): if wcpath.__class__ == cls: return wcpath wcpath = wcpath.localpath if _check_for_bad_chars(str(wcpath), ALLOWED_CHARS): raise ValueError("bad char in wcpath %s" % (wcpath, )) self.localpath = py.path.local(wcpath) self.auth = auth return self strpath = property(lambda x: str(x.localpath), None, None, "string path") def __eq__(self, other): return self.localpath == getattr(other, 'localpath', None) def _geturl(self): if getattr(self, '_url', None) is None: info = self.info() self._url = info.url #SvnPath(info.url, info.rev) assert isinstance(self._url, py.builtin._basestring) return self._url url = property(_geturl, None, None, "url of this WC item") def _escape(self, cmd): return _escape_helper(cmd) def dump(self, obj): """ pickle object into path location""" return self.localpath.dump(obj) def svnurl(self): """ return current SvnPath for this WC-item. """ info = self.info() return py.path.svnurl(info.url) def __repr__(self): return "svnwc(%r)" % (self.strpath) # , self._url) def __str__(self): return str(self.localpath) def _makeauthoptions(self): if self.auth is None: return '' return self.auth.makecmdoptions() def _authsvn(self, cmd, args=None): args = args and list(args) or [] args.append(self._makeauthoptions()) return self._svn(cmd, *args) def _svn(self, cmd, *args): l = ['svn %s' % cmd] args = [self._escape(item) for item in args] l.extend(args) l.append('"%s"' % self._escape(self.strpath)) # try fixing the locale because we can't otherwise parse string = fixlocale() + " ".join(l) try: try: key = 'LC_MESSAGES' hold = os.environ.get(key) os.environ[key] = 'C' out = py.process.cmdexec(string) finally: if hold: os.environ[key] = hold else: del os.environ[key] except py.process.cmdexec.Error: e = sys.exc_info()[1] strerr = e.err.lower() if strerr.find('file not found') != -1: raise py.error.ENOENT(self) if (strerr.find('file exists') != -1 or strerr.find('file already exists') != -1 or strerr.find("can't create directory") != -1): raise py.error.EEXIST(self) raise return out def switch(self, url): """ switch to given URL. """ self._authsvn('switch', [url]) def checkout(self, url=None, rev=None): """ checkout from url to local wcpath. """ args = [] if url is None: url = self.url if rev is None or rev == -1: if (py.std.sys.platform != 'win32' and _getsvnversion() == '1.3'): url += "@HEAD" else: if _getsvnversion() == '1.3': url += "@%d" % rev else: args.append('-r' + str(rev)) args.append(url) self._authsvn('co', args) def update(self, rev='HEAD'): """ update working copy item to given revision. (None -> HEAD). """ self._authsvn('up', ['-r', rev, "--non-interactive"],) def write(self, content, mode='w'): """ write content into local filesystem wc. """ self.localpath.write(content, mode) def dirpath(self, *args): """ return the directory Path of the current Path. """ return self.__class__(self.localpath.dirpath(*args), auth=self.auth) def _ensuredirs(self): parent = self.dirpath() if parent.check(dir=0): parent._ensuredirs() if self.check(dir=0): self.mkdir() return self def ensure(self, *args, **kwargs): """ ensure that an args-joined path exists (by default as a file). if you specify a keyword argument 'directory=True' then the path is forced to be a directory path. """ p = self.join(*args) if p.check(): if p.check(versioned=False): p.add() return p if kwargs.get('dir', 0): return p._ensuredirs() parent = p.dirpath() parent._ensuredirs() p.write("") p.add() return p def mkdir(self, *args): """ create & return the directory joined with args. """ if args: return self.join(*args).mkdir() else: self._svn('mkdir') return self def add(self): """ add ourself to svn """ self._svn('add') def remove(self, rec=1, force=1): """ remove a file or a directory tree. 'rec'ursive is ignored and considered always true (because of underlying svn semantics. """ assert rec, "svn cannot remove non-recursively" if not self.check(versioned=True): # not added to svn (anymore?), just remove py.path.local(self).remove() return flags = [] if force: flags.append('--force') self._svn('remove', *flags) def copy(self, target): """ copy path to target.""" py.process.cmdexec("svn copy %s %s" %(str(self), str(target))) def rename(self, target): """ rename this path to target. """ py.process.cmdexec("svn move --force %s %s" %(str(self), str(target))) def lock(self): """ set a lock (exclusive) on the resource """ out = self._authsvn('lock').strip() if not out: # warning or error, raise exception raise Exception(out[4:]) def unlock(self): """ unset a previously set lock """ out = self._authsvn('unlock').strip() if out.startswith('svn:'): # warning or error, raise exception raise Exception(out[4:]) def cleanup(self): """ remove any locks from the resource """ # XXX should be fixed properly!!! try: self.unlock() except: pass def status(self, updates=0, rec=0, externals=0): """ return (collective) Status object for this file. """ # http://svnbook.red-bean.com/book.html#svn-ch-3-sect-4.3.1 # 2201 2192 jum test # XXX if externals: raise ValueError("XXX cannot perform status() " "on external items yet") else: #1.2 supports: externals = '--ignore-externals' externals = '' if rec: rec= '' else: rec = '--non-recursive' # XXX does not work on all subversion versions #if not externals: # externals = '--ignore-externals' if updates: updates = '-u' else: updates = '' try: cmd = 'status -v --xml --no-ignore %s %s %s' % ( updates, rec, externals) out = self._authsvn(cmd) except py.process.cmdexec.Error: cmd = 'status -v --no-ignore %s %s %s' % ( updates, rec, externals) out = self._authsvn(cmd) rootstatus = WCStatus(self).fromstring(out, self) else: rootstatus = XMLWCStatus(self).fromstring(out, self) return rootstatus def diff(self, rev=None): """ return a diff of the current path against revision rev (defaulting to the last one). """ args = [] if rev is not None: args.append("-r %d" % rev) out = self._authsvn('diff', args) return out def blame(self): """ return a list of tuples of three elements: (revision, commiter, line) """ out = self._svn('blame') result = [] blamelines = out.splitlines() reallines = py.path.svnurl(self.url).readlines() for i, (blameline, line) in enumerate( zip(blamelines, reallines)): m = rex_blame.match(blameline) if not m: raise ValueError("output line %r of svn blame does not match " "expected format" % (line, )) rev, name, _ = m.groups() result.append((int(rev), name, line)) return result _rex_commit = re.compile(r'.*Committed revision (\d+)\.$', re.DOTALL) def commit(self, msg='', rec=1): """ commit with support for non-recursive commits """ # XXX i guess escaping should be done better here?!? cmd = 'commit -m "%s" --force-log' % (msg.replace('"', '\\"'),) if not rec: cmd += ' -N' out = self._authsvn(cmd) try: del cache.info[self] except KeyError: pass if out: m = self._rex_commit.match(out) return int(m.group(1)) def propset(self, name, value, *args): """ set property name to value on this path. """ d = py.path.local.mkdtemp() try: p = d.join('value') p.write(value) self._svn('propset', name, '--file', str(p), *args) finally: d.remove() def propget(self, name): """ get property name on this path. """ res = self._svn('propget', name) return res[:-1] # strip trailing newline def propdel(self, name): """ delete property name on this path. """ res = self._svn('propdel', name) return res[:-1] # strip trailing newline def proplist(self, rec=0): """ return a mapping of property names to property values. If rec is True, then return a dictionary mapping sub-paths to such mappings. """ if rec: res = self._svn('proplist -R') return make_recursive_propdict(self, res) else: res = self._svn('proplist') lines = res.split('\n') lines = [x.strip() for x in lines[1:]] return PropListDict(self, lines) def revert(self, rec=0): """ revert the local changes of this path. if rec is True, do so recursively. """ if rec: result = self._svn('revert -R') else: result = self._svn('revert') return result def new(self, **kw): """ create a modified version of this path. A 'rev' argument indicates a new revision. the following keyword arguments modify various path parts: http://host.com/repo/path/file.ext |-----------------------| dirname |------| basename |--| purebasename |--| ext """ if kw: localpath = self.localpath.new(**kw) else: localpath = self.localpath return self.__class__(localpath, auth=self.auth) def join(self, *args, **kwargs): """ return a new Path (with the same revision) which is composed of the self Path followed by 'args' path components. """ if not args: return self localpath = self.localpath.join(*args, **kwargs) return self.__class__(localpath, auth=self.auth) def info(self, usecache=1): """ return an Info structure with svn-provided information. """ info = usecache and cache.info.get(self) if not info: try: output = self._svn('info') except py.process.cmdexec.Error: e = sys.exc_info()[1] if e.err.find('Path is not a working copy directory') != -1: raise py.error.ENOENT(self, e.err) elif e.err.find("is not under version control") != -1: raise py.error.ENOENT(self, e.err) raise # XXX SVN 1.3 has output on stderr instead of stdout (while it does # return 0!), so a bit nasty, but we assume no output is output # to stderr... if (output.strip() == '' or output.lower().find('not a versioned resource') != -1): raise py.error.ENOENT(self, output) info = InfoSvnWCCommand(output) # Can't reliably compare on Windows without access to win32api if py.std.sys.platform != 'win32': if info.path != self.localpath: raise py.error.ENOENT(self, "not a versioned resource:" + " %s != %s" % (info.path, self.localpath)) cache.info[self] = info self.rev = info.rev return info def listdir(self, fil=None, sort=None): """ return a sequence of Paths. listdir will return either a tuple or a list of paths depending on implementation choices. """ if isinstance(fil, str): fil = common.FNMatcher(fil) # XXX unify argument naming with LocalPath.listdir def notsvn(path): return path.basename != '.svn' paths = [self.__class__(p, auth=self.auth) for p in self.localpath.listdir() if notsvn(p) and (not fil or fil(p))] self._sortlist(paths, sort) return paths def open(self, mode='r'): """ return an opened file with the given mode. """ return open(self.strpath, mode) def _getbyspec(self, spec): return self.localpath._getbyspec(spec) class Checkers(py.path.local.Checkers): def __init__(self, path): self.svnwcpath = path self.path = path.localpath def versioned(self): try: s = self.svnwcpath.info() except (py.error.ENOENT, py.error.EEXIST): return False except py.process.cmdexec.Error: e = sys.exc_info()[1] if e.err.find('is not a working copy')!=-1: return False if e.err.lower().find('not a versioned resource') != -1: return False raise else: return True def log(self, rev_start=None, rev_end=1, verbose=False): """ return a list of LogEntry instances for this path. rev_start is the starting revision (defaulting to the first one). rev_end is the last revision (defaulting to HEAD). if verbose is True, then the LogEntry instances also know which files changed. """ assert self.check() # make it simpler for the pipe rev_start = rev_start is None and "HEAD" or rev_start rev_end = rev_end is None and "HEAD" or rev_end if rev_start == "HEAD" and rev_end == 1: rev_opt = "" else: rev_opt = "-r %s:%s" % (rev_start, rev_end) verbose_opt = verbose and "-v" or "" locale_env = fixlocale() # some blather on stderr auth_opt = self._makeauthoptions() #stdin, stdout, stderr = os.popen3(locale_env + # 'svn log --xml %s %s %s "%s"' % ( # rev_opt, verbose_opt, auth_opt, # self.strpath)) cmd = locale_env + 'svn log --xml %s %s %s "%s"' % ( rev_opt, verbose_opt, auth_opt, self.strpath) popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, ) stdout, stderr = popen.communicate() stdout = py.builtin._totext(stdout, sys.getdefaultencoding()) minidom,ExpatError = importxml() try: tree = minidom.parseString(stdout) except ExpatError: raise ValueError('no such revision') result = [] for logentry in filter(None, tree.firstChild.childNodes): if logentry.nodeType == logentry.ELEMENT_NODE: result.append(LogEntry(logentry)) return result def size(self): """ Return the size of the file content of the Path. """ return self.info().size def mtime(self): """ Return the last modification time of the file. """ return self.info().mtime def __hash__(self): return hash((self.strpath, self.__class__, self.auth)) class WCStatus: attrnames = ('modified','added', 'conflict', 'unchanged', 'external', 'deleted', 'prop_modified', 'unknown', 'update_available', 'incomplete', 'kindmismatch', 'ignored', 'locked', 'replaced' ) def __init__(self, wcpath, rev=None, modrev=None, author=None): self.wcpath = wcpath self.rev = rev self.modrev = modrev self.author = author for name in self.attrnames: setattr(self, name, []) def allpath(self, sort=True, **kw): d = {} for name in self.attrnames: if name not in kw or kw[name]: for path in getattr(self, name): d[path] = 1 l = d.keys() if sort: l.sort() return l # XXX a bit scary to assume there's always 2 spaces between username and # path, however with win32 allowing spaces in user names there doesn't # seem to be a more solid approach :( _rex_status = re.compile(r'\s+(\d+|-)\s+(\S+)\s+(.+?)\s{2,}(.*)') def fromstring(data, rootwcpath, rev=None, modrev=None, author=None): """ return a new WCStatus object from data 's' """ rootstatus = WCStatus(rootwcpath, rev, modrev, author) update_rev = None for line in data.split('\n'): if not line.strip(): continue #print "processing %r" % line flags, rest = line[:8], line[8:] # first column c0,c1,c2,c3,c4,c5,x6,c7 = flags #if '*' in line: # print "flags", repr(flags), "rest", repr(rest) if c0 in '?XI': fn = line.split(None, 1)[1] if c0 == '?': wcpath = rootwcpath.join(fn, abs=1) rootstatus.unknown.append(wcpath) elif c0 == 'X': wcpath = rootwcpath.__class__( rootwcpath.localpath.join(fn, abs=1), auth=rootwcpath.auth) rootstatus.external.append(wcpath) elif c0 == 'I': wcpath = rootwcpath.join(fn, abs=1) rootstatus.ignored.append(wcpath) continue #elif c0 in '~!' or c4 == 'S': # raise NotImplementedError("received flag %r" % c0) m = WCStatus._rex_status.match(rest) if not m: if c7 == '*': fn = rest.strip() wcpath = rootwcpath.join(fn, abs=1) rootstatus.update_available.append(wcpath) continue if line.lower().find('against revision:')!=-1: update_rev = int(rest.split(':')[1].strip()) continue if line.lower().find('status on external') > -1: # XXX not sure what to do here... perhaps we want to # store some state instead of just continuing, as right # now it makes the top-level external get added twice # (once as external, once as 'normal' unchanged item) # because of the way SVN presents external items continue # keep trying raise ValueError("could not parse line %r" % line) else: rev, modrev, author, fn = m.groups() wcpath = rootwcpath.join(fn, abs=1) #assert wcpath.check() if c0 == 'M': assert wcpath.check(file=1), "didn't expect a directory with changed content here" rootstatus.modified.append(wcpath) elif c0 == 'A' or c3 == '+' : rootstatus.added.append(wcpath) elif c0 == 'D': rootstatus.deleted.append(wcpath) elif c0 == 'C': rootstatus.conflict.append(wcpath) elif c0 == '~': rootstatus.kindmismatch.append(wcpath) elif c0 == '!': rootstatus.incomplete.append(wcpath) elif c0 == 'R': rootstatus.replaced.append(wcpath) elif not c0.strip(): rootstatus.unchanged.append(wcpath) else: raise NotImplementedError("received flag %r" % c0) if c1 == 'M': rootstatus.prop_modified.append(wcpath) # XXX do we cover all client versions here? if c2 == 'L' or c5 == 'K': rootstatus.locked.append(wcpath) if c7 == '*': rootstatus.update_available.append(wcpath) if wcpath == rootwcpath: rootstatus.rev = rev rootstatus.modrev = modrev rootstatus.author = author if update_rev: rootstatus.update_rev = update_rev continue return rootstatus fromstring = staticmethod(fromstring) class XMLWCStatus(WCStatus): def fromstring(data, rootwcpath, rev=None, modrev=None, author=None): """ parse 'data' (XML string as outputted by svn st) into a status obj """ # XXX for externals, the path is shown twice: once # with external information, and once with full info as if # the item was a normal non-external... the current way of # dealing with this issue is by ignoring it - this does make # externals appear as external items as well as 'normal', # unchanged ones in the status object so this is far from ideal rootstatus = WCStatus(rootwcpath, rev, modrev, author) update_rev = None minidom, ExpatError = importxml() try: doc = minidom.parseString(data) except ExpatError: e = sys.exc_info()[1] raise ValueError(str(e)) urevels = doc.getElementsByTagName('against') if urevels: rootstatus.update_rev = urevels[-1].getAttribute('revision') for entryel in doc.getElementsByTagName('entry'): path = entryel.getAttribute('path') statusel = entryel.getElementsByTagName('wc-status')[0] itemstatus = statusel.getAttribute('item') if itemstatus == 'unversioned': wcpath = rootwcpath.join(path, abs=1) rootstatus.unknown.append(wcpath) continue elif itemstatus == 'external': wcpath = rootwcpath.__class__( rootwcpath.localpath.join(path, abs=1), auth=rootwcpath.auth) rootstatus.external.append(wcpath) continue elif itemstatus == 'ignored': wcpath = rootwcpath.join(path, abs=1) rootstatus.ignored.append(wcpath) continue elif itemstatus == 'incomplete': wcpath = rootwcpath.join(path, abs=1) rootstatus.incomplete.append(wcpath) continue rev = statusel.getAttribute('revision') if itemstatus == 'added' or itemstatus == 'none': rev = '0' modrev = '?' author = '?' date = '' else: #print entryel.toxml() commitel = entryel.getElementsByTagName('commit')[0] if commitel: modrev = commitel.getAttribute('revision') author = '' author_els = commitel.getElementsByTagName('author') if author_els: for c in author_els[0].childNodes: author += c.nodeValue date = '' for c in commitel.getElementsByTagName('date')[0]\ .childNodes: date += c.nodeValue wcpath = rootwcpath.join(path, abs=1) assert itemstatus != 'modified' or wcpath.check(file=1), ( 'did\'t expect a directory with changed content here') itemattrname = { 'normal': 'unchanged', 'unversioned': 'unknown', 'conflicted': 'conflict', 'none': 'added', }.get(itemstatus, itemstatus) attr = getattr(rootstatus, itemattrname) attr.append(wcpath) propsstatus = statusel.getAttribute('props') if propsstatus not in ('none', 'normal'): rootstatus.prop_modified.append(wcpath) if wcpath == rootwcpath: rootstatus.rev = rev rootstatus.modrev = modrev rootstatus.author = author rootstatus.date = date # handle repos-status element (remote info) rstatusels = entryel.getElementsByTagName('repos-status') if rstatusels: rstatusel = rstatusels[0] ritemstatus = rstatusel.getAttribute('item') if ritemstatus in ('added', 'modified'): rootstatus.update_available.append(wcpath) lockels = entryel.getElementsByTagName('lock') if len(lockels): rootstatus.locked.append(wcpath) return rootstatus fromstring = staticmethod(fromstring) class InfoSvnWCCommand: def __init__(self, output): # Path: test # URL: http://codespeak.net/svn/std.path/trunk/dist/std.path/test # Repository UUID: fd0d7bf2-dfb6-0310-8d31-b7ecfe96aada # Revision: 2151 # Node Kind: directory # Schedule: normal # Last Changed Author: hpk # Last Changed Rev: 2100 # Last Changed Date: 2003-10-27 20:43:14 +0100 (Mon, 27 Oct 2003) # Properties Last Updated: 2003-11-03 14:47:48 +0100 (Mon, 03 Nov 2003) d = {} for line in output.split('\n'): if not line.strip(): continue key, value = line.split(':', 1) key = key.lower().replace(' ', '') value = value.strip() d[key] = value try: self.url = d['url'] except KeyError: raise ValueError("Not a versioned resource") #raise ValueError, "Not a versioned resource %r" % path self.kind = d['nodekind'] == 'directory' and 'dir' or d['nodekind'] self.rev = int(d['revision']) self.path = py.path.local(d['path']) self.size = self.path.size() if 'lastchangedrev' in d: self.created_rev = int(d['lastchangedrev']) if 'lastchangedauthor' in d: self.last_author = d['lastchangedauthor'] if 'lastchangeddate' in d: self.mtime = parse_wcinfotime(d['lastchangeddate']) self.time = self.mtime * 1000000 def __eq__(self, other): return self.__dict__ == other.__dict__ def parse_wcinfotime(timestr): """ Returns seconds since epoch, UTC. """ # example: 2003-10-27 20:43:14 +0100 (Mon, 27 Oct 2003) m = re.match(r'(\d+-\d+-\d+ \d+:\d+:\d+) ([+-]\d+) .*', timestr) if not m: raise ValueError("timestring %r does not match" % timestr) timestr, timezone = m.groups() # do not handle timezone specially, return value should be UTC parsedtime = time.strptime(timestr, "%Y-%m-%d %H:%M:%S") return calendar.timegm(parsedtime) def make_recursive_propdict(wcroot, output, rex = re.compile("Properties on '(.*)':")): """ Return a dictionary of path->PropListDict mappings. """ lines = [x for x in output.split('\n') if x] pdict = {} while lines: line = lines.pop(0) m = rex.match(line) if not m: raise ValueError("could not parse propget-line: %r" % line) path = m.groups()[0] wcpath = wcroot.join(path, abs=1) propnames = [] while lines and lines[0].startswith(' '): propname = lines.pop(0).strip() propnames.append(propname) assert propnames, "must have found properties!" pdict[wcpath] = PropListDict(wcpath, propnames) return pdict def importxml(cache=[]): if cache: return cache from xml.dom import minidom from xml.parsers.expat import ExpatError cache.extend([minidom, ExpatError]) return cache class LogEntry: def __init__(self, logentry): self.rev = int(logentry.getAttribute('revision')) for lpart in filter(None, logentry.childNodes): if lpart.nodeType == lpart.ELEMENT_NODE: if lpart.nodeName == 'author': self.author = lpart.firstChild.nodeValue elif lpart.nodeName == 'msg': if lpart.firstChild: self.msg = lpart.firstChild.nodeValue else: self.msg = '' elif lpart.nodeName == 'date': #2003-07-29T20:05:11.598637Z timestr = lpart.firstChild.nodeValue self.date = parse_apr_time(timestr) elif lpart.nodeName == 'paths': self.strpaths = [] for ppart in filter(None, lpart.childNodes): if ppart.nodeType == ppart.ELEMENT_NODE: self.strpaths.append(PathEntry(ppart)) def __repr__(self): return '' % ( self.rev, self.author, self.date)