* simplify and refactor path tests to use funcargs instead of the layered xunit-setup mess

* port py/path to pass 3.11
* added py.builtin._totext, _tobytes, _isbytes, _istext helpers

--HG--
branch : trunk
This commit is contained in:
holger krekel 2009-08-31 19:51:25 +02:00
parent e336683cdb
commit c791610998
17 changed files with 1085 additions and 1175 deletions

View File

@ -149,9 +149,12 @@ initpkg(__name__,
'builtin.exec_' : ('./builtin/builtin31.py', 'exec_'),
'builtin._basestring' : ('./builtin/builtin31.py', '_basestring'),
'builtin._totext' : ('./builtin/builtin31.py', '_totext'),
'builtin._isbytes' : ('./builtin/builtin31.py', '_isbytes'),
'builtin._istext' : ('./builtin/builtin31.py', '_istext'),
'builtin.builtins' : ('./builtin/builtin31.py', 'builtins'),
'builtin.execfile' : ('./builtin/builtin31.py', 'execfile'),
'builtin.callable' : ('./builtin/builtin31.py', 'callable'),
'builtin.pickle' : ('./builtin/builtin31.py', 'pickle'),
# gateways into remote contexts
'execnet.__doc__' : ('./execnet/__init__.py', '__doc__'),

View File

@ -3,13 +3,21 @@ import sys
if sys.version_info >= (3, 0):
exec ("print_ = print ; exec_=exec")
import builtins
import pickle
# some backward compatibility helpers
_basestring = str
def _totext(obj, encoding):
if isinstance(obj, str):
obj = obj.encode(encoding)
return str(obj, encoding)
if isinstance(obj, bytes):
obj = obj.decode(encoding)
elif not isinstance(obj, str):
obj = str(obj)
return obj
def _isbytes(x):
return isinstance(x, bytes)
def _istext(x):
return isinstance(x, str)
def execfile(fn, globs=None, locs=None):
if globs is None:
@ -30,10 +38,18 @@ if sys.version_info >= (3, 0):
return hasattr(obj, "__call__")
else:
try:
import cPickle as pickle
except ImportError:
import pickle
_totext = unicode
_basestring = basestring
execfile = execfile
callable = callable
def _isbytes(x):
return isinstance(x, str)
def _istext(x):
return isinstance(x, unicode)
import __builtin__ as builtins
def print_(*args, **kwargs):

View File

@ -45,6 +45,8 @@ def test_frozenset():
assert len(s) == 1
def test_sorted():
if sys.version_info >= (2,5):
py.test.skip("python 2.5 needs no sorted tests")
for s in [py.builtin.sorted]:
def test():
assert s([3, 2, 1]) == [1, 2, 3]

View File

@ -166,7 +166,7 @@ class DebugInterpreter(ast.NodeVisitor):
explanations.append(explanation)
if result == is_or:
break
name = " or " if is_or else " and "
name = is_or and " or " or " and "
explanation = "(" + name.join(explanations) + ")"
return explanation, result

View File

@ -84,6 +84,7 @@ class PathBase(object):
def __div__(self, other):
return self.join(str(other))
__truediv__ = __div__ # py3k
def basename(self):
""" basename part of path. """
@ -106,7 +107,7 @@ class PathBase(object):
"""
return self.new(basename='').join(*args, **kwargs)
def read(self, mode='rb'):
def read(self, mode='r'):
""" read and return a bytestring from reading the path. """
if sys.version_info < (2,3):
for x in 'u', 'U':
@ -132,11 +133,10 @@ newline will be removed from the end of each line. """
f.close()
def load(self):
""" return object unpickled from self.read() """
""" (deprecated) return object unpickled from self.read() """
f = self.open('rb')
try:
from cPickle import load
return py.error.checked_call(load, f)
return py.error.checked_call(py.builtin.pickle.load, f)
finally:
f.close()
@ -253,6 +253,12 @@ newline will be removed from the end of each line. """
except AttributeError:
return cmp(str(self), str(other)) # self.path, other.path)
def __lt__(self, other):
try:
return self.strpath < other.strpath
except AttributeError:
return str(self) < str(other)
def visit(self, fil=None, rec=None, ignore=NeverRaised):
""" yields all paths below the current one
@ -271,7 +277,7 @@ newline will be removed from the end of each line. """
if rec:
if isinstance(rec, str):
rec = fnmatch(fil)
elif not callable(rec):
elif not py.builtin.callable(rec):
rec = lambda x: True
reclist = [self]
while reclist:
@ -286,6 +292,13 @@ newline will be removed from the end of each line. """
if p.check(dir=1) and (rec is None or rec(p)):
reclist.append(p)
def _sortlist(self, res, sort):
if sort:
if hasattr(sort, '__call__'):
res.sort(sort)
else:
res.sort()
class FNMatcher:
def __init__(self, pattern):
self.pattern = pattern

View File

@ -314,11 +314,8 @@ class LocalPath(FSBase):
childurl = self.join(name)
if fil is None or fil(childurl):
res.append(childurl)
if hasattr(sort, '__call__'):
res.sort(sort)
elif sort:
res.sort()
return res
self._sortlist(res, sort)
return res
def size(self):
""" return size of the underlying file object """
@ -358,7 +355,7 @@ class LocalPath(FSBase):
""" pickle object into path location"""
f = self.open('wb')
try:
py.error.checked_call(py.std.cPickle.dump, obj, f, bin)
py.error.checked_call(py.builtin.pickle.dump, obj, f, bin)
finally:
f.close()
@ -368,17 +365,20 @@ class LocalPath(FSBase):
py.error.checked_call(os.mkdir, str(p))
return p
def write(self, content, mode='wb'):
""" write string content into path. """
s = str(content)
def write(self, data, mode='w'):
""" write data into path. """
if 'b' in mode:
if not py.builtin._isbytes(data):
raise ValueError("can only process bytes")
else:
if not py.builtin._istext(data):
if not py.builtin._isbytes(data):
data = str(data)
else:
data = py.builtin._totext(data, sys.getdefaultencoding())
f = self.open(mode)
if not hasattr(s, 'decode'): # byte string
encoding = getattr(f, 'encoding', None)
if not encoding:
encoding = sys.getdefaultencoding()
s = s.encode(encoding)
try:
f.write(s)
f.write(data)
finally:
f.close()
@ -409,7 +409,7 @@ class LocalPath(FSBase):
else:
p.dirpath()._ensuredirs()
if not p.check(file=1):
p.write("")
p.open('w').close()
return p
def stat(self):
@ -542,10 +542,9 @@ class LocalPath(FSBase):
return mod
def sysexec(self, *argv):
""" return stdout-put from executing a system child process,
where the self path points to the binary (XXX or script)
to be executed. Note that this process is directly
invoked and not through a system shell.
""" return stdout text from executing a system child process,
where the 'self' path points to executable.
The process is directly invoked and not through a system shell.
"""
from subprocess import Popen, PIPE
argv = map(str, argv)
@ -555,6 +554,8 @@ class LocalPath(FSBase):
if ret != 0:
raise py.process.cmdexec.Error(ret, ret, str(self),
stdout, stderr,)
if py.builtin._isbytes(stdout):
stdout = py.builtin._totext(stdout, sys.getdefaultencoding())
return stdout
def sysfind(cls, name, checker=None):

View File

@ -97,7 +97,8 @@ class SvnCommandPath(svncommon.SvnPathBase):
def open(self, mode='r'):
""" return an opened file with the given mode. """
assert 'w' not in mode and 'a' not in mode, "XXX not implemented for svn cmdline"
if mode not in ("r", "rU",):
raise ValueError("mode %r not supported" % (mode,))
assert self.check(file=1) # svn cat returns an empty file otherwise
if self.rev is None:
return self._svnpopenauth('svn cat "%s"' % (
@ -120,9 +121,10 @@ class SvnCommandPath(svncommon.SvnPathBase):
# modifying methods (cache must be invalidated)
def mkdir(self, *args, **kwargs):
""" create & return the directory joined with args. You can provide
a checkin message by giving a keyword argument 'msg'"""
commit_msg=kwargs.get('msg', "mkdir by py lib invocation")
""" create & return the directory joined with args.
pass a 'msg' keyword argument to set the commit message.
"""
commit_msg = kwargs.get('msg', "mkdir by py lib invocation")
createpath = self.join(*args)
createpath._svnwrite('mkdir', '-m', commit_msg)
self._norev_delentry(createpath.dirpath())
@ -236,7 +238,7 @@ checkin message msg."""
info = InfoSvnCommand(lsline)
if info._name != '.': # svn 1.5 produces '.' dirs,
nameinfo_seq.append((info._name, info))
nameinfo_seq.sort()
return nameinfo_seq
auth = self.auth and self.auth.makecmdoptions() or None
if self.rev is not None:
@ -246,6 +248,25 @@ checkin message msg."""
return self._lsnorevcache.getorbuild((self.strpath, auth),
builder)
def listdir(self, fil=None, sort=None):
""" list directory contents, possibly filter by the given fil func
and possibly sorted.
"""
if isinstance(fil, str):
fil = common.FNMatcher(fil)
nameinfo_seq = self._listdir_nameinfo()
if len(nameinfo_seq) == 1:
name, info = nameinfo_seq[0]
if name == self.basename and info.kind == 'file':
#if not self.check(dir=1):
raise py.error.ENOTDIR(self)
paths = [self.join(name) for (name, info) in nameinfo_seq]
if fil:
paths = [x for x in paths if fil(x)]
self._sortlist(paths, sort)
return paths
def log(self, rev_start=None, rev_end=1, verbose=False):
""" return a list of LogEntry instances for this path.
rev_start is the starting revision (defaulting to the first one).

View File

@ -7,6 +7,7 @@ svn-Command based Implementation of a Subversion WorkingCopy Path.
import os, sys, time, re, calendar
import py
import subprocess
from py.__.path import common
#-----------------------------------------------------------
@ -234,29 +235,6 @@ class SvnPathBase(common.PathBase):
content = self._proplist()
return content
def listdir(self, fil=None, sort=None):
""" list directory contents, possibly filter by the given fil func
and possibly sorted.
"""
if isinstance(fil, str):
fil = common.FNMatcher(fil)
nameinfo_seq = self._listdir_nameinfo()
if len(nameinfo_seq) == 1:
name, info = nameinfo_seq[0]
if name == self.basename and info.kind == 'file':
#if not self.check(dir=1):
raise py.error.ENOTDIR(self)
paths = self._make_path_tuple(nameinfo_seq)
if fil or sort:
paths = filter(fil, paths)
paths = isinstance(paths, list) and paths or list(paths)
if hasattr(sort, '__call__'):
paths.sort(sort)
elif sort:
paths.sort()
return paths
def info(self):
""" return an Info structure with svn-provided information. """
parent = self.dirpath()
@ -280,23 +258,13 @@ class SvnPathBase(common.PathBase):
def _escape(self, cmd):
return _escape_helper(cmd)
def _make_path_tuple(self, nameinfo_seq):
""" return a tuple of paths from a nameinfo-tuple sequence.
"""
#assert self.rev is not None, "revision of %s should not be None here" % self
res = []
for name, info in nameinfo_seq:
child = self.join(name)
res.append(child)
return tuple(res)
def _childmaxrev(self):
""" return maximum revision number of childs (or self.rev if no childs) """
rev = self.rev
for name, info in self._listdir_nameinfo():
rev = max(rev, info.created_rev)
return rev
#def _childmaxrev(self):
# """ return maximum revision number of childs (or self.rev if no childs) """
# rev = self.rev
# for name, info in self._listdir_nameinfo():
# rev = max(rev, info.created_rev)
# return rev
#def _getlatestrevision(self):
# """ return latest repo-revision for this path. """
@ -553,11 +521,11 @@ class SvnWCCommandPath(common.PathBase):
args.append(url)
self._authsvn('co', args)
def update(self, rev = 'HEAD'):
def update(self, rev='HEAD'):
""" update working copy item to given revision. (None -> HEAD). """
self._authsvn('up', ['-r', rev])
self._authsvn('up', ['-r', rev, "--non-interactive"],)
def write(self, content, mode='wb'):
def write(self, content, mode='w'):
""" write content into local filesystem wc. """
self.localpath.write(content, mode)
@ -840,18 +808,10 @@ recursively. """
def notsvn(path):
return path.basename != '.svn'
paths = []
for localpath in self.localpath.listdir(notsvn):
p = self.__class__(localpath, auth=self.auth)
paths.append(p)
if fil or sort:
paths = filter(fil, paths)
paths = isinstance(paths, list) and paths or list(paths)
if hasattr(sort, '__call__'):
paths.sort(sort)
elif sort:
paths.sort()
paths = [self.__class__(p, auth=self.auth)
for p in self.localpath.listdir()
if notsvn(p) and (not fil or fil(p))]
self._sortlist(paths, sort)
return paths
def open(self, mode='r'):
@ -897,13 +857,23 @@ if verbose is True, then the LogEntry instances also know which files changed.
locale_env = fixlocale()
# some blather on stderr
auth_opt = self._makeauthoptions()
stdin, stdout, stderr = os.popen3(locale_env +
'svn log --xml %s %s %s "%s"' % (
rev_opt, verbose_opt, auth_opt,
self.strpath))
#stdin, stdout, stderr = os.popen3(locale_env +
# 'svn log --xml %s %s %s "%s"' % (
# rev_opt, verbose_opt, auth_opt,
# self.strpath))
cmd = locale_env + 'svn log --xml %s %s %s "%s"' % (
rev_opt, verbose_opt, auth_opt, self.strpath)
popen = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
)
stdout, stderr = popen.communicate()
stdout = py.builtin._totext(stdout, sys.getdefaultencoding())
minidom,ExpatError = importxml()
try:
tree = minidom.parse(stdout)
tree = minidom.parseString(stdout)
except ExpatError:
raise ValueError('no such revision')
result = []
@ -1212,7 +1182,7 @@ def make_recursive_propdict(wcroot,
output,
rex = re.compile("Properties on '(.*)':")):
""" Return a dictionary of path->PropListDict mappings. """
lines = filter(None, output.split('\n'))
lines = [x for x in output.split('\n') if x]
pdict = {}
while lines:
line = lines.pop(0)
@ -1244,15 +1214,15 @@ class LogEntry:
for lpart in filter(None, logentry.childNodes):
if lpart.nodeType == lpart.ELEMENT_NODE:
if lpart.nodeName == 'author':
self.author = lpart.firstChild.nodeValue.encode('UTF-8')
self.author = lpart.firstChild.nodeValue
elif lpart.nodeName == 'msg':
if lpart.firstChild:
self.msg = lpart.firstChild.nodeValue.encode('UTF-8')
self.msg = lpart.firstChild.nodeValue
else:
self.msg = ''
elif lpart.nodeName == 'date':
#2003-07-29T20:05:11.598637Z
timestr = lpart.firstChild.nodeValue.encode('UTF-8')
timestr = lpart.firstChild.nodeValue
self.date = parse_apr_time(timestr)
elif lpart.nodeName == 'paths':
self.strpaths = []

View File

@ -1,123 +1,121 @@
import py
import sys
class CommonPathTests(object):
root = None # subclasses have to setup a 'root' attribute
class CommonFSTests(object):
def test_constructor_equality(self, path1):
p = path1.__class__(path1)
assert p == path1
def test_constructor_equality(self):
p = self.root.__class__(self.root)
assert p == self.root
def test_eq_nonstring(self, path1):
p1 = path1.join('sampledir')
p2 = path1.join('sampledir')
assert p1 == p2
def test_eq_nonstring(self):
path1 = self.root.join('sampledir')
path2 = self.root.join('sampledir')
assert path1 == path2
def test_new_identical(self, path1):
assert path1 == path1.new()
def test_new_identical(self):
assert self.root == self.root.new()
def test_join(self):
p = self.root.join('sampledir')
def test_join(self, path1):
p = path1.join('sampledir')
strp = str(p)
assert strp.endswith('sampledir')
assert strp.startswith(str(self.root))
assert strp.startswith(str(path1))
def test_join_normalized(self):
newpath = self.root.join(self.root.sep+'sampledir')
def test_join_normalized(self, path1):
newpath = path1.join(path1.sep+'sampledir')
strp = str(newpath)
assert strp.endswith('sampledir')
assert strp.startswith(str(self.root))
newpath = self.root.join((self.root.sep*2) + 'sampledir')
assert strp.startswith(str(path1))
newpath = path1.join((path1.sep*2) + 'sampledir')
strp = str(newpath)
assert strp.endswith('sampledir')
assert strp.startswith(str(self.root))
assert strp.startswith(str(path1))
def test_join_noargs(self):
newpath = self.root.join()
assert self.root == newpath
def test_join_noargs(self, path1):
newpath = path1.join()
assert path1 == newpath
def test_add_something(self):
p = self.root.join('sample')
def test_add_something(self, path1):
p = path1.join('sample')
p = p + 'dir'
assert p.check()
def test_parts(self):
newpath = self.root.join('sampledir', 'otherfile')
def test_parts(self, path1):
newpath = path1.join('sampledir', 'otherfile')
par = newpath.parts()[-3:]
assert par == [self.root, self.root.join('sampledir'), newpath]
assert par == [path1, path1.join('sampledir'), newpath]
revpar = newpath.parts(reverse=True)[:3]
assert revpar == [newpath, self.root.join('sampledir'), self.root]
assert revpar == [newpath, path1.join('sampledir'), path1]
def test_common(self):
other = self.root.join('sampledir')
x = other.common(self.root)
assert x == self.root
def test_common(self, path1):
other = path1.join('sampledir')
x = other.common(path1)
assert x == path1
#def test_parents_nonexisting_file(self):
# newpath = self.root / 'dirnoexist' / 'nonexisting file'
#def test_parents_nonexisting_file(self, path1):
# newpath = path1 / 'dirnoexist' / 'nonexisting file'
# par = list(newpath.parents())
# assert par[:2] == [self.root / 'dirnoexist', self.root]
# assert par[:2] == [path1 / 'dirnoexist', path1]
def test_basename_checks(self):
newpath = self.root.join('sampledir')
def test_basename_checks(self, path1):
newpath = path1.join('sampledir')
assert newpath.check(basename='sampledir')
assert newpath.check(notbasename='xyz')
assert newpath.basename == 'sampledir'
def test_basename(self):
newpath = self.root.join('sampledir')
def test_basename(self, path1):
newpath = path1.join('sampledir')
assert newpath.check(basename='sampledir')
assert newpath.basename, 'sampledir'
def test_dirpath(self):
newpath = self.root.join('sampledir')
assert newpath.dirpath() == self.root
def test_dirpath(self, path1):
newpath = path1.join('sampledir')
assert newpath.dirpath() == path1
def test_dirpath_with_args(self):
newpath = self.root.join('sampledir')
assert newpath.dirpath('x') == self.root.join('x')
def test_dirpath_with_args(self, path1):
newpath = path1.join('sampledir')
assert newpath.dirpath('x') == path1.join('x')
def test_newbasename(self):
newpath = self.root.join('samplefile')
def test_newbasename(self, path1):
newpath = path1.join('samplefile')
newbase = newpath.new(basename="samplefile2")
assert newbase.basename == "samplefile2"
assert newbase.dirpath() == newpath.dirpath()
def test_not_exists(self):
assert not self.root.join('does_not_exist').check()
assert self.root.join('does_not_exist').check(exists=0)
def test_not_exists(self, path1):
assert not path1.join('does_not_exist').check()
assert path1.join('does_not_exist').check(exists=0)
def test_exists(self):
assert self.root.join("samplefile").check()
assert self.root.join("samplefile").check(exists=1)
def test_exists(self, path1):
assert path1.join("samplefile").check()
assert path1.join("samplefile").check(exists=1)
def test_dir(self):
#print repr(self.root.join("sampledir"))
assert self.root.join("sampledir").check(dir=1)
assert self.root.join('samplefile').check(notdir=1)
assert not self.root.join("samplefile").check(dir=1)
def test_dir(self, path1):
#print repr(path1.join("sampledir"))
assert path1.join("sampledir").check(dir=1)
assert path1.join('samplefile').check(notdir=1)
assert not path1.join("samplefile").check(dir=1)
def test_fnmatch_file(self):
assert self.root.join("samplefile").check(fnmatch='s*e')
assert self.root.join("samplefile").check(notfnmatch='s*x')
assert not self.root.join("samplefile").check(fnmatch='s*x')
def test_fnmatch_file(self, path1):
assert path1.join("samplefile").check(fnmatch='s*e')
assert path1.join("samplefile").check(notfnmatch='s*x')
assert not path1.join("samplefile").check(fnmatch='s*x')
#def test_fnmatch_dir(self):
#def test_fnmatch_dir(self, path1):
# pattern = self.root.sep.join(['s*file'])
# sfile = self.root.join("samplefile")
# pattern = path1.sep.join(['s*file'])
# sfile = path1.join("samplefile")
# assert sfile.check(fnmatch=pattern)
def test_relto(self):
l=self.root.join("sampledir", "otherfile")
assert l.relto(self.root) == l.sep.join(["sampledir", "otherfile"])
assert l.check(relto=self.root)
assert self.root.check(notrelto=l)
assert not self.root.check(relto=l)
def test_relto(self, path1):
l=path1.join("sampledir", "otherfile")
assert l.relto(path1) == l.sep.join(["sampledir", "otherfile"])
assert l.check(relto=path1)
assert path1.check(notrelto=l)
assert not path1.check(relto=l)
def test_bestrelpath(self):
curdir = self.root
def test_bestrelpath(self, path1):
curdir = path1
sep = curdir.sep
s = curdir.bestrelpath(curdir.join("hello", "world"))
assert s == "hello" + sep + "world"
@ -128,80 +126,300 @@ class CommonPathTests(object):
assert curdir.bestrelpath("hello") == "hello"
def test_relto_not_relative(self):
l1=self.root.join("bcde")
l2=self.root.join("b")
def test_relto_not_relative(self, path1):
l1=path1.join("bcde")
l2=path1.join("b")
assert not l1.relto(l2)
assert not l2.relto(l1)
def test_listdir(self):
l = self.root.listdir()
assert self.root.join('sampledir') in l
assert self.root.join('samplefile') in l
def test_listdir(self, path1):
l = path1.listdir()
assert path1.join('sampledir') in l
assert path1.join('samplefile') in l
py.test.raises(py.error.ENOTDIR,
"self.root.join('samplefile').listdir()")
"path1.join('samplefile').listdir()")
def test_listdir_fnmatchstring(self):
l = self.root.listdir('s*dir')
def test_listdir_fnmatchstring(self, path1):
l = path1.listdir('s*dir')
assert len(l)
assert l[0], self.root.join('sampledir')
assert l[0], path1.join('sampledir')
def test_listdir_filter(self):
l = self.root.listdir(lambda x: x.check(dir=1))
assert self.root.join('sampledir') in l
assert not self.root.join('samplefile') in l
def test_listdir_filter(self, path1):
l = path1.listdir(lambda x: x.check(dir=1))
assert path1.join('sampledir') in l
assert not path1.join('samplefile') in l
def test_listdir_sorted(self):
l = self.root.listdir(lambda x: x.check(basestarts="sample"), sort=True)
assert self.root.join('sampledir') == l[0]
assert self.root.join('samplefile') == l[1]
assert self.root.join('samplepickle') == l[2]
def test_listdir_sorted(self, path1):
l = path1.listdir(lambda x: x.check(basestarts="sample"), sort=True)
assert path1.join('sampledir') == l[0]
assert path1.join('samplefile') == l[1]
assert path1.join('samplepickle') == l[2]
def test_visit_nofilter(self):
def test_visit_nofilter(self, path1):
l = []
for i in self.root.visit():
l.append(i.relto(self.root))
for i in path1.visit():
l.append(i.relto(path1))
assert "sampledir" in l
assert self.root.sep.join(["sampledir", "otherfile"]) in l
assert path1.sep.join(["sampledir", "otherfile"]) in l
def test_visit_norecurse(self):
def test_visit_norecurse(self, path1):
l = []
for i in self.root.visit(None, lambda x: x.basename != "sampledir"):
l.append(i.relto(self.root))
for i in path1.visit(None, lambda x: x.basename != "sampledir"):
l.append(i.relto(path1))
assert "sampledir" in l
assert not self.root.sep.join(["sampledir", "otherfile"]) in l
assert not path1.sep.join(["sampledir", "otherfile"]) in l
def test_visit_filterfunc_is_string(self):
def test_visit_filterfunc_is_string(self, path1):
l = []
for i in self.root.visit('*dir'):
l.append(i.relto(self.root))
for i in path1.visit('*dir'):
l.append(i.relto(path1))
assert len(l), 2
assert "sampledir" in l
assert "otherdir" in l
def test_visit_ignore(self):
p = self.root.join('nonexisting')
def test_visit_ignore(self, path1):
p = path1.join('nonexisting')
assert list(p.visit(ignore=py.error.ENOENT)) == []
def test_visit_endswith(self):
def test_visit_endswith(self, path1):
l = []
for i in self.root.visit(lambda x: x.check(endswith="file")):
l.append(i.relto(self.root))
assert self.root.sep.join(["sampledir", "otherfile"]) in l
for i in path1.visit(lambda x: x.check(endswith="file")):
l.append(i.relto(path1))
assert path1.sep.join(["sampledir", "otherfile"]) in l
assert "samplefile" in l
def test_endswith(self):
assert self.root.check(notendswith='.py')
x = self.root.join('samplefile')
def test_endswith(self, path1):
assert path1.check(notendswith='.py')
x = path1.join('samplefile')
assert x.check(endswith='file')
def test_cmp(self):
path1 = self.root.join('samplefile')
path2 = self.root.join('samplefile2')
assert cmp(path1, path2) == cmp('samplefile', 'samplefile2')
assert cmp(path1, path1) == 0
def test_cmp(self, path1):
path1 = path1.join('samplefile')
path2 = path1.join('samplefile2')
assert (path1 < path2) == ('samplefile' < 'samplefile2')
assert not (path1 < path1)
def test_simple_read(self):
x = self.root.join('samplefile').read('ru')
def test_simple_read(self, path1):
x = path1.join('samplefile').read('r')
assert x == 'samplefile\n'
def test_join_div_operator(self, path1):
newpath = path1 / '/sampledir' / '/test//'
newpath2 = path1.join('sampledir', 'test')
assert newpath == newpath2
def test_ext(self, path1):
newpath = path1.join('sampledir.ext')
assert newpath.ext == '.ext'
newpath = path1.join('sampledir')
assert not newpath.ext
def test_purebasename(self, path1):
newpath = path1.join('samplefile.py')
assert newpath.purebasename == 'samplefile'
def test_multiple_parts(self, path1):
newpath = path1.join('samplefile.py')
dirname, purebasename, basename, ext = newpath._getbyspec(
'dirname,purebasename,basename,ext')
assert str(path1).endswith(dirname) # be careful with win32 'drive'
assert purebasename == 'samplefile'
assert basename == 'samplefile.py'
assert ext == '.py'
def test_dotted_name_ext(self, path1):
newpath = path1.join('a.b.c')
ext = newpath.ext
assert ext == '.c'
assert newpath.ext == '.c'
def test_newext(self, path1):
newpath = path1.join('samplefile.py')
newext = newpath.new(ext='.txt')
assert newext.basename == "samplefile.txt"
assert newext.purebasename == "samplefile"
def test_readlines(self, path1):
fn = path1.join('samplefile')
contents = fn.readlines()
assert contents == ['samplefile\n']
def test_readlines_nocr(self, path1):
fn = path1.join('samplefile')
contents = fn.readlines(cr=0)
assert contents == ['samplefile', '']
def test_file(self, path1):
assert path1.join('samplefile').check(file=1)
def test_not_file(self, path1):
assert not path1.join("sampledir").check(file=1)
assert path1.join("sampledir").check(file=0)
def test_non_existent(self, path1):
assert path1.join("sampledir.nothere").check(dir=0)
assert path1.join("sampledir.nothere").check(file=0)
assert path1.join("sampledir.nothere").check(notfile=1)
assert path1.join("sampledir.nothere").check(notdir=1)
assert path1.join("sampledir.nothere").check(notexists=1)
assert not path1.join("sampledir.nothere").check(notfile=0)
# pattern = path1.sep.join(['s*file'])
# sfile = path1.join("samplefile")
# assert sfile.check(fnmatch=pattern)
def test_size(self, path1):
url = path1.join("samplefile")
assert url.size() > len("samplefile")
def test_mtime(self, path1):
url = path1.join("samplefile")
assert url.mtime() > 0
def test_relto_wrong_type(self, path1):
py.test.raises(TypeError, "path1.relto(42)")
def test_visit_filesonly(self, path1):
l = []
for i in path1.visit(lambda x: x.check(file=1)):
l.append(i.relto(path1))
assert not "sampledir" in l
assert path1.sep.join(["sampledir", "otherfile"]) in l
def test_load(self, path1):
p = path1.join('samplepickle')
obj = p.load()
assert type(obj) is dict
assert obj.get('answer',None) == 42
def test_visit_nodotfiles(self, path1):
l = []
for i in path1.visit(lambda x: x.check(dotfile=0)):
l.append(i.relto(path1))
assert "sampledir" in l
assert path1.sep.join(["sampledir", "otherfile"]) in l
assert not ".dotfile" in l
def test_endswith(self, path1):
def chk(p):
return p.check(endswith="pickle")
assert not chk(path1)
assert not chk(path1.join('samplefile'))
assert chk(path1.join('somepickle'))
def test_copy_file(self, path1):
otherdir = path1.join('otherdir')
initpy = otherdir.join('__init__.py')
copied = otherdir.join('copied')
initpy.copy(copied)
try:
assert copied.check()
s1 = initpy.read()
s2 = copied.read()
assert s1 == s2
finally:
if copied.check():
copied.remove()
def test_copy_dir(self, path1):
otherdir = path1.join('otherdir')
copied = path1.join('newdir')
try:
otherdir.copy(copied)
assert copied.check(dir=1)
assert copied.join('__init__.py').check(file=1)
s1 = otherdir.join('__init__.py').read()
s2 = copied.join('__init__.py').read()
assert s1 == s2
finally:
if copied.check(dir=1):
copied.remove(rec=1)
def test_remove_file(self, path1):
d = path1.ensure('todeleted')
assert d.check()
d.remove()
assert not d.check()
def test_remove_dir_recursive_by_default(self, path1):
d = path1.ensure('to', 'be', 'deleted')
assert d.check()
p = path1.join('to')
p.remove()
assert not p.check()
def test_mkdir_and_remove(self, path1):
tmpdir = path1
py.test.raises(py.error.EEXIST, tmpdir.mkdir, 'sampledir')
new = tmpdir.join('mktest1')
new.mkdir()
assert new.check(dir=1)
new.remove()
new = tmpdir.mkdir('mktest')
assert new.check(dir=1)
new.remove()
assert tmpdir.join('mktest') == new
def test_move_file(self, path1):
p = path1.join('samplefile')
newp = p.dirpath('moved_samplefile')
p.move(newp)
try:
assert newp.check(file=1)
assert not p.check()
finally:
dp = newp.dirpath()
if hasattr(dp, 'revert'):
dp.revert()
else:
newp.move(p)
assert p.check()
def test_move_directory(self, path1):
source = path1.join('sampledir')
dest = path1.join('moveddir')
source.move(dest)
assert dest.check(dir=1)
assert dest.join('otherfile').check(file=1)
assert not source.join('sampledir').check()
def setuptestfs(path):
if path.join('samplefile').check():
return
#print "setting up test fs for", repr(path)
samplefile = path.ensure('samplefile')
samplefile.write('samplefile\n')
execfile = path.ensure('execfile')
execfile.write('x=42')
execfilepy = path.ensure('execfile.py')
execfilepy.write('x=42')
d = {1:2, 'hello': 'world', 'answer': 42}
path.ensure('samplepickle').dump(d)
sampledir = path.ensure('sampledir', dir=1)
sampledir.ensure('otherfile')
otherdir = path.ensure('otherdir', dir=1)
otherdir.ensure('__init__.py')
module_a = otherdir.ensure('a.py')
if sys.version_info >= (2,6):
module_a.write('from .b import stuff as result\n')
else:
module_a.write('from b import stuff as result\n')
module_b = otherdir.ensure('b.py')
module_b.write('stuff="got it"\n')
module_c = otherdir.ensure('c.py')
module_c.write('''import py;
import otherdir.a
value = otherdir.a.result
''')
module_d = otherdir.ensure('d.py')
module_d.write('''import py;
from otherdir import a
value2 = a.result
''')

View File

@ -0,0 +1,71 @@
import py
from py.__.path import svnwc as svncommon
svnbin = py.path.local.sysfind('svn')
repodump = py.path.local(__file__).dirpath('repotest.dump')
from py.builtin import print_
def pytest_funcarg__repowc1(request):
if svnbin is None:
py.test.skip("svn binary not found")
modname = request.module.__name__
repo, wc = request.cached_setup(
setup=lambda: getrepowc("repo-"+modname, "wc-" + modname),
scope="module",
)
for x in ('test_remove', 'test_move', 'test_status_deleted'):
if request.function.__name__.startswith(x):
_savedrepowc = save_repowc(repo, wc)
request.addfinalizer(lambda: restore_repowc(_savedrepowc))
return repo, wc
def pytest_funcarg__repowc2(request):
name = request.function.__name__
return getrepowc("%s-repo-2" % name, "%s-wc-2" % name)
def getsvnbin():
if svnbin is None:
py.test.skip("svn binary not found")
return svnbin
# make a wc directory out of a given root url
# cache previously obtained wcs!
#
def getrepowc(reponame='basetestrepo', wcname='wc'):
repo = py.test.ensuretemp(reponame)
wcdir = py.test.ensuretemp(wcname)
repo.ensure(dir=1)
py.process.cmdexec('svnadmin create "%s"' %
svncommon._escape_helper(repo))
py.process.cmdexec('svnadmin load -q "%s" <"%s"' %
(svncommon._escape_helper(repo), repodump))
print_("created svn repository", repo)
wcdir.ensure(dir=1)
wc = py.path.svnwc(wcdir)
if py.std.sys.platform == 'win32':
repo = '/' + str(repo).replace('\\', '/')
wc.checkout(url='file://%s' % repo)
print_("checked out new repo into", wc)
return ("file://%s" % repo, wc)
def save_repowc(repo, wc):
repo = py.path.local(repo[len("file://"):])
assert repo.check()
savedrepo = repo.dirpath(repo.basename+".1")
savedwc = wc.dirpath(wc.basename+".1")
repo.copy(savedrepo)
wc.localpath.copy(savedwc.localpath)
return savedrepo, savedwc
def restore_repowc(obj):
savedrepo, savedwc = obj
repo = savedrepo.new(basename=savedrepo.basename[:-2])
assert repo.check()
wc = savedwc.new(basename=savedwc.basename[:-2])
assert wc.check()
wc.localpath.remove()
repo.remove()
savedrepo.move(repo)
savedwc.localpath.move(wc.localpath)

View File

@ -1,223 +0,0 @@
import py
from py.__.path.testing import common
def setuptestfs(path):
if path.join('samplefile').check():
return
#print "setting up test fs for", repr(path)
samplefile = path.ensure('samplefile')
samplefile.write('samplefile\n')
execfile = path.ensure('execfile')
execfile.write('x=42')
execfilepy = path.ensure('execfile.py')
execfilepy.write('x=42')
d = {1:2, 'hello': 'world', 'answer': 42}
path.ensure('samplepickle').dump(d)
sampledir = path.ensure('sampledir', dir=1)
sampledir.ensure('otherfile')
otherdir = path.ensure('otherdir', dir=1)
otherdir.ensure('__init__.py')
module_a = otherdir.ensure('a.py')
module_a.write('from b import stuff as result\n')
module_b = otherdir.ensure('b.py')
module_b.write('stuff="got it"\n')
module_c = otherdir.ensure('c.py')
module_c.write('''import py;
import otherdir.a
value = otherdir.a.result
''')
module_d = otherdir.ensure('d.py')
module_d.write('''import py;
from otherdir import a
value2 = a.result
''')
class CommonFSTests(common.CommonPathTests):
root = None # subclasses have to provide a current 'root' attribute
def test_join_div_operator(self):
newpath = self.root / '/sampledir' / '/test//'
newpath2 = self.root.join('sampledir', 'test')
assert newpath == newpath2
def test_ext(self):
newpath = self.root.join('sampledir.ext')
assert newpath.ext == '.ext'
newpath = self.root.join('sampledir')
assert not newpath.ext
def test_purebasename(self):
newpath = self.root.join('samplefile.py')
assert newpath.purebasename == 'samplefile'
def test_multiple_parts(self):
newpath = self.root.join('samplefile.py')
dirname, purebasename, basename, ext = newpath._getbyspec(
'dirname,purebasename,basename,ext')
assert str(self.root).endswith(dirname) # be careful with win32 'drive'
assert purebasename == 'samplefile'
assert basename == 'samplefile.py'
assert ext == '.py'
def test_dotted_name_ext(self):
newpath = self.root.join('a.b.c')
ext = newpath.ext
assert ext == '.c'
assert newpath.ext == '.c'
def test_newext(self):
newpath = self.root.join('samplefile.py')
newext = newpath.new(ext='.txt')
assert newext.basename == "samplefile.txt"
assert newext.purebasename == "samplefile"
def test_readlines(self):
fn = self.root.join('samplefile')
contents = fn.readlines()
assert contents == ['samplefile\n']
def test_readlines_nocr(self):
fn = self.root.join('samplefile')
contents = fn.readlines(cr=0)
assert contents == ['samplefile', '']
def test_file(self):
assert self.root.join('samplefile').check(file=1)
def test_not_file(self):
assert not self.root.join("sampledir").check(file=1)
assert self.root.join("sampledir").check(file=0)
def test_non_existent(self):
assert self.root.join("sampledir.nothere").check(dir=0)
assert self.root.join("sampledir.nothere").check(file=0)
assert self.root.join("sampledir.nothere").check(notfile=1)
assert self.root.join("sampledir.nothere").check(notdir=1)
assert self.root.join("sampledir.nothere").check(notexists=1)
assert not self.root.join("sampledir.nothere").check(notfile=0)
# pattern = self.root.sep.join(['s*file'])
# sfile = self.root.join("samplefile")
# assert sfile.check(fnmatch=pattern)
def test_size(self):
url = self.root.join("samplefile")
assert url.size() > len("samplefile")
def test_mtime(self):
url = self.root.join("samplefile")
assert url.mtime() > 0
def test_relto_wrong_type(self):
py.test.raises(TypeError, "self.root.relto(42)")
def test_visit_filesonly(self):
l = []
for i in self.root.visit(lambda x: x.check(file=1)):
l.append(i.relto(self.root))
assert not "sampledir" in l
assert self.root.sep.join(["sampledir", "otherfile"]) in l
def test_load(self):
p = self.root.join('samplepickle')
obj = p.load()
assert type(obj) is dict
assert obj.get('answer',None) == 42
def test_visit_nodotfiles(self):
l = []
for i in self.root.visit(lambda x: x.check(dotfile=0)):
l.append(i.relto(self.root))
assert "sampledir" in l
assert self.root.sep.join(["sampledir", "otherfile"]) in l
assert not ".dotfile" in l
def test_endswith(self):
def chk(p):
return p.check(endswith="pickle")
assert not chk(self.root)
assert not chk(self.root.join('samplefile'))
assert chk(self.root.join('somepickle'))
def test_copy_file(self):
otherdir = self.root.join('otherdir')
initpy = otherdir.join('__init__.py')
copied = otherdir.join('copied')
initpy.copy(copied)
try:
assert copied.check()
s1 = initpy.read()
s2 = copied.read()
assert s1 == s2
finally:
if copied.check():
copied.remove()
def test_copy_dir(self):
otherdir = self.root.join('otherdir')
copied = self.root.join('newdir')
try:
otherdir.copy(copied)
assert copied.check(dir=1)
assert copied.join('__init__.py').check(file=1)
s1 = otherdir.join('__init__.py').read()
s2 = copied.join('__init__.py').read()
assert s1 == s2
finally:
if copied.check(dir=1):
copied.remove(rec=1)
def test_remove_file(self):
d = self.root.ensure('todeleted')
assert d.check()
d.remove()
assert not d.check()
def test_remove_dir_recursive_by_default(self):
d = self.root.ensure('to', 'be', 'deleted')
assert d.check()
p = self.root.join('to')
p.remove()
assert not p.check()
def test_mkdir_and_remove(self):
tmpdir = self.root
py.test.raises(py.error.EEXIST, tmpdir.mkdir, 'sampledir')
new = tmpdir.join('mktest1')
new.mkdir()
assert new.check(dir=1)
new.remove()
new = tmpdir.mkdir('mktest')
assert new.check(dir=1)
new.remove()
assert tmpdir.join('mktest') == new
def test_move_file(self):
p = self.root.join('samplefile')
newp = p.dirpath('moved_samplefile')
p.move(newp)
try:
assert newp.check(file=1)
assert not p.check()
finally:
dp = newp.dirpath()
if hasattr(dp, 'revert'):
dp.revert()
else:
newp.move(p)
assert p.check()
def test_move_directory(self):
source = self.root.join('sampledir')
dest = self.root.join('moveddir')
source.move(dest)
assert dest.check(dir=1)
assert dest.join('otherfile').check(file=1)
assert not source.join('sampledir').check()

View File

@ -1,187 +1,31 @@
import sys
import py
from py import path, test, process
from py.__.path.testing.fscommon import CommonFSTests, setuptestfs
from py.__.path.testing.common import CommonFSTests
from py.__.path import svnwc as svncommon
from py.builtin import print_
repodump = py.path.local(__file__).dirpath('repotest.dump')
def getsvnbin():
svnbin = py.path.local.sysfind('svn')
if svnbin is None:
py.test.skip("svn binary not found")
return svnbin
# make a wc directory out of a given root url
# cache previously obtained wcs!
#
def getrepowc(reponame='basetestrepo', wcname='wc'):
repo = py.test.ensuretemp(reponame)
wcdir = py.test.ensuretemp(wcname)
if not repo.listdir():
#assert not wcdir.check()
repo.ensure(dir=1)
py.process.cmdexec('svnadmin create "%s"' %
svncommon._escape_helper(repo))
py.process.cmdexec('svnadmin load -q "%s" <"%s"' %
(svncommon._escape_helper(repo), repodump))
print_("created svn repository", repo)
wcdir.ensure(dir=1)
wc = py.path.svnwc(wcdir)
if py.std.sys.platform == 'win32':
repo = '/' + str(repo).replace('\\', '/')
wc.checkout(url='file://%s' % repo)
print_("checked out new repo into", wc)
else:
print_("using repository at", repo)
wc = py.path.svnwc(wcdir)
return ("file://%s" % repo, wc)
def save_repowc():
repo, wc = getrepowc()
repo = py.path.local(repo[len("file://"):])
assert repo.check()
savedrepo = repo.dirpath('repo_save')
savedwc = wc.dirpath('wc_save')
repo.copy(savedrepo)
wc.localpath.copy(savedwc.localpath)
return savedrepo, savedwc
def restore_repowc(obj):
savedrepo, savedwc = obj
repo, wc = getrepowc()
print (repo)
print (repo[len("file://"):])
repo = py.path.local(repo[len("file://"):])
print (repo)
assert repo.check()
# repositories have read only files on windows
#repo.chmod(0777, rec=True)
repo.remove()
wc.localpath.remove()
savedrepo.move(repo)
savedwc.localpath.move(wc.localpath)
# create an empty repository for testing purposes and return the url to it
def make_test_repo(name="test-repository"):
getsvnbin()
repo = py.test.ensuretemp(name)
try:
py.process.cmdexec('svnadmin create %s' % repo)
except:
repo.remove()
raise
if sys.platform == 'win32':
repo = '/' + str(repo).replace('\\', '/')
return py.path.svnurl("file://%s" % repo)
class CommonSvnTests(CommonFSTests):
def setup_method(self, meth):
bn = meth.__name__
for x in 'test_remove', 'test_move', 'test_status_deleted':
if bn.startswith(x):
self._savedrepowc = save_repowc()
def teardown_method(self, meth):
x = getattr(self, '_savedrepowc', None)
if x is not None:
restore_repowc(x)
del self._savedrepowc
def test_propget(self):
url = self.root.join("samplefile")
def test_propget(self, path1):
url = path1.join("samplefile")
value = url.propget('svn:eol-style')
assert value == 'native'
def test_proplist(self):
url = self.root.join("samplefile")
def test_proplist(self, path1):
url = path1.join("samplefile")
res = url.proplist()
assert res['svn:eol-style'] == 'native'
def test_info(self):
url = self.root.join("samplefile")
def test_info(self, path1):
url = path1.join("samplefile")
res = url.info()
assert res.size > len("samplefile") and res.created_rev >= 0
def test_log_simple(self):
url = self.root.join("samplefile")
def test_log_simple(self, path1):
url = path1.join("samplefile")
logentries = url.log()
for logentry in logentries:
assert logentry.rev == 1
assert hasattr(logentry, 'author')
assert hasattr(logentry, 'date')
class CommonCommandAndBindingTests(CommonSvnTests):
def test_trailing_slash_is_stripped(self):
# XXX we need to test more normalizing properties
url = self.root.join("/")
assert self.root == url
#def test_different_revs_compare_unequal(self):
# newpath = self.root.new(rev=1199)
# assert newpath != self.root
def test_exists_svn_root(self):
assert self.root.check()
#def test_not_exists_rev(self):
# url = self.root.__class__(self.rooturl, rev=500)
# assert url.check(exists=0)
#def test_nonexisting_listdir_rev(self):
# url = self.root.__class__(self.rooturl, rev=500)
# raises(py.error.ENOENT, url.listdir)
#def test_newrev(self):
# url = self.root.new(rev=None)
# assert url.rev == None
# assert url.strpath == self.root.strpath
# url = self.root.new(rev=10)
# assert url.rev == 10
#def test_info_rev(self):
# url = self.root.__class__(self.rooturl, rev=1155)
# url = url.join("samplefile")
# res = url.info()
# assert res.size > len("samplefile") and res.created_rev == 1155
# the following tests are easier if we have a path class
def test_repocache_simple(self):
repocache = svncommon.RepoCache()
repocache.put(self.root.strpath, 42)
url, rev = repocache.get(self.root.join('test').strpath)
assert rev == 42
assert url == self.root.strpath
def test_repocache_notimeout(self):
repocache = svncommon.RepoCache()
repocache.timeout = 0
repocache.put(self.root.strpath, self.root.rev)
url, rev = repocache.get(self.root.strpath)
assert rev == -1
assert url == self.root.strpath
def test_repocache_outdated(self):
repocache = svncommon.RepoCache()
repocache.put(self.root.strpath, 42, timestamp=0)
url, rev = repocache.get(self.root.join('test').strpath)
assert rev == -1
assert url == self.root.strpath
def _test_getreporev(self):
""" this test runs so slow it's usually disabled """
old = svncommon.repositories.repos
try:
_repocache.clear()
root = self.root.new(rev=-1)
url, rev = cache.repocache.get(root.strpath)
assert rev>=0
assert url == svnrepourl
finally:
repositories.repos = old
#cache.repositories.put(svnrepourl, 1200, 0)

View File

@ -32,14 +32,18 @@ class TestBuildcostAccess(BasicCacheAPITest):
# result into time()-time() == 0 which makes the below
# test fail randomly. Let's rather use incrementing
# numbers instead.
monkeypatch.setattr(cacheutil, 'gettime',
py.std.itertools.count().next)
l = [0]
def counter():
l[0] = l[0] + 1
return l[0]
monkeypatch.setattr(cacheutil, 'gettime', counter)
for x in range(cache.maxentries):
y = cache.getorbuild(x, lambda: x)
assert x == y
for x in range(cache.maxentries):
assert cache.getorbuild(x, None) == x
for x in range(cache.maxentries/2):
halfentries = int(cache.maxentries / 2)
for x in range(halfentries):
assert cache.getorbuild(x, None) == x
assert cache.getorbuild(x, None) == x
# evict one entry
@ -47,7 +51,7 @@ class TestBuildcostAccess(BasicCacheAPITest):
assert val == 42
# check that recently used ones are still there
# and are not build again
for x in range(cache.maxentries/2):
for x in range(halfentries):
assert cache.getorbuild(x, None) == x
assert cache.getorbuild(-1, None) == 42

View File

@ -1,53 +1,71 @@
import py
import sys
from py.path import local
from py.__.path.testing.fscommon import CommonFSTests, setuptestfs
from py.__.path.testing import common
class LocalSetup:
def setup_class(cls):
cls.root = py.test.ensuretemp(cls.__name__)
cls.root.ensure(dir=1)
setuptestfs(cls.root)
def pytest_funcarg__path1(request):
def setup():
path1 = request.config.mktemp("path1")
common.setuptestfs(path1)
return path1
def teardown(path1):
# post check
assert path1.join("samplefile").check()
return request.cached_setup(setup, teardown, scope="session")
def setup_method(self, method):
self.tmpdir = self.root.mkdir(method.__name__)
def pytest_funcarg__tmpdir(request):
basedir = request.config.getbasetemp()
if request.cls:
try:
basedir = basedir.mkdir(request.cls.__name__)
except py.error.EEXIST:
pass
for i in range(1000):
name = request.function.__name__
if i > 0:
name += str(i)
try:
return basedir.mkdir(name)
except py.error.EEXIST:
continue
raise ValueError("could not create tempdir")
def teardown_method(self, method):
assert self.root.join("samplefile").check()
class TestLocalPath(common.CommonFSTests):
def test_join_normpath(self, tmpdir):
assert tmpdir.join(".") == tmpdir
p = tmpdir.join("../%s" % tmpdir.basename)
assert p == tmpdir
p = tmpdir.join("..//%s/" % tmpdir.basename)
assert p == tmpdir
class TestLocalPath(LocalSetup, CommonFSTests):
def test_join_normpath(self):
assert self.tmpdir.join(".") == self.tmpdir
p = self.tmpdir.join("../%s" % self.tmpdir.basename)
assert p == self.tmpdir
p = self.tmpdir.join("..//%s/" % self.tmpdir.basename)
assert p == self.tmpdir
def test_gethash(self):
import md5
import sha
fn = self.tmpdir.join("testhashfile")
fn.write("hello")
assert fn.computehash("md5") == md5.md5("hello").hexdigest()
#assert fn.computehash("sha") == sha.sha("hello").hexdigest()
if sys.version_info >= (2,4):
assert fn.computehash("sha1") == sha.sha("hello").hexdigest()
def test_gethash(self, tmpdir):
try:
from md5 import md5
from sha import sha
except ImportError:
from hashlib import md5 as md5
from hashlib import sha1 as sha
fn = tmpdir.join("testhashfile")
data = 'hello'.encode('ascii')
fn.write(data, mode="wb")
assert fn.computehash("md5") == md5(data).hexdigest()
assert fn.computehash("sha1") == sha(data).hexdigest()
py.test.raises(ValueError, fn.computehash, "asdasd")
def test_remove_removes_readonly_file(self):
readonly_file = self.tmpdir.join('readonly').ensure()
def test_remove_removes_readonly_file(self, tmpdir):
readonly_file = tmpdir.join('readonly').ensure()
readonly_file.chmod(0)
readonly_file.remove()
assert not readonly_file.check(exists=1)
def test_remove_removes_readonly_dir(self):
readonly_dir = self.tmpdir.join('readonlydir').ensure(dir=1)
def test_remove_removes_readonly_dir(self, tmpdir):
readonly_dir = tmpdir.join('readonlydir').ensure(dir=1)
readonly_dir.chmod(int("500", 8))
readonly_dir.remove()
assert not readonly_dir.check(exists=1)
def test_remove_removes_dir_and_readonly_file(self):
readonly_dir = self.tmpdir.join('readonlydir').ensure(dir=1)
def test_remove_removes_dir_and_readonly_file(self, tmpdir):
readonly_dir = tmpdir.join('readonlydir').ensure(dir=1)
readonly_file = readonly_dir.join('readonlyfile').ensure()
readonly_file.chmod(0)
readonly_dir.remove()
@ -56,42 +74,34 @@ class TestLocalPath(LocalSetup, CommonFSTests):
def test_initialize_curdir(self):
assert str(local()) == py.std.os.getcwd()
def test_initialize_reldir(self):
old = self.root.chdir()
def test_initialize_reldir(self, path1):
old = path1.chdir()
try:
p = local('samplefile')
assert p.check()
finally:
old.chdir()
def test_eq_with_strings(self):
path1 = self.root.join('sampledir')
def test_eq_with_strings(self, path1):
path1 = path1.join('sampledir')
path2 = str(path1)
assert path1 == path2
assert path2 == path1
path3 = self.root.join('samplefile')
path3 = path1.join('samplefile')
assert path3 != path2
assert path2 != path3
def test_dump(self):
import tempfile
for bin in 0, 1:
try:
fd, name = tempfile.mkstemp()
f = py.std.os.fdopen(fd)
except AttributeError:
name = tempfile.mktemp()
f = open(name, 'w+')
try:
d = {'answer' : 42}
path = local(name)
path.dump(d, bin=bin)
from cPickle import load
dnew = load(f)
assert d == dnew
finally:
f.close()
py.std.os.remove(name)
@py.test.mark.multi(bin=(False, True))
def test_dump(self, tmpdir, bin):
path = tmpdir.join("dumpfile%s" % int(bin))
try:
d = {'answer' : 42}
path.dump(d, bin=bin)
f = path.open('rb+')
dnew = py.builtin.pickle.load(f)
assert d == dnew
finally:
f.close()
def test_setmtime(self):
import tempfile
@ -113,9 +123,9 @@ class TestLocalPath(LocalSetup, CommonFSTests):
finally:
py.std.os.remove(name)
def test_normpath(self):
new1 = self.root.join("/otherdir")
new2 = self.root.join("otherdir")
def test_normpath(self, path1):
new1 = path1.join("/otherdir")
new2 = path1.join("otherdir")
assert str(new1) == str(new2)
def test_mkdtemp_creation(self):
@ -134,8 +144,7 @@ class TestLocalPath(LocalSetup, CommonFSTests):
finally:
d.remove(rec=1)
def test_chdir(self):
tmpdir = self.tmpdir.realpath()
def test_chdir(self, tmpdir):
old = local()
try:
res = tmpdir.chdir()
@ -144,30 +153,28 @@ class TestLocalPath(LocalSetup, CommonFSTests):
finally:
old.chdir()
def test_ensure_filepath_withdir(self):
tmpdir = self.tmpdir
def test_ensure_filepath_withdir(self, tmpdir):
newfile = tmpdir.join('test1','test')
newfile.ensure()
assert newfile.check(file=1)
newfile.write("42")
newfile.ensure()
assert newfile.read() == "42"
s = newfile.read()
assert s == "42"
def test_ensure_filepath_withoutdir(self):
tmpdir = self.tmpdir
def test_ensure_filepath_withoutdir(self, tmpdir):
newfile = tmpdir.join('test1file')
t = newfile.ensure()
assert t == newfile
assert newfile.check(file=1)
def test_ensure_dirpath(self):
tmpdir = self.tmpdir
def test_ensure_dirpath(self, tmpdir):
newfile = tmpdir.join('test1','testfile')
t = newfile.ensure(dir=1)
assert t == newfile
assert newfile.check(dir=1)
def test_init_from_path(self):
def test_init_from_path(self, tmpdir):
l = local()
l2 = local(l)
assert l2 is l
@ -178,12 +185,11 @@ class TestLocalPath(LocalSetup, CommonFSTests):
assert l3.strpath == wc.strpath
assert not hasattr(l3, 'commit')
def test_long_filenames(self):
def test_long_filenames(self, tmpdir):
if sys.platform == "win32":
py.test.skip("win32: work around needed for path length limit")
# see http://codespeak.net/pipermail/py-dev/2008q2/000922.html
tmpdir = self.tmpdir
# testing paths > 260 chars (which is Windows' limitation, but
# depending on how the paths are used), but > 4096 (which is the
# Linux' limitation) - the behaviour of paths with names > 4096 chars
@ -195,7 +201,7 @@ class TestLocalPath(LocalSetup, CommonFSTests):
l2 = tmpdir.join(newfilename)
assert l2.read() == 'foo'
class TestExecutionOnWindows(LocalSetup):
class TestExecutionOnWindows:
disabled = py.std.sys.platform != 'win32'
def test_sysfind(self):
@ -203,7 +209,7 @@ class TestExecutionOnWindows(LocalSetup):
assert x.check(file=1)
assert py.path.local.sysfind('jaksdkasldqwe') is None
class TestExecution(LocalSetup):
class TestExecution:
disabled = py.std.sys.platform == 'win32'
def test_sysfind(self):
@ -211,20 +217,11 @@ class TestExecution(LocalSetup):
assert x.check(file=1)
assert py.path.local.sysfind('jaksdkasldqwe') is None
def test_sysfind_no_permisson(self):
dir = py.test.ensuretemp('sysfind')
env = py.std.os.environ
oldpath = env['PATH']
try:
noperm = dir.ensure('noperm', dir=True)
env['PATH'] += ":%s" % (noperm)
noperm.chmod(0)
assert py.path.local.sysfind('a') is None
finally:
env['PATH'] = oldpath
noperm.chmod(int("644", 8))
noperm.remove()
def test_sysfind_no_permisson_ignored(self, monkeypatch, tmpdir):
noperm = tmpdir.ensure('noperm', dir=True)
monkeypatch.setenv("PATH", noperm, prepend=":")
noperm.chmod(0)
assert py.path.local.sysfind('jaksdkasldqwe') is None
def test_sysfind_absolute(self):
x = py.path.local.sysfind('test')
@ -233,23 +230,18 @@ class TestExecution(LocalSetup):
assert y.check(file=1)
assert y == x
def test_sysfind_multiple(self):
dir = py.test.ensuretemp('sysfind')
env = py.std.os.environ
oldpath = env['PATH']
try:
env['PATH'] += ":%s:%s" % (dir.ensure('a'),
dir.join('b'))
dir.ensure('b', 'a')
checker = lambda x: x.dirpath().basename == 'b'
x = py.path.local.sysfind('a', checker=checker)
assert x.basename == 'a'
assert x.dirpath().basename == 'b'
checker = lambda x: None
assert py.path.local.sysfind('a', checker=checker) is None
finally:
env['PATH'] = oldpath
#dir.remove()
def test_sysfind_multiple(self, tmpdir, monkeypatch):
monkeypatch.setenv('PATH',
"%s:%s" % (tmpdir.ensure('a'),
tmpdir.join('b')),
prepend=":")
tmpdir.ensure('b', 'a')
checker = lambda x: x.dirpath().basename == 'b'
x = py.path.local.sysfind('a', checker=checker)
assert x.basename == 'a'
assert x.dirpath().basename == 'b'
checker = lambda x: None
assert py.path.local.sysfind('a', checker=checker) is None
def test_sysexec(self):
x = py.path.local.sysfind('ls')
@ -263,11 +255,10 @@ class TestExecution(LocalSetup):
x.sysexec('aksjdkasjd')
""")
def test_make_numbered_dir(self):
root = self.tmpdir
root.ensure('base.not_an_int', dir=1)
def test_make_numbered_dir(self, tmpdir):
tmpdir.ensure('base.not_an_int', dir=1)
for i in range(10):
numdir = local.make_numbered_dir(prefix='base.', rootdir=root,
numdir = local.make_numbered_dir(prefix='base.', rootdir=tmpdir,
keep=2, lock_timeout=0)
assert numdir.check()
assert numdir.basename == 'base.%d' %i
@ -278,62 +269,60 @@ class TestExecution(LocalSetup):
if i>=3:
assert not numdir.new(ext=str(i-3)).check()
def test_locked_make_numbered_dir(self):
root = self.tmpdir
def test_locked_make_numbered_dir(self, tmpdir):
for i in range(10):
numdir = local.make_numbered_dir(prefix='base2.', rootdir=root,
numdir = local.make_numbered_dir(prefix='base2.', rootdir=tmpdir,
keep=2)
assert numdir.check()
assert numdir.basename == 'base2.%d' %i
for j in range(i):
assert numdir.new(ext=str(j)).check()
def test_error_preservation(self):
py.test.raises (EnvironmentError, self.root.join('qwoeqiwe').mtime)
py.test.raises (EnvironmentError, self.root.join('qwoeqiwe').read)
def test_error_preservation(self, path1):
py.test.raises (EnvironmentError, path1.join('qwoeqiwe').mtime)
py.test.raises (EnvironmentError, path1.join('qwoeqiwe').read)
#def test_parentdirmatch(self):
# local.parentdirmatch('std', startmodule=__name__)
#
# importing tests
def test_pyimport(self):
obj = self.root.join('execfile.py').pyimport()
class TestImport:
def test_pyimport(self, path1):
obj = path1.join('execfile.py').pyimport()
assert obj.x == 42
assert obj.__name__ == 'execfile'
def test_pyimport_execfile_different_name(self):
obj = self.root.join('execfile.py').pyimport(modname="0x.y.z")
def test_pyimport_execfile_different_name(self, path1):
obj = path1.join('execfile.py').pyimport(modname="0x.y.z")
assert obj.x == 42
assert obj.__name__ == '0x.y.z'
def test_pyimport_a(self):
otherdir = self.root.join('otherdir')
def test_pyimport_a(self, path1):
otherdir = path1.join('otherdir')
mod = otherdir.join('a.py').pyimport()
assert mod.result == "got it"
assert mod.__name__ == 'otherdir.a'
def test_pyimport_b(self):
otherdir = self.root.join('otherdir')
def test_pyimport_b(self, path1):
otherdir = path1.join('otherdir')
mod = otherdir.join('b.py').pyimport()
assert mod.stuff == "got it"
assert mod.__name__ == 'otherdir.b'
def test_pyimport_c(self):
otherdir = self.root.join('otherdir')
def test_pyimport_c(self, path1):
otherdir = path1.join('otherdir')
mod = otherdir.join('c.py').pyimport()
assert mod.value == "got it"
def test_pyimport_d(self):
otherdir = self.root.join('otherdir')
def test_pyimport_d(self, path1):
otherdir = path1.join('otherdir')
mod = otherdir.join('d.py').pyimport()
assert mod.value2 == "got it"
def test_pyimport_and_import(self):
# XXX maybe a bit of a fragile test ...
p = py.test.ensuretemp("pyimport")
p.ensure('xxxpackage', '__init__.py')
mod1path = p.ensure('xxxpackage', 'module1.py')
def test_pyimport_and_import(self, tmpdir):
tmpdir.ensure('xxxpackage', '__init__.py')
mod1path = tmpdir.ensure('xxxpackage', 'module1.py')
mod1 = mod1path.pyimport()
assert mod1.__name__ == 'xxxpackage.module1'
from xxxpackage import module1
@ -355,48 +344,41 @@ class TestWINLocalPath:
#root = local(TestLocalPath.root)
disabled = py.std.sys.platform != 'win32'
def setup_class(cls):
cls.root = py.test.ensuretemp(cls.__name__)
def setup_method(self, method):
name = method.im_func.func_name
self.tmpdir = self.root.ensure(name, dir=1)
def test_owner_group_not_implemented(self):
py.test.raises(NotImplementedError, "self.root.stat().owner")
py.test.raises(NotImplementedError, "self.root.stat().group")
py.test.raises(NotImplementedError, "path1.stat().owner")
py.test.raises(NotImplementedError, "path1.stat().group")
def test_chmod_simple_int(self):
py.builtin.print_("self.root is", self.root)
mode = self.root.stat().mode
py.builtin.print_("path1 is", path1)
mode = path1.stat().mode
# Ensure that we actually change the mode to something different.
self.root.chmod(mode == 0 and 1 or 0)
path1.chmod(mode == 0 and 1 or 0)
try:
print(self.root.stat().mode)
print(path1.stat().mode)
print(mode)
assert self.root.stat().mode != mode
assert path1.stat().mode != mode
finally:
self.root.chmod(mode)
assert self.root.stat().mode == mode
path1.chmod(mode)
assert path1.stat().mode == mode
def test_path_comparison_lowercase_mixed(self):
t1 = self.root.join("a_path")
t2 = self.root.join("A_path")
t1 = path1.join("a_path")
t2 = path1.join("A_path")
assert t1 == t1
assert t1 == t2
def test_relto_with_mixed_case(self):
t1 = self.root.join("a_path", "fiLe")
t2 = self.root.join("A_path")
t1 = path1.join("a_path", "fiLe")
t2 = path1.join("A_path")
assert t1.relto(t2) == "fiLe"
def test_allow_unix_style_paths(self):
t1 = self.root.join('a_path')
assert t1 == str(self.root) + '\\a_path'
t1 = self.root.join('a_path/')
assert t1 == str(self.root) + '\\a_path'
t1 = self.root.join('dir/a_path')
assert t1 == str(self.root) + '\\dir\\a_path'
t1 = path1.join('a_path')
assert t1 == str(path1) + '\\a_path'
t1 = path1.join('a_path/')
assert t1 == str(path1) + '\\a_path'
t1 = path1.join('dir/a_path')
assert t1 == str(path1) + '\\dir\\a_path'
def test_sysfind_in_currentdir(self):
cmd = py.path.local.sysfind('cmd')
@ -411,20 +393,12 @@ class TestWINLocalPath:
class TestPOSIXLocalPath:
disabled = py.std.sys.platform == 'win32'
def setup_class(cls):
cls.root = py.test.ensuretemp(cls.__name__)
def setup_method(self, method):
name = method.im_func.func_name
self.tmpdir = self.root.ensure(name, dir=1)
def test_samefile(self):
assert self.tmpdir.samefile(self.tmpdir)
p = self.tmpdir.ensure("hello")
def test_samefile(self, tmpdir):
assert tmpdir.samefile(tmpdir)
p = tmpdir.ensure("hello")
assert p.samefile(p)
def test_hardlink(self):
tmpdir = self.tmpdir
def test_hardlink(self, tmpdir):
linkpath = tmpdir.join('test')
filepath = tmpdir.join('file')
filepath.write("Hello")
@ -432,16 +406,14 @@ class TestPOSIXLocalPath:
linkpath.mklinkto(filepath)
assert filepath.stat().nlink == nlink + 1
def test_symlink_are_identical(self):
tmpdir = self.tmpdir
def test_symlink_are_identical(self, tmpdir):
filepath = tmpdir.join('file')
filepath.write("Hello")
linkpath = tmpdir.join('test')
linkpath.mksymlinkto(filepath)
assert linkpath.readlink() == str(filepath)
def test_symlink_isfile(self):
tmpdir = self.tmpdir
def test_symlink_isfile(self, tmpdir):
linkpath = tmpdir.join('test')
filepath = tmpdir.join('file')
filepath.write("")
@ -449,8 +421,7 @@ class TestPOSIXLocalPath:
assert linkpath.check(file=1)
assert not linkpath.check(link=0, file=1)
def test_symlink_relative(self):
tmpdir = self.tmpdir
def test_symlink_relative(self, tmpdir):
linkpath = tmpdir.join('test')
filepath = tmpdir.join('file')
filepath.write("Hello")
@ -458,40 +429,35 @@ class TestPOSIXLocalPath:
assert linkpath.readlink() == "file"
assert filepath.read() == linkpath.read()
def test_symlink_not_existing(self):
tmpdir = self.tmpdir
def test_symlink_not_existing(self, tmpdir):
linkpath = tmpdir.join('testnotexisting')
assert not linkpath.check(link=1)
assert linkpath.check(link=0)
def test_relto_with_root(self):
y = self.root.join('x').relto(py.path.local('/'))
assert y[0] == str(self.root)[1]
def test_relto_with_root(self, path1, tmpdir):
y = path1.join('x').relto(py.path.local('/'))
assert y[0] == str(path1)[1]
def test_visit_recursive_symlink(self):
tmpdir = self.tmpdir
def test_visit_recursive_symlink(self, tmpdir):
linkpath = tmpdir.join('test')
linkpath.mksymlinkto(tmpdir)
visitor = tmpdir.visit(None, lambda x: x.check(link=0))
assert list(visitor) == [linkpath]
def test_symlink_isdir(self):
tmpdir = self.tmpdir
def test_symlink_isdir(self, tmpdir):
linkpath = tmpdir.join('test')
linkpath.mksymlinkto(tmpdir)
assert linkpath.check(dir=1)
assert not linkpath.check(link=0, dir=1)
def test_symlink_remove(self):
tmpdir = self.tmpdir.realpath()
def test_symlink_remove(self, tmpdir):
linkpath = tmpdir.join('test')
linkpath.mksymlinkto(linkpath) # point to itself
assert linkpath.check(link=1)
linkpath.remove()
assert not linkpath.check()
def test_realpath_file(self):
tmpdir = self.tmpdir
def test_realpath_file(self, tmpdir):
linkpath = tmpdir.join('test')
filepath = tmpdir.join('file')
filepath.write("")
@ -499,11 +465,11 @@ class TestPOSIXLocalPath:
realpath = linkpath.realpath()
assert realpath.basename == 'file'
def test_owner(self):
def test_owner(self, path1, tmpdir):
from pwd import getpwuid
from grp import getgrgid
stat = self.root.stat()
assert stat.path == self.root
stat = path1.stat()
assert stat.path == path1
uid = stat.uid
gid = stat.gid
@ -515,9 +481,9 @@ class TestPOSIXLocalPath:
assert gid == stat.gid
assert group == stat.group
def test_atime(self):
def test_atime(self, tmpdir):
import time
path = self.root.ensure('samplefile')
path = tmpdir.ensure('samplefile')
now = time.time()
atime1 = path.atime()
# we could wait here but timer resolution is very
@ -527,72 +493,70 @@ class TestPOSIXLocalPath:
duration = time.time() - now
assert (atime2-atime1) <= duration
def test_commondir(self):
def test_commondir(self, path1):
# XXX This is here in local until we find a way to implement this
# using the subversion command line api.
p1 = self.root.join('something')
p2 = self.root.join('otherthing')
assert p1.common(p2) == self.root
assert p2.common(p1) == self.root
p1 = path1.join('something')
p2 = path1.join('otherthing')
assert p1.common(p2) == path1
assert p2.common(p1) == path1
def test_commondir_nocommon(self):
def test_commondir_nocommon(self, path1):
# XXX This is here in local until we find a way to implement this
# using the subversion command line api.
p1 = self.root.join('something')
p2 = py.path.local(self.root.sep+'blabla')
p1 = path1.join('something')
p2 = py.path.local(path1.sep+'blabla')
assert p1.common(p2) == '/'
def test_join_to_root(self):
root = self.root.parts()[0]
def test_join_to_root(self, path1):
root = path1.parts()[0]
assert len(str(root)) == 1
assert str(root.join('a')) == '/a'
def test_join_root_to_root_with_no_abs(self):
nroot = self.root.join('/')
assert str(self.root) == str(nroot)
assert self.root == nroot
def test_join_root_to_root_with_no_abs(self, path1):
nroot = path1.join('/')
assert str(path1) == str(nroot)
assert path1 == nroot
def test_chmod_simple_int(self):
py.builtin.print_("self.root is", self.root)
mode = self.root.stat().mode
self.root.chmod(mode/2)
def test_chmod_simple_int(self, path1):
mode = path1.stat().mode
path1.chmod(int(mode/2))
try:
assert self.root.stat().mode != mode
assert path1.stat().mode != mode
finally:
self.root.chmod(mode)
assert self.root.stat().mode == mode
path1.chmod(mode)
assert path1.stat().mode == mode
def test_chmod_rec_int(self):
def test_chmod_rec_int(self, path1):
# XXX fragile test
py.builtin.print_("self.root is", self.root)
recfilter = lambda x: x.check(dotfile=0, link=0)
oldmodes = {}
for x in self.root.visit(rec=recfilter):
for x in path1.visit(rec=recfilter):
oldmodes[x] = x.stat().mode
self.root.chmod(int("772", 8), rec=recfilter)
path1.chmod(int("772", 8), rec=recfilter)
try:
for x in self.root.visit(rec=recfilter):
for x in path1.visit(rec=recfilter):
assert x.stat().mode & int("777", 8) == int("772", 8)
finally:
for x,y in oldmodes.items():
x.chmod(y)
def test_chown_identity(self):
owner = self.root.stat().owner
group = self.root.stat().group
self.root.chown(owner, group)
def test_chown_identity(self, path1):
owner = path1.stat().owner
group = path1.stat().group
path1.chown(owner, group)
def test_chown_dangling_link(self):
owner = self.root.stat().owner
group = self.root.stat().group
x = self.root.join('hello')
def test_chown_dangling_link(self, path1):
owner = path1.stat().owner
group = path1.stat().group
x = path1.join('hello')
x.mksymlinkto('qlwkejqwlek')
try:
self.root.chown(owner, group, rec=1)
path1.chown(owner, group, rec=1)
finally:
x.remove(rec=0)
def test_chown_identity_rec_mayfail(self):
owner = self.root.stat().owner
group = self.root.stat().group
self.root.chown(owner, group)
def test_chown_identity_rec_mayfail(self, path1):
owner = path1.stat().owner
group = path1.stat().group
path1.chown(owner, group)

View File

@ -1,8 +1,8 @@
import py
from py.__.path.testing import svntestbase
from py.path import SvnAuth
import svntestbase
from threading import Thread
import time
import sys
def make_repo_auth(repo, userdata):
""" write config to repo
@ -34,6 +34,7 @@ def serve_bg(repopath):
while port < 10010:
cmd = 'svnserve -d -T --listen-port=%d --pid-file=%s -r %s' % (
port, pidfile, repopath)
print(cmd)
try:
py.process.cmdexec(cmd)
except py.process.cmdexec.Error:
@ -41,43 +42,47 @@ def serve_bg(repopath):
else:
# XXX we assume here that the pid file gets written somewhere, I
# guess this should be relatively safe... (I hope, at least?)
while True:
pid = pidfile.read()
counter = pid = 0
while counter < 10:
counter += 1
try:
pid = pidfile.read()
except py.error.ENOENT:
pass
if pid:
break
# needs a bit more time to boot
time.sleep(0.1)
time.sleep(0.2)
return port, int(pid)
port += 1
raise IOError('could not start svnserve: %s' % (e,))
class TestSvnAuth(object):
def test_basic(self):
auth = py.path.SvnAuth('foo', 'bar')
auth = SvnAuth('foo', 'bar')
assert auth.username == 'foo'
assert auth.password == 'bar'
assert str(auth)
def test_makecmdoptions_uname_pw_makestr(self):
auth = py.path.SvnAuth('foo', 'bar')
auth = SvnAuth('foo', 'bar')
assert auth.makecmdoptions() == '--username="foo" --password="bar"'
def test_makecmdoptions_quote_escape(self):
auth = py.path.SvnAuth('fo"o', '"ba\'r"')
auth = SvnAuth('fo"o', '"ba\'r"')
assert auth.makecmdoptions() == '--username="fo\\"o" --password="\\"ba\'r\\""'
def test_makecmdoptions_no_cache_auth(self):
auth = py.path.SvnAuth('foo', 'bar', cache_auth=False)
auth = SvnAuth('foo', 'bar', cache_auth=False)
assert auth.makecmdoptions() == ('--username="foo" --password="bar" '
'--no-auth-cache')
def test_makecmdoptions_no_interactive(self):
auth = py.path.SvnAuth('foo', 'bar', interactive=False)
auth = SvnAuth('foo', 'bar', interactive=False)
assert auth.makecmdoptions() == ('--username="foo" --password="bar" '
'--non-interactive')
def test_makecmdoptions_no_interactive_no_cache_auth(self):
auth = py.path.SvnAuth('foo', 'bar', cache_auth=False,
auth = SvnAuth('foo', 'bar', cache_auth=False,
interactive=False)
assert auth.makecmdoptions() == ('--username="foo" --password="bar" '
'--no-auth-cache --non-interactive')
@ -93,7 +98,6 @@ class svnwc_no_svn(py.path.svnwc):
class TestSvnWCAuth(object):
def setup_method(self, meth):
self.auth = SvnAuth('user', 'pass', cache_auth=False)
svntestbase.getsvnbin()
def test_checkout(self):
wc = svnwc_no_svn('foo', auth=self.auth)
@ -122,6 +126,9 @@ class TestSvnWCAuth(object):
class svnurl_no_svn(py.path.svnurl):
cmdexec_output = 'test'
popen_output = 'test'
def __init__(self, *args, **kwargs):
py.path.svnurl.__init__(self, *args, **kwargs)
self.commands = []
def _cmdexec(self, cmd):
self.commands.append(cmd)
@ -133,7 +140,6 @@ class svnurl_no_svn(py.path.svnurl):
class TestSvnURLAuth(object):
def setup_method(self, meth):
svnurl_no_svn.commands = []
self.auth = SvnAuth('foo', 'bar')
def test_init(self):
@ -196,8 +202,10 @@ class TestSvnURLAuth(object):
assert parent.auth is self.auth
def test_mkdir(self):
u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth)
u.mkdir('foo', msg='created dir foo')
u = svnurl_no_svn('http://foo.bar/svn/qweqwe', auth=self.auth)
assert not u.commands
u.mkdir(msg='created dir foo')
assert u.commands
assert '--username="foo" --password="bar"' in u.commands[0]
def test_copy(self):
@ -247,77 +255,64 @@ class TestSvnURLAuth(object):
u.propget('foo')
assert '--username="foo" --password="bar"' in u.commands[0]
class SvnAuthFunctionalTestBase(object):
def setup_class(cls):
svntestbase.getsvnbin()
if not py.test.config.option.runslowtests:
py.test.skip('skipping slow functional tests - use --runslowtests '
'to override')
class pytest_funcarg__setup:
def __init__(self, request):
if not request.config.option.runslowtests:
py.test.skip('use --runslowtests to run these tests')
def setup_method(self, meth):
func_name = meth.im_func.func_name
self.repo = svntestbase.make_test_repo('TestSvnAuthFunctional.%s' % (
func_name,))
repodir = str(self.repo)[7:]
tmpdir = request.getfuncargvalue("tmpdir")
repodir = tmpdir.join("repo")
py.process.cmdexec('svnadmin create %s' % repodir)
if sys.platform == 'win32':
repodir = '/' + str(repodir).replace('\\', '/')
self.repo = py.path.svnurl("file://%s" % repodir)
if py.std.sys.platform == 'win32':
# remove trailing slash...
repodir = repodir[1:]
self.repopath = py.path.local(repodir)
self.temppath = py.test.ensuretemp('TestSvnAuthFunctional.%s' % (
func_name))
self.auth = py.path.SvnAuth('johnny', 'foo', cache_auth=False,
self.temppath = tmpdir.mkdir("temppath")
self.auth = SvnAuth('johnny', 'foo', cache_auth=False,
interactive=False)
self.port, self.pid = self._start_svnserve()
def teardown_method(self, method):
py.process.kill(self.pid)
def _start_svnserve(self):
make_repo_auth(self.repopath, {'johnny': ('foo', 'rw')})
try:
return serve_bg(self.repopath.dirpath())
except IOError:
py.test.skip(str(sys.exc_info()[1]))
self.port, self.pid = serve_bg(self.repopath.dirpath())
# XXX caching is too global
py.path.svnurl._lsnorevcache._dict.clear()
request.addfinalizer(lambda: py.process.kill(self.pid))
class TestSvnWCAuthFunctional(SvnAuthFunctionalTestBase):
def test_checkout_constructor_arg(self):
port = self.port
wc = py.path.svnwc(self.temppath, auth=self.auth)
class TestSvnWCAuthFunctional:
def test_checkout_constructor_arg(self, setup):
wc = py.path.svnwc(setup.temppath, auth=setup.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename))
'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename))
assert wc.join('.svn').check()
def test_checkout_function_arg(self):
port = self.port
wc = py.path.svnwc(self.temppath, auth=self.auth)
def test_checkout_function_arg(self, setup):
wc = py.path.svnwc(setup.temppath, auth=setup.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename))
'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename))
assert wc.join('.svn').check()
def test_checkout_failing_non_interactive(self):
port = self.port
auth = py.path.SvnAuth('johnny', 'bar', cache_auth=False,
def test_checkout_failing_non_interactive(self, setup):
auth = SvnAuth('johnny', 'bar', cache_auth=False,
interactive=False)
wc = py.path.svnwc(self.temppath, auth)
wc = py.path.svnwc(setup.temppath, auth)
py.test.raises(Exception,
("wc.checkout('svn://localhost:%s/%s' % "
"(port, self.repopath.basename))"))
("wc.checkout('svn://localhost:%(port)s/%(repopath)s')" %
setup.__dict__))
def test_log(self):
port = self.port
wc = py.path.svnwc(self.temppath, self.auth)
def test_log(self, setup):
wc = py.path.svnwc(setup.temppath, setup.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename))
'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename))
foo = wc.ensure('foo.txt')
wc.commit('added foo.txt')
log = foo.log()
assert len(log) == 1
assert log[0].msg == 'added foo.txt'
def test_switch(self):
port = self.port
wc = py.path.svnwc(self.temppath, auth=self.auth)
svnurl = 'svn://localhost:%s/%s' % (port, self.repopath.basename)
def test_switch(self, setup):
wc = py.path.svnwc(setup.temppath, auth=setup.auth)
svnurl = 'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename)
wc.checkout(svnurl)
wc.ensure('foo', dir=True).ensure('foo.txt').write('foo')
wc.commit('added foo dir with foo.txt file')
@ -327,30 +322,29 @@ class TestSvnWCAuthFunctional(SvnAuthFunctionalTestBase):
bar.switch(svnurl + '/foo')
assert bar.join('foo.txt')
def test_update(self):
port = self.port
wc1 = py.path.svnwc(self.temppath.ensure('wc1', dir=True),
auth=self.auth)
wc2 = py.path.svnwc(self.temppath.ensure('wc2', dir=True),
auth=self.auth)
def test_update(self, setup):
wc1 = py.path.svnwc(setup.temppath.ensure('wc1', dir=True),
auth=setup.auth)
wc2 = py.path.svnwc(setup.temppath.ensure('wc2', dir=True),
auth=setup.auth)
wc1.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename))
'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename))
wc2.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename))
'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename))
wc1.ensure('foo', dir=True)
wc1.commit('added foo dir')
wc2.update()
assert wc2.join('foo').check()
auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
auth = SvnAuth('unknown', 'unknown', interactive=False)
wc2.auth = auth
py.test.raises(Exception, 'wc2.update()')
def test_lock_unlock_status(self):
port = self.port
wc = py.path.svnwc(self.temppath, auth=self.auth)
def test_lock_unlock_status(self, setup):
port = setup.port
wc = py.path.svnwc(setup.temppath, auth=setup.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename,))
'svn://localhost:%s/%s' % (port, setup.repopath.basename,))
wc.ensure('foo', file=True)
wc.commit('added foo file')
foo = wc.join('foo')
@ -361,16 +355,16 @@ class TestSvnWCAuthFunctional(SvnAuthFunctionalTestBase):
status = foo.status()
assert not status.locked
auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
auth = SvnAuth('unknown', 'unknown', interactive=False)
foo.auth = auth
py.test.raises(Exception, 'foo.lock()')
py.test.raises(Exception, 'foo.unlock()')
def test_diff(self):
port = self.port
wc = py.path.svnwc(self.temppath, auth=self.auth)
def test_diff(self, setup):
port = setup.port
wc = py.path.svnwc(setup.temppath, auth=setup.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename,))
'svn://localhost:%s/%s' % (port, setup.repopath.basename,))
wc.ensure('foo', file=True)
wc.commit('added foo file')
wc.update()
@ -385,51 +379,52 @@ class TestSvnWCAuthFunctional(SvnAuthFunctionalTestBase):
diff = foo.diff(rev=rev)
assert '\n+bar\n' in diff
auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
auth = SvnAuth('unknown', 'unknown', interactive=False)
foo.auth = auth
py.test.raises(Exception, 'foo.diff(rev=rev)')
class TestSvnURLAuthFunctional(SvnAuthFunctionalTestBase):
def test_listdir(self):
port = self.port
class TestSvnURLAuthFunctional:
def test_listdir(self, setup):
port = setup.port
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
auth=self.auth)
'svn://localhost:%s/%s' % (port, setup.repopath.basename),
auth=setup.auth)
u.ensure('foo')
paths = u.listdir()
assert len(paths) == 1
assert paths[0].auth is self.auth
assert paths[0].auth is setup.auth
auth = SvnAuth('foo', 'bar', interactive=False)
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
'svn://localhost:%s/%s' % (port, setup.repopath.basename),
auth=auth)
py.test.raises(Exception, 'u.listdir()')
def test_copy(self):
port = self.port
def test_copy(self, setup):
port = setup.port
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
auth=self.auth)
foo = u.ensure('foo')
'svn://localhost:%s/%s' % (port, setup.repopath.basename),
auth=setup.auth)
foo = u.mkdir('foo')
assert foo.check()
bar = u.join('bar')
foo.copy(bar)
assert bar.check()
assert bar.auth is self.auth
assert bar.auth is setup.auth
auth = SvnAuth('foo', 'bar', interactive=False)
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
'svn://localhost:%s/%s' % (port, setup.repopath.basename),
auth=auth)
foo = u.join('foo')
bar = u.join('bar')
py.test.raises(Exception, 'foo.copy(bar)')
def test_write_read(self):
port = self.port
def test_write_read(self, setup):
port = setup.port
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
auth=self.auth)
'svn://localhost:%s/%s' % (port, setup.repopath.basename),
auth=setup.auth)
foo = u.ensure('foo')
fp = foo.open()
try:
@ -440,7 +435,7 @@ class TestSvnURLAuthFunctional(SvnAuthFunctionalTestBase):
auth = SvnAuth('foo', 'bar', interactive=False)
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
'svn://localhost:%s/%s' % (port, setup.repopath.basename),
auth=auth)
foo = u.join('foo')
py.test.raises(Exception, 'foo.open()')

