[svn r52000] Merging the 'guido-auth-svn' branch back into the trunk. This means there's

a new class py.path.SvnAuth of which instances store user credentials and
auth config, and can be passed to py.path.svnurl and py.path.svnwc objects
to control SVN authentication behaviour.

--HG--
branch : trunk
This commit is contained in:
guido 2008-03-01 14:43:33 +01:00
parent f3f84fa36c
commit 17530e8ccd
9 changed files with 712 additions and 62 deletions

View File

@ -11,8 +11,8 @@ version = "1.0-pre-alpha"
initpkg(__name__, initpkg(__name__,
description = "pylib and py.test: agile development and test support library", description = "pylib and py.test: agile development and test support library",
revision = int('$LastChangedRevision: 51077 $'.split(':')[1][:-1]), revision = int('$LastChangedRevision: 52000 $'.split(':')[1][:-1]),
lastchangedate = '$LastChangedDate: 2008-01-27 12:55:27 +0100 (Sun, 27 Jan 2008) $', lastchangedate = '$LastChangedDate: 2008-03-01 14:43:33 +0100 (Sat, 01 Mar 2008) $',
version = version, version = version,
url = "http://codespeak.net/py", url = "http://codespeak.net/py",
download_url = "XXX", # "http://codespeak.net/download/py/py-%s.tar.gz" %(version,), download_url = "XXX", # "http://codespeak.net/download/py/py-%s.tar.gz" %(version,),
@ -67,6 +67,7 @@ initpkg(__name__,
'path.svnwc' : ('./path/svn/wccommand.py', 'SvnWCCommandPath'), 'path.svnwc' : ('./path/svn/wccommand.py', 'SvnWCCommandPath'),
'path.svnurl' : ('./path/svn/urlcommand.py', 'SvnCommandPath'), 'path.svnurl' : ('./path/svn/urlcommand.py', 'SvnCommandPath'),
'path.local' : ('./path/local/local.py', 'LocalPath'), 'path.local' : ('./path/local/local.py', 'LocalPath'),
'path.SvnAuth' : ('./path/svn/svncommon.py', 'SvnAuth'),
# some nice slightly magic APIs # some nice slightly magic APIs
'magic.__doc__' : ('./magic/__init__.py', '__doc__'), 'magic.__doc__' : ('./magic/__init__.py', '__doc__'),

View File

@ -33,6 +33,9 @@ option = py.test.config.addoptions("execnet options",
action='store', dest='docpath', action='store', dest='docpath',
default="doc", type='string', default="doc", type='string',
help="relative path to doc output location (relative from py/)"), help="relative path to doc output location (relative from py/)"),
Option('', '--runslowtests',
action="store_true", dest="runslowtests", default=False,
help="run slow tests)"),
) )
dist_rsync_roots = ['.'] dist_rsync_roots = ['.']

View File

@ -4,6 +4,10 @@ import py, sys
from py.__.misc.killproc import killproc from py.__.misc.killproc import killproc
def test_win_killsubprocess(): def test_win_killsubprocess():
if sys.platform == 'win32' and not py.path.local.sysfind('taskkill'):
py.test.skip("you\'re using an older version of windows, which "
"doesn\'t support 'taskkill' - py.misc.killproc is not "
"available")
tmp = py.test.ensuretemp("test_win_killsubprocess") tmp = py.test.ensuretemp("test_win_killsubprocess")
t = tmp.join("t.py") t = tmp.join("t.py")
t.write("import time ; time.sleep(100)") t.write("import time ; time.sleep(100)")

77
py/path/svn/auth.txt Normal file
View File

@ -0,0 +1,77 @@
SVN authentication support
==========================
This document describes authentication support for both py.path.svnwc and
py.path.svnurl (yet in its implemention phase). This feature allows using the
library in a completely automated fashion, without having to provide svn
credentials interactively.
Current implementation
----------------------
The credentials are passed to the constructor of the path objects, and are used
(transparently) for every action that accesses the server. Also, when provided,
they are passed recursively to all child objects created by methods such as
join(), ensure(), etc. (XXX currently only true for svnurl, not for svnwc)
To pass credentials to path objects, an SvnAuth class needs to be created to
hold them. This is then passed to the constructor or methods as the 'auth'
keyword argument. (XXX the latter currently only for svnwc, and preferrably
that needs to be removed in favour of an .auth attribute like in svnurl)
It is configurable whether the credentials are stored on disk. Storing them is
useful in certain situations (executive access to the repository will not
require the credentials to be passed) but might not be desired in others - for
instance if a webserver runs more than one application, one does not want to
pollute the webserver's home directory (if it even has one). This behaviour can
be controlled by passing a False value for the 'cache_auth' argument to
SvnAuth.
Also it is configurable what behaviour is displayed when the credentials do not
validate: if a keyword argument to the SvnAuth constructor called 'interactive'
has a True value (which is currently the default (XXX I think this should be
changed!)), an interactive prompt is displayed - this is useful for terminal
applications where you want to have an interactive fallback. When this has a
False value, an exception is raised (XXX define the exception properly).
Code examples
-------------
So, tying this together, code using this feature would look something like::
>>> auth = py.path.SvnAuth('user', 'pass', cache_auth=False,
... interactive=False)
>>> wcpath = py.path.svnwc(path, auth=auth)
>>> urlpath = py.path.svnurl(url, auth=auth)
Open issues
-----------
* How do we deal with externals properly?
It looks like the svn command-line client uses the credentials provided for
all externals, if possible, and either prompts for the password in
interactive mode, or barfs when --non-interactive is passed. I think it makes
sense to copy its behaviour here, pass the credentials to any child svn path
objects (as discussed above), and either let the command-line app ask for
creds or throw an exception when 'interactive' is set to False (see above).
Current idea: ignore this and let the client handle (so no passing auth
around to the children).
* Affected methods for svnwc:
- switch
- checkout
- update
- lock
- unlock
- diff (when using older revisions?)
- commit
- log
- status (for locking, etc.?)
* Affected methods for svnurl:
not appropriate - the auth is passed to the constructor rather or set to
path.auth rather than passed to all methods

