[svn r56835] Refactored the wcpath.status() method, moved the parsing of the regexp to the

WCStatus class.

--HG--
branch : trunk
This commit is contained in:
guido 2008-07-29 12:07:41 +02:00
parent 53c3f58a33
commit 4549e188da
2 changed files with 107 additions and 87 deletions

View File

@ -177,6 +177,19 @@ class TestLocalPath(LocalSetup, CommonFSTests):
assert l3.strpath == wc.strpath
assert not hasattr(l3, 'commit')
def test_long_filenames(self):
tmpdir = self.tmpdir
# testing paths > 260 chars (which is Windows' limitation, but
# depending on how the paths are used), but > 4096 (which is the
# Linux' limitation) - the behaviour of paths with names > 4096 chars
# is undetermined
newfilename = '/test' * 60
l = tmpdir.join(newfilename)
l.ensure(file=True)
l.write('foo')
l2 = tmpdir.join(newfilename)
assert l2.read() == 'foo'
class TestExecutionOnWindows(LocalSetup):
disabled = py.std.sys.platform != 'win32'

View File

@ -208,11 +208,6 @@ class SvnWCCommandPath(common.FSPathBase):
""" rename this path to target. """
py.process.cmdexec("svn move --force %s %s" %(str(self), str(target)))
# XXX a bit scary to assume there's always 2 spaces between username and
# path, however with win32 allowing spaces in user names there doesn't
# seem to be a more solid approach :(
_rex_status = re.compile(r'\s+(\d+|-)\s+(\S+)\s+(.+?)\s{2,}(.*)')
def lock(self):
""" set a lock (exclusive) on the resource """
out = self._authsvn('lock').strip()
@ -260,90 +255,9 @@ class SvnWCCommandPath(common.FSPathBase):
else:
updates = ''
update_rev = None
cmd = 'status -v %s %s %s' % (updates, rec, externals)
out = self._authsvn(cmd)
rootstatus = WCStatus(self)
for line in out.split('\n'):
if not line.strip():
continue
#print "processing %r" % line
flags, rest = line[:8], line[8:]
# first column
c0,c1,c2,c3,c4,c5,x6,c7 = flags
#if '*' in line:
# print "flags", repr(flags), "rest", repr(rest)
if c0 in '?XI':
fn = line.split(None, 1)[1]
if c0 == '?':
wcpath = self.join(fn, abs=1)
rootstatus.unknown.append(wcpath)
elif c0 == 'X':
wcpath = self.__class__(self.localpath.join(fn, abs=1),
auth=self.auth)
rootstatus.external.append(wcpath)
elif c0 == 'I':
wcpath = self.join(fn, abs=1)
rootstatus.ignored.append(wcpath)
continue
#elif c0 in '~!' or c4 == 'S':
# raise NotImplementedError("received flag %r" % c0)
m = self._rex_status.match(rest)
if not m:
if c7 == '*':
fn = rest.strip()
wcpath = self.join(fn, abs=1)
rootstatus.update_available.append(wcpath)
continue
if line.lower().find('against revision:')!=-1:
update_rev = int(rest.split(':')[1].strip())
continue
# keep trying
raise ValueError, "could not parse line %r" % line
else:
rev, modrev, author, fn = m.groups()
wcpath = self.join(fn, abs=1)
#assert wcpath.check()
if c0 == 'M':
assert wcpath.check(file=1), "didn't expect a directory with changed content here"
rootstatus.modified.append(wcpath)
elif c0 == 'A' or c3 == '+' :
rootstatus.added.append(wcpath)
elif c0 == 'D':
rootstatus.deleted.append(wcpath)
elif c0 == 'C':
rootstatus.conflict.append(wcpath)
elif c0 == '~':
rootstatus.kindmismatch.append(wcpath)
elif c0 == '!':
rootstatus.incomplete.append(wcpath)
elif c0 == 'R':
rootstatus.replaced.append(wcpath)
elif not c0.strip():
rootstatus.unchanged.append(wcpath)
else:
raise NotImplementedError("received flag %r" % c0)
if c1 == 'M':
rootstatus.prop_modified.append(wcpath)
# XXX do we cover all client versions here?
if c2 == 'L' or c5 == 'K':
rootstatus.locked.append(wcpath)
if c7 == '*':
rootstatus.update_available.append(wcpath)
if wcpath == self:
rootstatus.rev = rev
rootstatus.modrev = modrev
rootstatus.author = author
if update_rev:
rootstatus.update_rev = update_rev
continue
rootstatus = WCStatus(self).fromstring(out, self)
return rootstatus
def diff(self, rev=None):
@ -609,6 +523,99 @@ class WCStatus:
l.sort()
return l
# XXX a bit scary to assume there's always 2 spaces between username and
# path, however with win32 allowing spaces in user names there doesn't
# seem to be a more solid approach :(
_rex_status = re.compile(r'\s+(\d+|-)\s+(\S+)\s+(.+?)\s{2,}(.*)')
@staticmethod
def fromstring(data, rootwcpath, rev=None, modrev=None, author=None):
""" return a new WCStatus object from data 's'
"""
rootstatus = WCStatus(rootwcpath, rev, modrev, author)
update_rev = None
for line in data.split('\n'):
if not line.strip():
continue
#print "processing %r" % line
flags, rest = line[:8], line[8:]
# first column
c0,c1,c2,c3,c4,c5,x6,c7 = flags
#if '*' in line:
# print "flags", repr(flags), "rest", repr(rest)
if c0 in '?XI':
fn = line.split(None, 1)[1]
if c0 == '?':
wcpath = rootwcpath.join(fn, abs=1)
rootstatus.unknown.append(wcpath)
elif c0 == 'X':
wcpath = rootwcpath.__class__(
rootwcpath.localpath.join(fn, abs=1),
auth=rootwcpath.auth)
rootstatus.external.append(wcpath)
elif c0 == 'I':
wcpath = rootwcpath.join(fn, abs=1)
rootstatus.ignored.append(wcpath)
continue
#elif c0 in '~!' or c4 == 'S':
# raise NotImplementedError("received flag %r" % c0)
m = WCStatus._rex_status.match(rest)
if not m:
if c7 == '*':
fn = rest.strip()
wcpath = rootwcpath.join(fn, abs=1)
rootstatus.update_available.append(wcpath)
continue
if line.lower().find('against revision:')!=-1:
update_rev = int(rest.split(':')[1].strip())
continue
# keep trying
raise ValueError, "could not parse line %r" % line
else:
rev, modrev, author, fn = m.groups()
wcpath = rootwcpath.join(fn, abs=1)
#assert wcpath.check()
if c0 == 'M':
assert wcpath.check(file=1), "didn't expect a directory with changed content here"
rootstatus.modified.append(wcpath)
elif c0 == 'A' or c3 == '+' :
rootstatus.added.append(wcpath)
elif c0 == 'D':
rootstatus.deleted.append(wcpath)
elif c0 == 'C':
rootstatus.conflict.append(wcpath)
elif c0 == '~':
rootstatus.kindmismatch.append(wcpath)
elif c0 == '!':
rootstatus.incomplete.append(wcpath)
elif c0 == 'R':
rootstatus.replaced.append(wcpath)
elif not c0.strip():
rootstatus.unchanged.append(wcpath)
else:
raise NotImplementedError("received flag %r" % c0)
if c1 == 'M':
rootstatus.prop_modified.append(wcpath)
# XXX do we cover all client versions here?
if c2 == 'L' or c5 == 'K':
rootstatus.locked.append(wcpath)
if c7 == '*':
rootstatus.update_available.append(wcpath)
if wcpath == rootwcpath:
rootstatus.rev = rev
rootstatus.modrev = modrev
rootstatus.author = author
if update_rev:
rootstatus.update_rev = update_rev
continue
return rootstatus
class InfoSvnWCCommand:
def __init__(self, output):
# Path: test