View File

@ -1,56 +1,56 @@
import py
from py.__.path.svnurl import InfoSvnCommand
from py.__.path.testing.svntestbase import CommonCommandAndBindingTests, \
getrepowc, getsvnbin
from py.__.path.testing.svntestbase import CommonSvnTests
import datetime
import time
def setup_module(mod):
getsvnbin()
def pytest_funcarg__path1(request):
repo, wc = request.getfuncargvalue("repowc1")
return py.path.svnurl(repo)
class TestSvnURLCommandPath(CommonCommandAndBindingTests):
def setup_class(cls):
repo, wc = getrepowc()
cls.root = py.path.svnurl(repo)
class TestSvnURLCommandPath(CommonSvnTests):
@py.test.mark.xfail
def test_load(self, path1):
super(TestSvnURLCommandPath, self).test_load(path1)
def test_move_file(self): # overrides base class
p = self.root.ensure('origfile')
def test_move_file(self, path1): # overrides base class
p = path1.ensure('origfile')
newp = p.dirpath('newfile')
p.move(newp)
assert newp.check(file=1)
newp.remove()
assert not p.check()
def test_move_dir(self): # overrides base class
p = self.root.ensure('origdir', dir=1)
def test_move_dir(self, path1): # overrides base class
p = path1.ensure('origdir', dir=1)
newp = p.dirpath('newdir')
p.move(newp)
assert newp.check(dir=1)
newp.remove()
assert not p.check()
def test_svnurl_needs_arg(self):
def test_svnurl_needs_arg(self, path1):
py.test.raises(TypeError, "py.path.svnurl()")
def test_svnurl_does_not_accept_None_either(self):
def test_svnurl_does_not_accept_None_either(self, path1):
py.test.raises(Exception, "py.path.svnurl(None)")
def test_svnurl_characters_simple(self):
def test_svnurl_characters_simple(self, path1):
py.path.svnurl("svn+ssh://hello/world")
def test_svnurl_characters_at_user(self):
def test_svnurl_characters_at_user(self, path1):
py.path.svnurl("http://user@host.com/some/dir")
def test_svnurl_characters_at_path(self):
def test_svnurl_characters_at_path(self, path1):
py.test.raises(ValueError, 'py.path.svnurl("http://host.com/foo@bar")')
def test_svnurl_characters_colon_port(self):
def test_svnurl_characters_colon_port(self, path1):
py.path.svnurl("http://host.com:8080/some/dir")
def test_svnurl_characters_tilde_end(self):
def test_svnurl_characters_tilde_end(self, path1):
py.path.svnurl("http://host.com/some/file~")
def test_svnurl_characters_colon_path(self):
def test_svnurl_characters_colon_path(self, path1):
if py.std.sys.platform == 'win32':
# colons are allowed on win32, because they're part of the drive
# part of an absolute path... however, they shouldn't be allowed in
@ -58,42 +58,22 @@ class TestSvnURLCommandPath(CommonCommandAndBindingTests):
py.test.skip('XXX fixme win32')
py.test.raises(ValueError, 'py.path.svnurl("http://host.com/foo:bar")')
def test_export(self):
repo, wc = getrepowc('test_export_repo', 'test_export_wc')
foo = wc.join('foo').ensure(dir=True)
bar = foo.join('bar').ensure(file=True)
bar.write('bar\n')
foo.commit('testing something')
exportpath = py.test.ensuretemp('test_export_exportdir')
url = py.path.svnurl(repo + '/foo')
foo = url.export(exportpath.join('foo'))
assert foo == exportpath.join('foo')
assert isinstance(foo, py.path.local)
assert foo.join('bar').check()
assert not foo.join('.svn').check()
def test_export_rev(self):
repo, wc = getrepowc('test_export_rev_repo', 'test_export_rev_wc')
foo = wc.join('foo').ensure(dir=True)
bar = foo.join('bar').ensure(file=True)
bar.write('bar\n')
rev1 = foo.commit('testing something')
baz = foo.join('baz').ensure(file=True)
baz.write('baz\n')
rev2 = foo.commit('testing more')
exportpath = py.test.ensuretemp('test_export_rev_exportdir')
url = py.path.svnurl(repo + '/foo', rev=rev1)
foo1 = url.export(exportpath.join('foo1'))
assert foo1.check()
assert foo1.join('bar').check()
assert not foo1.join('baz').check()
url = py.path.svnurl(repo + '/foo', rev=rev2)
foo2 = url.export(exportpath.join('foo2'))
assert foo2.check()
assert foo2.join('bar').check()
assert foo2.join('baz').check()
def test_export(self, path1, tmpdir):
tmpdir = tmpdir.join("empty")
p = path1.export(tmpdir)
assert p == tmpdir # XXX should return None
n1 = [x.basename for x in tmpdir.listdir()]
n2 = [x.basename for x in path1.listdir()]
n1.sort()
n2.sort()
assert n1 == n2
assert not p.join('.svn').check()
rev = path1.mkdir("newdir")
tmpdir.remove()
assert not tmpdir.check()
path1.new(rev=1).export(tmpdir)
for p in tmpdir.listdir():
assert p.basename in n2
class TestSvnInfoCommand:

View File

@ -1,65 +1,54 @@
import py
import sys
from py.__.path.testing.svntestbase import CommonSvnTests, getrepowc, getsvnbin, make_test_repo
from py.__.path.testing.svntestbase import CommonSvnTests
from py.__.path.svnwc import InfoSvnWCCommand, XMLWCStatus, parse_wcinfotime
from py.__.path import svnwc as svncommon
if sys.platform != 'win32':
if sys.platform == 'win32':
def normpath(p):
return p
else:
try:
import win32api
except ImportError:
def normpath(p):
py.test.skip('this test requires win32api to run on windows')
else:
import os
def normpath(p):
p = win32api.GetShortPathName(p)
return os.path.normpath(os.path.normcase(p))
def normpath(p):
p = py.test.importorskip("win32").GetShortPathName(p)
return os.path.normpath(os.path.normcase(p))
def setup_module(mod):
getsvnbin()
def test_make_repo(path1, tmpdir):
repo = tmpdir.join("repo")
py.process.cmdexec('svnadmin create %s' % repo)
if sys.platform == 'win32':
repo = '/' + str(repo).replace('\\', '/')
repo = py.path.svnurl("file://%s" % repo)
wc = py.path.svnwc(tmpdir.join("wc"))
wc.checkout(repo)
assert wc.info().rev == 0
assert len(wc.listdir()) == 0
p = wc.join("a_file")
p.write("test file")
p.add()
rev = wc.commit("some test")
assert p.info().rev == 1
assert rev == 1
rev = wc.commit()
assert rev is None
class TestMakeRepo(object):
def setup_class(cls):
cls.repo = make_test_repo()
cls.wc = py.path.svnwc(py.test.ensuretemp("test-wc").join("wc"))
def test_empty_checkout(self):
self.wc.checkout(self.repo)
assert len(self.wc.listdir()) == 0
def test_commit(self):
self.wc.checkout(self.repo)
p = self.wc.join("a_file")
p.write("test file")
p.add()
rev = self.wc.commit("some test")
assert p.info().rev == 1
assert rev == 1
rev = self.wc.commit()
assert rev is None
def pytest_funcarg__path1(request):
repo, wc = request.getfuncargvalue("repowc1")
return wc
class TestWCSvnCommandPath(CommonSvnTests):
def setup_class(cls):
repo, cls.root = getrepowc()
def test_move_file(self): # overrides base class
def test_move_file(self, path1): # overrides base class
try:
super(TestWCSvnCommandPath, self).test_move_file()
super(TestWCSvnCommandPath, self).test_move_file(path1)
finally:
self.root.revert(rec=1)
path1.revert(rec=1)
def test_move_directory(self): # overrides base class
def test_move_directory(self, path1): # overrides base class
try:
super(TestWCSvnCommandPath, self).test_move_directory()
super(TestWCSvnCommandPath, self).test_move_directory(path1)
finally:
self.root.revert(rec=1)
path1.revert(rec=1)
def test_status_attributes_simple(self):
def test_status_attributes_simple(self, path1):
def assert_nochange(p):
s = p.status()
assert not s.modified
@ -68,12 +57,12 @@ class TestWCSvnCommandPath(CommonSvnTests):
assert not s.deleted
assert not s.replaced
dpath = self.root.join('sampledir')
assert_nochange(self.root.join('sampledir'))
assert_nochange(self.root.join('samplefile'))
dpath = path1.join('sampledir')
assert_nochange(path1.join('sampledir'))
assert_nochange(path1.join('samplefile'))
def test_status_added(self):
nf = self.root.join('newfile')
def test_status_added(self, path1):
nf = path1.join('newfile')
nf.write('hello')
nf.add()
try:
@ -85,8 +74,8 @@ class TestWCSvnCommandPath(CommonSvnTests):
finally:
nf.revert()
def test_status_change(self):
nf = self.root.join('samplefile')
def test_status_change(self, path1):
nf = path1.join('samplefile')
try:
nf.write(nf.read() + 'change')
s = nf.status()
@ -97,8 +86,8 @@ class TestWCSvnCommandPath(CommonSvnTests):
finally:
nf.revert()
def test_status_added_ondirectory(self):
sampledir = self.root.join('sampledir')
def test_status_added_ondirectory(self, path1):
sampledir = path1.join('sampledir')
try:
t2 = sampledir.mkdir('t2')
t1 = t2.join('t1')
@ -113,20 +102,20 @@ class TestWCSvnCommandPath(CommonSvnTests):
t2.revert(rec=1)
t2.localpath.remove(rec=1)
def test_status_unknown(self):
t1 = self.root.join('un1')
def test_status_unknown(self, path1):
t1 = path1.join('un1')
try:
t1.write('test')
s = self.root.status()
s = path1.status()
# Comparing just the file names, because paths are unpredictable
# on Windows. (long vs. 8.3 paths)
assert t1.basename in [item.basename for item in s.unknown]
finally:
t1.localpath.remove()
def test_status_unchanged(self):
r = self.root
s = self.root.status(rec=1)
def test_status_unchanged(self, path1):
r = path1
s = path1.status(rec=1)
# Comparing just the file names, because paths are unpredictable
# on Windows. (long vs. 8.3 paths)
assert r.join('samplefile').basename in [item.basename
@ -136,8 +125,8 @@ class TestWCSvnCommandPath(CommonSvnTests):
assert r.join('sampledir/otherfile').basename in [item.basename
for item in s.unchanged]
def test_status_update(self):
r = self.root
def test_status_update(self, path1):
r = path1
try:
r.update(rev=1)
s = r.status(updates=1, rec=1)
@ -149,20 +138,20 @@ class TestWCSvnCommandPath(CommonSvnTests):
finally:
r.update()
def test_status_replaced(self):
p = self.root.join("samplefile")
def test_status_replaced(self, path1):
p = path1.join("samplefile")
p.remove()
p.ensure(dir=0)
p.add()
try:
s = self.root.status()
s = path1.status()
assert p.basename in [item.basename for item in s.replaced]
finally:
self.root.revert(rec=1)
path1.revert(rec=1)
def test_status_ignored(self):
def test_status_ignored(self, path1):
try:
d = self.root.join('sampledir')
d = path1.join('sampledir')
p = py.path.local(d).join('ignoredfile')
p.ensure(file=True)
s = d.status()
@ -173,15 +162,11 @@ class TestWCSvnCommandPath(CommonSvnTests):
assert [x.basename for x in s.unknown] == []
assert [x.basename for x in s.ignored] == ['ignoredfile']
finally:
self.root.revert(rec=1)
path1.revert(rec=1)
def test_status_conflict(self):
if not py.test.config.option.runslowtests:
py.test.skip('skipping slow unit tests - use --runslowtests '
'to override')
wc = self.root
wccopy = py.path.svnwc(
py.test.ensuretemp('test_status_conflict_wccopy'))
def test_status_conflict(self, path1, tmpdir):
wc = path1
wccopy = py.path.svnwc(tmpdir.join("conflict_copy"))
wccopy.checkout(wc.url)
p = wc.ensure('conflictsamplefile', file=1)
p.write('foo')
@ -195,12 +180,9 @@ class TestWCSvnCommandPath(CommonSvnTests):
s = wccopy.status()
assert [x.basename for x in s.conflict] == ['conflictsamplefile']
def test_status_external(self):
if not py.test.config.option.runslowtests:
py.test.skip('skipping slow unit tests - use --runslowtests '
'to override')
otherrepo, otherwc = getrepowc('externalrepo', 'externalwc')
d = self.root.ensure('sampledir', dir=1)
def test_status_external(self, path1, repowc2):
otherrepo, otherwc = repowc2
d = path1.ensure('sampledir', dir=1)
try:
d.remove()
d.add()
@ -214,13 +196,13 @@ class TestWCSvnCommandPath(CommonSvnTests):
assert [x.basename for x in s.external] == ['otherwc']
assert 'otherwc' in [x.basename for x in s.unchanged]
finally:
self.root.revert(rec=1)
path1.revert(rec=1)
def test_status_deleted(self):
d = self.root.ensure('sampledir', dir=1)
def test_status_deleted(self, path1):
d = path1.ensure('sampledir', dir=1)
d.remove()
d.add()
self.root.commit()
path1.commit()
d.ensure('deletefile', dir=0)
d.commit()
s = d.status()
@ -232,7 +214,7 @@ class TestWCSvnCommandPath(CommonSvnTests):
assert 'deletefile' not in s.unchanged
assert [x.basename for x in s.deleted] == ['deletefile']
def test_status_noauthor(self):
def test_status_noauthor(self, path1):
# testing for XML without author - this used to raise an exception
xml = '''\
<entry path="/tmp/pytest-23/wc">
@ -243,129 +225,129 @@ class TestWCSvnCommandPath(CommonSvnTests):
</wc-status>
</entry>
'''
XMLWCStatus.fromstring(xml, self.root)
XMLWCStatus.fromstring(xml, path1)
def test_status_wrong_xml(self):
def test_status_wrong_xml(self, path1):
# testing for XML without author - this used to raise an exception
xml = '<entry path="/home/jean/zope/venv/projectdb/parts/development-products/DataGridField">\n<wc-status item="incomplete" props="none" revision="784">\n</wc-status>\n</entry>'
st = XMLWCStatus.fromstring(xml, self.root)
st = XMLWCStatus.fromstring(xml, path1)
assert len(st.incomplete) == 1
def test_diff(self):
p = self.root / 'anotherfile'
def test_diff(self, path1):
p = path1 / 'anotherfile'
out = p.diff(rev=2)
assert out.find('hello') != -1
def test_blame(self):
p = self.root.join('samplepickle')
def test_blame(self, path1):
p = path1.join('samplepickle')
lines = p.blame()
assert sum([l[0] for l in lines]) == len(lines)
for l1, l2 in zip(p.readlines(), [l[2] for l in lines]):
assert l1 == l2
assert [l[1] for l in lines] == ['hpk'] * len(lines)
p = self.root.join('samplefile')
p = path1.join('samplefile')
lines = p.blame()
assert sum([l[0] for l in lines]) == len(lines)
for l1, l2 in zip(p.readlines(), [l[2] for l in lines]):
assert l1 == l2
assert [l[1] for l in lines] == ['hpk'] * len(lines)
def test_join_abs(self):
s = str(self.root.localpath)
n = self.root.join(s, abs=1)
assert self.root == n
def test_join_abs(self, path1):
s = str(path1.localpath)
n = path1.join(s, abs=1)
assert path1 == n
def test_join_abs2(self):
assert self.root.join('samplefile', abs=1) == self.root.join('samplefile')
def test_join_abs2(self, path1):
assert path1.join('samplefile', abs=1) == path1.join('samplefile')
def test_str_gives_localpath(self):
assert str(self.root) == str(self.root.localpath)
def test_str_gives_localpath(self, path1):
assert str(path1) == str(path1.localpath)
def test_versioned(self):
assert self.root.check(versioned=1)
def test_versioned(self, path1):
assert path1.check(versioned=1)
# TODO: Why does my copy of svn think .svn is versioned?
#assert self.root.join('.svn').check(versioned=0)
assert self.root.join('samplefile').check(versioned=1)
assert not self.root.join('notexisting').check(versioned=1)
notexisting = self.root.join('hello').localpath
#assert path1.join('.svn').check(versioned=0)
assert path1.join('samplefile').check(versioned=1)
assert not path1.join('notexisting').check(versioned=1)
notexisting = path1.join('hello').localpath
try:
notexisting.write("")
assert self.root.join('hello').check(versioned=0)
assert path1.join('hello').check(versioned=0)
finally:
notexisting.remove()
def test_nonversioned_remove(self):
assert self.root.check(versioned=1)
somefile = self.root.join('nonversioned/somefile')
def test_nonversioned_remove(self, path1):
assert path1.check(versioned=1)
somefile = path1.join('nonversioned/somefile')
nonwc = py.path.local(somefile)
nonwc.ensure()
assert somefile.check()
assert not somefile.check(versioned=True)
somefile.remove() # this used to fail because it tried to 'svn rm'
def test_properties(self):
def test_properties(self, path1):
try:
self.root.propset('gaga', 'this')
assert self.root.propget('gaga') == 'this'
path1.propset('gaga', 'this')
assert path1.propget('gaga') == 'this'
# Comparing just the file names, because paths are unpredictable
# on Windows. (long vs. 8.3 paths)
assert self.root.basename in [item.basename for item in
self.root.status().prop_modified]
assert 'gaga' in self.root.proplist()
assert self.root.proplist()['gaga'] == 'this'
assert path1.basename in [item.basename for item in
path1.status().prop_modified]
assert 'gaga' in path1.proplist()
assert path1.proplist()['gaga'] == 'this'
finally:
self.root.propdel('gaga')
path1.propdel('gaga')
def test_proplist_recursive(self):
s = self.root.join('samplefile')
def test_proplist_recursive(self, path1):
s = path1.join('samplefile')
s.propset('gugu', 'that')
try:
p = self.root.proplist(rec=1)
p = path1.proplist(rec=1)
# Comparing just the file names, because paths are unpredictable
# on Windows. (long vs. 8.3 paths)
assert (self.root / 'samplefile').basename in [item.basename
assert (path1 / 'samplefile').basename in [item.basename
for item in p]
finally:
s.propdel('gugu')
def test_long_properties(self):
def test_long_properties(self, path1):
value = """
vadm:posix : root root 0100755
Properties on 'chroot/dns/var/bind/db.net.xots':
"""
try:
self.root.propset('gaga', value)
backvalue = self.root.propget('gaga')
path1.propset('gaga', value)
backvalue = path1.propget('gaga')
assert backvalue == value
#assert len(backvalue.split('\n')) == 1
finally:
self.root.propdel('gaga')
path1.propdel('gaga')
def test_ensure(self):
newpath = self.root.ensure('a', 'b', 'c')
def test_ensure(self, path1):
newpath = path1.ensure('a', 'b', 'c')
try:
assert newpath.check(exists=1, versioned=1)
newpath.write("hello")
newpath.ensure()
assert newpath.read() == "hello"
finally:
self.root.join('a').remove(force=1)
path1.join('a').remove(force=1)
def test_not_versioned(self):
p = self.root.localpath.mkdir('whatever')
f = self.root.localpath.ensure('testcreatedfile')
def test_not_versioned(self, path1):
p = path1.localpath.mkdir('whatever')
f = path1.localpath.ensure('testcreatedfile')
try:
assert self.root.join('whatever').check(versioned=0)
assert self.root.join('testcreatedfile').check(versioned=0)
assert not self.root.join('testcreatedfile').check(versioned=1)
assert path1.join('whatever').check(versioned=0)
assert path1.join('testcreatedfile').check(versioned=0)
assert not path1.join('testcreatedfile').check(versioned=1)
finally:
p.remove(rec=1)
f.remove()
def test_lock_unlock(self):
root = self.root
def test_lock_unlock(self, path1):
root = path1
somefile = root.join('somefile')
somefile.ensure(file=True)
# not yet added to repo
@ -388,9 +370,8 @@ class TestWCSvnCommandPath(CommonSvnTests):
py.test.raises(Exception, 'somefile,unlock()')
somefile.remove()
def test_commit_nonrecursive(self):
root = self.root
somedir = root.join('sampledir')
def test_commit_nonrecursive(self, path1):
somedir = path1.join('sampledir')
somedir.mkdir("subsubdir")
somedir.propset('foo', 'bar')
status = somedir.status()
@ -407,29 +388,28 @@ class TestWCSvnCommandPath(CommonSvnTests):
assert len(status.prop_modified) == 0
assert len(status.added) == 0
def test_commit_return_value(self):
root = self.root
testfile = root.join('test.txt').ensure(file=True)
def test_commit_return_value(self, path1):
testfile = path1.join('test.txt').ensure(file=True)
testfile.write('test')
rev = root.commit('testing')
rev = path1.commit('testing')
assert type(rev) == int
anotherfile = root.join('another.txt').ensure(file=True)
anotherfile = path1.join('another.txt').ensure(file=True)
anotherfile.write('test')
rev2 = root.commit('testing more')
rev2 = path1.commit('testing more')
assert type(rev2) == int
assert rev2 == rev + 1
#def test_log(self):
# l = self.root.log()
#def test_log(self, path1):
# l = path1.log()
# assert len(l) == 3 # might need to be upped if more tests are added
class XTestWCSvnCommandPathSpecial:
rooturl = 'http://codespeak.net/svn/py.path/trunk/dist/py.path/test/data'
#def test_update_none_rev(self):
#def test_update_none_rev(self, path1):
# path = tmpdir.join('checkouttest')
# wcpath = newpath(xsvnwc=str(path), url=self.rooturl)
# wcpath = newpath(xsvnwc=str(path), url=path1url)
# try:
# wcpath.checkout(rev=2100)
# wcpath.update()
@ -445,7 +425,7 @@ def test_parse_wcinfotime():
class TestInfoSvnWCCommand:
def test_svn_1_2(self):
def test_svn_1_2(self, path1):
output = """
Path: test_svnwc.py
Name: test_svnwc.py
@ -474,7 +454,7 @@ class TestInfoSvnWCCommand:
assert info.rev == 28137
def test_svn_1_3(self):
def test_svn_1_3(self, path1):
output = """
Path: test_svnwc.py
Name: test_svnwc.py
@ -503,28 +483,79 @@ class TestInfoSvnWCCommand:
assert info.rev == 28124
assert info.time == 1149021926000000.0
class TestWCSvnCommandPathEmptyRepo(object):
def setup_class(cls):
repo = py.test.ensuretemp("emptyrepo")
wcdir = py.test.ensuretemp("emptywc")
py.process.cmdexec('svnadmin create "%s"' %
svncommon._escape_helper(repo))
wc = py.path.svnwc(wcdir)
repopath = repo.strpath
if py.std.sys.platform.startswith('win32'):
# strange win problem, paths become something like file:///c:\\foo
repourl = 'file:///%s' % (repopath.replace('\\', '/'),)
else:
repourl = 'file://%s' % (repopath,)
wc.checkout(url=repourl)
cls.wc = wc
def test_info(self):
self.wc.info().rev = 0
def test_characters_at():
py.test.raises(ValueError, "py.path.svnwc('/tmp/@@@:')")
def test_characters_tilde():
py.path.svnwc('/tmp/test~')
class TestRepo:
def test_trailing_slash_is_stripped(self, path1):
# XXX we need to test more normalizing properties
url = path1.join("/")
assert path1 == url
#def test_different_revs_compare_unequal(self, path1):
# newpath = path1.new(rev=1199)
# assert newpath != path1
def test_exists_svn_root(self, path1):
assert path1.check()
#def test_not_exists_rev(self, path1):
# url = path1.__class__(path1url, rev=500)
# assert url.check(exists=0)
#def test_nonexisting_listdir_rev(self, path1):
# url = path1.__class__(path1url, rev=500)
# raises(py.error.ENOENT, url.listdir)
#def test_newrev(self, path1):
# url = path1.new(rev=None)
# assert url.rev == None
# assert url.strpath == path1.strpath
# url = path1.new(rev=10)
# assert url.rev == 10
#def test_info_rev(self, path1):
# url = path1.__class__(path1url, rev=1155)
# url = url.join("samplefile")
# res = url.info()
# assert res.size > len("samplefile") and res.created_rev == 1155
# the following tests are easier if we have a path class
def test_repocache_simple(self, path1):
repocache = svncommon.RepoCache()
repocache.put(path1.strpath, 42)
url, rev = repocache.get(path1.join('test').strpath)
assert rev == 42
assert url == path1.strpath
def test_repocache_notimeout(self, path1):
repocache = svncommon.RepoCache()
repocache.timeout = 0
repocache.put(path1.strpath, path1.rev)
url, rev = repocache.get(path1.strpath)
assert rev == -1
assert url == path1.strpath
def test_repocache_outdated(self, path1):
repocache = svncommon.RepoCache()
repocache.put(path1.strpath, 42, timestamp=0)
url, rev = repocache.get(path1.join('test').strpath)
assert rev == -1
assert url == path1.strpath
def _test_getreporev(self):
""" this test runs so slow it's usually disabled """
old = svncommon.repositories.repos
try:
_repocache.clear()
root = path1.new(rev=-1)
url, rev = cache.repocache.get(root.strpath)
assert rev>=0
assert url == svnrepourl
finally:
repositories.repos = old