From 17530e8ccdf506cbdec4bad094c8930f8ed4cb94 Mon Sep 17 00:00:00 2001 From: guido Date: Sat, 1 Mar 2008 14:43:33 +0100 Subject: [PATCH] [svn r52000] Merging the 'guido-auth-svn' branch back into the trunk. This means there's a new class py.path.SvnAuth of which instances store user credentials and auth config, and can be passed to py.path.svnurl and py.path.svnwc objects to control SVN authentication behaviour. --HG-- branch : trunk --- py/__init__.py | 5 +- py/conftest.py | 3 + py/misc/testing/test_oskill.py | 4 + py/path/svn/auth.txt | 77 +++++ py/path/svn/svncommon.py | 27 +- py/path/svn/testing/test_auth.py | 479 ++++++++++++++++++++++++++ py/path/svn/testing/test_wccommand.py | 18 +- py/path/svn/urlcommand.py | 93 +++-- py/path/svn/wccommand.py | 68 ++-- 9 files changed, 712 insertions(+), 62 deletions(-) create mode 100644 py/path/svn/auth.txt create mode 100644 py/path/svn/testing/test_auth.py diff --git a/py/__init__.py b/py/__init__.py index e3475c3d0..e5e7345e8 100644 --- a/py/__init__.py +++ b/py/__init__.py @@ -11,8 +11,8 @@ version = "1.0-pre-alpha" initpkg(__name__, description = "pylib and py.test: agile development and test support library", - revision = int('$LastChangedRevision: 51077 $'.split(':')[1][:-1]), - lastchangedate = '$LastChangedDate: 2008-01-27 12:55:27 +0100 (Sun, 27 Jan 2008) $', + revision = int('$LastChangedRevision: 52000 $'.split(':')[1][:-1]), + lastchangedate = '$LastChangedDate: 2008-03-01 14:43:33 +0100 (Sat, 01 Mar 2008) $', version = version, url = "http://codespeak.net/py", download_url = "XXX", # "http://codespeak.net/download/py/py-%s.tar.gz" %(version,), @@ -67,6 +67,7 @@ initpkg(__name__, 'path.svnwc' : ('./path/svn/wccommand.py', 'SvnWCCommandPath'), 'path.svnurl' : ('./path/svn/urlcommand.py', 'SvnCommandPath'), 'path.local' : ('./path/local/local.py', 'LocalPath'), + 'path.SvnAuth' : ('./path/svn/svncommon.py', 'SvnAuth'), # some nice slightly magic APIs 'magic.__doc__' : ('./magic/__init__.py', '__doc__'), diff --git a/py/conftest.py b/py/conftest.py index d78ade99a..24000e8d8 100644 --- a/py/conftest.py +++ b/py/conftest.py @@ -33,6 +33,9 @@ option = py.test.config.addoptions("execnet options", action='store', dest='docpath', default="doc", type='string', help="relative path to doc output location (relative from py/)"), + Option('', '--runslowtests', + action="store_true", dest="runslowtests", default=False, + help="run slow tests)"), ) dist_rsync_roots = ['.'] diff --git a/py/misc/testing/test_oskill.py b/py/misc/testing/test_oskill.py index 4518332ae..7e956a764 100644 --- a/py/misc/testing/test_oskill.py +++ b/py/misc/testing/test_oskill.py @@ -4,6 +4,10 @@ import py, sys from py.__.misc.killproc import killproc def test_win_killsubprocess(): + if sys.platform == 'win32' and not py.path.local.sysfind('taskkill'): + py.test.skip("you\'re using an older version of windows, which " + "doesn\'t support 'taskkill' - py.misc.killproc is not " + "available") tmp = py.test.ensuretemp("test_win_killsubprocess") t = tmp.join("t.py") t.write("import time ; time.sleep(100)") diff --git a/py/path/svn/auth.txt b/py/path/svn/auth.txt new file mode 100644 index 000000000..6d9b5af98 --- /dev/null +++ b/py/path/svn/auth.txt @@ -0,0 +1,77 @@ +SVN authentication support +========================== + +This document describes authentication support for both py.path.svnwc and +py.path.svnurl (yet in its implemention phase). This feature allows using the +library in a completely automated fashion, without having to provide svn +credentials interactively. + +Current implementation +---------------------- + +The credentials are passed to the constructor of the path objects, and are used +(transparently) for every action that accesses the server. Also, when provided, +they are passed recursively to all child objects created by methods such as +join(), ensure(), etc. (XXX currently only true for svnurl, not for svnwc) + +To pass credentials to path objects, an SvnAuth class needs to be created to +hold them. This is then passed to the constructor or methods as the 'auth' +keyword argument. (XXX the latter currently only for svnwc, and preferrably +that needs to be removed in favour of an .auth attribute like in svnurl) + +It is configurable whether the credentials are stored on disk. Storing them is +useful in certain situations (executive access to the repository will not +require the credentials to be passed) but might not be desired in others - for +instance if a webserver runs more than one application, one does not want to +pollute the webserver's home directory (if it even has one). This behaviour can +be controlled by passing a False value for the 'cache_auth' argument to +SvnAuth. + +Also it is configurable what behaviour is displayed when the credentials do not +validate: if a keyword argument to the SvnAuth constructor called 'interactive' +has a True value (which is currently the default (XXX I think this should be +changed!)), an interactive prompt is displayed - this is useful for terminal +applications where you want to have an interactive fallback. When this has a +False value, an exception is raised (XXX define the exception properly). + +Code examples +------------- + +So, tying this together, code using this feature would look something like:: + + >>> auth = py.path.SvnAuth('user', 'pass', cache_auth=False, + ... interactive=False) + >>> wcpath = py.path.svnwc(path, auth=auth) + >>> urlpath = py.path.svnurl(url, auth=auth) + +Open issues +----------- + +* How do we deal with externals properly? + + It looks like the svn command-line client uses the credentials provided for + all externals, if possible, and either prompts for the password in + interactive mode, or barfs when --non-interactive is passed. I think it makes + sense to copy its behaviour here, pass the credentials to any child svn path + objects (as discussed above), and either let the command-line app ask for + creds or throw an exception when 'interactive' is set to False (see above). + + Current idea: ignore this and let the client handle (so no passing auth + around to the children). + +* Affected methods for svnwc: + + - switch + - checkout + - update + - lock + - unlock + - diff (when using older revisions?) + - commit + - log + - status (for locking, etc.?) + +* Affected methods for svnurl: + + not appropriate - the auth is passed to the constructor rather or set to + path.auth rather than passed to all methods diff --git a/py/path/svn/svncommon.py b/py/path/svn/svncommon.py index d982ac365..c9ace8e66 100644 --- a/py/path/svn/svncommon.py +++ b/py/path/svn/svncommon.py @@ -65,6 +65,7 @@ class SvnPathBase(common.FSPathBase): """ obj = object.__new__(self.__class__) obj.rev = kw.get('rev', self.rev) + obj.auth = kw.get('auth', self.auth) dirname, basename, purebasename, ext = self._getbyspec( "dirname,basename,purebasename,ext") if 'basename' in kw: @@ -138,7 +139,7 @@ class SvnPathBase(common.FSPathBase): args = tuple([arg.strip(self.sep) for arg in args]) parts = (self.strpath, ) + args - newpath = self.__class__(self.sep.join(parts), self.rev) + newpath = self.__class__(self.sep.join(parts), self.rev, self.auth) return newpath def propget(self, name): @@ -330,3 +331,27 @@ def url_from_path(path): fspath = '%s@HEAD' % (fspath,) return 'file://%s' % (fspath,) +class SvnAuth(object): + """ container for auth information for Subversion """ + def __init__(self, username, password, cache_auth=True, interactive=True): + self.username = username + self.password = password + self.cache_auth = cache_auth + self.interactive = interactive + + def makecmdoptions(self): + uname = self.username.replace('"', '\\"') + passwd = self.password.replace('"', '\\"') + ret = [] + if uname: + ret.append('--username="%s"' % (uname,)) + if passwd: + ret.append('--password="%s"' % (passwd,)) + if not self.cache_auth: + ret.append('--no-auth-cache') + if not self.interactive: + ret.append('--non-interactive') + return ' '.join(ret) + + def __str__(self): + return "" %(self.username,) diff --git a/py/path/svn/testing/test_auth.py b/py/path/svn/testing/test_auth.py new file mode 100644 index 000000000..bf2a66cc6 --- /dev/null +++ b/py/path/svn/testing/test_auth.py @@ -0,0 +1,479 @@ +import py +from py.path import SvnAuth +import svntestbase +from threading import Thread +import time +from py.__.misc.killproc import killproc +from py.__.conftest import option + +def make_repo_auth(repo, userdata): + """ write config to repo + + user information in userdata is used for auth + userdata has user names as keys, and a tuple (password, readwrite) as + values, where 'readwrite' is either 'r' or 'rw' + """ + confdir = py.path.local(repo).join('conf') + confdir.join('svnserve.conf').write('''\ +[general] +anon-access = none +password-db = passwd +authz-db = authz +realm = TestRepo +''') + authzdata = '[/]\n' + passwddata = '[users]\n' + for user in userdata: + authzdata += '%s = %s\n' % (user, userdata[user][1]) + passwddata += '%s = %s\n' % (user, userdata[user][0]) + confdir.join('authz').write(authzdata) + confdir.join('passwd').write(passwddata) + +def serve_bg(repopath): + pidfile = py.path.local(repopath).join('pid') + port = 10000 + e = None + while port < 10010: + cmd = 'svnserve -d -T --listen-port=%d --pid-file=%s -r %s' % ( + port, pidfile, repopath) + try: + py.process.cmdexec(cmd) + except py.process.cmdexec.Error, e: + pass + else: + # XXX we assume here that the pid file gets written somewhere, I + # guess this should be relatively safe... (I hope, at least?) + while True: + pid = pidfile.read() + if pid: + break + # needs a bit more time to boot + time.sleep(0.1) + return port, int(pid) + port += 1 + raise IOError('could not start svnserve: %s' % (e,)) + +class TestSvnAuth(object): + def test_basic(self): + auth = py.path.SvnAuth('foo', 'bar') + assert auth.username == 'foo' + assert auth.password == 'bar' + assert str(auth) + + def test_makecmdoptions_uname_pw_makestr(self): + auth = py.path.SvnAuth('foo', 'bar') + assert auth.makecmdoptions() == '--username="foo" --password="bar"' + + def test_makecmdoptions_quote_escape(self): + auth = py.path.SvnAuth('fo"o', '"ba\'r"') + assert auth.makecmdoptions() == '--username="fo\\"o" --password="\\"ba\'r\\""' + + def test_makecmdoptions_no_cache_auth(self): + auth = py.path.SvnAuth('foo', 'bar', cache_auth=False) + assert auth.makecmdoptions() == ('--username="foo" --password="bar" ' + '--no-auth-cache') + + def test_makecmdoptions_no_interactive(self): + auth = py.path.SvnAuth('foo', 'bar', interactive=False) + assert auth.makecmdoptions() == ('--username="foo" --password="bar" ' + '--non-interactive') + + def test_makecmdoptions_no_interactive_no_cache_auth(self): + auth = py.path.SvnAuth('foo', 'bar', cache_auth=False, + interactive=False) + assert auth.makecmdoptions() == ('--username="foo" --password="bar" ' + '--no-auth-cache --non-interactive') + +class svnwc_no_svn(py.path.svnwc): + def __init__(self, *args, **kwargs): + self.commands = [] + super(svnwc_no_svn, self).__init__(*args, **kwargs) + + def _svn(self, *args): + self.commands.append(args) + +class TestSvnWCAuth(object): + def setup_method(self, meth): + self.auth = SvnAuth('user', 'pass', cache_auth=False) + + def test_checkout(self): + wc = svnwc_no_svn('foo', auth=self.auth) + wc.checkout('url') + assert wc.commands[0][-1] == ('--username="user" --password="pass" ' + '--no-auth-cache') + + def test_commit(self): + wc = svnwc_no_svn('foo', auth=self.auth) + wc.commit('msg') + assert wc.commands[0][-1] == ('--username="user" --password="pass" ' + '--no-auth-cache') + + def test_checkout_no_cache_auth(self): + wc = svnwc_no_svn('foo', auth=self.auth) + wc.checkout('url') + assert wc.commands[0][-1] == ('--username="user" --password="pass" ' + '--no-auth-cache') + + def test_checkout_auth_from_constructor(self): + wc = svnwc_no_svn('foo', auth=self.auth) + wc.checkout('url') + assert wc.commands[0][-1] == ('--username="user" --password="pass" ' + '--no-auth-cache') + +class svnurl_no_svn(py.path.svnurl): + cmdexec_output = 'test' + popen_output = 'test' + + def _cmdexec(self, cmd): + self.commands.append(cmd) + return self.cmdexec_output + + def _popen(self, cmd): + self.commands.append(cmd) + return self.popen_output + +class TestSvnURLAuth(object): + def setup_method(self, meth): + svnurl_no_svn.commands = [] + self.auth = SvnAuth('foo', 'bar') + + def test_init(self): + u = svnurl_no_svn('http://foo.bar/svn') + assert u.auth is None + + u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth) + assert u.auth is self.auth + + def test_new(self): + u = svnurl_no_svn('http://foo.bar/svn/foo', auth=self.auth) + new = u.new(basename='bar') + assert new.auth is self.auth + assert new.url == 'http://foo.bar/svn/bar' + + def test_join(self): + u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth) + new = u.join('foo') + assert new.auth is self.auth + assert new.url == 'http://foo.bar/svn/foo' + + def test_listdir(self): + u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth) + u.cmdexec_output = '''\ + 1717 johnny 1529 Nov 04 14:32 LICENSE.txt + 1716 johnny 5352 Nov 04 14:28 README.txt +''' + paths = u.listdir() + assert paths[0].auth is self.auth + assert paths[1].auth is self.auth + assert paths[0].basename == 'LICENSE.txt' + + def test_info(self): + u = svnurl_no_svn('http://foo.bar/svn/LICENSE.txt', auth=self.auth) + def dirpath(self): + return self + u.cmdexec_output = '''\ + 1717 johnny 1529 Nov 04 14:32 LICENSE.txt + 1716 johnny 5352 Nov 04 14:28 README.txt +''' + org_dp = u.__class__.dirpath + u.__class__.dirpath = dirpath + try: + info = u.info() + finally: + u.dirpath = org_dp + assert info.size == 1529 + + def test_open(self): + u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth) + foo = u.join('foo') + foo.check = lambda *args, **kwargs: True + ret = foo.open() + assert ret == 'test' + assert '--username="foo" --password="bar"' in foo.commands[0] + + def test_dirpath(self): + u = svnurl_no_svn('http://foo.bar/svn/foo', auth=self.auth) + parent = u.dirpath() + assert parent.auth is self.auth + + def test_mkdir(self): + u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth) + u.mkdir('foo', msg='created dir foo') + assert '--username="foo" --password="bar"' in u.commands[0] + + def test_copy(self): + u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth) + u2 = svnurl_no_svn('http://foo.bar/svn2') + u.copy(u2, 'copied dir') + assert '--username="foo" --password="bar"' in u.commands[0] + + def test_rename(self): + u = svnurl_no_svn('http://foo.bar/svn/foo', auth=self.auth) + u.rename('http://foo.bar/svn/bar', 'moved foo to bar') + assert '--username="foo" --password="bar"' in u.commands[0] + + def test_remove(self): + u = svnurl_no_svn('http://foo.bar/svn/foo', auth=self.auth) + u.remove(msg='removing foo') + assert '--username="foo" --password="bar"' in u.commands[0] + + def test_export(self): + u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth) + target = py.path.local('/foo') + u.export(target) + assert '--username="foo" --password="bar"' in u.commands[0] + + def test_log(self): + u = svnurl_no_svn('http://foo.bar/svn/foo', auth=self.auth) + u.popen_output = py.std.StringIO.StringIO('''\ + + + +guido +2008-02-11T12:12:18.476481Z +Creating branch to work on auth support for py.path.svn*. + + + +''') + u.check = lambda *args, **kwargs: True + ret = u.log(10, 20, verbose=True) + assert '--username="foo" --password="bar"' in u.commands[0] + assert len(ret) == 1 + assert int(ret[0].rev) == 51381 + assert ret[0].author == 'guido' + + def test_propget(self): + u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth) + u.propget('foo') + assert '--username="foo" --password="bar"' in u.commands[0] + +class SvnAuthFunctionalTestBase(object): + def setup_class(cls): + if not option.runslowtests: + py.test.skip('skipping slow functional tests - use --runslowtests ' + 'to override') + + def setup_method(self, meth): + func_name = meth.im_func.func_name + self.repo = svntestbase.make_test_repo('TestSvnAuthFunctional.%s' % ( + func_name,)) + repodir = str(self.repo)[7:] + if py.std.sys.platform == 'win32': + # remove trailing slash... + repodir = repodir[1:] + self.repopath = py.path.local(repodir) + self.temppath = py.test.ensuretemp('TestSvnAuthFunctional.%s' % ( + func_name)) + self.auth = py.path.SvnAuth('johnny', 'foo', cache_auth=False, + interactive=False) + + def _start_svnserve(self): + make_repo_auth(self.repopath, {'johnny': ('foo', 'rw')}) + try: + return serve_bg(self.repopath.dirpath()) + except IOError, e: + py.test.skip(str(e)) + +class TestSvnWCAuthFunctional(SvnAuthFunctionalTestBase): + def test_checkout_constructor_arg(self): + port, pid = self._start_svnserve() + try: + wc = py.path.svnwc(self.temppath, auth=self.auth) + wc.checkout( + 'svn://localhost:%s/%s' % (port, self.repopath.basename)) + assert wc.join('.svn').check() + finally: + # XXX can we do this in a teardown_method too? not sure if that's + # guaranteed to get called... + killproc(pid) + + def test_checkout_function_arg(self): + port, pid = self._start_svnserve() + try: + wc = py.path.svnwc(self.temppath, auth=self.auth) + wc.checkout( + 'svn://localhost:%s/%s' % (port, self.repopath.basename)) + assert wc.join('.svn').check() + finally: + killproc(pid) + + def test_checkout_failing_non_interactive(self): + port, pid = self._start_svnserve() + try: + auth = py.path.SvnAuth('johnny', 'bar', cache_auth=False, + interactive=False) + wc = py.path.svnwc(self.temppath, auth) + py.test.raises(Exception, + ("wc.checkout('svn://localhost:%s/%s' % " + "(port, self.repopath.basename))")) + finally: + killproc(pid) + + def test_log(self): + port, pid = self._start_svnserve() + try: + wc = py.path.svnwc(self.temppath, self.auth) + wc.checkout( + 'svn://localhost:%s/%s' % (port, self.repopath.basename)) + foo = wc.ensure('foo.txt') + wc.commit('added foo.txt') + log = foo.log() + assert len(log) == 1 + assert log[0].msg == 'added foo.txt' + finally: + killproc(pid) + + def test_switch(self): + port, pid = self._start_svnserve() + try: + wc = py.path.svnwc(self.temppath, auth=self.auth) + svnurl = 'svn://localhost:%s/%s' % (port, self.repopath.basename) + wc.checkout(svnurl) + wc.ensure('foo', dir=True).ensure('foo.txt').write('foo') + wc.commit('added foo dir with foo.txt file') + wc.ensure('bar', dir=True) + wc.commit('added bar dir') + bar = wc.join('bar') + bar.switch(svnurl + '/foo') + assert bar.join('foo.txt') + finally: + killproc(pid) + + def test_update(self): + port, pid = self._start_svnserve() + try: + wc1 = py.path.svnwc(self.temppath.ensure('wc1', dir=True), + auth=self.auth) + wc2 = py.path.svnwc(self.temppath.ensure('wc2', dir=True), + auth=self.auth) + wc1.checkout( + 'svn://localhost:%s/%s' % (port, self.repopath.basename)) + wc2.checkout( + 'svn://localhost:%s/%s' % (port, self.repopath.basename)) + wc1.ensure('foo', dir=True) + wc1.commit('added foo dir') + wc2.update() + assert wc2.join('foo').check() + + auth = py.path.SvnAuth('unknown', 'unknown', interactive=False) + wc2.auth = auth + py.test.raises(Exception, 'wc2.update()') + finally: + killproc(pid) + + def test_lock_unlock_status(self): + port, pid = self._start_svnserve() + try: + wc = py.path.svnwc(self.temppath, auth=self.auth) + wc.checkout( + 'svn://localhost:%s/%s' % (port, self.repopath.basename,)) + wc.ensure('foo', file=True) + wc.commit('added foo file') + foo = wc.join('foo') + foo.lock() + status = foo.status() + assert status.locked + foo.unlock() + status = foo.status() + assert not status.locked + + auth = py.path.SvnAuth('unknown', 'unknown', interactive=False) + foo.auth = auth + py.test.raises(Exception, 'foo.lock()') + py.test.raises(Exception, 'foo.unlock()') + finally: + killproc(pid) + + def test_diff(self): + port, pid = self._start_svnserve() + try: + wc = py.path.svnwc(self.temppath, auth=self.auth) + wc.checkout( + 'svn://localhost:%s/%s' % (port, self.repopath.basename,)) + wc.ensure('foo', file=True) + wc.commit('added foo file') + wc.update() + rev = int(wc.status().rev) + foo = wc.join('foo') + foo.write('bar') + diff = foo.diff() + assert '\n+bar\n' in diff + foo.commit('added some content') + diff = foo.diff() + assert not diff + diff = foo.diff(rev=rev) + assert '\n+bar\n' in diff + + auth = py.path.SvnAuth('unknown', 'unknown', interactive=False) + foo.auth = auth + py.test.raises(Exception, 'foo.diff(rev=rev)') + finally: + killproc(pid) + +class TestSvnURLAuthFunctional(SvnAuthFunctionalTestBase): + def test_listdir(self): + port, pid = self._start_svnserve() + try: + u = py.path.svnurl( + 'svn://localhost:%s/%s' % (port, self.repopath.basename), + auth=self.auth) + u.ensure('foo') + paths = u.listdir() + assert len(paths) == 1 + assert paths[0].auth is self.auth + + auth = SvnAuth('foo', 'bar', interactive=False) + u = py.path.svnurl( + 'svn://localhost:%s/%s' % (port, self.repopath.basename), + auth=auth) + py.test.raises(Exception, 'u.listdir()') + finally: + killproc(pid) + + def test_copy(self): + port, pid = self._start_svnserve() + try: + u = py.path.svnurl( + 'svn://localhost:%s/%s' % (port, self.repopath.basename), + auth=self.auth) + foo = u.ensure('foo') + bar = u.join('bar') + foo.copy(bar) + assert bar.check() + assert bar.auth is self.auth + + auth = SvnAuth('foo', 'bar', interactive=False) + u = py.path.svnurl( + 'svn://localhost:%s/%s' % (port, self.repopath.basename), + auth=auth) + foo = u.join('foo') + bar = u.join('bar') + py.test.raises(Exception, 'foo.copy(bar)') + finally: + killproc(pid) + + def test_write_read(self): + port, pid = self._start_svnserve() + try: + u = py.path.svnurl( + 'svn://localhost:%s/%s' % (port, self.repopath.basename), + auth=self.auth) + foo = u.ensure('foo') + fp = foo.open() + try: + data = fp.read() + finally: + fp.close() + assert data == '' + + auth = SvnAuth('foo', 'bar', interactive=False) + u = py.path.svnurl( + 'svn://localhost:%s/%s' % (port, self.repopath.basename), + auth=auth) + foo = u.join('foo') + py.test.raises(Exception, 'foo.open()') + finally: + killproc(pid) + + # XXX rinse, repeat... :| diff --git a/py/path/svn/testing/test_wccommand.py b/py/path/svn/testing/test_wccommand.py index 09da681e9..c15f3b89c 100644 --- a/py/path/svn/testing/test_wccommand.py +++ b/py/path/svn/testing/test_wccommand.py @@ -1,13 +1,27 @@ import py +import sys from py.__.path.svn.testing.svntestbase import CommonSvnTests, getrepowc from py.__.path.svn.wccommand import InfoSvnWCCommand from py.__.path.svn.wccommand import parse_wcinfotime from py.__.path.svn import svncommon - if py.path.local.sysfind('svn') is None: py.test.skip("cannot test py.path.svn, 'svn' binary not found") +if sys.platform != 'win32': + def normpath(p): + return p +else: + try: + import win32api + except ImportError: + def normpath(p): + py.test.skip('this test requires win32api to run on windows') + else: + import os + def normpath(p): + p = win32api.GetShortPathName(p) + return os.path.normpath(os.path.normcase(p)) class TestWCSvnCommandPath(CommonSvnTests): @@ -253,7 +267,7 @@ class TestWCSvnCommandPath(CommonSvnTests): try: locked = root.status().locked assert len(locked) == 1 - assert str(locked[0]) == str(somefile) + assert normpath(str(locked[0])) == normpath(str(somefile)) #assert somefile.locked() py.test.raises(Exception, 'somefile.lock()') finally: diff --git a/py/path/svn/urlcommand.py b/py/path/svn/urlcommand.py index 5f6ffd10d..696b5740d 100644 --- a/py/path/svn/urlcommand.py +++ b/py/path/svn/urlcommand.py @@ -21,10 +21,11 @@ class SvnCommandPath(svncommon.SvnPathBase): _lsrevcache = BuildcostAccessCache(maxentries=128) _lsnorevcache = AgingCache(maxentries=1000, maxseconds=60.0) - def __new__(cls, path, rev=None): + def __new__(cls, path, rev=None, auth=None): self = object.__new__(cls) if isinstance(path, cls): rev = path.rev + auth = path.auth path = path.strpath proto, uri = path.split("://", 1) host, uripath = uri.split('/', 1) @@ -36,6 +37,7 @@ class SvnCommandPath(svncommon.SvnPathBase): path = path.rstrip('/') self.strpath = path self.rev = rev + self.auth = auth return self def __repr__(self): @@ -44,7 +46,8 @@ class SvnCommandPath(svncommon.SvnPathBase): else: return 'svnurl(%r, %r)' % (self.strpath, self.rev) - def _svn(self, cmd, *args): + def _svnwithrev(self, cmd, *args): + """ execute an svn command, append our own url and revision """ if self.rev is None: return self._svnwrite(cmd, *args) else: @@ -52,16 +55,28 @@ class SvnCommandPath(svncommon.SvnPathBase): return self._svnwrite(cmd, *args) def _svnwrite(self, cmd, *args): + """ execute an svn command, append our own url """ l = ['svn %s' % cmd] args = ['"%s"' % self._escape(item) for item in args] l.extend(args) l.append('"%s"' % self._encodedurl()) # fixing the locale because we can't otherwise parse - string = svncommon.fixlocale() + " ".join(l) + string = " ".join(l) if DEBUG: print "execing", string + out = self._svncmdexecauth(string) + return out + + def _svncmdexecauth(self, cmd): + """ execute an svn command 'as is' """ + cmd = svncommon.fixlocale() + cmd + if self.auth is not None: + cmd += ' ' + self.auth.makecmdoptions() + return self._cmdexec(cmd) + + def _cmdexec(self, cmd): try: - out = process.cmdexec(string) + out = process.cmdexec(cmd) except py.process.cmdexec.Error, e: if (e.err.find('File Exists') != -1 or e.err.find('File already exists') != -1): @@ -69,21 +84,33 @@ class SvnCommandPath(svncommon.SvnPathBase): raise return out + def _svnpopenauth(self, cmd): + """ execute an svn command, return a pipe for reading stdin """ + cmd = svncommon.fixlocale() + cmd + if self.auth is not None: + cmd += ' ' + self.auth.makecmdoptions() + return self._popen(cmd) + + def _popen(self, cmd): + return os.popen(cmd) + def _encodedurl(self): return self._escape(self.strpath) + def _norev_delentry(self, path): + auth = self.auth and self.auth.makecmdoptions() or None + self._lsnorevcache.delentry((str(path), auth)) + def open(self, mode='r'): """ return an opened file with the given mode. """ assert 'w' not in mode and 'a' not in mode, "XXX not implemented for svn cmdline" assert self.check(file=1) # svn cat returns an empty file otherwise - def popen(cmd): - return os.popen(cmd) if self.rev is None: - return popen(svncommon.fixlocale() + - 'svn cat "%s"' % (self._escape(self.strpath), )) + return self._svnpopenauth('svn cat "%s"' % ( + self._escape(self.strpath), )) else: - return popen(svncommon.fixlocale() + - 'svn cat -r %s "%s"' % (self.rev, self._escape(self.strpath))) + return self._svnpopenauth('svn cat -r %s "%s"' % ( + self.rev, self._escape(self.strpath))) def dirpath(self, *args, **kwargs): """ return the directory path of the current path joined @@ -104,33 +131,33 @@ a checkin message by giving a keyword argument 'msg'""" commit_msg=kwargs.get('msg', "mkdir by py lib invocation") createpath = self.join(*args) createpath._svnwrite('mkdir', '-m', commit_msg) - self._lsnorevcache.delentry(createpath.dirpath().strpath) + self._norev_delentry(createpath.dirpath()) return createpath def copy(self, target, msg='copied by py lib invocation'): """ copy path to target with checkin message msg.""" if getattr(target, 'rev', None) is not None: raise py.error.EINVAL(target, "revisions are immutable") - process.cmdexec('svn copy -m "%s" "%s" "%s"' %(msg, - self._escape(self), self._escape(target))) - self._lsnorevcache.delentry(target.dirpath().strpath) + self._svncmdexecauth('svn copy -m "%s" "%s" "%s"' %(msg, + self._escape(self), self._escape(target))) + self._norev_delentry(target.dirpath()) def rename(self, target, msg="renamed by py lib invocation"): """ rename this path to target with checkin message msg. """ if getattr(self, 'rev', None) is not None: raise py.error.EINVAL(self, "revisions are immutable") - py.process.cmdexec('svn move -m "%s" --force "%s" "%s"' %( - msg, self._escape(self), self._escape(target))) - self._lsnorevcache.delentry(self.dirpath().strpath) - self._lsnorevcache.delentry(self.strpath) + self._svncmdexecauth('svn move -m "%s" --force "%s" "%s"' %( + msg, self._escape(self), self._escape(target))) + self._norev_delentry(self.dirpath()) + self._norev_delentry(self) def remove(self, rec=1, msg='removed by py lib invocation'): """ remove a file or directory (or a directory tree if rec=1) with checkin message msg.""" if self.rev is not None: raise py.error.EINVAL(self, "revisions are immutable") - process.cmdexec('svn rm -m "%s" "%s"' %(msg, self._escape(self))) - self._lsnorevcache.delentry(self.dirpath().strpath) + self._svncmdexecauth('svn rm -m "%s" "%s"' %(msg, self._escape(self))) + self._norev_delentry(self.dirpath()) def export(self, topath): """ export to a local path @@ -143,7 +170,7 @@ checkin message msg.""" '"%s"' % (self._escape(topath),)] if self.rev is not None: args = ['-r', str(self.rev)] + args - process.cmdexec('svn export %s' % (' '.join(args),)) + self._svncmdexecauth('svn export %s' % (' '.join(args),)) return topath def ensure(self, *args, **kwargs): @@ -173,19 +200,19 @@ checkin message msg.""" "ensure %s" % self._escape(tocreate), self._escape(tempdir.join(basename)), x.join(basename)._encodedurl()) - process.cmdexec(cmd) - self._lsnorevcache.delentry(x.strpath) # !!! + self._svncmdexecauth(cmd) + self._norev_delentry(x) finally: tempdir.remove() return target # end of modifying methods def _propget(self, name): - res = self._svn('propget', name) + res = self._svnwithrev('propget', name) return res[:-1] # strip trailing newline def _proplist(self): - res = self._svn('proplist') + res = self._svnwithrev('proplist') lines = res.split('\n') lines = map(str.strip, lines[1:]) return svncommon.PropListDict(self, lines) @@ -194,7 +221,7 @@ checkin message msg.""" """ return sequence of name-info directory entries of self """ def builder(): try: - res = self._svn('ls', '-v') + res = self._svnwithrev('ls', '-v') except process.cmdexec.Error, e: if e.err.find('non-existent in that revision') != -1: raise py.error.ENOENT(self, e.err) @@ -214,10 +241,13 @@ checkin message msg.""" info = InfoSvnCommand(lsline) nameinfo_seq.append((info._name, info)) return nameinfo_seq + auth = self.auth and self.auth.makecmdoptions() or None if self.rev is not None: - return self._lsrevcache.getorbuild((self.strpath, self.rev), builder) + return self._lsrevcache.getorbuild((self.strpath, self.rev, auth), + builder) else: - return self._lsnorevcache.getorbuild(self.strpath, builder) + return self._lsnorevcache.getorbuild((self.strpath, auth), + builder) def log(self, rev_start=None, rev_end=1, verbose=False): """ return a list of LogEntry instances for this path. @@ -234,9 +264,8 @@ if verbose is True, then the LogEntry instances also know which files changed. else: rev_opt = "-r %s:%s" % (rev_start, rev_end) verbose_opt = verbose and "-v" or "" - xmlpipe = os.popen(svncommon.fixlocale() + - 'svn log --xml %s %s "%s"' % - (rev_opt, verbose_opt, self.strpath)) + xmlpipe = self._svnpopenauth('svn log --xml %s %s "%s"' % + (rev_opt, verbose_opt, self.strpath)) from xml.dom import minidom tree = minidom.parse(xmlpipe) result = [] @@ -254,7 +283,7 @@ class InfoSvnCommand: # the '0?' part in the middle is an indication of whether the resource is # locked, see 'svn help ls' lspattern = re.compile( - r'^ *(?P\d+) +(?P\S+) +(0? *(?P\d+))? ' + r'^ *(?P\d+) +(?P.+?) +(0? *(?P\d+))? ' '*(?P\w+ +\d{2} +[\d:]+) +(?P.*)$') def __init__(self, line): # this is a typical line from 'svn ls http://...' diff --git a/py/path/svn/wccommand.py b/py/path/svn/wccommand.py index 46956159a..f56d536f0 100644 --- a/py/path/svn/wccommand.py +++ b/py/path/svn/wccommand.py @@ -25,7 +25,7 @@ class SvnWCCommandPath(common.FSPathBase): """ sep = os.sep - def __new__(cls, wcpath=None): + def __new__(cls, wcpath=None, auth=None): self = object.__new__(cls) if isinstance(wcpath, cls): if wcpath.__class__ == cls: @@ -35,6 +35,7 @@ class SvnWCCommandPath(common.FSPathBase): svncommon.ALLOWED_CHARS): raise ValueError("bad char in wcpath %s" % (wcpath, )) self.localpath = py.path.local(wcpath) + self.auth = auth return self strpath = property(lambda x: str(x.localpath), None, None, "string path") @@ -63,13 +64,22 @@ class SvnWCCommandPath(common.FSPathBase): info = self.info() return py.path.svnurl(info.url) - def __repr__(self): return "svnwc(%r)" % (self.strpath) # , self._url) def __str__(self): return str(self.localpath) + def _makeauthoptions(self): + if self.auth is None: + return '' + return self.auth.makecmdoptions() + + def _authsvn(self, cmd, args=None): + args = args and list(args) or [] + args.append(self._makeauthoptions()) + return self._svn(cmd, *args) + def _svn(self, cmd, *args): l = ['svn %s' % cmd] args = [self._escape(item) for item in args] @@ -101,9 +111,9 @@ class SvnWCCommandPath(common.FSPathBase): raise return out - def switch(self, url): + def switch(self, url): """ switch to given URL. """ - self._svn('switch', url) + self._authsvn('switch', [url]) def checkout(self, url=None, rev=None): """ checkout from url to local wcpath. """ @@ -119,11 +129,12 @@ class SvnWCCommandPath(common.FSPathBase): url += "@%d" % rev else: args.append('-r' + str(rev)) - self._svn('co', url, *args) + args.append(url) + self._authsvn('co', args) def update(self, rev = 'HEAD'): """ update working copy item to given revision. (None -> HEAD). """ - self._svn('up -r %s' % rev) + self._authsvn('up', ['-r', rev]) def write(self, content, mode='wb'): """ write content into local filesystem wc. """ @@ -131,7 +142,7 @@ class SvnWCCommandPath(common.FSPathBase): def dirpath(self, *args): """ return the directory Path of the current Path. """ - return self.__class__(self.localpath.dirpath(*args)) + return self.__class__(self.localpath.dirpath(*args), auth=self.auth) def _ensuredirs(self): parent = self.dirpath() @@ -197,18 +208,21 @@ class SvnWCCommandPath(common.FSPathBase): """ rename this path to target. """ py.process.cmdexec("svn move --force %s %s" %(str(self), str(target))) - _rex_status = re.compile(r'\s+(\d+|-)\s+(\S+)\s+(\S+)\s+(.*)') + # XXX a bit scary to assume there's always 2 spaces between username and + # path, however with win32 allowing spaces in user names there doesn't + # seem to be a more solid approach :( + _rex_status = re.compile(r'\s+(\d+|-)\s+(\S+)\s+(.+?)\s{2,}(.*)') def lock(self): """ set a lock (exclusive) on the resource """ - out = self._svn('lock').strip() + out = self._authsvn('lock').strip() if not out: # warning or error, raise exception raise Exception(out[4:]) def unlock(self): """ unset a previously set lock """ - out = self._svn('unlock').strip() + out = self._authsvn('unlock').strip() if out.startswith('svn:'): # warning or error, raise exception raise Exception(out[4:]) @@ -248,7 +262,8 @@ class SvnWCCommandPath(common.FSPathBase): update_rev = None - out = self._svn('status -v %s %s %s' % (updates, rec, externals)) + cmd = 'status -v %s %s %s' % (updates, rec, externals) + out = self._authsvn(cmd) rootstatus = WCStatus(self) for line in out.split('\n'): if not line.strip(): @@ -266,7 +281,8 @@ class SvnWCCommandPath(common.FSPathBase): wcpath = self.join(fn, abs=1) rootstatus.unknown.append(wcpath) elif c0 == 'X': - wcpath = self.__class__(self.localpath.join(fn, abs=1)) + wcpath = self.__class__(self.localpath.join(fn, abs=1), + auth=self.auth) rootstatus.external.append(wcpath) elif c0 == 'I': wcpath = self.join(fn, abs=1) @@ -334,10 +350,10 @@ class SvnWCCommandPath(common.FSPathBase): """ return a diff of the current path against revision rev (defaulting to the last one). """ - if rev is None: - out = self._svn('diff') - else: - out = self._svn('diff -r %d' % rev) + args = [] + if rev is not None: + args.append("-r %d" % rev) + out = self._authsvn('diff', args) return out def blame(self): @@ -365,7 +381,7 @@ class SvnWCCommandPath(common.FSPathBase): cmd = 'commit -m "%s" --force-log' % (msg.replace('"', '\\"'),) if not rec: cmd += ' -N' - out = self._svn(cmd) + out = self._authsvn(cmd) try: del cache.info[self] except KeyError: @@ -431,7 +447,7 @@ recursively. """ localpath = self.localpath.new(**kw) else: localpath = self.localpath - return self.__class__(localpath) + return self.__class__(localpath, auth=self.auth) def join(self, *args, **kwargs): """ return a new Path (with the same revision) which is composed @@ -440,7 +456,7 @@ recursively. """ if not args: return self localpath = self.localpath.join(*args, **kwargs) - return self.__class__(localpath) + return self.__class__(localpath, auth=self.auth) def info(self, usecache=1): """ return an Info structure with svn-provided information. """ @@ -483,7 +499,7 @@ recursively. """ paths = [] for localpath in self.localpath.listdir(notsvn): - p = self.__class__(localpath) + p = self.__class__(localpath, auth=self.auth) paths.append(p) if fil or sort: @@ -534,11 +550,13 @@ if verbose is True, then the LogEntry instances also know which files changed. else: rev_opt = "-r %s:%s" % (rev_start, rev_end) verbose_opt = verbose and "-v" or "" - s = svncommon.fixlocale() + locale_env = svncommon.fixlocale() # some blather on stderr - stdin, stdout, stderr = os.popen3(s + 'svn log --xml %s %s "%s"' % ( - rev_opt, verbose_opt, - self.strpath)) + auth_opt = self._makeauthoptions() + stdin, stdout, stderr = os.popen3(locale_env + + 'svn log --xml %s %s %s "%s"' % ( + rev_opt, verbose_opt, auth_opt, + self.strpath)) from xml.dom import minidom from xml.parsers.expat import ExpatError try: @@ -562,7 +580,7 @@ if verbose is True, then the LogEntry instances also know which files changed. return self.info().mtime def __hash__(self): - return hash((self.strpath, self.__class__)) + return hash((self.strpath, self.__class__, self.auth)) class WCStatus: