[svn r63027] new API:

py.execnet.MultiGateway
   py.execnet.MultiChannel
   py.execnet.GatewaySpec

with some docs and streamlined tests.

--HG--
branch : trunk
This commit is contained in:
hpk 2009-03-18 12:18:39 +01:00
parent 5803072b96
commit 87cd94a197
6 changed files with 264 additions and 195 deletions

View File

@ -27,8 +27,8 @@ version = "1.0.0a1"
initpkg(__name__,
description = "pylib and py.test: agile development and test support library",
revision = int('$LastChangedRevision: 62252 $'.split(':')[1][:-1]),
lastchangedate = '$LastChangedDate: 2009-02-27 20:56:51 +0100 (Fri, 27 Feb 2009) $',
revision = int('$LastChangedRevision: 63027 $'.split(':')[1][:-1]),
lastchangedate = '$LastChangedDate: 2009-03-18 12:18:39 +0100 (Wed, 18 Mar 2009) $',
version = version,
url = "http://pylib.org",
download_url = "http://codespeak.net/py/0.9.2/download.html",
@ -151,6 +151,9 @@ initpkg(__name__,
'execnet.SocketGateway' : ('./execnet/register.py', 'SocketGateway'),
'execnet.PopenGateway' : ('./execnet/register.py', 'PopenGateway'),
'execnet.SshGateway' : ('./execnet/register.py', 'SshGateway'),
'execnet.GatewaySpec' : ('./execnet/gwmanage.py', 'GatewaySpec'),
'execnet.MultiGateway' : ('./execnet/gwmanage.py', 'MultiGateway'),
'execnet.MultiChannel' : ('./execnet/gwmanage.py', 'MultiChannel'),
# execnet scripts
'execnet.RSync' : ('./execnet/rsync.py', 'RSync'),

View File

@ -38,18 +38,11 @@ Basic Features
With ''py.execnet'' you get the means
- to navigate through the network with Process, Thread, SSH
and Socket- gateways that allow you ...
- to distribute your program across a network and define
communication protocols from the client side, making
server maintenance superflous. In fact, there is no such
thing as a server. It's just another computer ... if it
doesn't run in a kernel-level jail [#]_ in which case
even that is virtualized.
- to execute python code fragements in remote processes and
- to interchange data between asynchronously executing code fragments
Available Gateways/Connection methods
Available Gateways
-----------------------------------------
You may use one of the following connection methods:
@ -68,10 +61,11 @@ You may use one of the following connection methods:
script. You can run this "server script" without
having the py lib installed on that remote system.
Remote execution approach
executing code remotely
-------------------------------------
All gateways offer one main high level function:
All gateways offer remote code execution via this high level function:
def remote_exec(source):
"""return channel object for communicating with the asynchronously
@ -94,29 +88,20 @@ an example:
>>> remote_pid != py.std.os.getpid()
True
`remote_exec` implements the idea to ``determine
protocol and remote code from the client/local side``.
This makes distributing a program run in an ad-hoc
manner (using e.g. :api:`py.execnet.SshGateway`) very easy.
You should not need to maintain software on the other sides
you are running your code at, other than the Python
executable itself.
.. _`Channel`:
.. _`channel-api`:
.. _`exchange data`:
The **Channel** interface for exchanging data across gateways
Bidirectionally exchange data between hosts
-------------------------------------------------------------
While executing custom strings on "the other side" is simple enough
it is often tricky to deal with. Therefore we want a way
to send data items to and fro between the distributedly running
program. The idea is to inject a Channel object for each
execution of source code. This Channel object allows two
program parts to send data to each other.
Here is the current interface::
A channel object allows to send and receive data between
two asynchronously running programs. When calling
`remote_exec` you will get a channel object back and
the code fragement running on the other side will
see a channel object in its global namespace.
Here is the interface of channel objects::
#
# API for sending and receiving anonymous values
@ -125,7 +110,7 @@ Here is the current interface::
sends the given item to the other side of the channel,
possibly blocking if the sender queue is full.
Note that items need to be marshallable (all basic
python types are):
python types are).
channel.receive():
receives an item that was sent from the other side,
@ -146,25 +131,93 @@ Here is the current interface::
A remote side blocking on receive() on this channel
will get woken up and see an EOFError exception.
Instantiating a gateway from a string-specification
---------------------------------------------------------
The complete Fileserver example
........................................
To specify Gateways with a String::
>>> import py
>>> gwspec = py.execnet.GatewaySpec("popen")
>>> gw = gwspec.makegateway()
>>> ex = gw.remote_exec("import sys ; channel.send(sys.executable)").receive()
>>> assert ex == py.std.sys.executable
>>>
current gateway types and specifications
+++++++++++++++++++++++++++++++++++++++++++++++
+------------------------+-------------------------------------------+
| ssh host | ssh:host:pythonexecutable:path |
+------------------------+-------------------------------------------+
| local subprocess | popen:python_executable:path |
+------------------------+-------------------------------------------+
| remote socket process | socket:host:port:python_executable:path |
+------------------------+-------------------------------------------+
examples of valid specifications
++++++++++++++++++++++++++++++++++++++
``ssh:wyvern:python2.4`` signifies a connection to a Python process on the machine reached via "ssh wyvern", current dir will be the 'pyexecnet-cache' subdirectory.
``socket:192.168.1.4`` signifies a connection to a Python Socket server process to the given IP on the default port 8888; current dir will be the 'pyexecnet-cache' directory.
``popen:python2.5`` signifies a connection to a python2.5 subprocess; current dir will be the current dir of the instantiator.
``popen::pytest-cache1`` signifies a connection to a subprocess using ``sys.executable``; current dir will be the `pytest-cache1`.
Examples for execnet usage
-------------------------------------------
Example: compare cwd() of Popen Gateways
++++++++++++++++++++++++++++++++++++++++
A PopenGateway has the same working directory as the instantiatior::
>>> import py, os
>>> gw = py.execnet.PopenGateway()
>>> ch = gw.remote_exec("import os; channel.send(os.getcwd())")
>>> res = ch.receive()
>>> assert res == os.getcwd()
>>> gw.exit()
Example: multichannels
++++++++++++++++++++++++++++++++++++++++
MultiChannels manage 1-n execution and communication:
>>> import py
>>> ch1 = py.execnet.PopenGateway().remote_exec("channel.send(1)")
>>> ch2 = py.execnet.PopenGateway().remote_exec("channel.send(2)")
>>> mch = py.execnet.MultiChannel([ch1, ch2])
>>> l = mch.receive_each()
>>> assert len(l) == 2
>>> assert 1 in l
>>> assert 2 in l
MultiGateways help with sending code to multiple remote places:
>>> import py
>>> mgw = py.execnet.MultiGateway([py.execnet.PopenGateway() for x in range(2)])
>>> mch = mgw.remote_exec("import os; channel.send(os.getcwd())")
>>> res = mch.receive_each()
>>> assert res == [os.getcwd()] * 2, res
>>> mgw.exit()
Example: receiving file contents from remote places
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
problem: retrieving contents of remote files::
import py
contentserverbootstrap = py.code.Source(
"""
# open a gateway to a fresh child process
gw = py.execnet.SshGateway('codespeak.net')
channel = gw.remote_exec("""
for fn in channel:
f = open(fn, 'rb')
try:
channel.send(f.read())
finally:
f.close()
""")
# open a gateway to a fresh child process
contentgateway = py.execnet.SshGateway('codespeak.net')
channel = contentgateway.remote_exec(contentserverbootstrap)
channel.send(f.read())
f.close()
""")
for fn in somefilelist:
channel.send(fn)
@ -218,7 +271,4 @@ Here are 20 lines of code making the above triangle happen::
gw = py.execnet.SocketGateway('localhost', cls.port)
print "initialized socket gateway to port", cls.port
.. [#] There is an interesting emerging `Jail`_ linux technology
as well as a host of others, of course.
.. _`Jail`: http://books.rsbac.org/unstable/x2223.html

View File

@ -140,6 +140,9 @@ class MultiGateway:
for gw in self.gateways:
channels.append(gw.remote_exec(source))
return MultiChannel(channels)
def exit(self):
for gw in self.gateways:
gw.exit()
class GatewayManager:
RemoteError = RemoteError

View File

@ -8,103 +8,7 @@
"""
import py
from py.__.execnet.gwmanage import GatewaySpec, GatewayManager
from py.__.execnet.gwmanage import HostRSync
class TestGatewaySpec:
"""
socket:hostname:port:path SocketGateway
popen[-executable][:path] PopenGateway
[ssh:]spec:path SshGateway
* [SshGateway]
"""
def test_popen(self):
for python in ('', 'python2.4'):
for joinpath in ('', 'abc', 'ab:cd', '/x/y'):
s = ":".join(["popen", python, joinpath])
print s
spec = GatewaySpec(s)
assert spec.address == "popen"
assert spec.python == python
assert spec.joinpath == joinpath
assert spec.type == "popen"
spec2 = GatewaySpec("popen" + joinpath)
self._equality(spec, spec2)
def test_ssh(self):
for prefix in ('ssh', ''): # ssh is default
for hostpart in ('x.y', 'xyz@x.y'):
for python in ('python', 'python2.5'):
for joinpath in ('', 'abc', 'ab:cd', '/tmp'):
specstring = ":".join([prefix, hostpart, python, joinpath])
if specstring[0] == ":":
specstring = specstring[1:]
print specstring
spec = GatewaySpec(specstring)
assert spec.address == hostpart
assert spec.python == python
if joinpath:
assert spec.joinpath == joinpath
else:
assert spec.joinpath == "pyexecnetcache"
assert spec.type == "ssh"
spec2 = GatewaySpec(specstring)
self._equality(spec, spec2)
def test_socket(self):
for hostpart in ('x.y', 'x', 'popen'):
for port in ":80", ":1000":
for joinpath in ('', ':abc', ':abc:de'):
spec = GatewaySpec("socket:" + hostpart + port + joinpath)
assert spec.address == (hostpart, int(port[1:]))
if joinpath[1:]:
assert spec.joinpath == joinpath[1:]
else:
assert spec.joinpath == "pyexecnetcache"
assert spec.type == "socket"
spec2 = GatewaySpec("socket:" + hostpart + port + joinpath)
self._equality(spec, spec2)
def _equality(self, spec1, spec2):
assert spec1 != spec2
assert hash(spec1) != hash(spec2)
assert not (spec1 == spec2)
class TestGatewaySpecAPI:
def test_popen_nopath_makegateway(self, testdir):
spec = GatewaySpec("popen")
gw = spec.makegateway()
p = gw.remote_exec("import os; channel.send(os.getcwd())").receive()
curdir = py.std.os.getcwd()
assert curdir == p
gw.exit()
def test_popen_makegateway(self, testdir):
spec = GatewaySpec("popen::" + str(testdir.tmpdir))
gw = spec.makegateway()
p = gw.remote_exec("import os; channel.send(os.getcwd())").receive()
assert spec.joinpath == p
gw.exit()
def test_popen_makegateway_python(self, testdir):
spec = GatewaySpec("popen:%s" % py.std.sys.executable)
gw = spec.makegateway()
res = gw.remote_exec("import sys ; channel.send(sys.executable)").receive()
assert py.std.sys.executable == py.std.sys.executable
gw.exit()
def test_ssh(self):
sshhost = py.test.config.getvalueorskip("sshhost")
spec = GatewaySpec("ssh:" + sshhost)
gw = spec.makegateway()
p = gw.remote_exec("import os ; channel.send(os.getcwd())").receive()
gw.exit()
@py.test.mark.xfail("implement socketserver test scenario")
def test_socketgateway(self):
gw = py.execnet.PopenGateway()
spec = GatewaySpec("ssh:" + sshhost)
from py.__.execnet.gwmanage import GatewayManager, HostRSync
class TestGatewayManagerPopen:
def test_hostmanager_popen_makegateway(self):
@ -191,56 +95,6 @@ class TestGatewayManagerPopen:
assert l[0].startswith(curwd)
assert l[0].endswith("hello")
from py.__.execnet.gwmanage import MultiChannel
class TestMultiChannel:
def test_multichannel_receive_each(self):
class pseudochannel:
def receive(self):
return 12
pc1 = pseudochannel()
pc2 = pseudochannel()
multichannel = MultiChannel([pc1, pc2])
l = multichannel.receive_each(withchannel=True)
assert len(l) == 2
assert l == [(pc1, 12), (pc2, 12)]
l = multichannel.receive_each(withchannel=False)
assert l == [12,12]
def test_multichannel_send_each(self):
gm = GatewayManager(['popen'] * 2)
mc = gm.multi_exec("""
import os
channel.send(channel.receive() + 1)
""")
mc.send_each(41)
l = mc.receive_each()
assert l == [42,42]
def test_multichannel_receive_queue(self):
gm = GatewayManager(['popen'] * 2)
mc = gm.multi_exec("""
import os
channel.send(os.getpid())
""")
queue = mc.make_receive_queue()
ch, item = queue.get(timeout=10)
ch2, item2 = queue.get(timeout=10)
assert ch != ch2
assert ch.gateway != ch2.gateway
assert item != item2
mc.waitclose()
def test_multichannel_waitclose(self):
l = []
class pseudochannel:
def waitclose(self):
l.append(0)
multichannel = MultiChannel([pseudochannel(), pseudochannel()])
multichannel.waitclose()
assert len(l) == 2
def pytest_pyfuncarg_source(pyfuncitem):
return py.test.ensuretemp(pyfuncitem.getmodpath()).mkdir("source")
def pytest_pyfuncarg_dest(pyfuncitem):
@ -262,7 +116,7 @@ class TestHRSync:
assert 'somedir' in basenames
def test_hrsync_one_host(self, source, dest):
spec = GatewaySpec("popen::%s" % dest)
spec = py.execnet.GatewaySpec("popen::%s" % dest)
gw = spec.makegateway()
finished = []
rsync = HostRSync(source)

View File

@ -0,0 +1,101 @@
"""
tests for py.execnet.GatewaySpec
"""
import py
class TestGatewaySpec:
"""
socket:hostname:port:path SocketGateway
popen[-executable][:path] PopenGateway
[ssh:]spec:path SshGateway
* [SshGateway]
"""
def test_popen(self):
for python in ('', 'python2.4'):
for joinpath in ('', 'abc', 'ab:cd', '/x/y'):
s = ":".join(["popen", python, joinpath])
print s
spec = py.execnet.GatewaySpec(s)
assert spec.address == "popen"
assert spec.python == python
assert spec.joinpath == joinpath
assert spec.type == "popen"
spec2 = py.execnet.GatewaySpec("popen" + joinpath)
self._equality(spec, spec2)
def test_ssh(self):
for prefix in ('ssh', ''): # ssh is default
for hostpart in ('x.y', 'xyz@x.y'):
for python in ('python', 'python2.5'):
for joinpath in ('', 'abc', 'ab:cd', '/tmp'):
specstring = ":".join([prefix, hostpart, python, joinpath])
if specstring[0] == ":":
specstring = specstring[1:]
print specstring
spec = py.execnet.GatewaySpec(specstring)
assert spec.address == hostpart
assert spec.python == python
if joinpath:
assert spec.joinpath == joinpath
else:
assert spec.joinpath == "pyexecnetcache"
assert spec.type == "ssh"
spec2 = py.execnet.GatewaySpec(specstring)
self._equality(spec, spec2)
def test_socket(self):
for hostpart in ('x.y', 'x', 'popen'):
for port in ":80", ":1000":
for joinpath in ('', ':abc', ':abc:de'):
spec = py.execnet.GatewaySpec("socket:" + hostpart + port + joinpath)
assert spec.address == (hostpart, int(port[1:]))
if joinpath[1:]:
assert spec.joinpath == joinpath[1:]
else:
assert spec.joinpath == "pyexecnetcache"
assert spec.type == "socket"
spec2 = py.execnet.GatewaySpec("socket:" + hostpart + port + joinpath)
self._equality(spec, spec2)
def _equality(self, spec1, spec2):
assert spec1 != spec2
assert hash(spec1) != hash(spec2)
assert not (spec1 == spec2)
class TestGatewaySpecAPI:
def test_popen_nopath_makegateway(self, testdir):
spec = py.execnet.GatewaySpec("popen")
gw = spec.makegateway()
p = gw.remote_exec("import os; channel.send(os.getcwd())").receive()
curdir = py.std.os.getcwd()
assert curdir == p
gw.exit()
def test_popen_makegateway(self, testdir):
spec = py.execnet.GatewaySpec("popen::" + str(testdir.tmpdir))
gw = spec.makegateway()
p = gw.remote_exec("import os; channel.send(os.getcwd())").receive()
assert spec.joinpath == p
gw.exit()
def test_popen_makegateway_python(self, testdir):
spec = py.execnet.GatewaySpec("popen:%s" % py.std.sys.executable)
gw = spec.makegateway()
res = gw.remote_exec("import sys ; channel.send(sys.executable)").receive()
assert py.std.sys.executable == py.std.sys.executable
gw.exit()
def test_ssh(self):
sshhost = py.test.config.getvalueorskip("sshhost")
spec = py.execnet.GatewaySpec("ssh:" + sshhost)
gw = spec.makegateway()
p = gw.remote_exec("import os ; channel.send(os.getcwd())").receive()
gw.exit()
@py.test.mark.xfail("implement socketserver test scenario")
def test_socketgateway(self):
gw = py.execnet.PopenGateway()
spec = py.execnet.GatewaySpec("ssh:" + sshhost)

View File

@ -0,0 +1,58 @@
"""
tests for
- multi channels and multi gateways
"""
import py
class TestMultiChannelAndGateway:
def test_multichannel_receive_each(self):
class pseudochannel:
def receive(self):
return 12
pc1 = pseudochannel()
pc2 = pseudochannel()
multichannel = py.execnet.MultiChannel([pc1, pc2])
l = multichannel.receive_each(withchannel=True)
assert len(l) == 2
assert l == [(pc1, 12), (pc2, 12)]
l = multichannel.receive_each(withchannel=False)
assert l == [12,12]
def test_multichannel_send_each(self):
l = [py.execnet.PopenGateway() for x in range(2)]
gm = py.execnet.MultiGateway(l)
mc = gm.remote_exec("""
import os
channel.send(channel.receive() + 1)
""")
mc.send_each(41)
l = mc.receive_each()
assert l == [42,42]
def test_multichannel_receive_queue_for_two_subprocesses(self):
l = [py.execnet.PopenGateway() for x in range(2)]
gm = py.execnet.MultiGateway(l)
mc = gm.remote_exec("""
import os
channel.send(os.getpid())
""")
queue = mc.make_receive_queue()
ch, item = queue.get(timeout=10)
ch2, item2 = queue.get(timeout=10)
assert ch != ch2
assert ch.gateway != ch2.gateway
assert item != item2
mc.waitclose()
def test_multichannel_waitclose(self):
l = []
class pseudochannel:
def waitclose(self):
l.append(0)
multichannel = py.execnet.MultiChannel([pseudochannel(), pseudochannel()])
multichannel.waitclose()
assert len(l) == 2