View File

@ -65,6 +65,7 @@ class SvnPathBase(common.FSPathBase):
""" """
obj = object.__new__(self.__class__) obj = object.__new__(self.__class__)
obj.rev = kw.get('rev', self.rev) obj.rev = kw.get('rev', self.rev)
obj.auth = kw.get('auth', self.auth)
dirname, basename, purebasename, ext = self._getbyspec( dirname, basename, purebasename, ext = self._getbyspec(
"dirname,basename,purebasename,ext") "dirname,basename,purebasename,ext")
if 'basename' in kw: if 'basename' in kw:
@ -138,7 +139,7 @@ class SvnPathBase(common.FSPathBase):
args = tuple([arg.strip(self.sep) for arg in args]) args = tuple([arg.strip(self.sep) for arg in args])
parts = (self.strpath, ) + args parts = (self.strpath, ) + args
newpath = self.__class__(self.sep.join(parts), self.rev) newpath = self.__class__(self.sep.join(parts), self.rev, self.auth)
return newpath return newpath
def propget(self, name): def propget(self, name):
@ -330,3 +331,27 @@ def url_from_path(path):
fspath = '%s@HEAD' % (fspath,) fspath = '%s@HEAD' % (fspath,)
return 'file://%s' % (fspath,) return 'file://%s' % (fspath,)
class SvnAuth(object):
""" container for auth information for Subversion """
def __init__(self, username, password, cache_auth=True, interactive=True):
self.username = username
self.password = password
self.cache_auth = cache_auth
self.interactive = interactive
def makecmdoptions(self):
uname = self.username.replace('"', '\\"')
passwd = self.password.replace('"', '\\"')
ret = []
if uname:
ret.append('--username="%s"' % (uname,))
if passwd:
ret.append('--password="%s"' % (passwd,))
if not self.cache_auth:
ret.append('--no-auth-cache')
if not self.interactive:
ret.append('--non-interactive')
return ' '.join(ret)
def __str__(self):
return "<SvnAuth username=%s ...>" %(self.username,)

View File

