""" module defining a subversion path object based on the external command 'svn'. """ import os, sys, time, re, calendar import py from py import path, process from py.__.path import common from py.__.path.svn import svncommon from py.__.misc.cache import BuildcostAccessCache, AgingCache DEBUG=False class SvnCommandPath(svncommon.SvnPathBase): """ path implementation that offers access to (possibly remote) subversion repositories. """ _lsrevcache = BuildcostAccessCache(maxentries=128) _lsnorevcache = AgingCache(maxentries=1000, maxseconds=60.0) def __new__(cls, path, rev=None): self = object.__new__(cls) if isinstance(path, cls): rev = path.rev path = path.strpath proto, uri = path.split("://", 1) host, uripath = uri.split('/', 1) # only check for bad chars in the non-protocol parts if (svncommon._check_for_bad_chars(host, svncommon.ALLOWED_CHARS_HOST) or svncommon._check_for_bad_chars(uripath, svncommon.ALLOWED_CHARS)): raise ValueError("bad char in path %s" % (path, )) path = path.rstrip('/') self.strpath = path self.rev = rev return self def __repr__(self): if self.rev == -1: return 'svnurl(%r)' % self.strpath else: return 'svnurl(%r, %r)' % (self.strpath, self.rev) def _svn(self, cmd, *args): if self.rev is None: return self._svnwrite(cmd, *args) else: args = ['-r', self.rev] + list(args) return self._svnwrite(cmd, *args) def _svnwrite(self, cmd, *args): l = ['svn %s' % cmd] args = ['"%s"' % self._escape(item) for item in args] l.extend(args) l.append('"%s"' % self._encodedurl()) # fixing the locale because we can't otherwise parse string = svncommon.fixlocale() + " ".join(l) if DEBUG: print "execing", string try: out = process.cmdexec(string) except py.process.cmdexec.Error, e: if (e.err.find('File Exists') != -1 or e.err.find('File already exists') != -1): raise py.error.EEXIST(self) raise return out def _encodedurl(self): return self._escape(self.strpath) def open(self, mode='r'): """ return an opened file with the given mode. """ assert 'w' not in mode and 'a' not in mode, "XXX not implemented for svn cmdline" assert self.check(file=1) # svn cat returns an empty file otherwise def popen(cmd): return os.popen(cmd) if self.rev is None: return popen(svncommon.fixlocale() + 'svn cat "%s"' % (self._escape(self.strpath), )) else: return popen(svncommon.fixlocale() + 'svn cat -r %s "%s"' % (self.rev, self._escape(self.strpath))) def dirpath(self, *args, **kwargs): """ return the directory path of the current path joined with any given path arguments. """ l = self.strpath.split(self.sep) if len(l) < 4: raise py.error.EINVAL(self, "base is not valid") elif len(l) == 4: return self.join(*args, **kwargs) else: return self.new(basename='').join(*args, **kwargs) # modifying methods (cache must be invalidated) def mkdir(self, *args, **kwargs): """ create & return the directory joined with args. You can provide a checkin message by giving a keyword argument 'msg'""" commit_msg=kwargs.get('msg', "mkdir by py lib invocation") createpath = self.join(*args) createpath._svnwrite('mkdir', '-m', commit_msg) self._lsnorevcache.delentry(createpath.dirpath().strpath) return createpath def copy(self, target, msg='copied by py lib invocation'): """ copy path to target with checkin message msg.""" if getattr(target, 'rev', None) is not None: raise py.error.EINVAL(target, "revisions are immutable") process.cmdexec('svn copy -m "%s" "%s" "%s"' %(msg, self._escape(self), self._escape(target))) self._lsnorevcache.delentry(target.dirpath().strpath) def rename(self, target, msg="renamed by py lib invocation"): """ rename this path to target with checkin message msg. """ if getattr(self, 'rev', None) is not None: raise py.error.EINVAL(self, "revisions are immutable") py.process.cmdexec('svn move -m "%s" --force "%s" "%s"' %( msg, self._escape(self), self._escape(target))) self._lsnorevcache.delentry(self.dirpath().strpath) self._lsnorevcache.delentry(self.strpath) def remove(self, rec=1, msg='removed by py lib invocation'): """ remove a file or directory (or a directory tree if rec=1) with checkin message msg.""" if self.rev is not None: raise py.error.EINVAL(self, "revisions are immutable") process.cmdexec('svn rm -m "%s" "%s"' %(msg, self._escape(self))) self._lsnorevcache.delentry(self.dirpath().strpath) def ensure(self, *args, **kwargs): """ ensure that an args-joined path exists (by default as a file). If you specify a keyword argument 'dir=True' then the path is forced to be a directory path. """ if getattr(self, 'rev', None) is not None: raise py.error.EINVAL(self, "revisions are immutable") target = self.join(*args) dir = kwargs.get('dir', 0) for x in target.parts(reverse=True): if x.check(): break else: raise py.error.ENOENT(target, "has not any valid base!") if x == target: if not x.check(dir=dir): raise dir and py.error.ENOTDIR(x) or py.error.EISDIR(x) return x tocreate = target.relto(x) basename = tocreate.split(self.sep, 1)[0] tempdir = py.path.local.mkdtemp() try: tempdir.ensure(tocreate, dir=dir) cmd = 'svn import -m "%s" "%s" "%s"' % ( "ensure %s" % self._escape(tocreate), self._escape(tempdir.join(basename)), x.join(basename)._encodedurl()) process.cmdexec(cmd) self._lsnorevcache.delentry(x.strpath) # !!! finally: tempdir.remove() return target # end of modifying methods def _propget(self, name): res = self._svn('propget', name) return res[:-1] # strip trailing newline def _proplist(self): res = self._svn('proplist') lines = res.split('\n') lines = map(str.strip, lines[1:]) return svncommon.PropListDict(self, lines) def _listdir_nameinfo(self): """ return sequence of name-info directory entries of self """ def builder(): try: res = self._svn('ls', '-v') except process.cmdexec.Error, e: if e.err.find('non-existent in that revision') != -1: raise py.error.ENOENT(self, e.err) elif e.err.find('File not found') != -1: raise py.error.ENOENT(self, e.err) elif e.err.find('not part of a repository')!=-1: raise py.error.ENOENT(self, e.err) elif e.err.find('Unable to open')!=-1: raise py.error.ENOENT(self, e.err) elif e.err.lower().find('method not allowed')!=-1: raise py.error.EACCES(self, e.err) raise py.error.Error(e.err) lines = res.split('\n') nameinfo_seq = [] for lsline in lines: if lsline: info = InfoSvnCommand(lsline) nameinfo_seq.append((info._name, info)) return nameinfo_seq if self.rev is not None: return self._lsrevcache.getorbuild((self.strpath, self.rev), builder) else: return self._lsnorevcache.getorbuild(self.strpath, builder) def log(self, rev_start=None, rev_end=1, verbose=False): """ return a list of LogEntry instances for this path. rev_start is the starting revision (defaulting to the first one). rev_end is the last revision (defaulting to HEAD). if verbose is True, then the LogEntry instances also know which files changed. """ assert self.check() #make it simpler for the pipe rev_start = rev_start is None and _Head or rev_start rev_end = rev_end is None and _Head or rev_end if rev_start is _Head and rev_end == 1: rev_opt = "" else: rev_opt = "-r %s:%s" % (rev_start, rev_end) verbose_opt = verbose and "-v" or "" xmlpipe = os.popen(svncommon.fixlocale() + 'svn log --xml %s %s "%s"' % (rev_opt, verbose_opt, self.strpath)) from xml.dom import minidom tree = minidom.parse(xmlpipe) result = [] for logentry in filter(None, tree.firstChild.childNodes): if logentry.nodeType == logentry.ELEMENT_NODE: result.append(LogEntry(logentry)) return result #01234567890123456789012345678901234567890123467 # 2256 hpk 165 Nov 24 17:55 __init__.py # XXX spotted by Guido, SVN 1.3.0 has different aligning, breaks the code!!! # 1312 johnny 1627 May 05 14:32 test_decorators.py # class InfoSvnCommand: # the '0?' part in the middle is an indication of whether the resource is # locked, see 'svn help ls' lspattern = re.compile( r'^ *(?P\d+) +(?P\S+) +(0? *(?P\d+))? ' '*(?P\w+ +\d{2} +[\d:]+) +(?P.*)$') def __init__(self, line): # this is a typical line from 'svn ls http://...' #_ 1127 jum 0 Jul 13 15:28 branch/ match = self.lspattern.match(line) data = match.groupdict() self._name = data['file'] if self._name[-1] == '/': self._name = self._name[:-1] self.kind = 'dir' else: self.kind = 'file' #self.has_props = l.pop(0) == 'P' self.created_rev = int(data['rev']) self.last_author = data['author'] self.size = data['size'] and int(data['size']) or 0 self.mtime = parse_time_with_missing_year(data['date']) self.time = self.mtime * 1000000 def __eq__(self, other): return self.__dict__ == other.__dict__ #____________________________________________________ # # helper functions #____________________________________________________ def parse_time_with_missing_year(timestr): """ analyze the time part from a single line of "svn ls -v" the svn output doesn't show the year makes the 'timestr' ambigous. """ t_now = time.gmtime() tparts = timestr.split() month = time.strptime(tparts.pop(0), '%b')[1] day = time.strptime(tparts.pop(0), '%d')[2] last = tparts.pop(0) # year or hour:minute try: year = time.strptime(last, '%Y')[0] hour = minute = 0 except ValueError: hour, minute = time.strptime(last, '%H:%M')[3:5] year = t_now[0] t_result = (year, month, day, hour, minute, 0,0,0,0) if t_result > t_now: year -= 1 t_result = (year, month, day, hour, minute, 0,0,0,0) return calendar.timegm(t_result) class PathEntry: def __init__(self, ppart): self.strpath = ppart.firstChild.nodeValue.encode('UTF-8') self.action = ppart.getAttribute('action').encode('UTF-8') if self.action == 'A': self.copyfrom_path = ppart.getAttribute('copyfrom-path').encode('UTF-8') if self.copyfrom_path: self.copyfrom_rev = int(ppart.getAttribute('copyfrom-rev')) class LogEntry: def __init__(self, logentry): self.rev = int(logentry.getAttribute('revision')) for lpart in filter(None, logentry.childNodes): if lpart.nodeType == lpart.ELEMENT_NODE: if lpart.nodeName == u'author': self.author = lpart.firstChild.nodeValue.encode('UTF-8') elif lpart.nodeName == u'msg': if lpart.firstChild: self.msg = lpart.firstChild.nodeValue.encode('UTF-8') else: self.msg = '' elif lpart.nodeName == u'date': #2003-07-29T20:05:11.598637Z timestr = lpart.firstChild.nodeValue.encode('UTF-8') self.date = svncommon.parse_apr_time(timestr) elif lpart.nodeName == u'paths': self.strpaths = [] for ppart in filter(None, lpart.childNodes): if ppart.nodeType == ppart.ELEMENT_NODE: self.strpaths.append(PathEntry(ppart)) def __repr__(self): return '' % ( self.rev, self.author, self.date) _Head = "HEAD"