[svn r62487] merging hostmanage branch:

* cleanup of the way distributed/remote sessions are setup up
* simplified config pickling
* configs are now more correctly wired on receival at remote sides
* introduced py.__.execnet.gwmanage helps managing calls to multiple hosts
* grouping all pickling related tests in test_pickle.py
  and showcasing a nice pyfunc_call hack

--HG--
branch : trunk
This commit is contained in:
hpk 2009-03-03 18:42:32 +01:00
parent fbe8315f76
commit a743caef18
26 changed files with 851 additions and 762 deletions

169
py/execnet/gwmanage.py Normal file
View File

@ -0,0 +1,169 @@
"""
instantiating, managing and rsyncing to hosts
Host specification strings and implied gateways:
socket:hostname:port:path SocketGateway
localhost[:path] PopenGateway
[ssh:]spec:path SshGateway
* [SshGateway]
on hostspec.makeconnection() a Host object
will be created which has an instantiated gateway.
the remote side will be chdir()ed to the specified path.
if no path was specified, do no chdir() at all.
"""
import py
import sys, os
from py.__.test.dsession.masterslave import MasterNode
from py.__.test import event
class GatewaySpec(object):
type = "ssh"
def __init__(self, spec):
if spec == "localhost" or spec.startswith("localhost:"):
self.address = "localhost"
self.joinpath = spec[len(self.address)+1:]
self.type = "popen"
elif spec.startswith("socket:"):
parts = spec[7:].split(":", 2)
self.address = parts.pop(0)
if parts:
port = int(parts.pop(0))
self.address = self.address, port
self.joinpath = parts and parts.pop(0) or ""
self.type = "socket"
else:
if spec.startswith("ssh:"):
spec = spec[4:]
parts = spec.split(":", 1)
self.address = parts.pop(0)
self.joinpath = parts and parts.pop(0) or ""
def inplacelocal(self):
return bool(self.type == "popen" and not self.joinpath)
def __str__(self):
return "<GatewaySpec %s:%s>" % (self.address, self.joinpath)
__repr__ = __str__
def makegateway(self, python=None, waitclose=True):
if self.type == "popen":
gw = py.execnet.PopenGateway(python=python)
elif self.type == "socket":
gw = py.execnet.SocketGateway(*self.address)
elif self.type == "ssh":
gw = py.execnet.SshGateway(self.address, remotepython=python)
if self.joinpath:
channel = gw.remote_exec("import os ; os.chdir(channel.receive())")
channel.send(self.joinpath)
if waitclose:
channel.waitclose()
else:
if waitclose:
gw.remote_exec("").waitclose()
gw.spec = self
return gw
class MultiChannel:
def __init__(self, channels):
self._channels = channels
def receive(self):
values = []
for ch in self._channels:
values.append(ch.receive())
return values
def wait(self):
for ch in self._channels:
ch.waitclose()
class GatewayManager:
def __init__(self, specs):
self.spec2gateway = {}
for spec in specs:
self.spec2gateway[GatewaySpec(spec)] = None
def trace(self, msg):
py._com.pyplugins.notify("trace_gatewaymanage", msg)
#print "trace", msg
def makegateways(self):
for spec, value in self.spec2gateway.items():
assert value is None
self.trace("makegateway %s" %(spec))
self.spec2gateway[spec] = spec.makegateway()
def multi_exec(self, source, inplacelocal=True):
source = py.code.Source(source)
channels = []
for spec, gw in self.spec2gateway.items():
if inplacelocal or not spec.inplacelocal():
channels.append(gw.remote_exec(source))
return MultiChannel(channels)
def multi_chdir(self, basename, inplacelocal=True):
self.multi_exec("import os ; os.chdir(%r)" % basename,
inplacelocal=inplacelocal).wait()
def rsync(self, source, notify=None, verbose=False, ignores=None):
rsync = HostRSync(source, verbose=verbose, ignores=ignores)
added = False
for spec, gateway in self.spec2gateway.items():
if not spec.inplacelocal():
self.trace("add_target_host %r" %(gateway,))
def finished():
if notify:
notify("rsyncrootready", spec, source)
rsync.add_target_host(gateway, finished=finished)
added = True
if added:
self.trace("rsyncing %r" % source)
rsync.send()
self.trace("rsyncing %r finished" % source)
else:
self.trace("rsync: nothing to do.")
def exit(self):
while self.spec2gateway:
spec, gw = self.spec2gateway.popitem()
self.trace("exiting gateway %s" % gw)
gw.exit()
class HostRSync(py.execnet.RSync):
""" RSyncer that filters out common files
"""
def __init__(self, sourcedir, *args, **kwargs):
self._synced = {}
ignores= None
if 'ignores' in kwargs:
ignores = kwargs.pop('ignores')
self._ignores = ignores or []
super(HostRSync, self).__init__(sourcedir=sourcedir, **kwargs)
def filter(self, path):
path = py.path.local(path)
if not path.ext in ('.pyc', '.pyo'):
if not path.basename.endswith('~'):
if path.check(dotfile=0):
for x in self._ignores:
if path == x:
break
else:
return True
def add_target_host(self, gateway, finished=None):
remotepath = os.path.basename(self._sourcedir)
super(HostRSync, self).add_target(gateway, remotepath,
finishedcallback=finished,
delete=True,)
def _report_send_file(self, gateway, modified_rel_path):
if self._verbose:
path = os.path.basename(self._sourcedir) + "/" + modified_rel_path
remotepath = gateway.spec.joinpath
print '%s:%s <= %s' % (gateway.remoteaddress, remotepath, path)

View File

@ -72,10 +72,12 @@ class PopenGateway(PopenCmdGateway):
""" This Gateway provides interaction with a newly started
python subprocess.
"""
def __init__(self, python=sys.executable):
def __init__(self, python=None):
""" instantiate a gateway to a subprocess
started with the given 'python' executable.
"""
if python is None:
python = sys.executable
cmd = '%s -u -c "exec input()"' % python
super(PopenGateway, self).__init__(cmd)
@ -143,7 +145,7 @@ class SshGateway(PopenCmdGateway):
established via the 'ssh' command line binary.
The remote side needs to have a Python interpreter executable.
"""
def __init__(self, sshaddress, remotepython='python',
def __init__(self, sshaddress, remotepython=None,
identity=None, ssh_config=None):
""" instantiate a remote ssh process with the
given 'sshaddress' and remotepython version.
@ -151,6 +153,8 @@ class SshGateway(PopenCmdGateway):
DEPRECATED: you may specify an 'identity' filepath.
"""
self.remoteaddress = sshaddress
if remotepython is None:
remotepython = "python"
remotecmd = '%s -u -c "exec input()"' % (remotepython,)
cmdline = [sshaddress, remotecmd]
# XXX Unix style quoting

View File

@ -73,10 +73,11 @@ class RSync(object):
if channel not in self._to_send:
self._to_send[channel] = []
self._to_send[channel].append(modified_rel_path)
#print "sending", modified_rel_path, data and len(data) or 0, checksum
if data is not None:
f.close()
if checksum is not None and checksum == md5.md5(data).digest():
if checksum is not None and checksum == md5(data).digest():
data = None # not really modified
else:
# ! there is a reason for the interning:

View File

@ -589,10 +589,12 @@ class TestSshGateway(BasicRemoteExecution):
def test_sshaddress(self):
assert self.gw.remoteaddress == py.test.config.option.sshhost
@py.test.mark.xfail("XXX ssh-gateway error handling")
def test_connexion_failes_on_non_existing_hosts(self):
py.test.raises(IOError,
"py.execnet.SshGateway('nowhere.codespeak.net')")
@py.test.mark.xfail("XXX ssh-gateway error handling")
def test_deprecated_identity(self):
py.test.deprecated_call(
py.test.raises, IOError,

View File

@ -0,0 +1,235 @@
"""
tests for
- host specifications
- managing hosts
- manage rsyncing of hosts
"""
import py
from py.__.execnet.gwmanage import GatewaySpec, GatewayManager
from py.__.execnet.gwmanage import HostRSync
class TestGatewaySpec:
"""
socket:hostname:port:path SocketGateway
localhost[:path] PopenGateway
[ssh:]spec:path SshGateway
* [SshGateway]
"""
def test_localhost_nopath(self):
for joinpath in ('', ':abc', ':ab:cd', ':/x/y'):
spec = GatewaySpec("localhost" + joinpath)
assert spec.address == "localhost"
assert spec.joinpath == joinpath[1:]
assert spec.type == "popen"
spec2 = GatewaySpec("localhost" + joinpath)
self._equality(spec, spec2)
if joinpath == "":
assert spec.inplacelocal()
else:
assert not spec.inplacelocal()
def test_ssh(self):
for prefix in ('ssh:', ''): # ssh is default
for hostpart in ('x.y', 'xyz@x.y'):
for joinpath in ('', ':abc', ':ab:cd', ':/tmp'):
specstring = prefix + hostpart + joinpath
spec = GatewaySpec(specstring)
assert spec.address == hostpart
assert spec.joinpath == joinpath[1:]
assert spec.type == "ssh"
spec2 = GatewaySpec(specstring)
self._equality(spec, spec2)
assert not spec.inplacelocal()
def test_socket(self):
for hostpart in ('x.y', 'x', 'localhost'):
for port in ":80", ":1000":
for joinpath in ('', ':abc', ':abc:de'):
spec = GatewaySpec("socket:" + hostpart + port + joinpath)
assert spec.address == (hostpart, int(port[1:]))
assert spec.joinpath == joinpath[1:]
assert spec.type == "socket"
spec2 = GatewaySpec("socket:" + hostpart + port + joinpath)
self._equality(spec, spec2)
assert not spec.inplacelocal()
def _equality(self, spec1, spec2):
assert spec1 != spec2
assert hash(spec1) != hash(spec2)
assert not (spec1 == spec2)
class TestGatewaySpecAPI:
def test_localhost_nopath_makegateway(self, testdir):
spec = GatewaySpec("localhost")
gw = spec.makegateway()
p = gw.remote_exec("import os; channel.send(os.getcwd())").receive()
curdir = py.std.os.getcwd()
assert curdir == p
gw.exit()
def test_localhost_makegateway(self, testdir):
spec = GatewaySpec("localhost:" + str(testdir.tmpdir))
gw = spec.makegateway()
p = gw.remote_exec("import os; channel.send(os.getcwd())").receive()
assert spec.joinpath == p
gw.exit()
def test_localhost_makegateway_python(self, testdir):
spec = GatewaySpec("localhost")
gw = spec.makegateway(python=py.std.sys.executable)
res = gw.remote_exec("import sys ; channel.send(sys.executable)").receive()
assert py.std.sys.executable == res
gw.exit()
def test_ssh(self):
sshhost = py.test.config.getvalueorskip("sshhost")
spec = GatewaySpec("ssh:" + sshhost)
gw = spec.makegateway()
p = gw.remote_exec("import os ; channel.send(os.getcwd())").receive()
gw.exit()
@py.test.mark.xfail("implement socketserver test scenario")
def test_socketgateway(self):
gw = py.execnet.PopenGateway()
spec = GatewaySpec("ssh:" + sshhost)
class TestGatewayManagerLocalhost:
def test_hostmanager_localhosts_makegateway(self):
hm = GatewayManager(["localhost"] * 2)
hm.makegateways()
assert len(hm.spec2gateway) == 2
hm.exit()
assert not len(hm.spec2gateway)
def test_hostmanager_localhosts_rsync(self, source):
hm = GatewayManager(["localhost"] * 2)
hm.makegateways()
assert len(hm.spec2gateway) == 2
for gw in hm.spec2gateway.values():
gw.remote_exec = None
l = []
hm.rsync(source, notify=lambda *args: l.append(args))
assert not l
hm.exit()
assert not len(hm.spec2gateway)
def test_hostmanager_rsync_localhost_with_path(self, source, dest):
hm = GatewayManager(["localhost:%s" %dest] * 1)
hm.makegateways()
source.ensure("dir1", "dir2", "hello")
l = []
hm.rsync(source, notify=lambda *args: l.append(args))
assert len(l) == 1
assert l[0] == ("rsyncrootready", hm.spec2gateway.keys()[0], source)
hm.exit()
dest = dest.join(source.basename)
assert dest.join("dir1").check()
assert dest.join("dir1", "dir2").check()
assert dest.join("dir1", "dir2", 'hello').check()
def XXXtest_ssh_rsync_samehost_twice(self):
#XXX we have no easy way to have a temp directory remotely!
option = py.test.config.option
if option.sshhost is None:
py.test.skip("no known ssh target, use -S to set one")
host1 = Host("%s" % (option.sshhost, ))
host2 = Host("%s" % (option.sshhost, ))
hm = HostManager(config, hosts=[host1, host2])
events = []
hm.init_rsync(events.append)
print events
assert 0
def test_multi_chdir_localhost_with_path(self, testdir):
import os
hm = GatewayManager(["localhost:hello"] * 2)
testdir.tmpdir.chdir()
hellopath = testdir.tmpdir.mkdir("hello")
hm.makegateways()
l = hm.multi_exec("import os ; channel.send(os.getcwd())").receive()
assert l == [str(hellopath)] * 2
py.test.raises(Exception, 'hm.multi_chdir("world", inplacelocal=False)')
worldpath = hellopath.mkdir("world")
hm.multi_chdir("world", inplacelocal=False)
l = hm.multi_exec("import os ; channel.send(os.getcwd())").receive()
assert len(l) == 2
assert l[0] == l[1]
curwd = os.getcwd()
assert l[0].startswith(curwd)
assert l[0].endswith("world")
def test_multi_chdir_localhost(self, testdir):
import os
hm = GatewayManager(["localhost"] * 2)
testdir.tmpdir.chdir()
hellopath = testdir.tmpdir.mkdir("hello")
hm.makegateways()
hm.multi_chdir("hello", inplacelocal=False)
l = hm.multi_exec("import os ; channel.send(os.getcwd())").receive()
assert len(l) == 2
assert l == [os.getcwd()] * 2
hm.multi_chdir("hello")
l = hm.multi_exec("import os ; channel.send(os.getcwd())").receive()
assert len(l) == 2
assert l[0] == l[1]
curwd = os.getcwd()
assert l[0].startswith(curwd)
assert l[0].endswith("hello")
from py.__.execnet.gwmanage import MultiChannel
class TestMultiChannel:
def test_multichannel_receive(self):
class pseudochannel:
def receive(self):
return 12
multichannel = MultiChannel([pseudochannel(), pseudochannel()])
l = multichannel.receive()
assert len(l) == 2
assert l == [12, 12]
def test_multichannel_wait(self):
l = []
class pseudochannel:
def waitclose(self):
l.append(0)
multichannel = MultiChannel([pseudochannel(), pseudochannel()])
multichannel.wait()
assert len(l) == 2
def pytest_pyfuncarg_source(pyfuncitem):
return py.test.ensuretemp(pyfuncitem.getmodpath()).mkdir("source")
def pytest_pyfuncarg_dest(pyfuncitem):
return py.test.ensuretemp(pyfuncitem.getmodpath()).mkdir("dest")
class TestHRSync:
def test_hrsync_filter(self, source, dest):
source.ensure("dir", "file.txt")
source.ensure(".svn", "entries")
source.ensure(".somedotfile", "moreentries")
source.ensure("somedir", "editfile~")
syncer = HostRSync(source)
l = list(source.visit(rec=syncer.filter,
fil=syncer.filter))
assert len(l) == 3
basenames = [x.basename for x in l]
assert 'dir' in basenames
assert 'file.txt' in basenames
assert 'somedir' in basenames
def test_hrsync_one_host(self, source, dest):
spec = GatewaySpec("localhost:%s" % dest)
gw = spec.makegateway()
finished = []
rsync = HostRSync(source)
rsync.add_target_host(gw, finished=lambda: finished.append(1))
source.join("hello.py").write("world")
rsync.send()
gw.exit()
assert dest.join(source.basename, "hello.py").check()
assert len(finished) == 1

View File

@ -30,7 +30,7 @@ class TestRSync(DirSetup):
dest2 = self.dest2
source = self.source
for s in ('content1', 'content2-a-bit-longer'):
for s in ('content1', 'content2', 'content2-a-bit-longer'):
source.ensure('subdir', 'file1').write(s)
rsync = RSync(self.source)
rsync.add_target(gw, dest)
@ -42,6 +42,9 @@ class TestRSync(DirSetup):
assert dest2.join('subdir').check(dir=1)
assert dest2.join('subdir', 'file1').check(file=1)
assert dest2.join('subdir', 'file1').read() == s
for x in dest, dest2:
fn = x.join("subdir", "file1")
fn.setmtime(0)
source.join('subdir').remove('file1')
rsync = RSync(source)

View File

@ -431,9 +431,6 @@ class FSCollector(Collector):
if len(picklestate) == 3:
# root node
name, config, relpath = picklestate
if not config._initialized:
raise ValueError("incomplete unpickling of "
"config object, need call to _initafterpickle()?")
fspath = config.topdir.join(relpath)
fsnode = config.getfsnode(fspath)
self.__dict__.update(fsnode.__dict__)

View File

@ -25,7 +25,6 @@ class CmdOptions(object):
class Config(object):
""" central bus for dealing with configuration/initialization data. """
Option = py.compat.optparse.Option # deprecated
_initialized = False
_sessionclass = None
def __init__(self, pytestplugins=None, topdir=None):
@ -67,9 +66,8 @@ class Config(object):
""" parse cmdline arguments into this config object.
Note that this can only be called once per testing process.
"""
assert not self._initialized, (
assert not hasattr(self, 'args'), (
"can only parse cmdline args at most once per Config object")
self._initialized = True
self._preparse(args)
args = self._parser.parse_setoption(args, self.option)
if not args:
@ -80,40 +78,31 @@ class Config(object):
# config objects are usually pickled across system
# barriers but they contain filesystem paths.
# upon getstate/setstate we take care to do everything
# relative to our "topdir".
# relative to "topdir".
def __getstate__(self):
return self._makerepr()
def __setstate__(self, repr):
self._repr = repr
def _initafterpickle(self, topdir):
self.__init__(
#issue1
#pytestplugins=py.test._PytestPlugins(py._com.pyplugins),
topdir=topdir,
)
self._mergerepr(self._repr)
self._initialized = True
del self._repr
def _makerepr(self):
l = []
for path in self.args:
path = py.path.local(path)
l.append(path.relto(self.topdir))
return l, self.option
def _mergerepr(self, repr):
# before any conftests are loaded we
# need to set the per-process singleton
# (also seens py.test.config) to have consistent
# option handling
global config_per_process
config_per_process = self
def __setstate__(self, repr):
# warning global side effects:
# * registering to py lib plugins
# * setting py.test.config
self.__init__(
pytestplugins=py.test._PytestPlugins(py._com.pyplugins),
topdir=py.path.local(),
)
# we have to set py.test.config because preparse()
# might load conftest files which have
# py.test.config.addoptions() lines in them
py.test.config = self
args, cmdlineopts = repr
self.args = [self.topdir.join(x) for x in args]
args = [self.topdir.join(x) for x in args]
self.option = cmdlineopts
self._preparse(self.args)
self._preparse(args)
self.args = args
def getcolitems(self):
return [self.getfsnode(arg) for arg in self.args]

View File

@ -6,10 +6,6 @@
import py
from py.__.test import event
import py.__.test.custompdb
from py.__.test.dsession.hostmanage import HostManager
Item = py.test.collect.Item
Collector = py.test.collect.Collector
from py.__.test.runner import basic_run_report, basic_collect_report
from py.__.test.session import Session
from py.__.test import outcome
@ -80,11 +76,9 @@ class DSession(Session):
def main(self, colitems=None):
colitems = self.getinitialitems(colitems)
#self.bus.notify(event.TestrunStart())
self.sessionstarts()
self.setup_hosts()
exitstatus = self.loop(colitems)
#self.bus.notify(event.TestrunFinish(exitstatus=exitstatus))
self.teardown_hosts()
self.sessionfinishes()
return exitstatus
@ -189,7 +183,7 @@ class DSession(Session):
colitems = self.filteritems(colitems)
senditems = []
for next in colitems:
if isinstance(next, Item):
if isinstance(next, py.test.collect.Item):
senditems.append(next)
else:
self.bus.notify("collectionstart", event.CollectionStart(next))
@ -235,14 +229,13 @@ class DSession(Session):
def setup_hosts(self):
""" setup any neccessary resources ahead of the test run. """
self.hm = HostManager(self)
from py.__.test.dsession.hostmanage import HostManager
self.hm = HostManager(self.config)
self.hm.setup_hosts(putevent=self.queue.put)
def teardown_hosts(self):
""" teardown any resources after a test run. """
for host in self.host2pending:
host.gw.exit()
self.hm.teardown_hosts()
# debugging function
def dump_picklestate(item):

View File

@ -1,206 +1,76 @@
import py
import sys, os
from py.__.test.dsession.masterslave import MasterNode
from py.__.execnet.gwmanage import GatewayManager
from py.__.test import event
class Host(object):
""" Host location representation for distributed testing. """
_hostname2list = {}
def __init__(self, spec, addrel="", python=None):
parts = spec.split(':', 1)
self.hostname = parts.pop(0)
self.relpath = parts and parts.pop(0) or ""
if self.hostname == "localhost" and not self.relpath:
self.inplacelocal = True
addrel = "" # inplace localhosts cannot have additions
else:
self.inplacelocal = False
if not self.relpath:
self.relpath = "pytestcache-%s" % self.hostname
if addrel:
self.relpath += "/" + addrel # XXX too os-dependent
assert not parts
assert self.inplacelocal or self.relpath
self.hostid = self._getuniqueid(self.hostname)
self.python = python
def __getstate__(self):
return (self.hostname, self.relpath, self.hostid)
def __setstate__(self, repr):
self.hostname, self.relpath, self.hostid = repr
def _getuniqueid(self, hostname):
l = self._hostname2list.setdefault(hostname, [])
hostid = hostname + "-%d" % len(l)
l.append(hostid)
return hostid
def initgateway(self):
python = self.python or "python"
if self.hostname == "localhost":
self.gw = py.execnet.PopenGateway(python=python)
else:
self.gw = py.execnet.SshGateway(self.hostname,
remotepython=python)
if self.inplacelocal:
self.gw.remote_exec(py.code.Source(
sethomedir, "sethomedir()"
)).waitclose()
self.gw_remotepath = None
else:
assert self.relpath
channel = self.gw.remote_exec(py.code.Source(
gethomedir,
sethomedir, "sethomedir()",
getpath_relto_home, """
channel.send(getpath_relto_home(%r))
""" % self.relpath,
))
self.gw_remotepath = channel.receive()
def __str__(self):
return "<Host id=%s %s:%s>" % (self.hostid, self.hostname, self.relpath)
__repr__ = __str__
def __hash__(self):
return hash(self.hostid)
def __eq__(self, other):
return self.hostid == other.hostid
def __ne__(self, other):
return not self.hostid == other.hostid
class HostRSync(py.execnet.RSync):
""" RSyncer that filters out common files
"""
def __init__(self, sourcedir, *args, **kwargs):
self._synced = {}
ignores= None
if 'ignores' in kwargs:
ignores = kwargs.pop('ignores')
self._ignores = ignores or []
super(HostRSync, self).__init__(sourcedir=sourcedir, **kwargs)
def filter(self, path):
path = py.path.local(path)
if not path.ext in ('.pyc', '.pyo'):
if not path.basename.endswith('~'):
if path.check(dotfile=0):
for x in self._ignores:
if path == x:
break
else:
return True
def add_target_host(self, host, destrelpath="", notify=None):
remotepath = host.gw_remotepath
key = host.hostname, host.relpath
if host.inplacelocal:
remotepath = self._sourcedir
self._synced[key] = True
elif destrelpath:
remotepath = os.path.join(remotepath, destrelpath)
synced = key in self._synced
if notify:
notify(
event.HostRSyncing(host, py.path.local(self._sourcedir),
remotepath, synced))
def hostrsynced(host=host):
if notify:
notify(
event.HostRSyncRootReady(host, self._sourcedir))
if key in self._synced:
hostrsynced()
return
self._synced[key] = True
super(HostRSync, self).add_target(host.gw, remotepath,
finishedcallback=hostrsynced,
delete=True,
)
return remotepath
def gethosts(config, addrel):
def getconfighosts(config):
if config.option.numprocesses:
hosts = ['localhost'] * config.option.numprocesses
else:
hosts = config.getvalue("dist_hosts")
python = config.option.executable or "python"
hosts = [Host(x, addrel, python=python) for x in hosts]
assert hosts is not None
return hosts
class HostManager(object):
def __init__(self, session, hosts=None):
self.session = session
roots = self.session.config.getvalue_pathlist("dist_rsync_roots")
addrel = ""
if roots is None:
roots = [self.session.config.topdir]
addrel = self.session.config.topdir.basename
self._addrel = addrel
def __init__(self, config, hosts=None):
self.config = config
roots = self.config.getvalue_pathlist("rsyncroots")
if not roots:
roots = self.config.getvalue_pathlist("dist_rsync_roots")
self.roots = roots
if hosts is None:
hosts = gethosts(self.session.config, addrel)
self.hosts = hosts
hosts = getconfighosts(self.config)
self.gwmanager = GatewayManager(hosts)
def prepare_gateways(self):
for host in self.hosts:
host.initgateway()
self.session.bus.notify("hostgatewayready", event.HostGatewayReady(host, self.roots))
def makegateways(self):
old = self.config.topdir.chdir()
try:
self.gwmanager.makegateways()
finally:
old.chdir()
def init_rsync(self):
self.prepare_gateways()
# send each rsync root
ignores = self.session.config.getvalue_pathlist("dist_rsync_ignore")
for root in self.roots:
rsync = HostRSync(root, ignores=ignores,
verbose=self.session.config.option.verbose)
if self._addrel:
destrelpath = ""
else:
destrelpath = root.basename
for host in self.hosts:
rsync.add_target_host(host, destrelpath)
rsync.send(raises=False)
self.session.bus.notify("rsyncfinished", event.RsyncFinished())
def rsync_roots(self):
""" make sure that all remote gateways
have the same set of roots in their
current directory.
"""
# we change to the topdir sot that
# PopenGateways will have their cwd
# such that unpickling configs will
# pick it up as the right topdir
# (for other gateways this chdir is irrelevant)
self.makegateways()
options = {
'ignores': self.config.getvalue_pathlist("dist_rsync_ignore"),
'verbose': self.config.option.verbose
}
if self.roots:
# send each rsync root
for root in self.roots:
self.gwmanager.rsync(root, **options)
else:
# we transfer our topdir as the root
# but need to be careful regarding
self.gwmanager.rsync(self.config.topdir, **options)
self.gwmanager.multi_chdir(self.config.topdir.basename, inplacelocal=False)
self.config.bus.notify("rsyncfinished", event.RsyncFinished())
def setup_hosts(self, putevent):
self.init_rsync()
for host in self.hosts:
self.rsync_roots()
nice = self.config.getvalue("dist_nicelevel")
if nice != 0:
self.gwmanager.multi_exec("""
import os
if hasattr(os, 'nice'):
os.nice(%r)
""" % nice).wait()
for host, gateway in self.gwmanager.spec2gateway.items():
host.node = MasterNode(host,
self.session.config,
gateway,
self.config,
putevent)
#
# helpers
#
def gethomedir():
import os
homedir = os.environ.get('HOME', '')
if not homedir:
homedir = os.environ.get('HOMEPATH', '.')
return os.path.abspath(homedir)
def getpath_relto_home(targetpath):
import os
if not os.path.isabs(targetpath):
homedir = gethomedir()
targetpath = os.path.join(homedir, targetpath)
return os.path.normpath(targetpath)
def sethomedir():
import os
homedir = os.environ.get('HOME', '')
if not homedir:
homedir = os.environ.get('HOMEPATH', '.')
os.chdir(homedir)
def makehostup(host=None):
if host is None:
host = Host("localhost")
platinfo = {}
for name in 'platform', 'executable', 'version_info':
platinfo["sys."+name] = getattr(sys, name)
return event.HostUp(host, platinfo)
def teardown_hosts(self):
self.gwmanager.exit()

View File

@ -8,11 +8,11 @@ from py.__.test.dsession.mypickle import PickleChannel
class MasterNode(object):
ENDMARK = -1
def __init__(self, host, config, putevent):
def __init__(self, host, gateway, config, putevent):
self.host = host
self.config = config
self.putevent = putevent
self.channel = install_slave(host, config)
self.channel = install_slave(host, gateway, config)
self.channel.setcallback(self.callback, endmarker=self.ENDMARK)
self._down = False
@ -23,7 +23,7 @@ class MasterNode(object):
""" this gets called for each object we receive from
the other side and if the channel closes.
Note that the callback runs in the receiver
Note that channel callbacks run in the receiver
thread of execnet gateways - we need to
avoid raising exceptions or doing heavy work.
"""
@ -60,63 +60,33 @@ class MasterNode(object):
def shutdown(self):
self.channel.send(None)
#
# a config needs to be available on the other side for options
# and for reconstructing collection trees (topdir, conftest access)
#
def send_and_receive_pickled_config(channel, config, remote_topdir):
channel.send((config, remote_topdir))
backconfig = channel.receive()
assert config is backconfig # ImmutablePickling :)
return backconfig
def receive_and_send_pickled_config(channel):
config,topdir = channel.receive()
config._initafterpickle(topdir)
channel.send(config)
return config
# setting up slave code
def install_slave(host, config):
channel = host.gw.remote_exec(source="""
def install_slave(host, gateway, config):
channel = gateway.remote_exec(source="""
from py.__.test.dsession.mypickle import PickleChannel
from py.__.test.dsession.masterslave import SlaveNode
channel = PickleChannel(channel)
from py.__.test.dsession import masterslave
config = masterslave.receive_and_send_pickled_config(channel)
slavenode = masterslave.SlaveNode(channel, config)
slavenode = SlaveNode(channel)
slavenode.run()
""")
channel = PickleChannel(channel)
remote_topdir = host.gw_remotepath
if remote_topdir is None:
assert host.inplacelocal
remote_topdir = config.topdir
send_and_receive_pickled_config(channel, config, remote_topdir)
channel.send(host)
channel.send((host, config))
return channel
class SlaveNode(object):
def __init__(self, channel, config):
def __init__(self, channel):
self.channel = channel
self.config = config
import os
if hasattr(os, 'nice'):
nice_level = config.getvalue('dist_nicelevel')
os.nice(nice_level)
def __repr__(self):
host = getattr(self, 'host', '<uninitialized>')
return "<%s host=%s>" %(self.__class__.__name__, host)
return "<%s channel=%s>" %(self.__class__.__name__, self.channel)
def sendevent(self, eventname, *args, **kwargs):
self.channel.send((eventname, args, kwargs))
def run(self):
self.config.pytestplugins.do_configure(self.config)
from py.__.test.dsession.hostmanage import makehostup
channel = self.channel
self.host = host = channel.receive()
host, self.config = channel.receive()
self.config.pytestplugins.do_configure(self.config)
self.sendevent("hostup", makehostup(host))
try:
while 1:
@ -140,3 +110,14 @@ class SlaveNode(object):
runner = item._getrunner()
testrep = runner(item)
self.sendevent("itemtestreport", testrep)
def makehostup(host=None):
from py.__.execnet.gwmanage import GatewaySpec
import sys
if host is None:
host = GatewaySpec("localhost")
platinfo = {}
for name in 'platform', 'executable', 'version_info':
platinfo["sys."+name] = getattr(sys, name)
return event.HostUp(host, platinfo)

View File

@ -1,5 +1,6 @@
from py.__.test.dsession.dsession import DSession, LoopState
from py.__.test.dsession.hostmanage import Host, makehostup
from py.__.test.dsession.masterslave import makehostup
from py.__.execnet.gwmanage import GatewaySpec
from py.__.test.runner import basic_collect_report
from py.__.test import event
from py.__.test import outcome
@ -37,7 +38,7 @@ class TestDSession:
item = testdir.getitem("def test_func(): pass")
rep = run(item)
session = DSession(item._config)
host = Host("localhost")
host = GatewaySpec("localhost")
host.node = MockNode()
assert not session.host2pending
session.addhost(host)
@ -52,7 +53,7 @@ class TestDSession:
item = testdir.getitem("def test_func(): pass")
rep = run(item)
session = DSession(item._config)
host = Host("localhost")
host = GatewaySpec("localhost")
host.node = MockNode()
session.addhost(host)
session.senditems([item])
@ -77,9 +78,9 @@ class TestDSession:
def test_triggertesting_item(self, testdir):
item = testdir.getitem("def test_func(): pass")
session = DSession(item._config)
host1 = Host("localhost")
host1 = GatewaySpec("localhost")
host1.node = MockNode()
host2 = Host("localhost")
host2 = GatewaySpec("localhost")
host2.node = MockNode()
session.addhost(host1)
session.addhost(host2)
@ -114,7 +115,7 @@ class TestDSession:
def test_rescheduleevent(self, testdir):
item = testdir.getitem("def test_func(): pass")
session = DSession(item._config)
host1 = Host("localhost")
host1 = GatewaySpec("localhost")
host1.node = MockNode()
session.addhost(host1)
ev = event.RescheduleItems([item])
@ -138,7 +139,7 @@ class TestDSession:
item = testdir.getitem("def test_func(): pass")
# setup a session with one host
session = DSession(item._config)
host1 = Host("localhost")
host1 = GatewaySpec("localhost")
host1.node = MockNode()
session.addhost(host1)
@ -163,10 +164,10 @@ class TestDSession:
# setup a session with two hosts
session = DSession(item1._config)
host1 = Host("localhost")
host1 = GatewaySpec("localhost")
host1.node = MockNode()
session.addhost(host1)
host2 = Host("localhost")
host2 = GatewaySpec("localhost")
host2.node = MockNode()
session.addhost(host2)
@ -184,13 +185,13 @@ class TestDSession:
assert testrep.failed
assert testrep.colitem == item1
assert str(testrep.longrepr).find("crashed") != -1
assert str(testrep.longrepr).find(host.hostname) != -1
assert str(testrep.longrepr).find(host.address) != -1
def test_hostup_adds_to_available(self, testdir):
item = testdir.getitem("def test_func(): pass")
# setup a session with two hosts
session = DSession(item._config)
host1 = Host("localhost")
host1 = GatewaySpec("localhost")
hostup = makehostup(host1)
session.queueevent("hostup", hostup)
loopstate = LoopState([item])
@ -210,7 +211,7 @@ class TestDSession:
def runthrough(self, item):
session = DSession(item._config)
host1 = Host("localhost")
host1 = GatewaySpec("localhost")
host1.node = MockNode()
session.addhost(host1)
loopstate = LoopState([item])
@ -247,7 +248,7 @@ class TestDSession:
""")
modcol._config.option.exitfirst = True
session = DSession(modcol._config)
host1 = Host("localhost")
host1 = GatewaySpec("localhost")
host1.node = MockNode()
session.addhost(host1)
items = basic_collect_report(modcol).result
@ -269,7 +270,7 @@ class TestDSession:
def test_shuttingdown_filters_events(self, testdir, EventRecorder):
item = testdir.getitem("def test_func(): pass")
session = DSession(item._config)
host = Host("localhost")
host = GatewaySpec("localhost")
session.addhost(host)
loopstate = LoopState([])
loopstate.shuttingdown = True
@ -315,7 +316,7 @@ class TestDSession:
item = testdir.getitem("def test_func(): pass")
session = DSession(item._config)
host = Host("localhost")
host = GatewaySpec("localhost")
host.node = MockNode()
session.addhost(host)
session.senditems([item])
@ -337,7 +338,7 @@ class TestDSession:
pass
""")
session = DSession(modcol._config)
host = Host("localhost")
host = GatewaySpec("localhost")
host.node = MockNode()
session.addhost(host)