@ -0,0 +1,479 @@
import py
from py.path import SvnAuth
import svntestbase
from threading import Thread
import time
from py.__.misc.killproc import killproc
from py.__.conftest import option
def make_repo_auth(repo, userdata):
""" write config to repo
user information in userdata is used for auth
userdata has user names as keys, and a tuple (password, readwrite) as
values, where 'readwrite' is either 'r' or 'rw'
"""
confdir = py.path.local(repo).join('conf')
confdir.join('svnserve.conf').write('''\
[general]
anon-access = none
password-db = passwd
authz-db = authz
realm = TestRepo
''')
authzdata = '[/]\n'
passwddata = '[users]\n'
for user in userdata:
authzdata += '%s = %s\n' % (user, userdata[user][1])
passwddata += '%s = %s\n' % (user, userdata[user][0])
confdir.join('authz').write(authzdata)
confdir.join('passwd').write(passwddata)
def serve_bg(repopath):
pidfile = py.path.local(repopath).join('pid')
port = 10000
e = None
while port < 10010:
cmd = 'svnserve -d -T --listen-port=%d --pid-file=%s -r %s' % (
port, pidfile, repopath)
try:
py.process.cmdexec(cmd)
except py.process.cmdexec.Error, e:
pass
else:
# XXX we assume here that the pid file gets written somewhere, I
# guess this should be relatively safe... (I hope, at least?)
while True:
pid = pidfile.read()
if pid:
break
# needs a bit more time to boot
time.sleep(0.1)
return port, int(pid)
port += 1
raise IOError('could not start svnserve: %s' % (e,))
class TestSvnAuth(object):
def test_basic(self):
auth = py.path.SvnAuth('foo', 'bar')
assert auth.username == 'foo'
assert auth.password == 'bar'
assert str(auth)
def test_makecmdoptions_uname_pw_makestr(self):
auth = py.path.SvnAuth('foo', 'bar')
assert auth.makecmdoptions() == '--username="foo" --password="bar"'
def test_makecmdoptions_quote_escape(self):
auth = py.path.SvnAuth('fo"o', '"ba\'r"')
assert auth.makecmdoptions() == '--username="fo\\"o" --password="\\"ba\'r\\""'
def test_makecmdoptions_no_cache_auth(self):
auth = py.path.SvnAuth('foo', 'bar', cache_auth=False)
assert auth.makecmdoptions() == ('--username="foo" --password="bar" '
'--no-auth-cache')
def test_makecmdoptions_no_interactive(self):
auth = py.path.SvnAuth('foo', 'bar', interactive=False)
assert auth.makecmdoptions() == ('--username="foo" --password="bar" '
'--non-interactive')
def test_makecmdoptions_no_interactive_no_cache_auth(self):
auth = py.path.SvnAuth('foo', 'bar', cache_auth=False,
interactive=False)
assert auth.makecmdoptions() == ('--username="foo" --password="bar" '
'--no-auth-cache --non-interactive')
class svnwc_no_svn(py.path.svnwc):
def __init__(self, *args, **kwargs):
self.commands = []
super(svnwc_no_svn, self).__init__(*args, **kwargs)
def _svn(self, *args):
self.commands.append(args)
class TestSvnWCAuth(object):
def setup_method(self, meth):
self.auth = SvnAuth('user', 'pass', cache_auth=False)
def test_checkout(self):
wc = svnwc_no_svn('foo', auth=self.auth)
wc.checkout('url')
assert wc.commands[0][-1] == ('--username="user" --password="pass" '
'--no-auth-cache')
def test_commit(self):
wc = svnwc_no_svn('foo', auth=self.auth)
wc.commit('msg')
assert wc.commands[0][-1] == ('--username="user" --password="pass" '
'--no-auth-cache')
def test_checkout_no_cache_auth(self):
wc = svnwc_no_svn('foo', auth=self.auth)
wc.checkout('url')
assert wc.commands[0][-1] == ('--username="user" --password="pass" '
'--no-auth-cache')
def test_checkout_auth_from_constructor(self):
wc = svnwc_no_svn('foo', auth=self.auth)
wc.checkout('url')
assert wc.commands[0][-1] == ('--username="user" --password="pass" '
'--no-auth-cache')
class svnurl_no_svn(py.path.svnurl):
cmdexec_output = 'test'
popen_output = 'test'
def _cmdexec(self, cmd):
self.commands.append(cmd)
return self.cmdexec_output
def _popen(self, cmd):
self.commands.append(cmd)
return self.popen_output
class TestSvnURLAuth(object):
def setup_method(self, meth):
svnurl_no_svn.commands = []
self.auth = SvnAuth('foo', 'bar')
def test_init(self):
u = svnurl_no_svn('http://foo.bar/svn')
assert u.auth is None
u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth)
assert u.auth is self.auth
def test_new(self):
u = svnurl_no_svn('http://foo.bar/svn/foo', auth=self.auth)
new = u.new(basename='bar')
assert new.auth is self.auth
assert new.url == 'http://foo.bar/svn/bar'
def test_join(self):
u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth)
new = u.join('foo')
assert new.auth is self.auth
assert new.url == 'http://foo.bar/svn/foo'
def test_listdir(self):
u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth)
u.cmdexec_output = '''\
1717 johnny 1529 Nov 04 14:32 LICENSE.txt
1716 johnny 5352 Nov 04 14:28 README.txt
'''
paths = u.listdir()
assert paths[0].auth is self.auth
assert paths[1].auth is self.auth
assert paths[0].basename == 'LICENSE.txt'
def test_info(self):
u = svnurl_no_svn('http://foo.bar/svn/LICENSE.txt', auth=self.auth)
def dirpath(self):
return self
u.cmdexec_output = '''\
1717 johnny 1529 Nov 04 14:32 LICENSE.txt
1716 johnny 5352 Nov 04 14:28 README.txt
'''
org_dp = u.__class__.dirpath
u.__class__.dirpath = dirpath
try:
info = u.info()
finally:
u.dirpath = org_dp
assert info.size == 1529
def test_open(self):
u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth)
foo = u.join('foo')
foo.check = lambda *args, **kwargs: True
ret = foo.open()
assert ret == 'test'
assert '--username="foo" --password="bar"' in foo.commands[0]
def test_dirpath(self):
u = svnurl_no_svn('http://foo.bar/svn/foo', auth=self.auth)
parent = u.dirpath()
assert parent.auth is self.auth
def test_mkdir(self):
u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth)
u.mkdir('foo', msg='created dir foo')
assert '--username="foo" --password="bar"' in u.commands[0]
def test_copy(self):
u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth)
u2 = svnurl_no_svn('http://foo.bar/svn2')
u.copy(u2, 'copied dir')
assert '--username="foo" --password="bar"' in u.commands[0]
def test_rename(self):
u = svnurl_no_svn('http://foo.bar/svn/foo', auth=self.auth)
u.rename('http://foo.bar/svn/bar', 'moved foo to bar')
assert '--username="foo" --password="bar"' in u.commands[0]
def test_remove(self):
u = svnurl_no_svn('http://foo.bar/svn/foo', auth=self.auth)
u.remove(msg='removing foo')
assert '--username="foo" --password="bar"' in u.commands[0]
def test_export(self):
u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth)
target = py.path.local('/foo')
u.export(target)
assert '--username="foo" --password="bar"' in u.commands[0]
def test_log(self):
u = svnurl_no_svn('http://foo.bar/svn/foo', auth=self.auth)
u.popen_output = py.std.StringIO.StringIO('''\
<?xml version="1.0"?>
<log>
<logentry revision="51381">
<author>guido</author>
<date>2008-02-11T12:12:18.476481Z</date>
<msg>Creating branch to work on auth support for py.path.svn*.
</msg>
</logentry>
</log>
''')
u.check = lambda *args, **kwargs: True
ret = u.log(10, 20, verbose=True)
assert '--username="foo" --password="bar"' in u.commands[0]
assert len(ret) == 1
assert int(ret[0].rev) == 51381
assert ret[0].author == 'guido'
def test_propget(self):
u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth)
u.propget('foo')
assert '--username="foo" --password="bar"' in u.commands[0]
class SvnAuthFunctionalTestBase(object):
def setup_class(cls):
if not option.runslowtests:
py.test.skip('skipping slow functional tests - use --runslowtests '
'to override')
def setup_method(self, meth):
func_name = meth.im_func.func_name
self.repo = svntestbase.make_test_repo('TestSvnAuthFunctional.%s' % (
func_name,))
repodir = str(self.repo)[7:]
if py.std.sys.platform == 'win32':
# remove trailing slash...
repodir = repodir[1:]
self.repopath = py.path.local(repodir)
self.temppath = py.test.ensuretemp('TestSvnAuthFunctional.%s' % (
func_name))
self.auth = py.path.SvnAuth('johnny', 'foo', cache_auth=False,
interactive=False)
def _start_svnserve(self):
make_repo_auth(self.repopath, {'johnny': ('foo', 'rw')})
try:
return serve_bg(self.repopath.dirpath())
except IOError, e:
py.test.skip(str(e))
class TestSvnWCAuthFunctional(SvnAuthFunctionalTestBase):
def test_checkout_constructor_arg(self):
port, pid = self._start_svnserve()
try:
wc = py.path.svnwc(self.temppath, auth=self.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename))
assert wc.join('.svn').check()
finally:
# XXX can we do this in a teardown_method too? not sure if that's
# guaranteed to get called...
killproc(pid)
def test_checkout_function_arg(self):
port, pid = self._start_svnserve()
try:
wc = py.path.svnwc(self.temppath, auth=self.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename))
assert wc.join('.svn').check()
finally:
killproc(pid)
def test_checkout_failing_non_interactive(self):
port, pid = self._start_svnserve()
try:
auth = py.path.SvnAuth('johnny', 'bar', cache_auth=False,
interactive=False)
wc = py.path.svnwc(self.temppath, auth)
py.test.raises(Exception,
("wc.checkout('svn://localhost:%s/%s' % "
"(port, self.repopath.basename))"))
finally:
killproc(pid)
def test_log(self):
port, pid = self._start_svnserve()
try:
wc = py.path.svnwc(self.temppath, self.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename))
foo = wc.ensure('foo.txt')
wc.commit('added foo.txt')
log = foo.log()
assert len(log) == 1
assert log[0].msg == 'added foo.txt'
finally:
killproc(pid)
def test_switch(self):
port, pid = self._start_svnserve()
try:
wc = py.path.svnwc(self.temppath, auth=self.auth)
svnurl = 'svn://localhost:%s/%s' % (port, self.repopath.basename)
wc.checkout(svnurl)
wc.ensure('foo', dir=True).ensure('foo.txt').write('foo')
wc.commit('added foo dir with foo.txt file')
wc.ensure('bar', dir=True)
wc.commit('added bar dir')
bar = wc.join('bar')
bar.switch(svnurl + '/foo')
assert bar.join('foo.txt')
finally:
killproc(pid)
def test_update(self):
port, pid = self._start_svnserve()
try:
wc1 = py.path.svnwc(self.temppath.ensure('wc1', dir=True),
auth=self.auth)
wc2 = py.path.svnwc(self.temppath.ensure('wc2', dir=True),
auth=self.auth)
wc1.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename))
wc2.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename))
wc1.ensure('foo', dir=True)
wc1.commit('added foo dir')
wc2.update()
assert wc2.join('foo').check()
auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
wc2.auth = auth
py.test.raises(Exception, 'wc2.update()')
finally:
killproc(pid)
def test_lock_unlock_status(self):
port, pid = self._start_svnserve()
try:
wc = py.path.svnwc(self.temppath, auth=self.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename,))
wc.ensure('foo', file=True)
wc.commit('added foo file')
foo = wc.join('foo')
foo.lock()
status = foo.status()
assert status.locked
foo.unlock()
status = foo.status()
assert not status.locked
auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
foo.auth = auth
py.test.raises(Exception, 'foo.lock()')
py.test.raises(Exception, 'foo.unlock()')
finally:
killproc(pid)
def test_diff(self):
port, pid = self._start_svnserve()
try:
wc = py.path.svnwc(self.temppath, auth=self.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename,))
wc.ensure('foo', file=True)
wc.commit('added foo file')
wc.update()
rev = int(wc.status().rev)
foo = wc.join('foo')
foo.write('bar')
diff = foo.diff()
assert '\n+bar\n' in diff
foo.commit('added some content')
diff = foo.diff()
assert not diff
diff = foo.diff(rev=rev)
assert '\n+bar\n' in diff
auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
foo.auth = auth
py.test.raises(Exception, 'foo.diff(rev=rev)')
finally:
killproc(pid)
class TestSvnURLAuthFunctional(SvnAuthFunctionalTestBase):
def test_listdir(self):
port, pid = self._start_svnserve()
try:
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
auth=self.auth)
u.ensure('foo')
paths = u.listdir()
assert len(paths) == 1
assert paths[0].auth is self.auth
auth = SvnAuth('foo', 'bar', interactive=False)
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
auth=auth)
py.test.raises(Exception, 'u.listdir()')
finally:
killproc(pid)
def test_copy(self):
port, pid = self._start_svnserve()
try:
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
auth=self.auth)
foo = u.ensure('foo')
bar = u.join('bar')
foo.copy(bar)
assert bar.check()
assert bar.auth is self.auth
auth = SvnAuth('foo', 'bar', interactive=False)
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
auth=auth)
foo = u.join('foo')
bar = u.join('bar')
py.test.raises(Exception, 'foo.copy(bar)')
finally:
killproc(pid)
def test_write_read(self):
port, pid = self._start_svnserve()
try:
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
auth=self.auth)
foo = u.ensure('foo')
fp = foo.open()
try:
data = fp.read()
finally:
fp.close()
assert data == ''
auth = SvnAuth('foo', 'bar', interactive=False)
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
auth=auth)
foo = u.join('foo')
py.test.raises(Exception, 'foo.open()')
finally:
killproc(pid)
# XXX rinse, repeat... :|

