* refactor some setup/teardown/ensuretemp usages to use funcargs

* introduce monkeypatch.syspath_prepend for safe monkey patching of module import path
* fix monkeypatch naming

--HG--
branch : trunk
This commit is contained in:
holger krekel 2009-09-05 23:21:58 +02:00
parent 783c714a2c
commit 518194537e
8 changed files with 102 additions and 82 deletions

View File

@ -48,7 +48,7 @@ def test_virtual_module_identity():
def test_importall(): def test_importall():
base = py.path.local(py.__file__).dirpath() base = py.path.local(py.__file__).dirpath()
nodirs = ( nodirs = [
base.join('test', 'testing', 'data'), base.join('test', 'testing', 'data'),
base.join('test', 'web'), base.join('test', 'web'),
base.join('path', 'gateway',), base.join('path', 'gateway',),
@ -57,10 +57,14 @@ def test_importall():
base.join('test', 'testing', 'import_test'), base.join('test', 'testing', 'import_test'),
base.join('bin'), base.join('bin'),
base.join('code', 'oldmagic.py'), base.join('code', 'oldmagic.py'),
base.join('code', '_assertionold.py'),
base.join('execnet', 'script'), base.join('execnet', 'script'),
base.join('compat', 'testing'), base.join('compat', 'testing'),
) ]
if sys.version_info >= (3,0):
nodirs.append(base.join('code', '_assertionold.py'))
else:
nodirs.append(base.join('code', '_assertionnew.py'))
def recurse(p): def recurse(p):
return p.check(dotfile=0) and p.basename != "attic" return p.check(dotfile=0) and p.basename != "attic"

View File

@ -689,8 +689,7 @@ class TestSshGateway(BasicRemoteExecution):
cls.sshhost = getspecssh().ssh cls.sshhost = getspecssh().ssh
cls.gw = py.execnet.SshGateway(cls.sshhost) cls.gw = py.execnet.SshGateway(cls.sshhost)
def test_sshconfig_functional(self): def test_sshconfig_functional(self, tmpdir):
tmpdir = py.test.ensuretemp("test_sshconfig")
ssh_config = tmpdir.join("ssh_config") ssh_config = tmpdir.join("ssh_config")
ssh_config.write( ssh_config.write(
"Host alias123\n" "Host alias123\n"

View File

@ -6,6 +6,7 @@
""" """
import py import py
import os
from py.__.execnet.gwmanage import GatewayManager, HostRSync from py.__.execnet.gwmanage import GatewayManager, HostRSync
class TestGatewayManagerPopen: class TestGatewayManagerPopen:
@ -75,7 +76,6 @@ class TestGatewayManagerPopen:
call = rec.popcall("pyexecnet_gwmanage_rsyncfinish") call = rec.popcall("pyexecnet_gwmanage_rsyncfinish")
def test_multi_chdir_popen_with_path(self, testdir): def test_multi_chdir_popen_with_path(self, testdir):
import os
hm = GatewayManager(["popen//chdir=hello"] * 2) hm = GatewayManager(["popen//chdir=hello"] * 2)
testdir.tmpdir.chdir() testdir.tmpdir.chdir()
hellopath = testdir.tmpdir.mkdir("hello").realpath() hellopath = testdir.tmpdir.mkdir("hello").realpath()
@ -117,7 +117,7 @@ class TestGatewayManagerPopen:
class pytest_funcarg__mysetup: class pytest_funcarg__mysetup:
def __init__(self, request): def __init__(self, request):
tmp = request.config.mktemp(request.function.__name__, numbered=True) tmp = request.getfuncargvalue('tmpdir')
self.source = tmp.mkdir("source") self.source = tmp.mkdir("source")
self.dest = tmp.mkdir("dest") self.dest = tmp.mkdir("dest")
request.getfuncargvalue("_pytest") # to have patching of py._com.comregistry request.getfuncargvalue("_pytest") # to have patching of py._com.comregistry

View File

@ -2,38 +2,37 @@ import py
from py.execnet import RSync from py.execnet import RSync
def setup_module(mod): def pytest_funcarg__gw1(request):
mod.gw = py.execnet.PopenGateway() return request.cached_setup(
mod.gw2 = py.execnet.PopenGateway() setup=py.execnet.PopenGateway,
teardown=lambda val: val.exit(),
scope="module"
)
pytest_funcarg__gw2 = pytest_funcarg__gw1
def teardown_module(mod): def pytest_funcarg__dirs(request):
mod.gw.exit() t = request.getfuncargvalue('tmpdir')
mod.gw2.exit() class dirs:
source = t.join("source")
dest1 = t.join("dest1")
dest2 = t.join("dest2")
return dirs
class TestRSync:
class DirSetup: def test_notargets(self, dirs):
def setup_method(self, method): rsync = RSync(dirs.source)
name = "%s.%s" %(self.__class__.__name__, method.__name__)
self.tmpdir = t = py.test.ensuretemp(name)
self.source = t.join("source")
self.dest1 = t.join("dest1")
self.dest2 = t.join("dest2")
class TestRSync(DirSetup):
def test_notargets(self):
rsync = RSync(self.source)
py.test.raises(IOError, "rsync.send()") py.test.raises(IOError, "rsync.send()")
assert rsync.send(raises=False) is None assert rsync.send(raises=False) is None
def test_dirsync(self): def test_dirsync(self, dirs, gw1, gw2):
dest = self.dest1 dest = dirs.dest1
dest2 = self.dest2 dest2 = dirs.dest2
source = self.source source = dirs.source
for s in ('content1', 'content2', 'content2-a-bit-longer'): for s in ('content1', 'content2', 'content2-a-bit-longer'):
source.ensure('subdir', 'file1').write(s) source.ensure('subdir', 'file1').write(s)
rsync = RSync(self.source) rsync = RSync(dirs.source)
rsync.add_target(gw, dest) rsync.add_target(gw1, dest)
rsync.add_target(gw2, dest2) rsync.add_target(gw2, dest2)
rsync.send() rsync.send()
assert dest.join('subdir').check(dir=1) assert dest.join('subdir').check(dir=1)
@ -49,76 +48,70 @@ class TestRSync(DirSetup):
source.join('subdir').remove('file1') source.join('subdir').remove('file1')
rsync = RSync(source) rsync = RSync(source)
rsync.add_target(gw2, dest2) rsync.add_target(gw2, dest2)
rsync.add_target(gw, dest) rsync.add_target(gw1, dest)
rsync.send() rsync.send()
assert dest.join('subdir', 'file1').check(file=1) assert dest.join('subdir', 'file1').check(file=1)
assert dest2.join('subdir', 'file1').check(file=1) assert dest2.join('subdir', 'file1').check(file=1)
rsync = RSync(source) rsync = RSync(source)
rsync.add_target(gw, dest, delete=True) rsync.add_target(gw1, dest, delete=True)
rsync.add_target(gw2, dest2) rsync.add_target(gw2, dest2)
rsync.send() rsync.send()
assert not dest.join('subdir', 'file1').check() assert not dest.join('subdir', 'file1').check()
assert dest2.join('subdir', 'file1').check() assert dest2.join('subdir', 'file1').check()
def test_dirsync_twice(self): def test_dirsync_twice(self, dirs, gw1, gw2):
source = self.source source = dirs.source
source.ensure("hello") source.ensure("hello")
rsync = RSync(source) rsync = RSync(source)
rsync.add_target(gw, self.dest1) rsync.add_target(gw1, dirs.dest1)
rsync.send() rsync.send()
assert self.dest1.join('hello').check() assert dirs.dest1.join('hello').check()
py.test.raises(IOError, "rsync.send()") py.test.raises(IOError, "rsync.send()")
assert rsync.send(raises=False) is None assert rsync.send(raises=False) is None
rsync.add_target(gw, self.dest2) rsync.add_target(gw1, dirs.dest2)
rsync.send() rsync.send()
assert self.dest2.join('hello').check() assert dirs.dest2.join('hello').check()
py.test.raises(IOError, "rsync.send()") py.test.raises(IOError, "rsync.send()")
assert rsync.send(raises=False) is None assert rsync.send(raises=False) is None
def test_rsync_default_reporting(self): def test_rsync_default_reporting(self, capsys, dirs, gw1):
source = self.source source = dirs.source
source.ensure("hello") source.ensure("hello")
cap = py.io.StdCapture() rsync = RSync(source)
try: rsync.add_target(gw1, dirs.dest1)
rsync = RSync(source) rsync.send()
rsync.add_target(gw, self.dest1) out, err = capsys.readouterr()
rsync.send()
finally:
out, err = cap.reset()
assert out.find("hello") != -1 assert out.find("hello") != -1
def test_rsync_non_verbose(self): def test_rsync_non_verbose(self, capsys, dirs, gw1):
source = self.source source = dirs.source
source.ensure("hello") source.ensure("hello")
cap = py.io.StdCapture() rsync = RSync(source, verbose=False)
try: rsync.add_target(gw1, dirs.dest1)
rsync = RSync(source, verbose=False) rsync.send()
rsync.add_target(gw, self.dest1) out, err = capsys.readouterr()
rsync.send()
finally:
out, err = cap.reset()
assert not out assert not out
assert not err assert not err
def test_symlink_rsync(self): def test_symlink_rsync(self, dirs, gw1):
if py.std.sys.platform == 'win32': if py.std.sys.platform == 'win32':
py.test.skip("symlinks are unsupported on Windows.") py.test.skip("symlinks are unsupported on Windows.")
source = self.source source = dirs.source
dest = self.dest1 dest = dirs.dest1
self.source.ensure("existant") dirs.source.ensure("existant")
source.join("rellink").mksymlinkto(source.join("existant"), absolute=0) source.join("rellink").mksymlinkto(source.join("existant"), absolute=0)
source.join('abslink').mksymlinkto(source.join("existant")) source.join('abslink').mksymlinkto(source.join("existant"))
rsync = RSync(source) rsync = RSync(source)
rsync.add_target(gw, dest) rsync.add_target(gw1, dest)
rsync.send() rsync.send()
assert dest.join('rellink').readlink() == dest.join("existant") assert dest.join('rellink').readlink() == dest.join("existant")
assert dest.join('abslink').readlink() == dest.join("existant") assert dest.join('abslink').readlink() == dest.join("existant")
def test_callback(self): def test_callback(self, dirs, gw1):
dest = self.dest1 dest = dirs.dest1
source = self.source source = dirs.source
source.ensure("existant").write("a" * 100) source.ensure("existant").write("a" * 100)
source.ensure("existant2").write("a" * 10) source.ensure("existant2").write("a" * 10)
total = {} total = {}
@ -127,14 +120,14 @@ class TestRSync(DirSetup):
rsync = RSync(source, callback=callback) rsync = RSync(source, callback=callback)
#rsync = RSync() #rsync = RSync()
rsync.add_target(gw, dest) rsync.add_target(gw1, dest)
rsync.send() rsync.send()
assert total == {("list", 110):True, ("ack", 100):True, ("ack", 10):True} assert total == {("list", 110):True, ("ack", 100):True, ("ack", 10):True}
def test_file_disappearing(self): def test_file_disappearing(self, dirs, gw1):
dest = self.dest1 dest = dirs.dest1
source = self.source source = dirs.source
source.ensure("ex").write("a" * 100) source.ensure("ex").write("a" * 100)
source.ensure("ex2").write("a" * 100) source.ensure("ex2").write("a" * 100)
@ -147,7 +140,7 @@ class TestRSync(DirSetup):
return True return True
rsync = DRsync(source) rsync = DRsync(source)
rsync.add_target(gw, dest) rsync.add_target(gw1, dest)
rsync.send() rsync.send()
assert rsync.x == 1 assert rsync.x == 1
assert len(dest.listdir()) == 1 assert len(dest.listdir()) == 1

View File

@ -10,8 +10,9 @@ def pytest_funcarg__repowc1(request):
py.test.skip("svn binary not found") py.test.skip("svn binary not found")
modname = request.module.__name__ modname = request.module.__name__
tmpdir = request.getfuncargvalue("tmpdir")
repo, wc = request.cached_setup( repo, wc = request.cached_setup(
setup=lambda: getrepowc("repo-"+modname, "wc-" + modname), setup=lambda: getrepowc(tmpdir, "repo-"+modname, "wc-" + modname),
scope="module", scope="module",
) )
for x in ('test_remove', 'test_move', 'test_status_deleted'): for x in ('test_remove', 'test_move', 'test_status_deleted'):
@ -21,8 +22,9 @@ def pytest_funcarg__repowc1(request):
return repo, wc return repo, wc
def pytest_funcarg__repowc2(request): def pytest_funcarg__repowc2(request):
tmpdir = request.getfuncargvalue("tmpdir")
name = request.function.__name__ name = request.function.__name__
return getrepowc("%s-repo-2" % name, "%s-wc-2" % name) return getrepowc(tmpdir, "%s-repo-2" % name, "%s-wc-2" % name)
def getsvnbin(): def getsvnbin():
if svnbin is None: if svnbin is None:
@ -32,9 +34,9 @@ def getsvnbin():
# make a wc directory out of a given root url # make a wc directory out of a given root url
# cache previously obtained wcs! # cache previously obtained wcs!
# #
def getrepowc(reponame='basetestrepo', wcname='wc'): def getrepowc(tmpdir, reponame='basetestrepo', wcname='wc'):
repo = py.test.ensuretemp(reponame) repo = tmpdir.mkdir(reponame)
wcdir = py.test.ensuretemp(wcname) wcdir = tmpdir.mkdir(wcname)
repo.ensure(dir=1) repo.ensure(dir=1)
py.process.cmdexec('svnadmin create "%s"' % py.process.cmdexec('svnadmin create "%s"' %
svncommon._escape_helper(repo)) svncommon._escape_helper(repo))

View File

@ -325,9 +325,8 @@ class TestImport:
from xxxpackage import module1 from xxxpackage import module1
assert module1 is mod1 assert module1 is mod1
def test_pypkgdir(): def test_pypkgdir(tmpdir):
datadir = py.test.ensuretemp("pypkgdir") pkg = tmpdir.ensure('pkg1', dir=1)
pkg = datadir.ensure('pkg1', dir=1)
pkg.ensure("__init__.py") pkg.ensure("__init__.py")
pkg.ensure("subdir/__init__.py") pkg.ensure("subdir/__init__.py")
assert pkg.pypkgpath() == pkg assert pkg.pypkgpath() == pkg

View File

@ -48,7 +48,7 @@ object, however.
.. _`monkeypatch blog post`: http://tetamap.wordpress.com/2009/03/03/monkeypatching-in-unit-tests-done-right/ .. _`monkeypatch blog post`: http://tetamap.wordpress.com/2009/03/03/monkeypatching-in-unit-tests-done-right/
""" """
import py, os import py, os, sys
def pytest_funcarg__monkeypatch(request): def pytest_funcarg__monkeypatch(request):
"""The returned ``monkeypatch`` funcarg provides these """The returned ``monkeypatch`` funcarg provides these
@ -60,6 +60,7 @@ def pytest_funcarg__monkeypatch(request):
monkeypatch.delitem(obj, name, raising=True) monkeypatch.delitem(obj, name, raising=True)
monkeypatch.setenv(name, value, prepend=False) monkeypatch.setenv(name, value, prepend=False)
monkeypatch.delenv(name, value, raising=True) monkeypatch.delenv(name, value, raising=True)
monkeypatch.syspath_prepend(path)
All modifications will be undone when the requesting All modifications will be undone when the requesting
test function finished its execution. For the ``del`` test function finished its execution. For the ``del``
@ -111,6 +112,11 @@ class MonkeyPatch:
def delenv(self, name, raising=True): def delenv(self, name, raising=True):
self.delitem(os.environ, name, raising=raising) self.delitem(os.environ, name, raising=raising)
def syspath_prepend(self, path):
if not hasattr(self, '_savesyspath'):
self._savesyspath = sys.path[:]
sys.path.insert(0, str(path))
def undo(self): def undo(self):
for obj, name, value in self._setattr: for obj, name, value in self._setattr:
if value is not notset: if value is not notset:
@ -124,7 +130,8 @@ class MonkeyPatch:
else: else:
dictionary[name] = value dictionary[name] = value
self._setitem[:] = [] self._setitem[:] = []
if hasattr(self, '_savesyspath'):
sys.path[:] = self._savesyspath
def test_setattr(): def test_setattr():
class A: class A:
@ -243,3 +250,19 @@ def test_monkeypatch_plugin(testdir):
res = reprec.countoutcomes() res = reprec.countoutcomes()
assert tuple(res) == (1, 0, 0), res assert tuple(res) == (1, 0, 0), res
def test_syspath_prepend():
old = list(sys.path)
try:
monkeypatch = MonkeyPatch()
monkeypatch.syspath_prepend('world')
monkeypatch.syspath_prepend('hello')
assert sys.path[0] == "hello"
assert sys.path[1] == "world"
monkeypatch.undo()
assert sys.path == old
monkeypatch.undo()
assert sys.path == old
finally:
sys.path[:] = old

View File

@ -10,7 +10,7 @@ class TestCaptureManager:
try: try:
assert capman._getmethod(config, None) == "sys" assert capman._getmethod(config, None) == "sys"
finally: finally:
monkeypatch.finalize() monkeypatch.undo()
def test_configure_per_fspath(self, testdir): def test_configure_per_fspath(self, testdir):
config = testdir.parseconfig(testdir.tmpdir) config = testdir.parseconfig(testdir.tmpdir)