View File

@ -4,39 +4,39 @@
import py
from py.__.test.dsession.dsession import DSession
from py.__.test.dsession.hostmanage import HostManager, Host
from test_masterslave import EventQueue
import os
class TestAsyncFunctional:
def test_conftest_options(self, testdir):
testdir.makepyfile(conftest="""
print "importing conftest"
p1 = testdir.tmpdir.ensure("dir", 'p1.py')
p1.dirpath("__init__.py").write("")
p1.dirpath("conftest.py").write(py.code.Source("""
print "importing conftest", __file__
import py
Option = py.test.config.Option
option = py.test.config.addoptions("someopt",
Option('--someopt', action="store_true", dest="someopt", default=False))
""",
)
p1 = testdir.makepyfile("""
dist_rsync_roots = ['../dir']
print "added options", option
print "config file seen from conftest", py.test.config
"""))
p1.write(py.code.Source("""
import py, conftest
def test_1():
import py, conftest
print "config from test_1", py.test.config
print "conftest from test_1", conftest.__file__
print "test_1: py.test.config.option.someopt", py.test.config.option.someopt
print "test_1: conftest", conftest
print "test_1: conftest.option.someopt", conftest.option.someopt
assert conftest.option.someopt
""", __init__="#")
print p1
config = py.test.config._reparse(['-n1', p1, '--someopt'])
dsession = DSession(config)
eq = EventQueue(config.bus)
dsession.main()
ev, = eq.geteventargs("itemtestreport")
if not ev.passed:
print ev
assert 0
"""))
result = testdir.runpytest('-n1', p1, '--someopt')
assert result.ret == 0
extra = result.stdout.fnmatch_lines([
"*1 passed*",
])
def test_dist_some_tests(self, testdir):
testdir.makepyfile(conftest="dist_hosts=['localhost']\n")
@ -61,7 +61,7 @@ class TestAsyncFunctional:
assert ev.failed
# see that the host is really down
ev, = eq.geteventargs("hostdown")
assert ev.host.hostname == "localhost"
assert ev.host.address == "localhost"
ev, = eq.geteventargs("testrunfinish")
def test_distribution_rsync_roots_example(self, testdir):

View File

@ -3,186 +3,16 @@
"""
import py
from py.__.test.dsession.hostmanage import HostRSync, Host, HostManager, gethosts
from py.__.test.dsession.hostmanage import sethomedir, gethomedir, getpath_relto_home
from py.__.test.dsession.hostmanage import HostManager, getconfighosts
from py.__.execnet.gwmanage import GatewaySpec as Host
from py.__.test import event
class TestHost:
def _gethostinfo(self, testdir, relpath=""):
exampledir = testdir.mkdir("gethostinfo")
if relpath:
exampledir = exampledir.join(relpath)
hostinfo = Host("localhost:%s" % exampledir)
return hostinfo
def test_defaultpath(self):
x = Host("localhost:")
assert x.hostname == "localhost"
assert not x.relpath
def test_addrel(self):
host = Host("localhost:", addrel="whatever")
assert host.inplacelocal
assert not host.relpath
host = Host("localhost:/tmp", addrel="base")
assert host.relpath == "/tmp/base"
host = Host("localhost:tmp", addrel="base2")
assert host.relpath == "tmp/base2"
def test_path(self):
x = Host("localhost:/tmp")
assert x.relpath == "/tmp"
assert x.hostname == "localhost"
assert not x.inplacelocal
def test_equality(self):
x = Host("localhost:")
y = Host("localhost:")
assert x != y
assert not (x == y)
def test_hostid(self):
x = Host("localhost:")
y = Host("localhost:")
assert x.hostid != y.hostid
x = Host("localhost:/tmp")
y = Host("localhost:")
assert x.hostid != y.hostid
def test_non_existing_hosts(self):
host = Host("alskdjalsdkjasldkajlsd")
py.test.raises((py.process.cmdexec.Error, IOError, EOFError),
host.initgateway)
def test_remote_has_homedir_as_currentdir(self, testdir):
host = self._gethostinfo(testdir)
old = py.path.local.get_temproot().chdir()
try:
host.initgateway()
channel = host.gw.remote_exec(py.code.Source(
gethomedir, """
import os
homedir = gethomedir()
curdir = os.getcwd()
channel.send((curdir, homedir))
"""))
remote_curdir, remote_homedir = channel.receive()
assert remote_curdir == remote_homedir
finally:
old.chdir()
def test_initgateway_localhost_relpath(self):
host = Host("localhost:somedir")
host.initgateway()
assert host.gw
try:
homedir = py.path.local._gethomedir()
expected = homedir.join("somedir")
assert host.gw_remotepath == str(expected)
finally:
host.gw.exit()
def test_initgateway_python(self):
host = Host("localhost", python="python2.4")
l = []
def p(python):
l.append(python)
raise ValueError
py.magic.patch(py.execnet, 'PopenGateway', p)
try:
py.test.raises(ValueError, host.initgateway)
finally:
py.magic.revert(py.execnet, 'PopenGateway')
assert l[0] == host.python
def test_initgateway_ssh_and_remotepath(self):
hostspec = py.test.config.option.sshhost
if not hostspec:
py.test.skip("no known ssh target, use -S to set one")
host = Host("%s" % (hostspec))
# this test should be careful to not write/rsync anything
# as the remotepath is the default location
# and may be used in the real world
host.initgateway()
assert host.gw
assert host.gw_remotepath.endswith(host.relpath)
channel = host.gw.remote_exec("""
import os
homedir = os.environ['HOME']
relpath = channel.receive()
path = os.path.join(homedir, relpath)
channel.send(path)
""")
channel.send(host.relpath)
res = channel.receive()
assert res == host.gw_remotepath
def pytest_pyfuncarg_source(pyfuncitem):
return py.test.ensuretemp(pyfuncitem.getmodpath()).mkdir("source")
def pytest_pyfuncarg_dest(pyfuncitem):
return py.test.ensuretemp(pyfuncitem.getmodpath()).mkdir("dest")
class TestSyncing:
def _gethostinfo(self, dest):
hostinfo = Host("localhost:%s" % dest)
return hostinfo
def test_hrsync_filter(self, source, dest):
source.ensure("dir", "file.txt")
source.ensure(".svn", "entries")
source.ensure(".somedotfile", "moreentries")
source.ensure("somedir", "editfile~")
syncer = HostRSync(source)
l = list(source.visit(rec=syncer.filter,
fil=syncer.filter))
assert len(l) == 3
basenames = [x.basename for x in l]
assert 'dir' in basenames
assert 'file.txt' in basenames
assert 'somedir' in basenames
def test_hrsync_localhost_inplace(self, source, dest):
h1 = Host("localhost")
events = []
rsync = HostRSync(source)
h1.initgateway()
rsync.add_target_host(h1, notify=events.append)
assert events
l = [x for x in events
if isinstance(x, event.HostRSyncing)]
assert len(l) == 1
ev = l[0]
assert ev.host == h1
assert ev.root == ev.remotepath
l = [x for x in events
if isinstance(x, event.HostRSyncRootReady)]
assert len(l) == 1
ev = l[0]
assert ev.root == source
assert ev.host == h1
def test_hrsync_one_host(self, source, dest):
h1 = self._gethostinfo(dest)
finished = []
rsync = HostRSync(source)
h1.initgateway()
rsync.add_target_host(h1)
source.join("hello.py").write("world")
rsync.send()
assert dest.join("hello.py").check()
def test_hrsync_same_host_twice(self, source, dest):
h1 = self._gethostinfo(dest)
h2 = self._gethostinfo(dest)
finished = []
rsync = HostRSync(source)
l = []
h1.initgateway()
h2.initgateway()
res1 = rsync.add_target_host(h1)
assert res1
res2 = rsync.add_target_host(h2)
assert not res2
dest = py.test.ensuretemp(pyfuncitem.getmodpath()).mkdir("dest")
return dest
class TestHostManager:
def gethostmanager(self, source, dist_hosts, dist_rsync_roots=None):
@ -192,25 +22,37 @@ class TestHostManager:
source.join("conftest.py").write("\n".join(l))
config = py.test.config._reparse([source])
assert config.topdir == source
session = config.initsession()
hm = HostManager(session)
assert hm.hosts
hm = HostManager(config)
assert hm.gwmanager.spec2gateway
return hm
def test_hostmanager_custom_hosts(self, source, dest):
def xxtest_hostmanager_custom_hosts(self, source, dest):
session = py.test.config._reparse([source]).initsession()
hm = HostManager(session, hosts=[1,2,3])
hm = HostManager(session.config, hosts=[1,2,3])
assert hm.hosts == [1,2,3]
def test_hostmanager_init_rsync_topdir(self, source, dest):
def test_hostmanager_rsync_roots_no_roots(self, source, dest):
source.ensure("dir1", "file1").write("hello")
config = py.test.config._reparse([source])
hm = HostManager(config, hosts=["localhost:%s" % dest])
assert hm.config.topdir == source == config.topdir
hm.rsync_roots()
p, = hm.gwmanager.multi_exec("import os ; channel.send(os.getcwd())").receive()
p = py.path.local(p)
print "remote curdir", p
assert p == dest.join(config.topdir.basename)
assert p.join("dir1").check()
assert p.join("dir1", "file1").check()
def test_hostmanager_rsync_roots_roots(self, source, dest):
dir2 = source.ensure("dir1", "dir2", dir=1)
dir2.ensure("hello")
hm = self.gethostmanager(source,
dist_hosts = ["localhost:%s" % dest]
dist_hosts = ["localhost:%s" % dest],
dist_rsync_roots = ['dir1']
)
assert hm.session.config.topdir == source
hm.init_rsync()
dest = dest.join(source.basename)
assert hm.config.topdir == source
hm.rsync_roots()
assert dest.join("dir1").check()
assert dest.join("dir1", "dir2").check()
assert dest.join("dir1", "dir2", 'hello').check()
@ -222,8 +64,8 @@ class TestHostManager:
dist_hosts = ["localhost:%s" % dest],
dist_rsync_roots = [str(source)]
)
assert hm.session.config.topdir == source
hm.init_rsync()
assert hm.config.topdir == source
hm.rsync_roots()
dest = dest.join(source.basename)
assert dest.join("dir1").check()
assert dest.join("dir1", "dir2").check()
@ -238,9 +80,9 @@ class TestHostManager:
dist_rsync_roots = ['dir1/dir2']
"""))
session = py.test.config._reparse([source]).initsession()
hm = HostManager(session,
hosts=[Host("localhost:" + str(dest))])
hm.init_rsync()
hm = HostManager(session.config,
hosts=["localhost:" + str(dest)])
hm.rsync_roots()
assert dest.join("dir2").check()
assert not dest.join("dir1").check()
assert not dest.join("bogus").check()
@ -252,39 +94,43 @@ class TestHostManager:
dir2.ensure("hello")
source.join("conftest.py").write(py.code.Source("""
dist_rsync_ignore = ['dir1/dir2', 'dir5/dir6']
dist_rsync_roots = ['dir1', 'dir5']
"""))
session = py.test.config._reparse([source]).initsession()
hm = HostManager(session,
hosts=[Host("localhost:" + str(dest))])
hm.init_rsync()
hm = HostManager(session.config,
hosts=["localhost:" + str(dest)])
hm.rsync_roots()
assert dest.join("dir1").check()
assert not dest.join("dir1", "dir2").check()
assert dest.join("dir5","file").check()
assert not dest.join("dir6").check()
def test_hostmanage_optimise_localhost(self, source, dest):
hosts = [Host("localhost") for i in range(3)]
session = py.test.config._reparse([source]).initsession()
hm = HostManager(session, hosts=hosts)
hm.init_rsync()
for host in hosts:
assert host.inplacelocal
assert host.gw_remotepath is None
assert not host.relpath
hosts = ["localhost"] * 3
source.join("conftest.py").write("dist_rsync_roots = ['a']")
source.ensure('a', dir=1)
config = py.test.config._reparse([source])
hm = HostManager(config, hosts=hosts)
hm.rsync_roots()
for gwspec in hm.gwmanager.spec2gateway:
assert gwspec.inplacelocal()
assert not gwspec.joinpath
def test_hostmanage_setup_hosts(self, source):
hosts = [Host("localhost") for i in range(3)]
session = py.test.config._reparse([source]).initsession()
hm = HostManager(session, hosts=hosts)
hosts = ["localhost"] * 3
source.join("conftest.py").write("dist_rsync_roots = ['a']")
source.ensure('a', dir=1)
config = py.test.config._reparse([source])
hm = HostManager(config, hosts=hosts)
queue = py.std.Queue.Queue()
hm.setup_hosts(putevent=queue.put)
for host in hm.hosts:
for host in hm.gwmanager.spec2gateway:
eventcall = queue.get(timeout=2.0)
name, args, kwargs = eventcall
assert name == "hostup"
for host in hm.hosts:
for host in hm.gwmanager.spec2gateway:
host.node.shutdown()
for host in hm.hosts:
for host in hm.gwmanager.spec2gateway:
eventcall = queue.get(timeout=2.0)
name, args, kwargs = eventcall
print name, args, kwargs
@ -304,24 +150,8 @@ class TestHostManager:
assert 0
def test_getpath_relto_home():
x = getpath_relto_home("hello")
assert x == py.path.local._gethomedir().join("hello")
x = getpath_relto_home(".")
assert x == py.path.local._gethomedir()
def test_sethomedir():
old = py.path.local.get_temproot().chdir()
try:
sethomedir()
curdir = py.path.local()
finally:
old.chdir()
assert py.path.local._gethomedir() == curdir
def test_gethosts():
def test_getconfighosts():
config = py.test.config._reparse(['-n3'])
hosts = gethosts(config, '')
hosts = getconfighosts(config)
assert len(hosts) == 3

View File

@ -1,7 +1,7 @@
import py
from py.__.test.dsession.masterslave import MasterNode
from py.__.test.dsession.hostmanage import Host
from py.__.execnet.gwmanage import GatewaySpec
class EventQueue:
def __init__(self, bus, queue=None):
@ -43,22 +43,28 @@ class MySetup:
config = py.test.config._reparse([])
self.config = config
self.queue = py.std.Queue.Queue()
self.host = Host("localhost")
self.host.initgateway()
self.node = MasterNode(self.host, self.config, putevent=self.queue.put)
self.host = GatewaySpec("localhost")
self.gateway = self.host.makegateway()
self.node = MasterNode(self.host, self.gateway, self.config, putevent=self.queue.put)
assert not self.node.channel.isclosed()
return self.node
def finalize(self):
if hasattr(self, 'host'):
print "exiting:", self.host.gw
self.host.gw.exit()
print "exiting:", self.gateway
self.gateway.exit()
def pytest_pyfuncarg_mysetup(pyfuncitem):
mysetup = MySetup(pyfuncitem)
pyfuncitem.addfinalizer(mysetup.finalize)
return mysetup
def pytest_pyfuncarg_testdir(__call__, pyfuncitem):
# decorate to make us always change to testdir
testdir = __call__.execute(firstresult=True)
testdir.chdir()
return testdir
class TestMasterSlaveConnection:
def test_crash_invalid_item(self, mysetup):

View File

@ -11,7 +11,6 @@
from __future__ import generators
import py
from py.__.test.session import Session
from py.__.test.outcome import Failed, Passed, Skipped
from py.__.test.dsession.mypickle import PickleChannel
from py.__.test import event
from py.__.test.looponfail import util
@ -67,27 +66,29 @@ class RemoteControl(object):
msg = " ".join([str(x) for x in args])
print "RemoteControl:", msg
def initgateway(self):
return py.execnet.PopenGateway(self.executable)
def setup(self, out=None):
if hasattr(self, 'gateway'):
raise ValueError("already have gateway %r" % self.gateway)
if out is None:
out = py.io.TerminalWriter()
from py.__.test.dsession import masterslave
if hasattr(self, 'gateway'):
raise ValueError("already have gateway %r" % self.gateway)
self.trace("setting up slave session")
self.gateway = py.execnet.PopenGateway(self.executable)
old = self.config.topdir.chdir()
try:
self.gateway = self.initgateway()
finally:
old.chdir()
channel = self.gateway.remote_exec(source="""
from py.__.test.dsession.mypickle import PickleChannel
channel = PickleChannel(channel)
from py.__.test.looponfail.remote import slave_runsession
from py.__.test.dsession import masterslave
config = masterslave.receive_and_send_pickled_config(channel)
fullwidth, hasmarkup = channel.receive()
channel = PickleChannel(channel)
config, fullwidth, hasmarkup = channel.receive()
slave_runsession(channel, config, fullwidth, hasmarkup)
""", stdout=out, stderr=out)
channel = PickleChannel(channel)
masterslave.send_and_receive_pickled_config(
channel, self.config, remote_topdir=self.config.topdir)
channel.send((out.fullwidth, out.hasmarkup))
channel.send((self.config, out.fullwidth, out.hasmarkup))
self.trace("set up of slave session complete")
self.channel = channel

View File

@ -3,6 +3,10 @@ import py
class DefaultPlugin:
""" Plugin implementing defaults and general options. """
def pytest_pyfunc_call(self, pyfuncitem, args, kwargs):
pyfuncitem.obj(*args, **kwargs)
return
def pytest_collect_file(self, path, parent):
ext = path.ext
pb = path.purebasename

View File

@ -24,6 +24,8 @@ class MonkeyPatch:
for obj, name, value in self._setattr:
if value is not notset:
setattr(obj, name, value)
else:
delattr(obj, name)
for dictionary, name, value in self._setitem:
if value is notset:
del dictionary[name]
@ -42,6 +44,12 @@ def test_setattr():
monkeypatch.finalize()
assert A.x == 1
monkeypatch.setattr(A, 'y', 3)
assert A.y == 3
monkeypatch.finalize()
assert not hasattr(A, 'y')
def test_setitem():
d = {'x': 1}
monkeypatch = MonkeyPatch()

View File

@ -204,6 +204,8 @@ class TmpTestdir:
p = self.tmpdir.join("conftest.py")
if not p.check():
plugins = [x for x in self.plugins if isinstance(x, str)]
if not plugins:
return
p.write("import py ; pytest_plugins = %r" % plugins)
else:
if self.plugins:

View File

@ -103,9 +103,9 @@ class TerminalReporter:
def pyevent_hostup(self, event):
d = event.platinfo.copy()
d['hostid'] = event.host.hostid
d['host'] = event.host.address
d['version'] = repr_pythonversion(d['sys.version_info'])
self.write_line("HOSTUP: %(hostid)s %(sys.platform)s "
self.write_line("HOSTUP: %(host)s %(sys.platform)s "
"%(sys.executable)s - Python %(version)s" %
d)
@ -113,7 +113,7 @@ class TerminalReporter:
host = event.host
error = event.error
if error:
self.write_line("HostDown %s: %s" %(host.hostid, error))
self.write_line("HostDown %s: %s" %(host, error))
def pyevent_itemstart(self, event):
if self.config.option.verbose:
@ -311,16 +311,16 @@ def repr_pythonversion(v=None):
from py.__.test import event
from py.__.test.runner import basic_run_report
from py.__.test.dsession.hostmanage import Host, makehostup
from py.__.test.dsession.masterslave import makehostup
class TestTerminal:
def test_hostup(self, testdir, linecomp):
from py.__.execnet.gwmanage import GatewaySpec
item = testdir.getitem("def test_func(): pass")
rep = TerminalReporter(item._config, linecomp.stringio)
host = Host("localhost")
rep.pyevent_hostup(makehostup(host))
rep.pyevent_hostup(makehostup())
linecomp.assert_contains_lines([
"*%s %s %s - Python %s" %(host.hostid, sys.platform,
"*localhost %s %s - Python %s" %(sys.platform,
sys.executable, repr_pythonversion(sys.version_info))
])
@ -409,11 +409,12 @@ class TestTerminal:
])
def test_hostready_crash(self, testdir, linecomp):
from py.__.execnet.gwmanage import GatewaySpec
modcol = testdir.getmodulecol("""
def test_one():
pass
""", configargs=("-v",))
host1 = Host("localhost")
host1 = GatewaySpec("localhost")
rep = TerminalReporter(modcol._config, file=linecomp.stringio)
rep.pyevent_hostgatewayready(event.HostGatewayReady(host1, None))
linecomp.assert_contains_lines([

View File

@ -348,11 +348,8 @@ class Function(FunctionMixin, py.test.collect.Item):
""" execute the given test function. """
if not self._deprecated_testexecution():
kw = self.lookup_allargs()
pytest_pyfunc_call = self._config.pytestplugins.getfirst("pytest_pyfunc_call")
if pytest_pyfunc_call is not None:
if pytest_pyfunc_call(pyfuncitem=self, args=self._args, kwargs=kw):
return
self.obj(*self._args, **kw)
ret = self._config.pytestplugins.call_firstresult(
"pytest_pyfunc_call", pyfuncitem=self, args=self._args, kwargs=kw)
def lookup_allargs(self):
kwargs = {}

View File

@ -117,7 +117,11 @@ def importplugin(importspec):
try:
return __import__(importspec)
except ImportError, e:
if str(e).find(importspec) == -1:
raise
try:
return __import__("py.__.test.plugin.%s" %(importspec), None, None, '__doc__')
except ImportError:
except ImportError, e:
if str(e).find(importspec) == -1:
raise
return __import__(importspec) # show the original exception

View File

@ -12,7 +12,7 @@ from py.__.test import event, outcome
Item = py.test.collect.Item
Collector = py.test.collect.Collector
from runner import basic_collect_report
from py.__.test.dsession.hostmanage import makehostup
from py.__.test.dsession.masterslave import makehostup
class Session(object):
"""

View File

@ -1,52 +0,0 @@
import py
def pytest_pyfuncarg_pickletransport(pyfuncitem):
return PickleTransport()
class PickleTransport:
def __init__(self):
from py.__.test.dsession.mypickle import ImmutablePickler
self.p1 = ImmutablePickler(uneven=0)
self.p2 = ImmutablePickler(uneven=1)
def p1_to_p2(self, obj):
return self.p2.loads(self.p1.dumps(obj))
def p2_to_p1(self, obj):
return self.p1.loads(self.p2.dumps(obj))
def unifyconfig(self, config):
p2config = self.p1_to_p2(config)
p2config._initafterpickle(config.topdir)
return p2config
class TestPickling:
def test_pickle_config(self, pickletransport):
config1 = py.test.config._reparse([])
p2config = pickletransport.unifyconfig(config1)
assert p2config.topdir == config1.topdir
config_back = pickletransport.p2_to_p1(p2config)
assert config_back is config1
def test_pickle_module(self, testdir, pickletransport):
modcol1 = testdir.getmodulecol("def test_one(): pass")
pickletransport.unifyconfig(modcol1._config)
modcol2a = pickletransport.p1_to_p2(modcol1)
modcol2b = pickletransport.p1_to_p2(modcol1)
assert modcol2a is modcol2b
modcol1_back = pickletransport.p2_to_p1(modcol2a)
assert modcol1_back
def test_pickle_func(self, testdir, pickletransport):
modcol1 = testdir.getmodulecol("def test_one(): pass")
pickletransport.unifyconfig(modcol1._config)
item = modcol1.collect_by_name("test_one")
item2a = pickletransport.p1_to_p2(item)
assert item is not item2a # of course
assert item2a.name == item.name
modback = pickletransport.p2_to_p1(item2a.parent)
assert modback is modcol1

View File

@ -290,150 +290,6 @@ class TestConfig_gettopdir:
assert gettopdir([c]) == a
assert gettopdir([c, Z]) == tmp
class TestConfigPickling:
@py.test.mark(xfail=True, issue="config's pytestplugins/bus initialization")
def test_config_initafterpickle_plugin(self, testdir):
testdir.makepyfile(__init__="", conftest="x=1; y=2")
hello = testdir.makepyfile(hello="")
tmp = testdir.tmpdir
config = py.test.config._reparse([hello])
config2 = py.test.config._reparse([tmp.dirpath()])
config2._initialized = False # we have to do that from tests
config2._repr = config._makerepr()
config2._initafterpickle(topdir=tmp.dirpath())
# we check that config "remote" config objects
# have correct plugin initialization
#XXX assert config2.pytestplugins.pm._plugins
#XXX assert config2.bus.isregistered(config2.pytestplugins.forward_event)
assert config2.bus == py._com.pyplugins
assert config2.pytestplugins.pm == py._com.pyplugins
def test_config_initafterpickle_some(self, testdir):
tmp = testdir.tmpdir
tmp.ensure("__init__.py")
tmp.ensure("conftest.py").write("x=1 ; y=2")
hello = tmp.ensure("test_hello.py")
config = py.test.config._reparse([hello])
config2 = testdir.Config()
config2._initialized = False # we have to do that from tests
config2._repr = config._makerepr()
config2._initafterpickle(tmp.dirpath())
for col1, col2 in zip(config.getcolitems(), config2.getcolitems()):
assert col1.fspath == col2.fspath
cols = config2.getcolitems()
assert len(cols) == 1
col = cols[0]
assert col.name == 'test_hello.py'
assert col.parent.name == tmp.basename
assert col.parent.parent is None
def test_config_make_and__mergerepr(self, testdir):
tmp = testdir.tmpdir
tmp.ensure("__init__.py")
tmp.ensure("conftest.py").write("x=1")
config = py.test.config._reparse([tmp])
repr = config._makerepr()
config.option.verbose = 42
repr2 = config._makerepr()
print "hello"
config = testdir.Config()
config._mergerepr(repr)
print config._conftest.getconftestmodules(None)
assert config.getvalue('x') == 1
config = testdir.Config()
config._preparse([])
py.test.raises(KeyError, "config.getvalue('x')")
config = testdir.Config()
config._mergerepr(repr2)
assert config.option.verbose == 42
def test_config_rconfig(self, testdir):
tmp = testdir.tmpdir
tmp.ensure("__init__.py")
tmp.ensure("conftest.py").write(py.code.Source("""
import py
Option = py.test.config.Option
option = py.test.config.addoptions("testing group",
Option('-G', '--glong', action="store", default=42,
type="int", dest="gdest", help="g value."))
"""))
config = py.test.config._reparse([tmp, "-G", "11"])
assert config.option.gdest == 11
repr = config._makerepr()
config = testdir.Config()
py.test.raises(AttributeError, "config.option.gdest")
config2 = testdir.Config()
config2._mergerepr(repr)
option = config2.addoptions("testing group",
config2.Option('-G', '--glong', action="store", default=42,
type="int", dest="gdest", help="g value."))
assert config2.option.gdest == 11
assert option.gdest == 11
def test_config_picklability(self, tmpdir):
import cPickle
config = py.test.config._reparse([tmpdir])
s = cPickle.dumps(config)
newconfig = cPickle.loads(s)
assert not hasattr(newconfig, "topdir")
assert not newconfig._initialized
assert not hasattr(newconfig, 'args')
newconfig._initafterpickle(config.topdir)
assert newconfig.topdir == config.topdir
assert newconfig._initialized
assert newconfig.args == [tmpdir]
def test_config_and_collector_pickling_missing_initafter(self, tmpdir):
from cPickle import Pickler, Unpickler
config = py.test.config._reparse([tmpdir])
col = config.getfsnode(config.topdir)
io = py.std.cStringIO.StringIO()
pickler = Pickler(io)
pickler.dump(config)
pickler.dump(col)
io.seek(0)
unpickler = Unpickler(io)
newconfig = unpickler.load()
# we don't call _initafterpickle ... so
py.test.raises(ValueError, "unpickler.load()")
def test_config_and_collector_pickling(self, tmpdir):
from cPickle import Pickler, Unpickler
dir1 = tmpdir.ensure("somedir", dir=1)
config = py.test.config._reparse([tmpdir])
col = config.getfsnode(config.topdir)
col1 = col.join(dir1.basename)
assert col1.parent is col
io = py.std.cStringIO.StringIO()
pickler = Pickler(io)
pickler.dump(config)
pickler.dump(col)
pickler.dump(col1)
pickler.dump(col)
io.seek(0)
unpickler = Unpickler(io)
newconfig = unpickler.load()
topdir = tmpdir.ensure("newtopdir", dir=1)
newconfig._initafterpickle(topdir)
topdir.ensure("somedir", dir=1)
newcol = unpickler.load()
newcol2 = unpickler.load()
newcol3 = unpickler.load()
assert newcol2._config is newconfig
assert newcol2.parent == newcol
assert newcol._config is newconfig
assert newconfig.topdir == topdir
assert newcol3 is newcol
assert newcol.fspath == topdir
assert newcol2.fspath.basename == dir1.basename
assert newcol2.fspath.relto(topdir)
def test_options_on_small_file_do_not_blow_up(testdir):
def runfiletest(opts):

View File

@ -0,0 +1,187 @@
import py
def pytest_pyfuncarg_pickletransport(pyfuncitem):
return ImmutablePickleTransport()
def pytest_pyfunc_call(__call__, pyfuncitem, args, kwargs):
# for each function call we patch py._com.pyplugins
# so that the unpickling of config objects
# (which bind to this mechanism) doesn't do harm
# usually config objects are no meant to be unpickled in
# the same system
oldconfig = py.test.config
oldcom = py._com.pyplugins
print "setting py.test.config to None"
py.test.config = None
py._com.pyplugins = py._com.PyPlugins()
try:
return __call__.execute(firstresult=True)
finally:
print "setting py.test.config to", oldconfig
py.test.config = oldconfig
py._com.pyplugins = oldcom
class ImmutablePickleTransport:
def __init__(self):
from py.__.test.dsession.mypickle import ImmutablePickler
self.p1 = ImmutablePickler(uneven=0)
self.p2 = ImmutablePickler(uneven=1)
def p1_to_p2(self, obj):
return self.p2.loads(self.p1.dumps(obj))
def p2_to_p1(self, obj):
return self.p1.loads(self.p2.dumps(obj))
def unifyconfig(self, config):
p2config = self.p1_to_p2(config)
p2config._initafterpickle(config.topdir)
return p2config
class TestImmutablePickling:
def test_pickle_config(self, testdir, pickletransport):
config1 = testdir.parseconfig()
assert config1.topdir == testdir.tmpdir
testdir.chdir()
p2config = pickletransport.p1_to_p2(config1)
assert p2config.topdir == config1.topdir
config_back = pickletransport.p2_to_p1(p2config)
assert config_back is config1
def test_pickle_modcol(self, testdir, pickletransport):
modcol1 = testdir.getmodulecol("def test_one(): pass")
modcol2a = pickletransport.p1_to_p2(modcol1)
modcol2b = pickletransport.p1_to_p2(modcol1)
assert modcol2a is modcol2b
modcol1_back = pickletransport.p2_to_p1(modcol2a)
assert modcol1_back
def test_pickle_func(self, testdir, pickletransport):
modcol1 = testdir.getmodulecol("def test_one(): pass")
item = modcol1.collect_by_name("test_one")
testdir.chdir()
item2a = pickletransport.p1_to_p2(item)
assert item is not item2a # of course
assert item2a.name == item.name
modback = pickletransport.p2_to_p1(item2a.parent)
assert modback is modcol1
class TestConfigPickling:
def test_config_getstate_setstate(self, testdir):
from py.__.test.config import Config
testdir.makepyfile(__init__="", conftest="x=1; y=2")
hello = testdir.makepyfile(hello="")
tmp = testdir.tmpdir
testdir.chdir()
config1 = testdir.parseconfig(hello)
config2 = Config()
config2.__setstate__(config1.__getstate__())
assert config2.topdir == py.path.local()
config2_relpaths = [x.relto(config2.topdir) for x in config2.args]
config1_relpaths = [x.relto(config1.topdir) for x in config1.args]
assert config2_relpaths == config1_relpaths
for name, value in config1.option.__dict__.items():
assert getattr(config2.option, name) == value
assert config2.getvalue("x") == 1
def test_config_rconfig(self, testdir):
tmp = testdir.tmpdir
tmp.ensure("__init__.py")
testdir.makeconftest("""
class ConftestPlugin:
def pytest_addoption(self, parser):
group = parser.addgroup("testing group")
group.addoption('-G', '--glong', action="store", default=42,
type="int", dest="gdest", help="g value.")
""")
config = testdir.parseconfig(tmp, "-G", "11")
assert config.option.gdest == 11
repr = config.__getstate__()
config = testdir.Config()
py.test.raises(AttributeError, "config.option.gdest")
config2 = testdir.Config()
config2.__setstate__(repr)
option = config2.addoptions("testing group",
config2.Option('-G', '--glong', action="store", default=42,
type="int", dest="gdest", help="g value."))
assert config2.option.gdest == 11
assert option.gdest == 11
def test_config_picklability(self, testdir):
import cPickle
config = testdir.parseconfig()
s = cPickle.dumps(config)
newconfig = cPickle.loads(s)
assert hasattr(newconfig, "topdir")
assert newconfig.topdir == py.path.local()
def test_collector_implicit_config_pickling(self, testdir):
from cPickle import Pickler, Unpickler
tmpdir = testdir.tmpdir
testdir.chdir()
testdir.makepyfile(hello="def test_x(): pass")
config = testdir.parseconfig(tmpdir)
col = config.getfsnode(config.topdir)
io = py.std.cStringIO.StringIO()
pickler = Pickler(io)
pickler.dump(col)
io.seek(0)
unpickler = Unpickler(io)
col2 = unpickler.load()
assert col2.name == col.name
assert col2.listnames() == col.listnames()
def test_config_and_collector_pickling(self, testdir):
from cPickle import Pickler, Unpickler
tmpdir = testdir.tmpdir
dir1 = tmpdir.ensure("somedir", dir=1)
config = testdir.parseconfig()
col = config.getfsnode(config.topdir)
col1 = col.join(dir1.basename)
assert col1.parent is col
io = py.std.cStringIO.StringIO()
pickler = Pickler(io)
pickler.dump(col)
pickler.dump(col1)
pickler.dump(col)
io.seek(0)
unpickler = Unpickler(io)
topdir = tmpdir.ensure("newtopdir", dir=1)
topdir.ensure("somedir", dir=1)
old = topdir.chdir()
try:
newcol = unpickler.load()
newcol2 = unpickler.load()
newcol3 = unpickler.load()
assert newcol2._config is newcol._config
assert newcol2.parent == newcol
assert newcol2._config.topdir == topdir
assert newcol.fspath == topdir
assert newcol2.fspath.basename == dir1.basename
assert newcol2.fspath.relto(topdir)
finally:
old.chdir()
def test_config__setstate__wired_correctly_in_childprocess(testdir):
from py.__.test.dsession.mypickle import PickleChannel
gw = py.execnet.PopenGateway()
channel = gw.remote_exec("""
import py
from py.__.test.dsession.mypickle import PickleChannel
channel = PickleChannel(channel)
config = channel.receive()
assert py.test.config.pytestplugins.pyplugins == py._com.pyplugins, "pyplugins wrong"
assert py.test.config.bus == py._com.pyplugins, "bus wrong"
""")
channel = PickleChannel(channel)
config = testdir.parseconfig()
channel.send(config)
channel.waitclose() # this will raise
gw.exit()