View File

@ -1,13 +1,27 @@
import py import py
import sys
from py.__.path.svn.testing.svntestbase import CommonSvnTests, getrepowc from py.__.path.svn.testing.svntestbase import CommonSvnTests, getrepowc
from py.__.path.svn.wccommand import InfoSvnWCCommand from py.__.path.svn.wccommand import InfoSvnWCCommand
from py.__.path.svn.wccommand import parse_wcinfotime from py.__.path.svn.wccommand import parse_wcinfotime
from py.__.path.svn import svncommon from py.__.path.svn import svncommon
if py.path.local.sysfind('svn') is None: if py.path.local.sysfind('svn') is None:
py.test.skip("cannot test py.path.svn, 'svn' binary not found") py.test.skip("cannot test py.path.svn, 'svn' binary not found")
if sys.platform != 'win32':
def normpath(p):
return p
else:
try:
import win32api
except ImportError:
def normpath(p):
py.test.skip('this test requires win32api to run on windows')
else:
import os
def normpath(p):
p = win32api.GetShortPathName(p)
return os.path.normpath(os.path.normcase(p))
class TestWCSvnCommandPath(CommonSvnTests): class TestWCSvnCommandPath(CommonSvnTests):
@ -253,7 +267,7 @@ class TestWCSvnCommandPath(CommonSvnTests):
try: try:
locked = root.status().locked locked = root.status().locked
assert len(locked) == 1 assert len(locked) == 1
assert str(locked[0]) == str(somefile) assert normpath(str(locked[0])) == normpath(str(somefile))
#assert somefile.locked() #assert somefile.locked()
py.test.raises(Exception, 'somefile.lock()') py.test.raises(Exception, 'somefile.lock()')
finally: finally:

View File

@ -21,10 +21,11 @@ class SvnCommandPath(svncommon.SvnPathBase):
_lsrevcache = BuildcostAccessCache(maxentries=128) _lsrevcache = BuildcostAccessCache(maxentries=128)
_lsnorevcache = AgingCache(maxentries=1000, maxseconds=60.0) _lsnorevcache = AgingCache(maxentries=1000, maxseconds=60.0)
def __new__(cls, path, rev=None): def __new__(cls, path, rev=None, auth=None):
self = object.__new__(cls) self = object.__new__(cls)
if isinstance(path, cls): if isinstance(path, cls):
rev = path.rev rev = path.rev
auth = path.auth
path = path.strpath path = path.strpath
proto, uri = path.split("://", 1) proto, uri = path.split("://", 1)
host, uripath = uri.split('/', 1) host, uripath = uri.split('/', 1)
@ -36,6 +37,7 @@ class SvnCommandPath(svncommon.SvnPathBase):
path = path.rstrip('/') path = path.rstrip('/')
self.strpath = path self.strpath = path
self.rev = rev self.rev = rev
self.auth = auth
return self return self
def __repr__(self): def __repr__(self):
@ -44,7 +46,8 @@ class SvnCommandPath(svncommon.SvnPathBase):
else: else:
return 'svnurl(%r, %r)' % (self.strpath, self.rev) return 'svnurl(%r, %r)' % (self.strpath, self.rev)
def _svn(self, cmd, *args): def _svnwithrev(self, cmd, *args):
""" execute an svn command, append our own url and revision """
if self.rev is None: if self.rev is None:
return self._svnwrite(cmd, *args) return self._svnwrite(cmd, *args)
else: else:
@ -52,16 +55,28 @@ class SvnCommandPath(svncommon.SvnPathBase):
return self._svnwrite(cmd, *args) return self._svnwrite(cmd, *args)
def _svnwrite(self, cmd, *args): def _svnwrite(self, cmd, *args):
""" execute an svn command, append our own url """
l = ['svn %s' % cmd] l = ['svn %s' % cmd]
args = ['"%s"' % self._escape(item) for item in args] args = ['"%s"' % self._escape(item) for item in args]
l.extend(args) l.extend(args)
l.append('"%s"' % self._encodedurl()) l.append('"%s"' % self._encodedurl())
# fixing the locale because we can't otherwise parse # fixing the locale because we can't otherwise parse
string = svncommon.fixlocale() + " ".join(l) string = " ".join(l)
if DEBUG: if DEBUG:
print "execing", string print "execing", string
out = self._svncmdexecauth(string)
return out
def _svncmdexecauth(self, cmd):
""" execute an svn command 'as is' """
cmd = svncommon.fixlocale() + cmd
if self.auth is not None:
cmd += ' ' + self.auth.makecmdoptions()
return self._cmdexec(cmd)
def _cmdexec(self, cmd):
try: try:
out = process.cmdexec(string) out = process.cmdexec(cmd)
except py.process.cmdexec.Error, e: except py.process.cmdexec.Error, e:
if (e.err.find('File Exists') != -1 or if (e.err.find('File Exists') != -1 or
e.err.find('File already exists') != -1): e.err.find('File already exists') != -1):
@ -69,21 +84,33 @@ class SvnCommandPath(svncommon.SvnPathBase):
raise raise
return out return out
def _svnpopenauth(self, cmd):
""" execute an svn command, return a pipe for reading stdin """
cmd = svncommon.fixlocale() + cmd
if self.auth is not None:
cmd += ' ' + self.auth.makecmdoptions()
return self._popen(cmd)
def _popen(self, cmd):
return os.popen(cmd)
def _encodedurl(self): def _encodedurl(self):
return self._escape(self.strpath) return self._escape(self.strpath)
def _norev_delentry(self, path):
auth = self.auth and self.auth.makecmdoptions() or None
self._lsnorevcache.delentry((str(path), auth))
def open(self, mode='r'): def open(self, mode='r'):
""" return an opened file with the given mode. """ """ return an opened file with the given mode. """
assert 'w' not in mode and 'a' not in mode, "XXX not implemented for svn cmdline" assert 'w' not in mode and 'a' not in mode, "XXX not implemented for svn cmdline"
assert self.check(file=1) # svn cat returns an empty file otherwise assert self.check(file=1) # svn cat returns an empty file otherwise
def popen(cmd):
return os.popen(cmd)
if self.rev is None: if self.rev is None:
return popen(svncommon.fixlocale() + return self._svnpopenauth('svn cat "%s"' % (
'svn cat "%s"' % (self._escape(self.strpath), )) self._escape(self.strpath), ))
else: else:
return popen(svncommon.fixlocale() + return self._svnpopenauth('svn cat -r %s "%s"' % (
'svn cat -r %s "%s"' % (self.rev, self._escape(self.strpath))) self.rev, self._escape(self.strpath)))
def dirpath(self, *args, **kwargs): def dirpath(self, *args, **kwargs):
""" return the directory path of the current path joined """ return the directory path of the current path joined
@ -104,33 +131,33 @@ a checkin message by giving a keyword argument 'msg'"""
commit_msg=kwargs.get('msg', "mkdir by py lib invocation") commit_msg=kwargs.get('msg', "mkdir by py lib invocation")
createpath = self.join(*args) createpath = self.join(*args)
createpath._svnwrite('mkdir', '-m', commit_msg) createpath._svnwrite('mkdir', '-m', commit_msg)
self._lsnorevcache.delentry(createpath.dirpath().strpath) self._norev_delentry(createpath.dirpath())
return createpath return createpath
def copy(self, target, msg='copied by py lib invocation'): def copy(self, target, msg='copied by py lib invocation'):
""" copy path to target with checkin message msg.""" """ copy path to target with checkin message msg."""
if getattr(target, 'rev', None) is not None: if getattr(target, 'rev', None) is not None:
raise py.error.EINVAL(target, "revisions are immutable") raise py.error.EINVAL(target, "revisions are immutable")
process.cmdexec('svn copy -m "%s" "%s" "%s"' %(msg, self._svncmdexecauth('svn copy -m "%s" "%s" "%s"' %(msg,
self._escape(self), self._escape(target))) self._escape(self), self._escape(target)))
self._lsnorevcache.delentry(target.dirpath().strpath) self._norev_delentry(target.dirpath())
def rename(self, target, msg="renamed by py lib invocation"): def rename(self, target, msg="renamed by py lib invocation"):
""" rename this path to target with checkin message msg. """ """ rename this path to target with checkin message msg. """
if getattr(self, 'rev', None) is not None: if getattr(self, 'rev', None) is not None:
raise py.error.EINVAL(self, "revisions are immutable") raise py.error.EINVAL(self, "revisions are immutable")
py.process.cmdexec('svn move -m "%s" --force "%s" "%s"' %( self._svncmdexecauth('svn move -m "%s" --force "%s" "%s"' %(
msg, self._escape(self), self._escape(target))) msg, self._escape(self), self._escape(target)))
self._lsnorevcache.delentry(self.dirpath().strpath) self._norev_delentry(self.dirpath())
self._lsnorevcache.delentry(self.strpath) self._norev_delentry(self)
def remove(self, rec=1, msg='removed by py lib invocation'): def remove(self, rec=1, msg='removed by py lib invocation'):
""" remove a file or directory (or a directory tree if rec=1) with """ remove a file or directory (or a directory tree if rec=1) with
checkin message msg.""" checkin message msg."""
if self.rev is not None: if self.rev is not None:
raise py.error.EINVAL(self, "revisions are immutable") raise py.error.EINVAL(self, "revisions are immutable")
process.cmdexec('svn rm -m "%s" "%s"' %(msg, self._escape(self))) self._svncmdexecauth('svn rm -m "%s" "%s"' %(msg, self._escape(self)))
self._lsnorevcache.delentry(self.dirpath().strpath) self._norev_delentry(self.dirpath())
def export(self, topath): def export(self, topath):
""" export to a local path """ export to a local path
@ -143,7 +170,7 @@ checkin message msg."""
'"%s"' % (self._escape(topath),)] '"%s"' % (self._escape(topath),)]
if self.rev is not None: if self.rev is not None:
args = ['-r', str(self.rev)] + args args = ['-r', str(self.rev)] + args
process.cmdexec('svn export %s' % (' '.join(args),)) self._svncmdexecauth('svn export %s' % (' '.join(args),))
return topath return topath
def ensure(self, *args, **kwargs): def ensure(self, *args, **kwargs):
@ -173,19 +200,19 @@ checkin message msg."""
"ensure %s" % self._escape(tocreate), "ensure %s" % self._escape(tocreate),
self._escape(tempdir.join(basename)), self._escape(tempdir.join(basename)),
x.join(basename)._encodedurl()) x.join(basename)._encodedurl())
process.cmdexec(cmd) self._svncmdexecauth(cmd)
self._lsnorevcache.delentry(x.strpath) # !!! self._norev_delentry(x)
finally: finally:
tempdir.remove() tempdir.remove()
return target return target
# end of modifying methods # end of modifying methods
def _propget(self, name): def _propget(self, name):
res = self._svn('propget', name) res = self._svnwithrev('propget', name)
return res[:-1] # strip trailing newline return res[:-1] # strip trailing newline
def _proplist(self): def _proplist(self):
res = self._svn('proplist') res = self._svnwithrev('proplist')
lines = res.split('\n') lines = res.split('\n')
lines = map(str.strip, lines[1:]) lines = map(str.strip, lines[1:])
return svncommon.PropListDict(self, lines) return svncommon.PropListDict(self, lines)
@ -194,7 +221,7 @@ checkin message msg."""
""" return sequence of name-info directory entries of self """ """ return sequence of name-info directory entries of self """
def builder(): def builder():
try: try:
res = self._svn('ls', '-v') res = self._svnwithrev('ls', '-v')
except process.cmdexec.Error, e: except process.cmdexec.Error, e:
if e.err.find('non-existent in that revision') != -1: if e.err.find('non-existent in that revision') != -1:
raise py.error.ENOENT(self, e.err) raise py.error.ENOENT(self, e.err)
@ -214,10 +241,13 @@ checkin message msg."""
info = InfoSvnCommand(lsline) info = InfoSvnCommand(lsline)
nameinfo_seq.append((info._name, info)) nameinfo_seq.append((info._name, info))
return nameinfo_seq return nameinfo_seq
auth = self.auth and self.auth.makecmdoptions() or None
if self.rev is not None: if self.rev is not None:
return self._lsrevcache.getorbuild((self.strpath, self.rev), builder) return self._lsrevcache.getorbuild((self.strpath, self.rev, auth),
builder)
else: else:
return self._lsnorevcache.getorbuild(self.strpath, builder) return self._lsnorevcache.getorbuild((self.strpath, auth),
builder)
def log(self, rev_start=None, rev_end=1, verbose=False): def log(self, rev_start=None, rev_end=1, verbose=False):
""" return a list of LogEntry instances for this path. """ return a list of LogEntry instances for this path.
@ -234,9 +264,8 @@ if verbose is True, then the LogEntry instances also know which files changed.
else: else:
rev_opt = "-r %s:%s" % (rev_start, rev_end) rev_opt = "-r %s:%s" % (rev_start, rev_end)
verbose_opt = verbose and "-v" or "" verbose_opt = verbose and "-v" or ""
xmlpipe = os.popen(svncommon.fixlocale() + xmlpipe = self._svnpopenauth('svn log --xml %s %s "%s"' %
'svn log --xml %s %s "%s"' % (rev_opt, verbose_opt, self.strpath))
(rev_opt, verbose_opt, self.strpath))
from xml.dom import minidom from xml.dom import minidom
tree = minidom.parse(xmlpipe) tree = minidom.parse(xmlpipe)
result = [] result = []
@ -254,7 +283,7 @@ class InfoSvnCommand:
# the '0?' part in the middle is an indication of whether the resource is # the '0?' part in the middle is an indication of whether the resource is
# locked, see 'svn help ls' # locked, see 'svn help ls'
lspattern = re.compile( lspattern = re.compile(
r'^ *(?P<rev>\d+) +(?P<author>\S+) +(0? *(?P<size>\d+))? ' r'^ *(?P<rev>\d+) +(?P<author>.+?) +(0? *(?P<size>\d+))? '
'*(?P<date>\w+ +\d{2} +[\d:]+) +(?P<file>.*)$') '*(?P<date>\w+ +\d{2} +[\d:]+) +(?P<file>.*)$')
def __init__(self, line): def __init__(self, line):
# this is a typical line from 'svn ls http://...' # this is a typical line from 'svn ls http://...'

View File

@ -25,7 +25,7 @@ class SvnWCCommandPath(common.FSPathBase):
""" """
sep = os.sep sep = os.sep
def __new__(cls, wcpath=None): def __new__(cls, wcpath=None, auth=None):
self = object.__new__(cls) self = object.__new__(cls)
if isinstance(wcpath, cls): if isinstance(wcpath, cls):
if wcpath.__class__ == cls: if wcpath.__class__ == cls:
@ -35,6 +35,7 @@ class SvnWCCommandPath(common.FSPathBase):
svncommon.ALLOWED_CHARS): svncommon.ALLOWED_CHARS):
raise ValueError("bad char in wcpath %s" % (wcpath, )) raise ValueError("bad char in wcpath %s" % (wcpath, ))
self.localpath = py.path.local(wcpath) self.localpath = py.path.local(wcpath)
self.auth = auth
return self return self
strpath = property(lambda x: str(x.localpath), None, None, "string path") strpath = property(lambda x: str(x.localpath), None, None, "string path")
@ -63,13 +64,22 @@ class SvnWCCommandPath(common.FSPathBase):
info = self.info() info = self.info()
return py.path.svnurl(info.url) return py.path.svnurl(info.url)
def __repr__(self): def __repr__(self):
return "svnwc(%r)" % (self.strpath) # , self._url) return "svnwc(%r)" % (self.strpath) # , self._url)
def __str__(self): def __str__(self):
return str(self.localpath) return str(self.localpath)
def _makeauthoptions(self):
if self.auth is None:
return ''
return self.auth.makecmdoptions()
def _authsvn(self, cmd, args=None):
args = args and list(args) or []
args.append(self._makeauthoptions())
return self._svn(cmd, *args)
def _svn(self, cmd, *args): def _svn(self, cmd, *args):
l = ['svn %s' % cmd] l = ['svn %s' % cmd]
args = [self._escape(item) for item in args] args = [self._escape(item) for item in args]
@ -103,7 +113,7 @@ class SvnWCCommandPath(common.FSPathBase):
def switch(self, url): def switch(self, url):
""" switch to given URL. """ """ switch to given URL. """
self._svn('switch', url) self._authsvn('switch', [url])
def checkout(self, url=None, rev=None): def checkout(self, url=None, rev=None):
""" checkout from url to local wcpath. """ """ checkout from url to local wcpath. """
@ -119,11 +129,12 @@ class SvnWCCommandPath(common.FSPathBase):
url += "@%d" % rev url += "@%d" % rev
else: else:
args.append('-r' + str(rev)) args.append('-r' + str(rev))
self._svn('co', url, *args) args.append(url)
self._authsvn('co', args)
def update(self, rev = 'HEAD'): def update(self, rev = 'HEAD'):
""" update working copy item to given revision. (None -> HEAD). """ """ update working copy item to given revision. (None -> HEAD). """
self._svn('up -r %s' % rev) self._authsvn('up', ['-r', rev])
def write(self, content, mode='wb'): def write(self, content, mode='wb'):
""" write content into local filesystem wc. """ """ write content into local filesystem wc. """
@ -131,7 +142,7 @@ class SvnWCCommandPath(common.FSPathBase):
def dirpath(self, *args): def dirpath(self, *args):
""" return the directory Path of the current Path. """ """ return the directory Path of the current Path. """
return self.__class__(self.localpath.dirpath(*args)) return self.__class__(self.localpath.dirpath(*args), auth=self.auth)
def _ensuredirs(self): def _ensuredirs(self):
parent = self.dirpath() parent = self.dirpath()
@ -197,18 +208,21 @@ class SvnWCCommandPath(common.FSPathBase):
""" rename this path to target. """ """ rename this path to target. """
py.process.cmdexec("svn move --force %s %s" %(str(self), str(target))) py.process.cmdexec("svn move --force %s %s" %(str(self), str(target)))
_rex_status = re.compile(r'\s+(\d+|-)\s+(\S+)\s+(\S+)\s+(.*)') # XXX a bit scary to assume there's always 2 spaces between username and
# path, however with win32 allowing spaces in user names there doesn't
# seem to be a more solid approach :(
_rex_status = re.compile(r'\s+(\d+|-)\s+(\S+)\s+(.+?)\s{2,}(.*)')
def lock(self): def lock(self):
""" set a lock (exclusive) on the resource """ """ set a lock (exclusive) on the resource """
out = self._svn('lock').strip() out = self._authsvn('lock').strip()
if not out: if not out:
# warning or error, raise exception # warning or error, raise exception
raise Exception(out[4:]) raise Exception(out[4:])
def unlock(self): def unlock(self):
""" unset a previously set lock """ """ unset a previously set lock """
out = self._svn('unlock').strip() out = self._authsvn('unlock').strip()
if out.startswith('svn:'): if out.startswith('svn:'):
# warning or error, raise exception # warning or error, raise exception
raise Exception(out[4:]) raise Exception(out[4:])
@ -248,7 +262,8 @@ class SvnWCCommandPath(common.FSPathBase):
update_rev = None update_rev = None
out = self._svn('status -v %s %s %s' % (updates, rec, externals)) cmd = 'status -v %s %s %s' % (updates, rec, externals)
out = self._authsvn(cmd)
rootstatus = WCStatus(self) rootstatus = WCStatus(self)
for line in out.split('\n'): for line in out.split('\n'):
if not line.strip(): if not line.strip():
@ -266,7 +281,8 @@ class SvnWCCommandPath(common.FSPathBase):
wcpath = self.join(fn, abs=1) wcpath = self.join(fn, abs=1)
rootstatus.unknown.append(wcpath) rootstatus.unknown.append(wcpath)
elif c0 == 'X': elif c0 == 'X':
wcpath = self.__class__(self.localpath.join(fn, abs=1)) wcpath = self.__class__(self.localpath.join(fn, abs=1),
auth=self.auth)
rootstatus.external.append(wcpath) rootstatus.external.append(wcpath)
elif c0 == 'I': elif c0 == 'I':
wcpath = self.join(fn, abs=1) wcpath = self.join(fn, abs=1)
@ -334,10 +350,10 @@ class SvnWCCommandPath(common.FSPathBase):
""" return a diff of the current path against revision rev (defaulting """ return a diff of the current path against revision rev (defaulting
to the last one). to the last one).
""" """
if rev is None: args = []
out = self._svn('diff') if rev is not None:
else: args.append("-r %d" % rev)
out = self._svn('diff -r %d' % rev) out = self._authsvn('diff', args)
return out return out
def blame(self): def blame(self):
@ -365,7 +381,7 @@ class SvnWCCommandPath(common.FSPathBase):
cmd = 'commit -m "%s" --force-log' % (msg.replace('"', '\\"'),) cmd = 'commit -m "%s" --force-log' % (msg.replace('"', '\\"'),)
if not rec: if not rec:
cmd += ' -N' cmd += ' -N'
out = self._svn(cmd) out = self._authsvn(cmd)
try: try:
del cache.info[self] del cache.info[self]
except KeyError: except KeyError:
@ -431,7 +447,7 @@ recursively. """
localpath = self.localpath.new(**kw) localpath = self.localpath.new(**kw)
else: else:
localpath = self.localpath localpath = self.localpath
return self.__class__(localpath) return self.__class__(localpath, auth=self.auth)
def join(self, *args, **kwargs): def join(self, *args, **kwargs):
""" return a new Path (with the same revision) which is composed """ return a new Path (with the same revision) which is composed
@ -440,7 +456,7 @@ recursively. """
if not args: if not args:
return self return self
localpath = self.localpath.join(*args, **kwargs) localpath = self.localpath.join(*args, **kwargs)
return self.__class__(localpath) return self.__class__(localpath, auth=self.auth)
def info(self, usecache=1): def info(self, usecache=1):
""" return an Info structure with svn-provided information. """ """ return an Info structure with svn-provided information. """
@ -483,7 +499,7 @@ recursively. """
paths = [] paths = []
for localpath in self.localpath.listdir(notsvn): for localpath in self.localpath.listdir(notsvn):
p = self.__class__(localpath) p = self.__class__(localpath, auth=self.auth)
paths.append(p) paths.append(p)
if fil or sort: if fil or sort:
@ -534,11 +550,13 @@ if verbose is True, then the LogEntry instances also know which files changed.
else: else:
rev_opt = "-r %s:%s" % (rev_start, rev_end) rev_opt = "-r %s:%s" % (rev_start, rev_end)
verbose_opt = verbose and "-v" or "" verbose_opt = verbose and "-v" or ""
s = svncommon.fixlocale() locale_env = svncommon.fixlocale()
# some blather on stderr # some blather on stderr
stdin, stdout, stderr = os.popen3(s + 'svn log --xml %s %s "%s"' % ( auth_opt = self._makeauthoptions()
rev_opt, verbose_opt, stdin, stdout, stderr = os.popen3(locale_env +
self.strpath)) 'svn log --xml %s %s %s "%s"' % (
rev_opt, verbose_opt, auth_opt,
self.strpath))
from xml.dom import minidom from xml.dom import minidom
from xml.parsers.expat import ExpatError from xml.parsers.expat import ExpatError
try: try:
@ -562,7 +580,7 @@ if verbose is True, then the LogEntry instances also know which files changed.
return self.info().mtime return self.info().mtime
def __hash__(self): def __hash__(self):
return hash((self.strpath, self.__class__)) return hash((self.strpath, self.__class__, self.auth))
class WCStatus: class WCStatus: