remove py.execnet, substitute py.execnet usages with "execnet" ones.

--HG--
branch : trunk
This commit is contained in:
holger krekel 2009-10-02 16:58:57 +02:00
parent 496e3b1138
commit ab9f6a75ad
54 changed files with 66 additions and 3851 deletions

View File

@ -3,7 +3,7 @@ import sys
sys.path.insert(0, sys.argv[1])
import py
toolpath = py.magic.autopath()
toolpath = py.path.local(__file__)
binpath = py.path.local(py.__file__).dirpath('bin')
def error(msg):

View File

@ -78,7 +78,7 @@ class VirtualEnv(object):
def makegateway(self):
python = self._cmd('python')
return py.execnet.makegateway("popen//python=%s" %(python,))
return execnet.makegateway("popen//python=%s" %(python,))
def pcall(self, cmd, *args, **kw):
self.ensure()

View File

@ -17,15 +17,18 @@ def pytest_addoption(parser):
def pytest_funcarg__specssh(request):
return getspecssh(request.config)
def pytest_funcarg__specsocket(request):
return getsocketspec(request.config)
def getgspecs(config=None):
if config is None:
config = py.test.config
return [execnet.XSpec(spec)
for spec in config.getvalueorskip("gspecs")]
# configuration information for tests
def getgspecs(config=None):
if config is None:
config = py.test.config
return [py.execnet.XSpec(spec)
return [execnet.XSpec(spec)
for spec in config.getvalueorskip("gspecs")]
def getspecssh(config=None):

View File

@ -3,7 +3,7 @@
"""
small utility for hot-syncing a svn repository through ssh.
uses py.execnet.
uses execnet.
"""
@ -105,7 +105,7 @@ def get_svn_youngest(repo):
return int(rev)
def getgateway(host, keyfile=None):
return py.execnet.SshGateway(host, identity=keyfile)
return execnet.SshGateway(host, identity=keyfile)
if __name__ == '__main__':
if len(sys.argv) < 3:

View File

@ -95,7 +95,7 @@ def error(*args):
def getinfo(sshname, ssh_config=None, loginfo=sys.stdout):
debug("connecting to", sshname)
try:
gw = py.execnet.SshGateway(sshname, ssh_config=ssh_config)
gw = execnet.SshGateway(sshname, ssh_config=ssh_config)
except IOError:
error("could not get sshagteway", sshname)
else:

View File

@ -1,6 +1,8 @@
Changes between 1.0.x and 'trunk'
=====================================
* remove py.execnet code and substitute all usages with 'execnet' proper
* fix issue50 - cached_setup now caches more to expectations
for test functions with multiple arguments.

View File

@ -2,263 +2,11 @@
py.execnet: *elastic* distributed programming
==============================================================================
``execnet`` helps you to:
* ad-hoc instantiate local or remote Python Processes
* send code for execution in one or many processes
* send and receive data between processes through channels
One of it's unique features is that it uses a **zero-install**
technique: no manual installation steps are required on
remote places, only a basic working Python interpreter
and some input/output connection to it.
There is a `EuroPython2009 talk`_ from July 2009 with
examples and some pictures.
.. contents::
:local:
:depth: 2
.. _`EuroPython2009 talk`: http://codespeak.net/download/py/ep2009-execnet.pdf
Gateways: immediately spawn local or remote process
===================================================
In order to send code to a remote place or a subprocess
you need to instantiate a so-called Gateway object.
There are currently three Gateway classes:
* :api:`py.execnet.PopenGateway` to open a subprocess
on the local machine. Useful for making use
of multiple processors to to contain code execution
in a separated environment.
* :api:`py.execnet.SshGateway` to connect to
a remote ssh server and distribute execution to it.
* :api:`py.execnet.SocketGateway` a way to connect to
a remote Socket based server. *Note* that this method
requires a manually started
:source:py/execnet/script/socketserver.py
script. You can run this "server script" without
having the py lib installed on the remote system
and you can setup it up as permanent service.
remote_exec: execute source code remotely
===================================================
All gateways offer remote code execution via this high level function::
def remote_exec(source):
"""return channel object for communicating with the asynchronously
executing 'source' code which will have a corresponding 'channel'
object in its executing namespace."""
With `remote_exec` you send source code to the other
side and get both a local and a remote Channel_ object,
which you can use to have the local and remote site
communicate data in a structured way. Here is
an example for reading the PID::
>>> import py
>>> gw = py.execnet.PopenGateway()
>>> channel = gw.remote_exec("""
... import os
... channel.send(os.getpid())
... """)
>>> remote_pid = channel.receive()
>>> remote_pid != py.std.os.getpid()
True
.. _`Channel`:
.. _`channel-api`:
.. _`exchange data`:
Channels: bidirectionally exchange data between hosts
=======================================================
A channel object allows to send and receive data between
two asynchronously running programs. When calling
`remote_exec` you will get a channel object back and
the code fragment running on the other side will
see a channel object in its global namespace.
Here is the interface of channel objects::
#
# API for sending and receiving anonymous values
#
channel.send(item):
sends the given item to the other side of the channel,
possibly blocking if the sender queue is full.
Note that items need to be marshallable (all basic
python types are).
channel.receive():
receives an item that was sent from the other side,
possibly blocking if there is none.
Note that exceptions from the other side will be
reraised as gateway.RemoteError exceptions containing
a textual representation of the remote traceback.
channel.waitclose(timeout=None):
wait until this channel is closed. Note that a closed
channel may still hold items that will be received or
send. Note that exceptions from the other side will be
reraised as gateway.RemoteError exceptions containing
a textual representation of the remote traceback.
channel.close():
close this channel on both the local and the remote side.
A remote side blocking on receive() on this channel
will get woken up and see an EOFError exception.
.. _xspec:
XSpec: string specification for gateway type and configuration
===============================================================
``py.execnet`` supports a simple extensible format for
specifying and configuring Gateways for remote execution.
You can use a string specification to instantiate a new gateway,
for example a new SshGateway::
gateway = py.execnet.makegateway("ssh=myhost")
Let's look at some examples for valid specifications.
Specification for an ssh connection to `wyvern`, running on python2.4 in the (newly created) 'mycache' subdirectory::
ssh=wyvern//python=python2.4//chdir=mycache
Specification of a python2.5 subprocess; with a low CPU priority ("nice" level). Current dir will be the current dir of the instantiator (that's true for all 'popen' specifications unless they specify 'chdir')::
popen//python=2.5//nice=20
Specification of a Python Socket server process that listens on 192.168.1.4:8888; current dir will be the 'pyexecnet-cache' sub directory which is used a default for all remote processes::
socket=192.168.1.4:8888
More generally, a specification string has this general format::
key1=value1//key2=value2//key3=value3
If you omit a value, a boolean true value is assumed. Currently
the following key/values are supported:
* ``popen`` for a PopenGateway
* ``ssh=host`` for a SshGateway
* ``socket=address:port`` for a SocketGateway
* ``python=executable`` for specifying Python executables
* ``chdir=path`` change remote working dir to given relative or absolute path
* ``nice=value`` decrease remote nice level if platforms supports it
Examples of py.execnet usage
===============================================================
Compare cwd() of Popen Gateways
----------------------------------------
A PopenGateway has the same working directory as the instantiatior::
>>> import py, os
>>> gw = py.execnet.PopenGateway()
>>> ch = gw.remote_exec("import os; channel.send(os.getcwd())")
>>> res = ch.receive()
>>> assert res == os.getcwd()
>>> gw.exit()
Synchronously receive results from two sub processes
-----------------------------------------------------
Use MultiChannels for receiving multiple results from remote code::
>>> import py
>>> ch1 = py.execnet.PopenGateway().remote_exec("channel.send(1)")
>>> ch2 = py.execnet.PopenGateway().remote_exec("channel.send(2)")
>>> mch = py.execnet.MultiChannel([ch1, ch2])
>>> l = mch.receive_each()
>>> assert len(l) == 2
>>> assert 1 in l
>>> assert 2 in l
Asynchronously receive results from two sub processes
-----------------------------------------------------
Use ``MultiChannel.make_receive_queue()`` for asynchronously receiving
multiple results from remote code. This standard Queue provides
``(channel, result)`` tuples which allows to determine where
a result comes from::
>>> import py
>>> ch1 = py.execnet.PopenGateway().remote_exec("channel.send(1)")
>>> ch2 = py.execnet.PopenGateway().remote_exec("channel.send(2)")
>>> mch = py.execnet.MultiChannel([ch1, ch2])
>>> queue = mch.make_receive_queue()
>>> chan1, res1 = queue.get() # you may also specify a timeout
>>> chan2, res2 = queue.get()
>>> res1 + res2
3
>>> assert chan1 in (ch1, ch2)
>>> assert chan2 in (ch1, ch2)
>>> assert chan1 != chan2
Receive file contents from remote SSH account
-----------------------------------------------------
Here is a small program that you can use to retrieve
contents of remote files::
import py
# open a gateway to a fresh child process
gw = py.execnet.SshGateway('codespeak.net')
channel = gw.remote_exec("""
for fn in channel:
f = open(fn, 'rb')
channel.send(f.read())
f.close()
""")
for fn in somefilelist:
channel.send(fn)
content = channel.receive()
# process content
# later you can exit / close down the gateway
gw.exit()
Instantiate a socket server in a new subprocess
-----------------------------------------------------
The following example opens a PopenGateway, i.e. a python
child process, and starts a socket server within that process
and then opens a second gateway to the freshly started
socketserver::
import py
popengw = py.execnet.PopenGateway()
socketgw = py.execnet.SocketGateway.new_remote(popengw, ("127.0.0.1", 0))
print socketgw._rinfo() # print some info about the remote environment
Sending a module / checking if run through remote_exec
--------------------------------------------------------------
You can pass a module object to ``remote_exec`` in which case
its source code will be sent. No dependencies will be transferred
so the module must be self-contained or only use modules that are
installed on the "other" side. Module code can detect if it is
running in a remote_exec situation by checking for the special
``__name__`` attribute like this::
if __name__ == '__channelexec__':
# ... call module functions ...
Since pylib 1.1 "py.execnet" is separated out of hte lib and now
available through the standalone `execnet standalone package`_.
If you have usages of the "py.execnet.*" 1.0 API you can likely
rename all occurences of the string ``py.execnet.`` with the
string ``execnet.``.
.. _`execnet standalone package`: http://codespeak.net/execnet

View File

@ -364,7 +364,7 @@ remote environment. For this you can implement the newgateway hook:
def pytest_gwmanage_newgateway(gateway, platinfo):
""" called after a gateway is instantiated. """
The ``gateway`` object here has a ``spec`` attribute which is an ``py.execnet.XSpec``
The ``gateway`` object here has a ``spec`` attribute which is an ``execnet.XSpec``
object, which has attributes that map key/values as specified from a ``--txspec``
option. The platinfo object is a dictionary with information about the remote process:

View File

@ -165,7 +165,7 @@ and to offer a new mysetup method:
host = self.config.option.ssh
if host is None:
py.test.skip("specify ssh host with --ssh")
return py.execnet.SshGateway(host)
return execnet.SshGateway(host)
Now any test function can use the ``mysetup.getsshconnection()`` method like this:

View File

@ -9,7 +9,7 @@ NUM_PROCESSES = 5
channels = []
for i in range(NUM_PROCESSES):
gw = py.execnet.PopenGateway() # or use SSH or socket gateways
gw = execnet.PopenGateway() # or use SSH or socket gateways
channel = gw.remote_exec("""
import time
secs = channel.receive()
@ -19,7 +19,7 @@ for i in range(NUM_PROCESSES):
channels.append(channel)
print "*** instantiated subprocess", gw
mc = py.execnet.MultiChannel(channels)
mc = execnet.MultiChannel(channels)
queue = mc.make_receive_queue()
print "***", "verifying that timeout on receiving results from blocked subprocesses works"

View File

@ -10,7 +10,7 @@ showcasing features of the channel object:
import py
gw = py.execnet.PopenGateway()
gw = execnet.PopenGateway()
outchan = gw.remote_exec("""
import sys

View File

@ -82,7 +82,7 @@ def get_svn_youngest(repo):
return int(rev)
def getgateway(host, keyfile=None):
return py.execnet.SshGateway(host, identity=keyfile)
return execnet.SshGateway(host, identity=keyfile)
if __name__ == '__main__':
if len(sys.argv) < 3:

View File

@ -95,7 +95,7 @@ def error(*args):
def getinfo(sshname, ssh_config=None, loginfo=sys.stdout):
debug("connecting to", sshname)
try:
gw = py.execnet.SshGateway(sshname, ssh_config=ssh_config)
gw = execnet.SshGateway(sshname, ssh_config=ssh_config)
except IOError:
error("could not get sshagteway", sshname)
else:

View File

@ -20,5 +20,5 @@ class MySetup:
host = self.config.option.ssh
if host is None:
py.test.skip("specify ssh host with --ssh")
return py.execnet.SshGateway(host)
return execnet.SshGateway(host)

View File

@ -4,15 +4,13 @@
advanced testing and development support library:
- `py.test`_: cross-project testing tool with many advanced features
- `py.execnet`_: ad-hoc code distribution to SSH, Socket and local sub processes
- `py.path`_: path abstractions over local and subversion files
- `py.code`_: dynamic code compile and traceback printing support
Compatibility: Linux, Win32, OSX, Python versions 2.3-2.6.
Compatibility: Linux, Win32, OSX, Python versions 2.4 through to 3.1.
For questions please check out http://pylib.org/contact.html
.. _`py.test`: http://pylib.org/test.html
.. _`py.execnet`: http://pylib.org/execnet.html
.. _`py.path`: http://pylib.org/path.html
.. _`py.code`: http://pylib.org/code.html
@ -159,21 +157,6 @@ initpkg(__name__,
'builtin.execfile' : ('./builtin/builtin31.py', 'execfile'),
'builtin.callable' : ('./builtin/builtin31.py', 'callable'),
# gateways into remote contexts
'execnet.__doc__' : ('./execnet/__init__.py', '__doc__'),
'execnet._HookSpecs' : ('./execnet/gateway_base.py', 'ExecnetAPI'),
'execnet.SocketGateway' : ('./execnet/gateway.py', 'SocketGateway'),
'execnet.PopenGateway' : ('./execnet/gateway.py', 'PopenGateway'),
'execnet.SshGateway' : ('./execnet/gateway.py', 'SshGateway'),
'execnet.HostNotFound' : ('./execnet/gateway.py', 'HostNotFound'),
'execnet.XSpec' : ('./execnet/xspec.py', 'XSpec'),
'execnet.makegateway' : ('./execnet/xspec.py', 'makegateway'),
'execnet.MultiGateway' : ('./execnet/multi.py', 'MultiGateway'),
'execnet.MultiChannel' : ('./execnet/multi.py', 'MultiChannel'),
# execnet scripts
'execnet.RSync' : ('./execnet/rsync.py', 'RSync'),
# input-output helping
'io.__doc__' : ('./io/__init__.py', '__doc__'),
'io.dupfile' : ('./io/capture.py', 'dupfile'),

View File

@ -1 +0,0 @@
""" ad-hoc networking mechanism """

View File

@ -1,354 +0,0 @@
"""
gateway code for initiating popen, socket and ssh connections.
(c) 2004-2009, Holger Krekel and others
"""
import sys, os, inspect, socket, atexit, weakref
import py
from py.__.execnet.gateway_base import Message, Popen2IO, SocketIO
from py.__.execnet import gateway_base
debug = False
class GatewayCleanup:
def __init__(self):
self._activegateways = weakref.WeakKeyDictionary()
atexit.register(self.cleanup_atexit)
def register(self, gateway):
assert gateway not in self._activegateways
self._activegateways[gateway] = True
def unregister(self, gateway):
del self._activegateways[gateway]
def cleanup_atexit(self):
if debug:
debug.writeslines(["="*20, "cleaning up", "=" * 20])
debug.flush()
for gw in list(self._activegateways):
gw.exit()
#gw.join() # should work as well
class ExecnetAPI:
def pyexecnet_gateway_init(self, gateway):
""" signal initialisation of new gateway. """
def pyexecnet_gateway_exit(self, gateway):
""" signal exitting of gateway. """
class InitiatingGateway(gateway_base.BaseGateway):
""" initialize gateways on both sides of a inputoutput object. """
# XXX put the next two global variables into an Execnet object
# which intiaties gateways and passes in appropriate values.
_cleanup = GatewayCleanup()
hook = ExecnetAPI()
def __init__(self, io):
self._remote_bootstrap_gateway(io)
super(InitiatingGateway, self).__init__(io=io, _startcount=1)
self._initreceive()
self.hook = py._com.HookRelay(ExecnetAPI, py._com.comregistry)
self.hook.pyexecnet_gateway_init(gateway=self)
self._cleanup.register(self)
def __repr__(self):
""" return string representing gateway type and status. """
if hasattr(self, 'remoteaddress'):
addr = '[%s]' % (self.remoteaddress,)
else:
addr = ''
try:
r = (self._receiverthread.isAlive() and "receiving" or
"not receiving")
s = "sending" # XXX
i = len(self._channelfactory.channels())
except AttributeError:
r = s = "uninitialized"
i = "no"
return "<%s%s %s/%s (%s active channels)>" %(
self.__class__.__name__, addr, r, s, i)
def exit(self):
""" Try to stop all exec and IO activity. """
try:
self._cleanup.unregister(self)
except KeyError:
return # we assume it's already happened
self._stopexec()
self._stopsend()
self.hook.pyexecnet_gateway_exit(gateway=self)
def _remote_bootstrap_gateway(self, io, extra=''):
""" return Gateway with a asynchronously remotely
initialized counterpart Gateway (which may or may not succeed).
Note that the other sides gateways starts enumerating
its channels with even numbers while the sender
gateway starts with odd numbers. This allows to
uniquely identify channels across both sides.
"""
bootstrap = [extra]
bootstrap += [inspect.getsource(gateway_base)]
bootstrap += [io.server_stmt,
"io.write('1'.encode('ascii'))",
"SlaveGateway(io=io, _startcount=2).serve()",
]
source = "\n".join(bootstrap)
self._trace("sending gateway bootstrap code")
#open("/tmp/bootstrap.py", 'w').write(source)
repr_source = repr(source) + "\n"
io.write(repr_source.encode('ascii'))
s = io.read(1)
assert s == "1".encode('ascii')
def _rinfo(self, update=False):
""" return some sys/env information from remote. """
if update or not hasattr(self, '_cache_rinfo'):
ch = self.remote_exec(rinfo_source)
self._cache_rinfo = RInfo(**ch.receive())
return self._cache_rinfo
def remote_exec(self, source):
""" return channel object and connect it to a remote
execution thread where the given 'source' executes
and has the sister 'channel' object in its global
namespace.
"""
source = str(py.code.Source(source))
channel = self.newchannel()
self._send(Message.CHANNEL_OPEN(channel.id, source))
return channel
def remote_init_threads(self, num=None):
""" start up to 'num' threads for subsequent
remote_exec() invocations to allow concurrent
execution.
"""
if hasattr(self, '_remotechannelthread'):
raise IOError("remote threads already running")
from py.__.thread import pool
source = py.code.Source(pool, """
execpool = WorkerPool(maxthreads=%r)
gw = channel.gateway
while 1:
task = gw._execqueue.get()
if task is None:
gw._stopsend()
execpool.shutdown()
execpool.join()
raise gw._StopExecLoop
execpool.dispatch(gw.executetask, task)
""" % num)
self._remotechannelthread = self.remote_exec(source)
def _remote_redirect(self, stdout=None, stderr=None):
""" return a handle representing a redirection of a remote
end's stdout to a local file object. with handle.close()
the redirection will be reverted.
"""
# XXX implement a remote_exec_in_globals(...)
# to send ThreadOut implementation over
clist = []
for name, out in ('stdout', stdout), ('stderr', stderr):
if out:
outchannel = self.newchannel()
outchannel.setcallback(getattr(out, 'write', out))
channel = self.remote_exec("""
import sys
outchannel = channel.receive()
ThreadOut(sys, %r).setdefaultwriter(outchannel.send)
""" % name)
channel.send(outchannel)
clist.append(channel)
for c in clist:
c.waitclose()
class Handle:
def close(_):
for name, out in ('stdout', stdout), ('stderr', stderr):
if out:
c = self.remote_exec("""
import sys
channel.gateway._ThreadOut(sys, %r).resetdefault()
""" % name)
c.waitclose()
return Handle()
class RInfo:
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
def __repr__(self):
info = ", ".join(["%s=%s" % item
for item in self.__dict__.items()])
return "<RInfo %r>" % info
rinfo_source = """
import sys, os
channel.send(dict(
executable = sys.executable,
version_info = tuple([sys.version_info[i] for i in range(5)]),
platform = sys.platform,
cwd = os.getcwd(),
pid = os.getpid(),
))
"""
class PopenCmdGateway(InitiatingGateway):
def __init__(self, args):
from subprocess import Popen, PIPE
self._popen = p = Popen(args, stdin=PIPE, stdout=PIPE)
io = Popen2IO(p.stdin, p.stdout)
super(PopenCmdGateway, self).__init__(io=io)
def exit(self):
super(PopenCmdGateway, self).exit()
self._popen.poll()
popen_bootstrapline = "import sys ; exec(eval(sys.stdin.readline()))"
class PopenGateway(PopenCmdGateway):
""" This Gateway provides interaction with a newly started
python subprocess.
"""
def __init__(self, python=None):
""" instantiate a gateway to a subprocess
started with the given 'python' executable.
"""
if not python:
python = sys.executable
args = [str(python), '-c', popen_bootstrapline]
super(PopenGateway, self).__init__(args)
def _remote_bootstrap_gateway(self, io, extra=''):
# have the subprocess use the same PYTHONPATH and py lib
x = py.path.local(py.__file__).dirpath().dirpath()
ppath = os.environ.get('PYTHONPATH', '')
plist = [str(x)] + ppath.split(':')
s = "\n".join([extra,
"import sys ; sys.path[:0] = %r" % (plist,),
"import os ; os.environ['PYTHONPATH'] = %r" % ppath,
inspect.getsource(stdouterrin_setnull),
"stdouterrin_setnull()",
""
])
super(PopenGateway, self)._remote_bootstrap_gateway(io, s)
class SocketGateway(InitiatingGateway):
""" This Gateway provides interaction with a remote process
by connecting to a specified socket. On the remote
side you need to manually start a small script
(py/execnet/script/socketserver.py) that accepts
SocketGateway connections.
"""
def __init__(self, host, port):
""" instantiate a gateway to a process accessed
via a host/port specified socket.
"""
self.host = host = str(host)
self.port = port = int(port)
self.remoteaddress = '%s:%d' % (self.host, self.port)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((host, port))
except socket.gaierror:
raise HostNotFound(str(sys.exc_info()[1]))
io = SocketIO(sock)
super(SocketGateway, self).__init__(io=io)
def new_remote(cls, gateway, hostport=None):
""" return a new (connected) socket gateway, instatiated
indirectly through the given 'gateway'.
"""
if hostport is None:
host, port = ('', 0) # XXX works on all platforms?
else:
host, port = hostport
mydir = py.path.local(__file__).dirpath()
socketserverbootstrap = py.code.Source(
mydir.join('script', 'socketserver.py').read('r'), """
import socket
sock = bind_and_listen((%r, %r))
port = sock.getsockname()
channel.send(port)
startserver(sock)
""" % (host, port)
)
# execute the above socketserverbootstrap on the other side
channel = gateway.remote_exec(socketserverbootstrap)
(realhost, realport) = channel.receive()
#gateway._trace("new_remote received"
# "port=%r, hostname = %r" %(realport, hostname))
return py.execnet.SocketGateway(host, realport)
new_remote = classmethod(new_remote)
class HostNotFound(Exception):
pass
class SshGateway(PopenCmdGateway):
""" This Gateway provides interaction with a remote Python process,
established via the 'ssh' command line binary.
The remote side needs to have a Python interpreter executable.
"""
def __init__(self, sshaddress, remotepython=None, ssh_config=None):
""" instantiate a remote ssh process with the
given 'sshaddress' and remotepython version.
you may specify an ssh_config file.
"""
self.remoteaddress = sshaddress
if remotepython is None:
remotepython = "python"
args = ['ssh', '-C' ]
if ssh_config is not None:
args.extend(['-F', str(ssh_config)])
remotecmd = '%s -c "%s"' %(remotepython, popen_bootstrapline)
args.extend([sshaddress, remotecmd])
super(SshGateway, self).__init__(args)
def _remote_bootstrap_gateway(self, io, s=""):
extra = "\n".join([
str(py.code.Source(stdouterrin_setnull)),
"stdouterrin_setnull()",
s,
])
try:
super(SshGateway, self)._remote_bootstrap_gateway(io, extra)
except EOFError:
ret = self._popen.wait()
if ret == 255:
raise HostNotFound(self.remoteaddress)
def stdouterrin_setnull():
""" redirect file descriptors 0 and 1 (and possibly 2) to /dev/null.
note that this function may run remotely without py lib support.
"""
# complete confusion (this is independent from the sys.stdout
# and sys.stderr redirection that gateway.remote_exec() can do)
# note that we redirect fd 2 on win too, since for some reason that
# blocks there, while it works (sending to stderr if possible else
# ignoring) on *nix
import sys, os
if not hasattr(os, 'dup'): # jython
return
try:
devnull = os.devnull
except AttributeError:
if os.name == 'nt':
devnull = 'NUL'
else:
devnull = '/dev/null'
# stdin
sys.stdin = os.fdopen(os.dup(0), 'r', 1)
fd = os.open(devnull, os.O_RDONLY)
os.dup2(fd, 0)
os.close(fd)
# stdout
sys.stdout = os.fdopen(os.dup(1), 'w', 1)
fd = os.open(devnull, os.O_WRONLY)
os.dup2(fd, 1)
# stderr for win32
if os.name == 'nt':
sys.stderr = os.fdopen(os.dup(2), 'w', 1)
os.dup2(fd, 2)
os.close(fd)

View File

@ -1,757 +0,0 @@
"""
base execnet gateway code, a quick overview.
the code of this module is sent to the "other side"
as a means of bootstrapping a Gateway object
capable of receiving and executing code,
and routing data through channels.
Gateways operate on InputOutput objects offering
a write and a read(n) method.
Once bootstrapped a higher level protocol
based on Messages is used. Messages are serialized
to and from InputOutput objects. The details of this protocol
are locally defined in this module. There is no need
for standardizing or versioning the protocol.
After bootstrapping the BaseGateway opens a receiver thread which
accepts encoded messages and triggers actions to interpret them.
Sending of channel data items happens directly through
write operations to InputOutput objects so there is no
separate thread.
Code execution messages are put into an execqueue from
which they will be taken for execution. gateway.serve()
will take and execute such items, one by one. This means
that by incoming default execution is single-threaded.
The receiver thread terminates if the remote side sends
a gateway termination message or if the IO-connection drops.
It puts an end symbol into the execqueue so
that serve() can cleanly finish as well.
(C) 2004-2009 Holger Krekel, Armin Rigo and others
"""
import sys, os, weakref
import threading, traceback, socket, struct
try:
import queue
except ImportError:
import Queue as queue
if sys.version_info > (3, 0):
exec("""def do_exec(co, loc):
exec(co, loc)""")
unicode = str
else:
exec("""def do_exec(co, loc):
exec co in loc""")
bytes = str
def str(*args):
raise EnvironmentError(
"use unicode or bytes, not cross-python ambigous 'str'")
default_encoding = "UTF-8"
sysex = (KeyboardInterrupt, SystemExit)
debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid() , 'w')
# ___________________________________________________________________________
#
# input output classes
# ___________________________________________________________________________
class SocketIO:
server_stmt = "io = SocketIO(clientsock)"
error = (socket.error, EOFError)
def __init__(self, sock):
self.sock = sock
try:
sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.SOL_IP, socket.IP_TOS, 0x10)# IPTOS_LOWDELAY
except socket.error:
e = sys.exc_info()[1]
sys.stderr.write("WARNING: cannot set socketoption")
self.readable = self.writeable = True
def read(self, numbytes):
"Read exactly 'bytes' bytes from the socket."
buf = bytes()
while len(buf) < numbytes:
t = self.sock.recv(numbytes - len(buf))
if not t:
raise EOFError
buf += t
return buf
def write(self, data):
assert isinstance(data, bytes)
self.sock.sendall(data)
def close_read(self):
if self.readable:
try:
self.sock.shutdown(0)
except socket.error:
pass
self.readable = None
def close_write(self):
if self.writeable:
try:
self.sock.shutdown(1)
except socket.error:
pass
self.writeable = None
class Popen2IO:
server_stmt = """
import os, sys, tempfile
io = Popen2IO(sys.stdout, sys.stdin)
sys.stdout = tempfile.TemporaryFile('w')
sys.stdin = tempfile.TemporaryFile('r')
"""
error = (IOError, OSError, EOFError)
def __init__(self, outfile, infile):
# we need raw byte streams
self.outfile, self.infile = outfile, infile
if sys.platform == "win32":
import msvcrt
msvcrt.setmode(infile.fileno(), os.O_BINARY)
msvcrt.setmode(outfile.fileno(), os.O_BINARY)
self.readable = self.writeable = True
def read(self, numbytes):
"""Read exactly 'numbytes' bytes from the pipe. """
try:
data = self.infile.buffer.read(numbytes)
except AttributeError:
data = self.infile.read(numbytes)
if len(data) < numbytes:
raise EOFError
return data
def write(self, data):
"""write out all data bytes. """
assert isinstance(data, bytes)
try:
self.outfile.buffer.write(data)
except AttributeError:
self.outfile.write(data)
self.outfile.flush()
def close_read(self):
if self.readable:
self.infile.close()
self.readable = None
def close_write(self):
try:
self.outfile.close()
except EnvironmentError:
pass
self.writeable = None
# ___________________________________________________________________________
#
# Messages
# ___________________________________________________________________________
# the header format
HDR_FORMAT = "!hhii"
HDR_SIZE = struct.calcsize(HDR_FORMAT)
is3k = sys.version_info >= (3,0)
class Message:
""" encapsulates Messages and their wire protocol. """
_types = {}
def __init__(self, channelid=0, data=''):
self.channelid = channelid
self.data = data
def writeto(self, io):
# XXX marshal.dumps doesn't work for exchanging data across Python
# version :-((( XXX check this statement wrt python2.4 through 3.1
data = self.data
if isinstance(data, bytes):
dataformat = 1 + int(is3k)
else:
if isinstance(data, unicode):
dataformat = 3
else:
data = repr(self.data) # argh
dataformat = 4
data = data.encode(default_encoding)
header = struct.pack(HDR_FORMAT, self.msgtype, dataformat,
self.channelid, len(data))
io.write(header + data)
def readfrom(cls, io):
header = io.read(HDR_SIZE)
(msgtype, dataformat,
senderid, stringlen) = struct.unpack(HDR_FORMAT, header)
data = io.read(stringlen)
if dataformat == 1:
if is3k:
# remote was python2-str, we are 3k-text
data = data.decode(default_encoding)
elif dataformat == 2:
# remote was python3-bytes
pass
else:
data = data.decode(default_encoding)
if dataformat == 3:
pass
elif dataformat == 4:
data = eval(data, {}) # reversed argh
else:
raise ValueError("bad data format")
return cls._types[msgtype](senderid, data)
readfrom = classmethod(readfrom)
def __repr__(self):
r = repr(self.data)
if len(r) > 50:
return "<Message.%s channelid=%d len=%d>" %(self.__class__.__name__,
self.channelid, len(r))
else:
return "<Message.%s channelid=%d %r>" %(self.__class__.__name__,
self.channelid, self.data)
def _setupmessages():
class CHANNEL_OPEN(Message):
def received(self, gateway):
channel = gateway._channelfactory.new(self.channelid)
gateway._local_schedulexec(channel=channel, sourcetask=self.data)
class CHANNEL_NEW(Message):
def received(self, gateway):
""" receive a remotely created new (sub)channel. """
newid = self.data
newchannel = gateway._channelfactory.new(newid)
gateway._channelfactory._local_receive(self.channelid, newchannel)
class CHANNEL_DATA(Message):
def received(self, gateway):
gateway._channelfactory._local_receive(self.channelid, self.data)
class CHANNEL_CLOSE(Message):
def received(self, gateway):
gateway._channelfactory._local_close(self.channelid)
class CHANNEL_CLOSE_ERROR(Message):
def received(self, gateway):
remote_error = gateway._channelfactory.RemoteError(self.data)
gateway._channelfactory._local_close(self.channelid, remote_error)
class CHANNEL_LAST_MESSAGE(Message):
def received(self, gateway):
gateway._channelfactory._local_close(self.channelid, sendonly=True)
classes = [CHANNEL_OPEN, CHANNEL_NEW, CHANNEL_DATA,
CHANNEL_CLOSE, CHANNEL_CLOSE_ERROR, CHANNEL_LAST_MESSAGE]
for i, cls in enumerate(classes):
Message._types[i] = cls
cls.msgtype = i
setattr(Message, cls.__name__, cls)
_setupmessages()
def geterrortext(excinfo):
try:
l = traceback.format_exception(*excinfo)
errortext = "".join(l)
except sysex:
raise
except:
errortext = '%s: %s' % (excinfo[0].__name__,
excinfo[1])
return errortext
class RemoteError(EOFError):
""" Contains an Exceptions from the other side. """
def __init__(self, formatted):
self.formatted = formatted
EOFError.__init__(self)
def __str__(self):
return self.formatted
def __repr__(self):
return "%s: %s" %(self.__class__.__name__, self.formatted)
def warn(self):
# XXX do this better
sys.stderr.write("Warning: unhandled %r\n" % (self,))
NO_ENDMARKER_WANTED = object()
class Channel(object):
"""Communication channel between two possibly remote threads of code. """
RemoteError = RemoteError
def __init__(self, gateway, id):
assert isinstance(id, int)
self.gateway = gateway
self.id = id
self._items = queue.Queue()
self._closed = False
self._receiveclosed = threading.Event()
self._remoteerrors = []
def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED):
# we first execute the callback on all already received
# items. We need to hold the receivelock to prevent
# race conditions with newly arriving items.
# after having cleared the queue we register
# the callback only if the channel is not closed already.
_callbacks = self.gateway._channelfactory._callbacks
_receivelock = self.gateway._receivelock
_receivelock.acquire()
try:
if self._items is None:
raise IOError("%r has callback already registered" %(self,))
items = self._items
self._items = None
while 1:
try:
olditem = items.get(block=False)
except queue.Empty:
if not (self._closed or self._receiveclosed.isSet()):
_callbacks[self.id] = (callback, endmarker)
break
else:
if olditem is ENDMARKER:
items.put(olditem) # for other receivers
if endmarker is not NO_ENDMARKER_WANTED:
callback(endmarker)
break
else:
callback(olditem)
finally:
_receivelock.release()
def __repr__(self):
flag = self.isclosed() and "closed" or "open"
return "<Channel id=%d %s>" % (self.id, flag)
def __del__(self):
if self.gateway is None: # can be None in tests
return
self.gateway._trace("Channel(%d).__del__" % self.id)
# no multithreading issues here, because we have the last ref to 'self'
if self._closed:
# state transition "closed" --> "deleted"
for error in self._remoteerrors:
error.warn()
elif self._receiveclosed.isSet():
# state transition "sendonly" --> "deleted"
# the remote channel is already in "deleted" state, nothing to do
pass
else:
# state transition "opened" --> "deleted"
if self._items is None: # has_callback
Msg = Message.CHANNEL_LAST_MESSAGE
else:
Msg = Message.CHANNEL_CLOSE
self.gateway._send(Msg(self.id))
def _getremoteerror(self):
try:
return self._remoteerrors.pop(0)
except IndexError:
return None
#
# public API for channel objects
#
def isclosed(self):
""" return True if the channel is closed. A closed
channel may still hold items.
"""
return self._closed
def makefile(self, mode='w', proxyclose=False):
""" return a file-like object.
mode: 'w' for writes, 'r' for reads
proxyclose: if true file.close() will
trigger a channel.close() call.
"""
if mode == "w":
return ChannelFileWrite(channel=self, proxyclose=proxyclose)
elif mode == "r":
return ChannelFileRead(channel=self, proxyclose=proxyclose)
raise ValueError("mode %r not availabe" %(mode,))
def close(self, error=None):
""" close down this channel on both sides. """
if not self._closed:
# state transition "opened/sendonly" --> "closed"
# threads warning: the channel might be closed under our feet,
# but it's never damaging to send too many CHANNEL_CLOSE messages
put = self.gateway._send
if error is not None:
put(Message.CHANNEL_CLOSE_ERROR(self.id, error))
else:
put(Message.CHANNEL_CLOSE(self.id))
if isinstance(error, RemoteError):
self._remoteerrors.append(error)
self._closed = True # --> "closed"
self._receiveclosed.set()
queue = self._items
if queue is not None:
queue.put(ENDMARKER)
self.gateway._channelfactory._no_longer_opened(self.id)
def waitclose(self, timeout=None):
""" wait until this channel is closed (or the remote side
otherwise signalled that no more data was being sent).
The channel may still hold receiveable items, but not receive
more. waitclose() reraises exceptions from executing code on
the other side as channel.RemoteErrors containing a a textual
representation of the remote traceback.
"""
self._receiveclosed.wait(timeout=timeout) # wait for non-"opened" state
if not self._receiveclosed.isSet():
raise IOError("Timeout")
error = self._getremoteerror()
if error:
raise error
def send(self, item):
"""sends the given item to the other side of the channel,
possibly blocking if the sender queue is full.
Note that an item needs to be marshallable.
"""
if self.isclosed():
raise IOError("cannot send to %r" %(self,))
if isinstance(item, Channel):
data = Message.CHANNEL_NEW(self.id, item.id)
else:
data = Message.CHANNEL_DATA(self.id, item)
self.gateway._send(data)
def receive(self):
"""receives an item that was sent from the other side,
possibly blocking if there is none.
Note that exceptions from the other side will be
reraised as channel.RemoteError exceptions containing
a textual representation of the remote traceback.
"""
queue = self._items
if queue is None:
raise IOError("calling receive() on channel with receiver callback")
x = queue.get()
if x is ENDMARKER:
queue.put(x) # for other receivers
raise self._getremoteerror() or EOFError()
else:
return x
def __iter__(self):
return self
def next(self):
try:
return self.receive()
except EOFError:
raise StopIteration
__next__ = next
ENDMARKER = object()
class ChannelFactory(object):
RemoteError = RemoteError
def __init__(self, gateway, startcount=1):
self._channels = weakref.WeakValueDictionary()
self._callbacks = {}
self._writelock = threading.Lock()
self.gateway = gateway
self.count = startcount
self.finished = False
def new(self, id=None):
""" create a new Channel with 'id' (or create new id if None). """
self._writelock.acquire()
try:
if self.finished:
raise IOError("connexion already closed: %s" % (self.gateway,))
if id is None:
id = self.count
self.count += 2
channel = Channel(self.gateway, id)
self._channels[id] = channel
return channel
finally:
self._writelock.release()
def channels(self):
return list(self._channels.values())
#
# internal methods, called from the receiver thread
#
def _no_longer_opened(self, id):
try:
del self._channels[id]
except KeyError:
pass
try:
callback, endmarker = self._callbacks.pop(id)
except KeyError:
pass
else:
if endmarker is not NO_ENDMARKER_WANTED:
callback(endmarker)
def _local_close(self, id, remoteerror=None, sendonly=False):
channel = self._channels.get(id)
if channel is None:
# channel already in "deleted" state
if remoteerror:
remoteerror.warn()
else:
# state transition to "closed" state
if remoteerror:
channel._remoteerrors.append(remoteerror)
if not sendonly: # otherwise #--> "sendonly"
channel._closed = True # --> "closed"
channel._receiveclosed.set()
queue = channel._items
if queue is not None:
queue.put(ENDMARKER)
self._no_longer_opened(id)
def _local_receive(self, id, data):
# executes in receiver thread
try:
callback, endmarker = self._callbacks[id]
except KeyError:
channel = self._channels.get(id)
queue = channel and channel._items
if queue is None:
pass # drop data
else:
queue.put(data)
else:
callback(data) # even if channel may be already closed
def _finished_receiving(self):
self._writelock.acquire()
try:
self.finished = True
finally:
self._writelock.release()
for id in list(self._channels):
self._local_close(id, sendonly=True)
for id in list(self._callbacks):
self._no_longer_opened(id)
class ChannelFile(object):
def __init__(self, channel, proxyclose=True):
self.channel = channel
self._proxyclose = proxyclose
def close(self):
if self._proxyclose:
self.channel.close()
def __repr__(self):
state = self.channel.isclosed() and 'closed' or 'open'
return '<ChannelFile %d %s>' %(self.channel.id, state)
class ChannelFileWrite(ChannelFile):
def write(self, out):
self.channel.send(out)
def flush(self):
pass
class ChannelFileRead(ChannelFile):
def __init__(self, channel, proxyclose=True):
super(ChannelFileRead, self).__init__(channel, proxyclose)
self._buffer = ""
def read(self, n):
while len(self._buffer) < n:
try:
self._buffer += self.channel.receive()
except EOFError:
self.close()
break
ret = self._buffer[:n]
self._buffer = self._buffer[n:]
return ret
def readline(self):
i = self._buffer.find("\n")
if i != -1:
return self.read(i+1)
line = self.read(len(self._buffer)+1)
while line and line[-1] != "\n":
c = self.read(1)
if not c:
break
line += c
return line
class BaseGateway(object):
exc_info = sys.exc_info
class _StopExecLoop(Exception):
pass
def __init__(self, io, _startcount=2):
""" initialize core gateway, using the given inputoutput object.
"""
self._io = io
self._channelfactory = ChannelFactory(self, _startcount)
self._receivelock = threading.RLock()
def _initreceive(self):
self._receiverthread = threading.Thread(name="receiver",
target=self._thread_receiver)
self._receiverthread.setDaemon(1)
self._receiverthread.start()
def _trace(self, msg):
if debug:
try:
debug.write(unicode(msg) + "\n")
debug.flush()
except sysex:
raise
except:
sys.stderr.write("exception during tracing\n")
def _thread_receiver(self):
""" thread to read and handle Messages half-sync-half-async. """
self._trace("starting to receive")
try:
while 1:
try:
msg = Message.readfrom(self._io)
self._trace("received <- %r" % msg)
_receivelock = self._receivelock
_receivelock.acquire()
try:
msg.received(self)
finally:
_receivelock.release()
except sysex:
break
except EOFError:
break
except:
self._trace(geterrortext(self.exc_info()))
break
finally:
# XXX we need to signal fatal error states to
# channels/callbacks, particularly ones
# where the other side just died.
self._stopexec()
try:
self._stopsend()
except IOError:
self._trace('IOError on _stopsend()')
self._channelfactory._finished_receiving()
if threading: # might be None during shutdown/finalization
self._trace('leaving %r' % threading.currentThread())
def _send(self, msg):
if msg is None:
self._io.close_write()
else:
try:
msg.writeto(self._io)
except:
excinfo = self.exc_info()
self._trace(geterrortext(excinfo))
else:
self._trace('sent -> %r' % msg)
def _stopsend(self):
self._send(None)
def _stopexec(self):
pass
def _local_schedulexec(self, channel, sourcetask):
channel.close("execution disallowed")
# _____________________________________________________________________
#
# High Level Interface
# _____________________________________________________________________
#
def newchannel(self):
""" return new channel object. """
return self._channelfactory.new()
def join(self, joinexec=True):
""" Wait for all IO (and by default all execution activity)
to stop. the joinexec parameter is obsolete.
"""
current = threading.currentThread()
if self._receiverthread.isAlive():
self._trace("joining receiver thread")
self._receiverthread.join()
class SlaveGateway(BaseGateway):
def _stopexec(self):
self._execqueue.put(None)
def _local_schedulexec(self, channel, sourcetask):
self._execqueue.put((channel, sourcetask))
def serve(self, joining=True):
self._execqueue = queue.Queue()
self._initreceive()
try:
while 1:
item = self._execqueue.get()
if item is None:
self._stopsend()
break
try:
self.executetask(item)
except self._StopExecLoop:
break
finally:
self._trace("serve")
if joining:
self.join()
def executetask(self, item):
""" execute channel/source items. """
channel, source = item
try:
loc = { 'channel' : channel, '__name__': '__channelexec__'}
#open("task.py", 'w').write(source)
self._trace("execution starts: %s" % repr(source)[:50])
try:
co = compile(source+'\n', '', 'exec')
do_exec(co, loc)
finally:
self._trace("execution finished")
except sysex:
pass
except self._StopExecLoop:
channel.close()
raise
except:
excinfo = self.exc_info()
self._trace("got exception %s" % excinfo[1])
errortext = geterrortext(excinfo)
channel.close(errortext)
else:
channel.close()

View File

@ -1,71 +0,0 @@
"""
Support for working with multiple channels and gateways
(c) 2008-2009, Holger Krekel and others
"""
import py
try:
import queue
except ImportError:
import Queue as queue
NO_ENDMARKER_WANTED = object()
class MultiGateway:
def __init__(self, gateways):
self.gateways = gateways
def remote_exec(self, source):
channels = []
for gw in self.gateways:
channels.append(gw.remote_exec(source))
return MultiChannel(channels)
def exit(self):
for gw in self.gateways:
gw.exit()
class MultiChannel:
def __init__(self, channels):
self._channels = channels
def send_each(self, item):
for ch in self._channels:
ch.send(item)
def receive_each(self, withchannel=False):
assert not hasattr(self, '_queue')
l = []
for ch in self._channels:
obj = ch.receive()
if withchannel:
l.append((ch, obj))
else:
l.append(obj)
return l
def make_receive_queue(self, endmarker=NO_ENDMARKER_WANTED):
try:
return self._queue
except AttributeError:
self._queue = queue.Queue()
for ch in self._channels:
def putreceived(obj, channel=ch):
self._queue.put((channel, obj))
if endmarker is NO_ENDMARKER_WANTED:
ch.setcallback(putreceived)
else:
ch.setcallback(putreceived, endmarker=endmarker)
return self._queue
def waitclose(self):
first = None
for ch in self._channels:
try:
ch.waitclose()
except ch.RemoteError:
if first is None:
first = py.std.sys.exc_info()
if first:
py.builtin._reraise(first[0], first[1], first[2])

View File

@ -1,201 +0,0 @@
"""
1:N rsync implemenation on top of execnet.
(c) 2006-2009, Armin Rigo, Holger Krekel, Maciej Fijalkowski
"""
import py, os, stat
md5 = py.builtin._tryimport('hashlib', 'md5').md5
Queue = py.builtin._tryimport('queue', 'Queue').Queue
class RSync(object):
""" This class allows to send a directory structure (recursively)
to one or multiple remote filesystems.
There is limited support for symlinks, which means that symlinks
pointing to the sourcetree will be send "as is" while external
symlinks will be just copied (regardless of existance of such
a path on remote side).
"""
def __init__(self, sourcedir, callback=None, verbose=True):
self._sourcedir = str(sourcedir)
self._verbose = verbose
assert callback is None or py.builtin.callable(callback)
self._callback = callback
self._channels = {}
self._receivequeue = Queue()
self._links = []
def filter(self, path):
return True
def _end_of_channel(self, channel):
if channel in self._channels:
# too early! we must have got an error
channel.waitclose()
# or else we raise one
raise IOError('connection unexpectedly closed: %s ' % (
channel.gateway,))
def _process_link(self, channel):
for link in self._links:
channel.send(link)
# completion marker, this host is done
channel.send(42)
def _done(self, channel):
""" Call all callbacks
"""
finishedcallback = self._channels.pop(channel)
if finishedcallback:
finishedcallback()
def _list_done(self, channel):
# sum up all to send
if self._callback:
s = sum([self._paths[i] for i in self._to_send[channel]])
self._callback("list", s, channel)
def _send_item(self, channel, data):
""" Send one item
"""
modified_rel_path, checksum = data
modifiedpath = os.path.join(self._sourcedir, *modified_rel_path)
try:
f = open(modifiedpath, 'rb')
data = f.read()
except IOError:
data = None
# provide info to progress callback function
modified_rel_path = "/".join(modified_rel_path)
if data is not None:
self._paths[modified_rel_path] = len(data)
else:
self._paths[modified_rel_path] = 0
if channel not in self._to_send:
self._to_send[channel] = []
self._to_send[channel].append(modified_rel_path)
#print "sending", modified_rel_path, data and len(data) or 0, checksum
if data is not None:
f.close()
if checksum is not None and checksum == md5(data).digest():
data = None # not really modified
else:
self._report_send_file(channel.gateway, modified_rel_path)
channel.send(data)
def _report_send_file(self, gateway, modified_rel_path):
if self._verbose:
print("%s <= %s" %(gateway, modified_rel_path))
def send(self, raises=True):
""" Sends a sourcedir to all added targets. Flag indicates
whether to raise an error or return in case of lack of
targets
"""
if not self._channels:
if raises:
raise IOError("no targets available, maybe you "
"are trying call send() twice?")
return
# normalize a trailing '/' away
self._sourcedir = os.path.dirname(os.path.join(self._sourcedir, 'x'))
# send directory structure and file timestamps/sizes
self._send_directory_structure(self._sourcedir)
# paths and to_send are only used for doing
# progress-related callbacks
self._paths = {}
self._to_send = {}
# send modified file to clients
while self._channels:
channel, req = self._receivequeue.get()
if req is None:
self._end_of_channel(channel)
else:
command, data = req
if command == "links":
self._process_link(channel)
elif command == "done":
self._done(channel)
elif command == "ack":
if self._callback:
self._callback("ack", self._paths[data], channel)
elif command == "list_done":
self._list_done(channel)
elif command == "send":
self._send_item(channel, data)
del data
else:
assert "Unknown command %s" % command
def add_target(self, gateway, destdir,
finishedcallback=None, **options):
""" Adds a remote target specified via a 'gateway'
and a remote destination directory.
"""
assert finishedcallback is None or py.builtin.callable(finishedcallback)
for name in options:
assert name in ('delete',)
def itemcallback(req):
self._receivequeue.put((channel, req))
channel = gateway.remote_exec(REMOTE_SOURCE)
channel.setcallback(itemcallback, endmarker = None)
channel.send((str(destdir), options))
self._channels[channel] = finishedcallback
def _broadcast(self, msg):
for channel in self._channels:
channel.send(msg)
def _send_link(self, basename, linkpoint):
self._links.append(("link", basename, linkpoint))
def _send_directory(self, path):
# dir: send a list of entries
names = []
subpaths = []
for name in os.listdir(path):
p = os.path.join(path, name)
if self.filter(p):
names.append(name)
subpaths.append(p)
self._broadcast(names)
for p in subpaths:
self._send_directory_structure(p)
def _send_link_structure(self, path):
linkpoint = os.readlink(path)
basename = path[len(self._sourcedir) + 1:]
if not linkpoint.startswith(os.sep):
# relative link, just send it
# XXX: do sth with ../ links
self._send_link(basename, linkpoint)
elif linkpoint.startswith(self._sourcedir):
self._send_link(basename, linkpoint[len(self._sourcedir) + 1:])
else:
self._send_link(basename, linkpoint)
self._broadcast(None)
def _send_directory_structure(self, path):
try:
st = os.lstat(path)
except OSError:
self._broadcast((0, 0))
return
if stat.S_ISREG(st.st_mode):
# regular file: send a timestamp/size pair
self._broadcast((st.st_mtime, st.st_size))
elif stat.S_ISDIR(st.st_mode):
self._send_directory(path)
elif stat.S_ISLNK(st.st_mode):
self._send_link_structure(path)
else:
raise ValueError("cannot sync %r" % (path,))
REMOTE_SOURCE = py.path.local(__file__).dirpath().\
join('rsync_remote.py').open().read() + "\nf()"

View File

@ -1,92 +0,0 @@
def f():
import os, stat, shutil
try:
from hashlib import md5
except ImportError:
from md5 import md5
destdir, options = channel.receive()
modifiedfiles = []
def remove(path):
assert path.startswith(destdir)
try:
os.unlink(path)
except OSError:
# assume it's a dir
shutil.rmtree(path)
def receive_directory_structure(path, relcomponents):
try:
st = os.lstat(path)
except OSError:
st = None
msg = channel.receive()
if isinstance(msg, list):
if st and not stat.S_ISDIR(st.st_mode):
os.unlink(path)
st = None
if not st:
os.makedirs(path)
entrynames = {}
for entryname in msg:
receive_directory_structure(os.path.join(path, entryname),
relcomponents + [entryname])
entrynames[entryname] = True
if options.get('delete'):
for othername in os.listdir(path):
if othername not in entrynames:
otherpath = os.path.join(path, othername)
remove(otherpath)
elif msg is not None:
checksum = None
if st:
if stat.S_ISREG(st.st_mode):
msg_mtime, msg_size = msg
if msg_size != st.st_size:
pass
elif msg_mtime != st.st_mtime:
f = open(path, 'rb')
checksum = md5(f.read()).digest()
f.close()
else:
return # already fine
else:
remove(path)
channel.send(("send", (relcomponents, checksum)))
modifiedfiles.append((path, msg))
receive_directory_structure(destdir, [])
STRICT_CHECK = False # seems most useful this way for py.test
channel.send(("list_done", None))
for path, (time, size) in modifiedfiles:
data = channel.receive()
channel.send(("ack", path[len(destdir) + 1:]))
if data is not None:
if STRICT_CHECK and len(data) != size:
raise IOError('file modified during rsync: %r' % (path,))
f = open(path, 'wb')
f.write(data)
f.close()
try:
os.utime(path, (time, time))
except OSError:
pass
del data
channel.send(("links", None))
msg = channel.receive()
while msg is not 42:
# we get symlink
_type, relpath, linkpoint = msg
assert _type == "link"
path = os.path.join(destdir, relpath)
try:
remove(path)
except OSError:
pass
os.symlink(os.path.join(destdir, linkpoint), path)
msg = channel.receive()
channel.send(("done", None))

View File

@ -1 +0,0 @@
#

View File

@ -1,14 +0,0 @@
import os, sys
import subprocess
if __name__ == '__main__':
directory = os.path.dirname(os.path.abspath(sys.argv[0]))
script = os.path.join(directory, 'socketserver.py')
while 1:
cmdlist = ["python", script]
cmdlist.extend(sys.argv[1:])
text = "starting subcommand: " + " ".join(cmdlist)
print(text)
process = subprocess.Popen(cmdlist)
process.wait()

View File

@ -1,16 +0,0 @@
"""
send a "quit" signal to a remote server
"""
import sys
import socket
hostport = sys.argv[1]
host, port = hostport.split(':')
hostport = (host, int(port))
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(hostport)
sock.sendall('"raise KeyboardInterrupt"\n')

View File

@ -1,85 +0,0 @@
#! /usr/bin/env python
"""
a remote python shell
for injection into startserver.py
"""
import sys, os, socket, select
try:
clientsock
except NameError:
print("client side starting")
import sys
host, port = sys.argv[1].split(':')
port = int(port)
myself = open(os.path.abspath(sys.argv[0]), 'rU').read()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
sock.sendall(repr(myself)+'\n')
print("send boot string")
inputlist = [ sock, sys.stdin ]
try:
while 1:
r,w,e = select.select(inputlist, [], [])
if sys.stdin in r:
line = raw_input()
sock.sendall(line + '\n')
if sock in r:
line = sock.recv(4096)
sys.stdout.write(line)
sys.stdout.flush()
except:
import traceback
print(traceback.print_exc())
sys.exit(1)
print("server side starting")
# server side
#
from traceback import print_exc
from threading import Thread
class promptagent(Thread):
def __init__(self, clientsock):
Thread.__init__(self)
self.clientsock = clientsock
def run(self):
print("Entering thread prompt loop")
clientfile = self.clientsock.makefile('w')
filein = self.clientsock.makefile('r')
loc = self.clientsock.getsockname()
while 1:
try:
clientfile.write('%s %s >>> ' % loc)
clientfile.flush()
line = filein.readline()
if len(line)==0: raise EOFError("nothing")
#print >>sys.stderr,"got line: " + line
if line.strip():
oldout, olderr = sys.stdout, sys.stderr
sys.stdout, sys.stderr = clientfile, clientfile
try:
try:
exec(compile(line + '\n','<remote pyin>', 'single'))
except:
print_exc()
finally:
sys.stdout=oldout
sys.stderr=olderr
clientfile.flush()
except EOFError:
e = sys.exc_info()[1]
sys.stderr.write("connection close, prompt thread returns")
break
#print >>sys.stdout, "".join(apply(format_exception,sys.exc_info()))
self.clientsock.close()
prompter = promptagent(clientsock)
prompter.start()
print("promptagent - thread started")

View File

@ -1,102 +0,0 @@
#! /usr/bin/env python
"""
start socket based minimal readline exec server
"""
# this part of the program only executes on the server side
#
progname = 'socket_readline_exec_server-1.2'
import sys, socket, os
try:
import fcntl
except ImportError:
fcntl = None
debug = 0
if debug: # and not os.isatty(sys.stdin.fileno()):
f = open('/tmp/execnet-socket-pyout.log', 'w')
old = sys.stdout, sys.stderr
sys.stdout = sys.stderr = f
#import py
#compile = py.code.compile
def print_(*args):
print(" ".join(str(arg) for arg in args))
if sys.version_info > (3, 0):
exec("""def exec_(source, locs):
exec(source, locs)""")
else:
exec("""def exec_(source, locs):
exec source in locs""")
def exec_from_one_connection(serversock):
print_(progname, 'Entering Accept loop', serversock.getsockname())
clientsock,address = serversock.accept()
print_(progname, 'got new connection from %s %s' % address)
clientfile = clientsock.makefile('rb')
print_("reading line")
# rstrip so that we can use \r\n for telnet testing
source = clientfile.readline().rstrip()
clientfile.close()
g = {'clientsock' : clientsock, 'address' : address}
source = eval(source)
if source:
co = compile(source+'\n', source, 'exec')
print_(progname, 'compiled source, executing')
try:
exec_(co, g)
finally:
print_(progname, 'finished executing code')
# background thread might hold a reference to this (!?)
#clientsock.close()
def bind_and_listen(hostport):
if isinstance(hostport, str):
host, port = hostport.split(':')
hostport = (host, int(port))
serversock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# set close-on-exec
if hasattr(fcntl, 'FD_CLOEXEC'):
old = fcntl.fcntl(serversock.fileno(), fcntl.F_GETFD)
fcntl.fcntl(serversock.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC)
# allow the address to be re-used in a reasonable amount of time
if os.name == 'posix' and sys.platform != 'cygwin':
serversock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversock.bind(hostport)
serversock.listen(5)
return serversock
def startserver(serversock, loop=False):
try:
while 1:
try:
exec_from_one_connection(serversock)
except (KeyboardInterrupt, SystemExit):
raise
except:
if debug:
import traceback
traceback.print_exc()
else:
excinfo = sys.exc_info()
print_("got exception", excinfo[1])
if not loop:
break
finally:
print_("leaving socketserver execloop")
serversock.shutdown(2)
if __name__ == '__main__':
import sys
if len(sys.argv)>1:
hostport = sys.argv[1]
else:
hostport = ':8888'
serversock = bind_and_listen(hostport)
startserver(serversock, loop=False)

View File

@ -1,91 +0,0 @@
"""
A windows service wrapper for the py.execnet socketserver.
To use, run:
python socketserverservice.py register
net start ExecNetSocketServer
"""
import sys
import os
import time
import win32serviceutil
import win32service
import win32event
import win32evtlogutil
import servicemanager
import threading
import socketserver
appname = 'ExecNetSocketServer'
class SocketServerService(win32serviceutil.ServiceFramework):
_svc_name_ = appname
_svc_display_name_ = "%s" % appname
_svc_deps_ = ["EventLog"]
def __init__(self, args):
# The exe-file has messages for the Event Log Viewer.
# Register the exe-file as event source.
#
# Probably it would be better if this is done at installation time,
# so that it also could be removed if the service is uninstalled.
# Unfortunately it cannot be done in the 'if __name__ == "__main__"'
# block below, because the 'frozen' exe-file does not run this code.
#
win32evtlogutil.AddSourceToRegistry(self._svc_display_name_,
servicemanager.__file__,
"Application")
win32serviceutil.ServiceFramework.__init__(self, args)
self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)
self.WAIT_TIME = 1000 # in milliseconds
def SvcStop(self):
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
win32event.SetEvent(self.hWaitStop)
def SvcDoRun(self):
# Redirect stdout and stderr to prevent "IOError: [Errno 9]
# Bad file descriptor". Windows services don't have functional
# output streams.
sys.stdout = sys.stderr = open('nul', 'w')
# Write a 'started' event to the event log...
win32evtlogutil.ReportEvent(self._svc_display_name_,
servicemanager.PYS_SERVICE_STARTED,
0, # category
servicemanager.EVENTLOG_INFORMATION_TYPE,
(self._svc_name_, ''))
print("Begin: %s" % (self._svc_display_name_))
hostport = ':8888'
print('Starting py.execnet SocketServer on %s' % hostport)
serversock = socketserver.bind_and_listen(hostport)
thread = threading.Thread(target=socketserver.startserver,
args=(serversock,),
kwargs={'loop':True})
thread.setDaemon(True)
thread.start()
# wait to be stopped or self.WAIT_TIME to pass
while True:
result = win32event.WaitForSingleObject(self.hWaitStop,
self.WAIT_TIME)
if result == win32event.WAIT_OBJECT_0:
break
# write a 'stopped' event to the event log.
win32evtlogutil.ReportEvent(self._svc_display_name_,
servicemanager.PYS_SERVICE_STOPPED,
0, # category
servicemanager.EVENTLOG_INFORMATION_TYPE,
(self._svc_name_, ''))
print("End: %s" % appname)
if __name__ == '__main__':
# Note that this code will not be run in the 'frozen' exe-file!!!
win32serviceutil.HandleCommandLine(SocketServerService)

View File

@ -1,9 +0,0 @@
import rlcompleter2
rlcompleter2.setup()
import register, sys
try:
hostport = sys.argv[1]
except:
hostport = ':8888'
gw = register.ServerGateway(hostport)

View File

@ -1,272 +0,0 @@
"""
Simple marshal format (based on pickle) designed to work across Python versions.
"""
import sys
import struct
_INPY3 = _REALLY_PY3 = sys.version_info > (3, 0)
class SerializeError(Exception):
pass
class SerializationError(SerializeError):
"""Error while serializing an object."""
class UnserializableType(SerializationError):
"""Can't serialize a type."""
class UnserializationError(SerializeError):
"""Error while unserializing an object."""
class VersionMismatch(UnserializationError):
"""Data from a previous or later format."""
class Corruption(UnserializationError):
"""The pickle format appears to have been corrupted."""
if _INPY3:
def b(s):
return s.encode("ascii")
else:
b = str
FOUR_BYTE_INT_MAX = 2147483647
_int4_format = struct.Struct("!i")
_float_format = struct.Struct("!d")
# Protocol constants
VERSION_NUMBER = 1
VERSION = b(chr(VERSION_NUMBER))
PY2STRING = b('s')
PY3STRING = b('t')
UNICODE = b('u')
BYTES = b('b')
NEWLIST = b('l')
BUILDTUPLE = b('T')
SETITEM = b('m')
NEWDICT = b('d')
INT = b('i')
FLOAT = b('f')
STOP = b('S')
class CrossVersionOptions(object):
pass
class Serializer(object):
def __init__(self, stream):
self.stream = stream
def save(self, obj):
self.stream.write(VERSION)
self._save(obj)
self.stream.write(STOP)
def _save(self, obj):
tp = type(obj)
try:
dispatch = self.dispatch[tp]
except KeyError:
raise UnserializableType("can't serialize %s" % (tp,))
dispatch(self, obj)
dispatch = {}
def save_bytes(self, bytes_):
self.stream.write(BYTES)
self._write_byte_sequence(bytes_)
dispatch[bytes] = save_bytes
if _INPY3:
def save_string(self, s):
self.stream.write(PY3STRING)
self._write_unicode_string(s)
else:
def save_string(self, s):
self.stream.write(PY2STRING)
self._write_byte_sequence(s)
def save_unicode(self, s):
self.stream.write(UNICODE)
self._write_unicode_string(s)
dispatch[unicode] = save_unicode
dispatch[str] = save_string
def _write_unicode_string(self, s):
try:
as_bytes = s.encode("utf-8")
except UnicodeEncodeError:
raise SerializationError("strings must be utf-8 encodable")
self._write_byte_sequence(as_bytes)
def _write_byte_sequence(self, bytes_):
self._write_int4(len(bytes_), "string is too long")
self.stream.write(bytes_)
def save_int(self, i):
self.stream.write(INT)
self._write_int4(i)
dispatch[int] = save_int
def save_float(self, flt):
self.stream.write(FLOAT)
self.stream.write(_float_format.pack(flt))
dispatch[float] = save_float
def _write_int4(self, i, error="int must be less than %i" %
(FOUR_BYTE_INT_MAX,)):
if i > FOUR_BYTE_INT_MAX:
raise SerializationError(error)
self.stream.write(_int4_format.pack(i))
def save_list(self, L):
self.stream.write(NEWLIST)
self._write_int4(len(L), "list is too long")
for i, item in enumerate(L):
self._write_setitem(i, item)
dispatch[list] = save_list
def _write_setitem(self, key, value):
self._save(key)
self._save(value)
self.stream.write(SETITEM)
def save_dict(self, d):
self.stream.write(NEWDICT)
for key, value in d.items():
self._write_setitem(key, value)
dispatch[dict] = save_dict
def save_tuple(self, tup):
for item in tup:
self._save(item)
self.stream.write(BUILDTUPLE)
self._write_int4(len(tup), "tuple is too long")
dispatch[tuple] = save_tuple
class _UnserializationOptions(object):
pass
class _Py2UnserializationOptions(_UnserializationOptions):
def __init__(self, py3_strings_as_str=False):
self.py3_strings_as_str = py3_strings_as_str
class _Py3UnserializationOptions(_UnserializationOptions):
def __init__(self, py2_strings_as_str=False):
self.py2_strings_as_str = py2_strings_as_str
if _INPY3:
UnserializationOptions = _Py3UnserializationOptions
else:
UnserializationOptions = _Py2UnserializationOptions
class _Stop(Exception):
pass
class Unserializer(object):
def __init__(self, stream, options=UnserializationOptions()):
self.stream = stream
self.options = options
def load(self):
self.stack = []
version = ord(self.stream.read(1))
if version != VERSION_NUMBER:
raise VersionMismatch("%i != %i" % (version, VERSION_NUMBER))
try:
while True:
opcode = self.stream.read(1)
if not opcode:
raise EOFError
try:
loader = self.opcodes[opcode]
except KeyError:
raise Corruption("unkown opcode %s" % (opcode,))
loader(self)
except _Stop:
if len(self.stack) != 1:
raise UnserializationError("internal unserialization error")
return self.stack[0]
else:
raise Corruption("didn't get STOP")
opcodes = {}
def load_int(self):
i = self._read_int4()
self.stack.append(i)
opcodes[INT] = load_int
def load_float(self):
binary = self.stream.read(_float_format.size)
self.stack.append(_float_format.unpack(binary)[0])
opcodes[FLOAT] = load_float
def _read_int4(self):
return _int4_format.unpack(self.stream.read(4))[0]
def _read_byte_string(self):
length = self._read_int4()
as_bytes = self.stream.read(length)
return as_bytes
def load_py3string(self):
as_bytes = self._read_byte_string()
if not _INPY3 and self.options.py3_strings_as_str:
# XXX Should we try to decode into latin-1?
self.stack.append(as_bytes)
else:
self.stack.append(as_bytes.decode("utf-8"))
opcodes[PY3STRING] = load_py3string
def load_py2string(self):
as_bytes = self._read_byte_string()
if _INPY3 and self.options.py2_strings_as_str:
s = as_bytes.decode("latin-1")
else:
s = as_bytes
self.stack.append(s)
opcodes[PY2STRING] = load_py2string
def load_bytes(self):
s = self._read_byte_string()
self.stack.append(s)
opcodes[BYTES] = load_bytes
def load_unicode(self):
self.stack.append(self._read_byte_string().decode("utf-8"))
opcodes[UNICODE] = load_unicode
def load_newlist(self):
length = self._read_int4()
self.stack.append([None] * length)
opcodes[NEWLIST] = load_newlist
def load_setitem(self):
if len(self.stack) < 3:
raise Corruption("not enough items for setitem")
value = self.stack.pop()
key = self.stack.pop()
self.stack[-1][key] = value
opcodes[SETITEM] = load_setitem
def load_newdict(self):
self.stack.append({})
opcodes[NEWDICT] = load_newdict
def load_buildtuple(self):
length = self._read_int4()
tup = tuple(self.stack[-length:])
del self.stack[-length:]
self.stack.append(tup)
opcodes[BUILDTUPLE] = load_buildtuple
def load_stop(self):
raise _Stop
opcodes[STOP] = load_stop

View File

@ -1,79 +0,0 @@
"""
(c) 2008-2009, holger krekel
"""
import py
class XSpec:
""" Execution Specification: key1=value1//key2=value2 ...
* keys need to be unique within the specification scope
* neither key nor value are allowed to contain "//"
* keys are not allowed to contain "="
* keys are not allowed to start with underscore
* if no "=value" is given, assume a boolean True value
"""
# XXX allow customization, for only allow specific key names
popen = ssh = socket = python = chdir = nice = None
def __init__(self, string):
self._spec = string
for keyvalue in string.split("//"):
i = keyvalue.find("=")
if i == -1:
key, value = keyvalue, True
else:
key, value = keyvalue[:i], keyvalue[i+1:]
if key[0] == "_":
raise AttributeError("%r not a valid XSpec key" % key)
if key in self.__dict__:
raise ValueError("duplicate key: %r in %r" %(key, string))
setattr(self, key, value)
def __getattr__(self, name):
if name[0] == "_":
raise AttributeError(name)
return None
def __repr__(self):
return "<XSpec %r>" %(self._spec,)
def __str__(self):
return self._spec
def __hash__(self):
return hash(self._spec)
def __eq__(self, other):
return self._spec == getattr(other, '_spec', None)
def __ne__(self, other):
return self._spec != getattr(other, '_spec', None)
def _samefilesystem(self):
return bool(self.popen and not self.chdir)
def makegateway(spec):
if not isinstance(spec, XSpec):
spec = XSpec(spec)
if spec.popen:
gw = py.execnet.PopenGateway(python=spec.python)
elif spec.ssh:
gw = py.execnet.SshGateway(spec.ssh, remotepython=spec.python)
elif spec.socket:
assert not spec.python, "socket: specifying python executables not supported"
hostport = spec.socket.split(":")
gw = py.execnet.SocketGateway(*hostport)
else:
raise ValueError("no gateway type found for %r" % (spec._spec,))
gw.spec = spec
if spec.chdir or spec.nice:
channel = gw.remote_exec("""
import os
path, nice = channel.receive()
if path:
if not os.path.exists(path):
os.mkdir(path)
os.chdir(path)
if nice and hasattr(os, 'nice'):
os.nice(nice)
""")
nice = spec.nice and int(spec.nice) or 0
channel.send((spec.chdir, nice))
channel.waitclose()
return gw

View File

@ -52,7 +52,7 @@ class PathServer:
if __name__ == '__main__':
import py
gw = py.execnet.PopenGateway()
gw = execnet.PopenGateway()
channel = gw._channelfactory.new()
srv = PathServer(channel)
c = gw.remote_exec("""

View File

@ -11,8 +11,8 @@ channel.send(srv.p2c(py.path.local("/tmp")))
'''
#gw = py.execnet.SshGateway('codespeak.net')
gw = py.execnet.PopenGateway()
#gw = execnet.SshGateway('codespeak.net')
gw = execnet.PopenGateway()
gw.remote_init_threads(5)
c = gw.remote_exec(SRC, stdout=py.std.sys.stdout, stderr=py.std.sys.stderr)
subchannel = gw._channelfactory.new()

View File

@ -252,7 +252,8 @@ class Config(object):
xspeclist.extend([xspec[i+1:]] * num)
if not xspeclist:
raise self.Error("MISSING test execution (tx) nodes: please specify --tx")
return [py.execnet.XSpec(x) for x in xspeclist]
import execnet
return [execnet.XSpec(x) for x in xspeclist]
def getrsyncdirs(self):
config = self

View File

@ -10,5 +10,5 @@ Generator = py.test.collect.Generator
Function = py.test.collect.Function
Instance = py.test.collect.Instance
pytest_plugins = "default runner capture terminal keyword xfail tmpdir execnetcleanup monkeypatch recwarn pdb pastebin unittest helpconfig nose assertion".split()
pytest_plugins = "default runner capture terminal keyword xfail tmpdir monkeypatch recwarn pdb pastebin unittest helpconfig nose assertion".split()

View File

@ -4,7 +4,8 @@
import py
import sys, os
from py.__.execnet.gateway_base import RemoteError
import execnet
from execnet.gateway_base import RemoteError
class GatewayManager:
RemoteError = RemoteError
@ -13,8 +14,8 @@ class GatewayManager:
self.specs = []
self.hook = hook
for spec in specs:
if not isinstance(spec, py.execnet.XSpec):
spec = py.execnet.XSpec(spec)
if not isinstance(spec, execnet.XSpec):
spec = execnet.XSpec(spec)
if not spec.chdir and not spec.popen:
spec.chdir = defaultchdir
self.specs.append(spec)
@ -22,7 +23,7 @@ class GatewayManager:
def makegateways(self):
assert not self.gateways
for spec in self.specs:
gw = py.execnet.makegateway(spec)
gw = execnet.makegateway(spec)
self.gateways.append(gw)
gw.id = "[%s]" % len(self.gateways)
self.hook.pytest_gwmanage_newgateway(
@ -39,7 +40,7 @@ class GatewayManager:
else:
if remote:
l.append(gw)
return py.execnet.MultiGateway(gateways=l)
return execnet.MultiGateway(gateways=l)
def multi_exec(self, source, inplacelocal=True):
""" remote execute code on all gateways.
@ -87,7 +88,7 @@ class GatewayManager:
gw = self.gateways.pop()
gw.exit()
class HostRSync(py.execnet.RSync):
class HostRSync(execnet.RSync):
""" RSyncer that filters out common files
"""
def __init__(self, sourcedir, *args, **kwargs):

View File

@ -13,7 +13,7 @@
"""
import py
from py.__.execnet.gateway_base import Channel
from execnet.gateway_base import Channel
import sys, os, struct
#debug = open("log-mypickle-%d" % os.getpid(), 'w')

View File

@ -7,10 +7,9 @@
otherwise changes to source code can crash
the controlling process which should never happen.
"""
from __future__ import generators
import py
import sys
import execnet
from py.__.test.session import Session
from py.__.test.dist.mypickle import PickleChannel
from py.__.test.looponfail import util
@ -55,7 +54,7 @@ class RemoteControl(object):
py.builtin.print_("RemoteControl:", msg)
def initgateway(self):
return py.execnet.PopenGateway()
return execnet.PopenGateway()
def setup(self, out=None):
if out is None:

View File

@ -1,41 +0,0 @@
"""
cleanup execnet gateways during test function runs.
"""
import py
pytest_plugins = "xfail"
def pytest_configure(config):
config.pluginmanager.register(Execnetcleanup())
class Execnetcleanup:
_gateways = None
def __init__(self, debug=False):
self._debug = debug
def pyexecnet_gateway_init(self, gateway):
if self._gateways is not None:
self._gateways.append(gateway)
def pyexecnet_gateway_exit(self, gateway):
if self._gateways is not None:
self._gateways.remove(gateway)
def pytest_sessionstart(self, session):
self._gateways = []
def pytest_sessionfinish(self, session, exitstatus):
l = []
for gw in self._gateways:
gw.exit()
l.append(gw)
#for gw in l:
# gw.join()
def pytest_pyfunc_call(self, __multicall__, pyfuncitem):
if self._gateways is not None:
gateways = self._gateways[:]
res = __multicall__.execute()
while len(self._gateways) > len(gateways):
self._gateways[-1].exit()
return res

View File

@ -11,15 +11,13 @@ long_description = """
advanced testing and development support library:
- `py.test`_: cross-project testing tool with many advanced features
- `py.execnet`_: ad-hoc code distribution to SSH, Socket and local sub processes
- `py.path`_: path abstractions over local and subversion files
- `py.code`_: dynamic code compile and traceback printing support
Compatibility: Linux, Win32, OSX, Python versions 2.3-2.6.
Compatibility: Linux, Win32, OSX, Python versions 2.4 through to 3.1.
For questions please check out http://pylib.org/contact.html
.. _`py.test`: http://pylib.org/test.html
.. _`py.execnet`: http://pylib.org/execnet.html
.. _`py.path`: http://pylib.org/path.html
.. _`py.code`: http://pylib.org/code.html
@ -63,8 +61,6 @@ def main():
'py.cmdline',
'py.code',
'py.compat',
'py.execnet',
'py.execnet.script',
'py.io',
'py.log',
'py.path',

View File

@ -1 +0,0 @@
#

View File

@ -1,46 +0,0 @@
import py
def pytest_generate_tests(metafunc):
if 'gw' in metafunc.funcargnames:
if hasattr(metafunc.cls, 'gwtype'):
gwtypes = [metafunc.cls.gwtype]
else:
gwtypes = ['popen', 'socket', 'ssh']
for gwtype in gwtypes:
metafunc.addcall(id=gwtype, param=gwtype)
def pytest_funcarg__gw(request):
scope = "session"
if request.param == "popen":
return request.cached_setup(
setup=py.execnet.PopenGateway,
teardown=lambda gw: gw.exit(),
extrakey=request.param,
scope=scope)
elif request.param == "socket":
return request.cached_setup(
setup=setup_socket_gateway,
teardown=teardown_socket_gateway,
extrakey=request.param,
scope=scope)
elif request.param == "ssh":
return request.cached_setup(
setup=lambda: setup_ssh_gateway(request),
teardown=lambda gw: gw.exit(),
extrakey=request.param,
scope=scope)
def setup_socket_gateway():
proxygw = py.execnet.PopenGateway()
gw = py.execnet.SocketGateway.new_remote(proxygw, ("127.0.0.1", 0))
gw.proxygw = proxygw
return gw
def teardown_socket_gateway(gw):
gw.exit()
gw.proxygw.exit()
def setup_ssh_gateway(request):
sshhost = request.getfuncargvalue('specssh').ssh
gw = py.execnet.SshGateway(sshhost)
return gw

View File

@ -1,198 +0,0 @@
import py
import sys, os, subprocess, inspect
from py.__.execnet import gateway_base, gateway
from py.__.execnet.gateway_base import Message, Channel, ChannelFactory
def test_subprocess_interaction(anypython):
line = gateway.popen_bootstrapline
compile(line, 'xyz', 'exec')
args = [str(anypython), '-c', line]
popen = subprocess.Popen(args, bufsize=0, stderr=subprocess.STDOUT,
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
def send(line):
popen.stdin.write(line.encode('ascii'))
if sys.version_info > (3,0): # 3k still buffers
popen.stdin.flush()
def receive():
return popen.stdout.readline().decode('ascii')
try:
source = py.code.Source(read_write_loop, "read_write_loop()")
repr_source = repr(str(source)) + "\n"
sendline = repr_source
send(sendline)
s = receive()
assert s == "ok\n"
send("hello\n")
s = receive()
assert s == "received: hello\n"
send("world\n")
s = receive()
assert s == "received: world\n"
finally:
popen.stdin.close()
popen.stdout.close()
popen.wait()
def read_write_loop():
import os, sys
sys.stdout.write("ok\n")
sys.stdout.flush()
while 1:
try:
line = sys.stdin.readline()
sys.stdout.write("received: %s" % line)
sys.stdout.flush()
except (IOError, EOFError):
break
def pytest_generate_tests(metafunc):
if 'anypython' in metafunc.funcargnames:
for name in 'python3.1', 'python2.4', 'python2.5', 'python2.6':
metafunc.addcall(id=name, param=name)
def pytest_funcarg__anypython(request):
name = request.param
executable = py.path.local.sysfind(name)
if executable is None:
py.test.skip("no %s found" % (name,))
return executable
def test_io_message(anypython, tmpdir):
check = tmpdir.join("check.py")
check.write(py.code.Source(gateway_base, """
try:
from io import BytesIO
except ImportError:
from StringIO import StringIO as BytesIO
import tempfile
temp_out = BytesIO()
temp_in = BytesIO()
io = Popen2IO(temp_out, temp_in)
for i, msg_cls in Message._types.items():
print ("checking %s %s" %(i, msg_cls))
for data in "hello", "hello".encode('ascii'):
msg1 = msg_cls(i, data)
msg1.writeto(io)
x = io.outfile.getvalue()
io.outfile.truncate(0)
io.outfile.seek(0)
io.infile.seek(0)
io.infile.write(x)
io.infile.seek(0)
msg2 = Message.readfrom(io)
assert msg1.channelid == msg2.channelid, (msg1, msg2)
assert msg1.data == msg2.data
print ("all passed")
"""))
#out = py.process.cmdexec("%s %s" %(executable,check))
out = anypython.sysexec(check)
print (out)
assert "all passed" in out
def test_popen_io(anypython, tmpdir):
check = tmpdir.join("check.py")
check.write(py.code.Source(gateway_base, """
do_exec(Popen2IO.server_stmt, globals())
io.write("hello".encode('ascii'))
s = io.read(1)
assert s == "x".encode('ascii')
"""))
from subprocess import Popen, PIPE
args = [str(anypython), str(check)]
proc = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
proc.stdin.write("x".encode('ascii'))
stdout, stderr = proc.communicate()
print (stderr)
ret = proc.wait()
assert "hello".encode('ascii') in stdout
def test_rinfo_source(anypython, tmpdir):
check = tmpdir.join("check.py")
check.write(py.code.Source("""
class Channel:
def send(self, data):
assert eval(repr(data), {}) == data
channel = Channel()
""", gateway.rinfo_source, """
print ('all passed')
"""))
out = anypython.sysexec(check)
print (out)
assert "all passed" in out
def test_geterrortext(anypython, tmpdir):
check = tmpdir.join("check.py")
check.write(py.code.Source(gateway_base, """
class Arg:
pass
errortext = geterrortext((Arg, "1", 4))
assert "Arg" in errortext
import sys
try:
raise ValueError("17")
except ValueError:
excinfo = sys.exc_info()
s = geterrortext(excinfo)
assert "17" in s
print ("all passed")
"""))
out = anypython.sysexec(check)
print (out)
assert "all passed" in out
def test_stdouterrin_setnull():
cap = py.io.StdCaptureFD()
from py.__.execnet.gateway import stdouterrin_setnull
stdouterrin_setnull()
import os
os.write(1, "hello".encode('ascii'))
if os.name == "nt":
os.write(2, "world")
os.read(0, 1)
out, err = cap.reset()
assert not out
assert not err
class TestMessage:
def test_wire_protocol(self):
for cls in Message._types.values():
one = py.io.BytesIO()
data = '23'.encode('ascii')
cls(42, data).writeto(one)
two = py.io.BytesIO(one.getvalue())
msg = Message.readfrom(two)
assert isinstance(msg, cls)
assert msg.channelid == 42
assert msg.data == data
assert isinstance(repr(msg), str)
# == "<Message.%s channelid=42 '23'>" %(msg.__class__.__name__, )
class TestPureChannel:
def setup_method(self, method):
self.fac = ChannelFactory(None)
def test_factory_create(self):
chan1 = self.fac.new()
assert chan1.id == 1
chan2 = self.fac.new()
assert chan2.id == 3
def test_factory_getitem(self):
chan1 = self.fac.new()
assert self.fac._channels[chan1.id] == chan1
chan2 = self.fac.new()
assert self.fac._channels[chan2.id] == chan2
def test_channel_timeouterror(self):
channel = self.fac.new()
py.test.raises(IOError, channel.waitclose, timeout=0.01)
def test_channel_makefile_incompatmode(self):
channel = self.fac.new()
py.test.raises(ValueError, 'channel.makefile("rw")')

View File

@ -1,545 +0,0 @@
"""
mostly functional tests of gateways.
"""
import os, sys, time
import py
from py.__.execnet import gateway_base, gateway
queue = py.builtin._tryimport('queue', 'Queue')
TESTTIMEOUT = 10.0 # seconds
class TestBasicRemoteExecution:
def test_correct_setup(self, gw):
assert gw._receiverthread.isAlive()
def test_repr_doesnt_crash(self, gw):
assert isinstance(repr(gw), str)
def test_attribute__name__(self, gw):
channel = gw.remote_exec("channel.send(__name__)")
name = channel.receive()
assert name == "__channelexec__"
def test_correct_setup_no_py(self, gw):
channel = gw.remote_exec("""
import sys
channel.send(list(sys.modules))
""")
remotemodules = channel.receive()
assert 'py' not in remotemodules, (
"py should not be imported on remote side")
def test_remote_exec_waitclose(self, gw):
channel = gw.remote_exec('pass')
channel.waitclose(TESTTIMEOUT)
def test_remote_exec_waitclose_2(self, gw):
channel = gw.remote_exec('def gccycle(): pass')
channel.waitclose(TESTTIMEOUT)
def test_remote_exec_waitclose_noarg(self, gw):
channel = gw.remote_exec('pass')
channel.waitclose()
def test_remote_exec_error_after_close(self, gw):
channel = gw.remote_exec('pass')
channel.waitclose(TESTTIMEOUT)
py.test.raises(IOError, channel.send, 0)
def test_remote_exec_channel_anonymous(self, gw):
channel = gw.remote_exec('''
obj = channel.receive()
channel.send(obj)
''')
channel.send(42)
result = channel.receive()
assert result == 42
class TestChannelBasicBehaviour:
def test_channel_close_and_then_receive_error(self, gw):
channel = gw.remote_exec('raise ValueError')
py.test.raises(channel.RemoteError, channel.receive)
def test_channel_finish_and_then_EOFError(self, gw):
channel = gw.remote_exec('channel.send(42)')
x = channel.receive()
assert x == 42
py.test.raises(EOFError, channel.receive)
py.test.raises(EOFError, channel.receive)
py.test.raises(EOFError, channel.receive)
def test_channel_close_and_then_receive_error_multiple(self, gw):
channel = gw.remote_exec('channel.send(42) ; raise ValueError')
x = channel.receive()
assert x == 42
py.test.raises(channel.RemoteError, channel.receive)
def test_channel__local_close(self, gw):
channel = gw._channelfactory.new()
gw._channelfactory._local_close(channel.id)
channel.waitclose(0.1)
def test_channel__local_close_error(self, gw):
channel = gw._channelfactory.new()
gw._channelfactory._local_close(channel.id,
channel.RemoteError("error"))
py.test.raises(channel.RemoteError, channel.waitclose, 0.01)
def test_channel_error_reporting(self, gw):
channel = gw.remote_exec('def foo():\n return foobar()\nfoo()\n')
try:
channel.receive()
except channel.RemoteError:
e = sys.exc_info()[1]
assert str(e).startswith('Traceback (most recent call last):')
assert str(e).find('NameError: global name \'foobar\' '
'is not defined') > -1
else:
py.test.fail('No exception raised')
def test_channel_syntax_error(self, gw):
# missing colon
channel = gw.remote_exec('def foo()\n return 1\nfoo()\n')
try:
channel.receive()
except channel.RemoteError:
e = sys.exc_info()[1]
assert str(e).startswith('Traceback (most recent call last):')
assert str(e).find('SyntaxError') > -1
def test_channel_iter(self, gw):
channel = gw.remote_exec("""
for x in range(3):
channel.send(x)
""")
l = list(channel)
assert l == [0, 1, 2]
def test_channel_passing_over_channel(self, gw):
channel = gw.remote_exec('''
c = channel.gateway.newchannel()
channel.send(c)
c.send(42)
''')
c = channel.receive()
x = c.receive()
assert x == 42
# check that the both sides previous channels are really gone
channel.waitclose(TESTTIMEOUT)
#assert c.id not in gw._channelfactory
newchan = gw.remote_exec('''
assert %d not in channel.gateway._channelfactory._channels
''' % (channel.id))
newchan.waitclose(TESTTIMEOUT)
assert channel.id not in gw._channelfactory._channels
def test_channel_receiver_callback(self, gw):
l = []
#channel = gw.newchannel(receiver=l.append)
channel = gw.remote_exec(source='''
channel.send(42)
channel.send(13)
channel.send(channel.gateway.newchannel())
''')
channel.setcallback(callback=l.append)
py.test.raises(IOError, channel.receive)
channel.waitclose(TESTTIMEOUT)
assert len(l) == 3
assert l[:2] == [42,13]
assert isinstance(l[2], channel.__class__)
def test_channel_callback_after_receive(self, gw):
l = []
channel = gw.remote_exec(source='''
channel.send(42)
channel.send(13)
channel.send(channel.gateway.newchannel())
''')
x = channel.receive()
assert x == 42
channel.setcallback(callback=l.append)
py.test.raises(IOError, channel.receive)
channel.waitclose(TESTTIMEOUT)
assert len(l) == 2
assert l[0] == 13
assert isinstance(l[1], channel.__class__)
def test_waiting_for_callbacks(self, gw):
l = []
def callback(msg):
import time; time.sleep(0.2)
l.append(msg)
channel = gw.remote_exec(source='''
channel.send(42)
''')
channel.setcallback(callback)
channel.waitclose(TESTTIMEOUT)
assert l == [42]
def test_channel_callback_stays_active(self, gw):
self.check_channel_callback_stays_active(gw, earlyfree=True)
def check_channel_callback_stays_active(self, gw, earlyfree=True):
# with 'earlyfree==True', this tests the "sendonly" channel state.
l = []
channel = gw.remote_exec(source='''
try:
import thread
except ImportError:
import _thread as thread
import time
def producer(subchannel):
for i in range(5):
time.sleep(0.15)
subchannel.send(i*100)
channel2 = channel.receive()
thread.start_new_thread(producer, (channel2,))
del channel2
''')
subchannel = gw.newchannel()
subchannel.setcallback(l.append)
channel.send(subchannel)
if earlyfree:
subchannel = None
counter = 100
while len(l) < 5:
if subchannel and subchannel.isclosed():
break
counter -= 1
print(counter)
if not counter:
py.test.fail("timed out waiting for the answer[%d]" % len(l))
time.sleep(0.04) # busy-wait
assert l == [0, 100, 200, 300, 400]
return subchannel
def test_channel_callback_remote_freed(self, gw):
channel = self.check_channel_callback_stays_active(gw, earlyfree=False)
# freed automatically at the end of producer()
channel.waitclose(TESTTIMEOUT)
def test_channel_endmarker_callback(self, gw):
l = []
channel = gw.remote_exec(source='''
channel.send(42)
channel.send(13)
channel.send(channel.gateway.newchannel())
''')
channel.setcallback(l.append, 999)
py.test.raises(IOError, channel.receive)
channel.waitclose(TESTTIMEOUT)
assert len(l) == 4
assert l[:2] == [42,13]
assert isinstance(l[2], channel.__class__)
assert l[3] == 999
def test_channel_endmarker_callback_error(self, gw):
q = queue.Queue()
channel = gw.remote_exec(source='''
raise ValueError()
''')
channel.setcallback(q.put, endmarker=999)
val = q.get(TESTTIMEOUT)
assert val == 999
err = channel._getremoteerror()
assert err
assert str(err).find("ValueError") != -1
@py.test.mark.xfail
def test_remote_redirect_stdout(self, gw):
out = py.io.TextIO()
handle = gw._remote_redirect(stdout=out)
c = gw.remote_exec("print 42")
c.waitclose(TESTTIMEOUT)
handle.close()
s = out.getvalue()
assert s.strip() == "42"
@py.test.mark.xfail
def test_remote_exec_redirect_multi(self, gw):
num = 3
l = [[] for x in range(num)]
channels = [gw.remote_exec("print %d" % i,
stdout=l[i].append)
for i in range(num)]
for x in channels:
x.waitclose(TESTTIMEOUT)
for i in range(num):
subl = l[i]
assert subl
s = subl[0]
assert s.strip() == str(i)
class TestChannelFile:
def test_channel_file_write(self, gw):
channel = gw.remote_exec("""
f = channel.makefile()
f.write("hello world\\n")
f.close()
channel.send(42)
""")
first = channel.receive()
assert first.strip() == 'hello world'
second = channel.receive()
assert second == 42
def test_channel_file_write_error(self, gw):
channel = gw.remote_exec("pass")
f = channel.makefile()
channel.waitclose(TESTTIMEOUT)
py.test.raises(IOError, f.write, 'hello')
def test_channel_file_proxyclose(self, gw):
channel = gw.remote_exec("""
f = channel.makefile(proxyclose=True)
f.write("hello world")
f.close()
channel.send(42)
""")
first = channel.receive()
assert first.strip() == 'hello world'
py.test.raises(EOFError, channel.receive)
def test_channel_file_read(self, gw):
channel = gw.remote_exec("""
f = channel.makefile(mode='r')
s = f.read(2)
channel.send(s)
s = f.read(5)
channel.send(s)
""")
channel.send("xyabcde")
s1 = channel.receive()
s2 = channel.receive()
assert s1 == "xy"
assert s2 == "abcde"
def test_channel_file_read_empty(self, gw):
channel = gw.remote_exec("pass")
f = channel.makefile(mode="r")
s = f.read(3)
assert s == ""
s = f.read(5)
assert s == ""
def test_channel_file_readline_remote(self, gw):
channel = gw.remote_exec("""
channel.send('123\\n45')
""")
channel.waitclose(TESTTIMEOUT)
f = channel.makefile(mode="r")
s = f.readline()
assert s == "123\n"
s = f.readline()
assert s == "45"
def test_channel_makefile_incompatmode(self, gw):
channel = gw.newchannel()
py.test.raises(ValueError, 'channel.makefile("rw")')
def test_confusion_from_os_write_stdout(self, gw):
channel = gw.remote_exec("""
import os
os.write(1, 'confusion!'.encode('ascii'))
channel.send(channel.receive() * 6)
channel.send(channel.receive() * 6)
""")
channel.send(3)
res = channel.receive()
assert res == 18
channel.send(7)
res = channel.receive()
assert res == 42
def test_confusion_from_os_write_stderr(self, gw):
channel = gw.remote_exec("""
import os
os.write(2, 'test'.encode('ascii'))
channel.send(channel.receive() * 6)
channel.send(channel.receive() * 6)
""")
channel.send(3)
res = channel.receive()
assert res == 18
channel.send(7)
res = channel.receive()
assert res == 42
def test__rinfo(self, gw):
rinfo = gw._rinfo()
assert rinfo.executable
assert rinfo.cwd
assert rinfo.version_info
s = repr(rinfo)
old = gw.remote_exec("""
import os.path
cwd = os.getcwd()
channel.send(os.path.basename(cwd))
os.chdir('..')
""").receive()
try:
rinfo2 = gw._rinfo()
assert rinfo2.cwd == rinfo.cwd
rinfo3 = gw._rinfo(update=True)
assert rinfo3.cwd != rinfo2.cwd
finally:
gw._cache_rinfo = rinfo
gw.remote_exec("import os ; os.chdir(%r)" % old).waitclose()
def test_join_blocked_execution_gateway():
gateway = py.execnet.PopenGateway()
channel = gateway.remote_exec("""
import time
time.sleep(5.0)
""")
def doit():
gateway.exit()
gateway.join(joinexec=True)
return 17
pool = py._thread.WorkerPool()
reply = pool.dispatch(doit)
x = reply.get(timeout=1.0)
assert x == 17
class TestPopenGateway:
gwtype = 'popen'
def test_chdir_separation(self, tmpdir):
old = tmpdir.chdir()
try:
gw = py.execnet.PopenGateway()
finally:
waschangedir = old.chdir()
c = gw.remote_exec("import os ; channel.send(os.getcwd())")
x = c.receive()
assert x == str(waschangedir)
def test_many_popen(self):
num = 4
l = []
for i in range(num):
l.append(py.execnet.PopenGateway())
channels = []
for gw in l:
channel = gw.remote_exec("""channel.send(42)""")
channels.append(channel)
## try:
## while channels:
## channel = channels.pop()
## try:
## ret = channel.receive()
## assert ret == 42
## finally:
## channel.gateway.exit()
## finally:
## for x in channels:
## x.gateway.exit()
while channels:
channel = channels.pop()
ret = channel.receive()
assert ret == 42
def test_rinfo_popen(self, gw):
rinfo = gw._rinfo()
assert rinfo.executable == py.std.sys.executable
assert rinfo.cwd == py.std.os.getcwd()
assert rinfo.version_info == py.std.sys.version_info
def test_gateway_init_event(self, _pytest):
rec = _pytest.gethookrecorder(gateway.ExecnetAPI)
gw = py.execnet.PopenGateway()
call = rec.popcall("pyexecnet_gateway_init")
assert call.gateway == gw
gw.exit()
call = rec.popcall("pyexecnet_gateway_exit")
assert call.gateway == gw
@py.test.mark.xfail # "fix needed: dying remote process does not cause waitclose() to fail"
def test_waitclose_on_remote_killed(self):
gw = py.execnet.PopenGateway()
channel = gw.remote_exec("""
import os
import time
channel.send(os.getpid())
while 1:
channel.send("#" * 100)
""")
remotepid = channel.receive()
py.process.kill(remotepid)
py.test.raises(channel.RemoteError, "channel.waitclose(TESTTIMEOUT)")
py.test.raises(EOFError, channel.send, None)
py.test.raises(EOFError, channel.receive)
@py.test.mark.xfail
def test_endmarker_delivery_on_remote_killterm():
if not hasattr(py.std.os, 'kill'):
py.test.skip("no os.kill()")
gw = py.execnet.PopenGateway()
try:
q = queue.Queue()
channel = gw.remote_exec(source='''
import os
os.kill(os.getpid(), 15)
''')
channel.setcallback(q.put, endmarker=999)
val = q.get(TESTTIMEOUT)
assert val == 999
err = channel._getremoteerror()
finally:
gw.exit()
assert "killed" in str(err)
assert "15" in str(err)
def test_socket_gw_host_not_found(gw):
py.test.raises(py.execnet.HostNotFound,
'py.execnet.SocketGateway("qowieuqowe", 9000)'
)
class TestSshPopenGateway:
gwtype = "ssh"
def test_sshconfig_config_parsing(self, monkeypatch):
import subprocess
l = []
monkeypatch.setattr(subprocess, 'Popen',
lambda *args, **kwargs: l.append(args[0]))
py.test.raises(AttributeError,
"""py.execnet.SshGateway("xyz", ssh_config='qwe')""")
assert len(l) == 1
popen_args = l[0]
i = popen_args.index('-F')
assert popen_args[i+1] == "qwe"
def test_sshaddress(self, gw, specssh):
assert gw.remoteaddress == specssh.ssh
def test_host_not_found(self):
py.test.raises(py.execnet.HostNotFound,
"py.execnet.SshGateway('nowhere.codespeak.net')")
class TestThreads:
def test_threads(self):
gw = py.execnet.PopenGateway()
gw.remote_init_threads(3)
c1 = gw.remote_exec("channel.send(channel.receive())")
c2 = gw.remote_exec("channel.send(channel.receive())")
c2.send(1)
res = c2.receive()
assert res == 1
c1.send(42)
res = c1.receive()
assert res == 42
def test_threads_twice(self):
gw = py.execnet.PopenGateway()
gw.remote_init_threads(3)
py.test.raises(IOError, gw.remote_init_threads, 3)
def test_nodebug():
from py.__.execnet import gateway_base
assert not gateway_base.debug

View File

@ -1,58 +0,0 @@
"""
tests for
- multi channels and multi gateways
"""
import py
class TestMultiChannelAndGateway:
def test_multichannel_receive_each(self):
class pseudochannel:
def receive(self):
return 12
pc1 = pseudochannel()
pc2 = pseudochannel()
multichannel = py.execnet.MultiChannel([pc1, pc2])
l = multichannel.receive_each(withchannel=True)
assert len(l) == 2
assert l == [(pc1, 12), (pc2, 12)]
l = multichannel.receive_each(withchannel=False)
assert l == [12,12]
def test_multichannel_send_each(self):
l = [py.execnet.PopenGateway() for x in range(2)]
gm = py.execnet.MultiGateway(l)
mc = gm.remote_exec("""
import os
channel.send(channel.receive() + 1)
""")
mc.send_each(41)
l = mc.receive_each()
assert l == [42,42]
def test_multichannel_receive_queue_for_two_subprocesses(self):
l = [py.execnet.PopenGateway() for x in range(2)]
gm = py.execnet.MultiGateway(l)
mc = gm.remote_exec("""
import os
channel.send(os.getpid())
""")
queue = mc.make_receive_queue()
ch, item = queue.get(timeout=10)
ch2, item2 = queue.get(timeout=10)
assert ch != ch2
assert ch.gateway != ch2.gateway
assert item != item2
mc.waitclose()
def test_multichannel_waitclose(self):
l = []
class pseudochannel:
def waitclose(self):
l.append(0)
multichannel = py.execnet.MultiChannel([pseudochannel(), pseudochannel()])
multichannel.waitclose()
assert len(l) == 2

View File

@ -1,148 +0,0 @@
import py
from py.execnet import RSync
def pytest_funcarg__gw1(request):
return request.cached_setup(
setup=py.execnet.PopenGateway,
teardown=lambda val: val.exit(),
scope="module"
)
pytest_funcarg__gw2 = pytest_funcarg__gw1
def pytest_funcarg__dirs(request):
t = request.getfuncargvalue('tmpdir')
class dirs:
source = t.join("source")
dest1 = t.join("dest1")
dest2 = t.join("dest2")
return dirs
class TestRSync:
def test_notargets(self, dirs):
rsync = RSync(dirs.source)
py.test.raises(IOError, "rsync.send()")
assert rsync.send(raises=False) is None
def test_dirsync(self, dirs, gw1, gw2):
dest = dirs.dest1
dest2 = dirs.dest2
source = dirs.source
for s in ('content1', 'content2', 'content2-a-bit-longer'):
source.ensure('subdir', 'file1').write(s)
rsync = RSync(dirs.source)
rsync.add_target(gw1, dest)
rsync.add_target(gw2, dest2)
rsync.send()
assert dest.join('subdir').check(dir=1)
assert dest.join('subdir', 'file1').check(file=1)
assert dest.join('subdir', 'file1').read() == s
assert dest2.join('subdir').check(dir=1)
assert dest2.join('subdir', 'file1').check(file=1)
assert dest2.join('subdir', 'file1').read() == s
for x in dest, dest2:
fn = x.join("subdir", "file1")
fn.setmtime(0)
source.join('subdir').remove('file1')
rsync = RSync(source)
rsync.add_target(gw2, dest2)
rsync.add_target(gw1, dest)
rsync.send()
assert dest.join('subdir', 'file1').check(file=1)
assert dest2.join('subdir', 'file1').check(file=1)
rsync = RSync(source)
rsync.add_target(gw1, dest, delete=True)
rsync.add_target(gw2, dest2)
rsync.send()
assert not dest.join('subdir', 'file1').check()
assert dest2.join('subdir', 'file1').check()
def test_dirsync_twice(self, dirs, gw1, gw2):
source = dirs.source
source.ensure("hello")
rsync = RSync(source)
rsync.add_target(gw1, dirs.dest1)
rsync.send()
assert dirs.dest1.join('hello').check()
py.test.raises(IOError, "rsync.send()")
assert rsync.send(raises=False) is None
rsync.add_target(gw1, dirs.dest2)
rsync.send()
assert dirs.dest2.join('hello').check()
py.test.raises(IOError, "rsync.send()")
assert rsync.send(raises=False) is None
def test_rsync_default_reporting(self, capsys, dirs, gw1):
source = dirs.source
source.ensure("hello")
rsync = RSync(source)
rsync.add_target(gw1, dirs.dest1)
rsync.send()
out, err = capsys.readouterr()
assert out.find("hello") != -1
def test_rsync_non_verbose(self, capsys, dirs, gw1):
source = dirs.source
source.ensure("hello")
rsync = RSync(source, verbose=False)
rsync.add_target(gw1, dirs.dest1)
rsync.send()
out, err = capsys.readouterr()
assert not out
assert not err
def test_symlink_rsync(self, dirs, gw1):
if py.std.sys.platform == 'win32':
py.test.skip("symlinks are unsupported on Windows.")
source = dirs.source
dest = dirs.dest1
dirs.source.ensure("existant")
source.join("rellink").mksymlinkto(source.join("existant"), absolute=0)
source.join('abslink').mksymlinkto(source.join("existant"))
rsync = RSync(source)
rsync.add_target(gw1, dest)
rsync.send()
assert dest.join('rellink').readlink() == dest.join("existant")
assert dest.join('abslink').readlink() == dest.join("existant")
def test_callback(self, dirs, gw1):
dest = dirs.dest1
source = dirs.source
source.ensure("existant").write("a" * 100)
source.ensure("existant2").write("a" * 10)
total = {}
def callback(cmd, lgt, channel):
total[(cmd, lgt)] = True
rsync = RSync(source, callback=callback)
#rsync = RSync()
rsync.add_target(gw1, dest)
rsync.send()
assert total == {("list", 110):True, ("ack", 100):True, ("ack", 10):True}
def test_file_disappearing(self, dirs, gw1):
dest = dirs.dest1
source = dirs.source
source.ensure("ex").write("a" * 100)
source.ensure("ex2").write("a" * 100)
class DRsync(RSync):
def filter(self, x):
assert x != source
if x.endswith("ex2"):
self.x = 1
source.join("ex2").remove()
return True
rsync = DRsync(source)
rsync.add_target(gw1, dest)
rsync.send()
assert rsync.x == 1
assert len(dest.listdir()) == 1
assert len(source.listdir()) == 1

View File

@ -1,179 +0,0 @@
# -*- coding: utf-8 -*-
import sys
import os
import tempfile
import subprocess
import py
from py.__.execnet import serializer
def _find_version(suffix=""):
name = "python" + suffix
executable = py.path.local.sysfind(name)
if executable is None:
py.test.skip("can't find a %r executable" % (name,))
return executable
def setup_module(mod):
mod.TEMPDIR = py.path.local(tempfile.mkdtemp())
if sys.version_info > (3, 0):
mod._py3_wrapper = PythonWrapper(py.path.local(sys.executable))
mod._py2_wrapper = PythonWrapper(_find_version())
else:
mod._py3_wrapper = PythonWrapper(_find_version("3"))
mod._py2_wrapper = PythonWrapper(py.path.local(sys.executable))
mod._old_pypath = os.environ.get("PYTHONPATH")
pylib = str(py.path.local(py.__file__).dirpath().join(".."))
os.environ["PYTHONPATH"] = pylib
def teardown_module(mod):
TEMPDIR.remove(True)
if _old_pypath is not None:
os.environ["PYTHONPATH"] = _old_pypath
class PythonWrapper(object):
def __init__(self, executable):
self.executable = executable
def dump(self, obj_rep):
script_file = TEMPDIR.join("dump.py")
script_file.write("""
from py.__.execnet import serializer
import sys
if sys.version_info > (3, 0): # Need binary output
sys.stdout = sys.stdout.detach()
saver = serializer.Serializer(sys.stdout)
saver.save(%s)""" % (obj_rep,))
return self.executable.sysexec(script_file)
def load(self, data, option_args=""):
script_file = TEMPDIR.join("load.py")
script_file.write(r"""
from py.__.execnet import serializer
import sys
if sys.version_info > (3, 0):
sys.stdin = sys.stdin.detach()
options = serializer.UnserializationOptions(%s)
loader = serializer.Unserializer(sys.stdin, options)
obj = loader.load()
sys.stdout.write(type(obj).__name__ + "\n")
sys.stdout.write(repr(obj))""" % (option_args,))
popen = subprocess.Popen([str(self.executable), str(script_file)],
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
stdout, stderr = popen.communicate(data.encode("latin-1"))
ret = popen.returncode
if ret:
raise py.process.cmdexec.Error(ret, ret, str(self.executable),
stdout, stderr)
return [s.decode("ascii") for s in stdout.splitlines()]
def __repr__(self):
return "<PythonWrapper for %s>" % (self.executable,)
def pytest_funcarg__py2(request):
return _py2_wrapper
def pytest_funcarg__py3(request):
return _py3_wrapper
def pytest_funcarg__dump(request):
py_dump = request.getfuncargvalue(request.param[0])
return py_dump.dump
def pytest_funcarg__load(request):
py_dump = request.getfuncargvalue(request.param[1])
return py_dump.load
def pytest_generate_tests(metafunc):
if 'dump' in metafunc.funcargnames and 'load' in metafunc.funcargnames:
pys = 'py2', 'py3'
for dump in pys:
for load in pys:
param = (dump, load)
conversion = '%s to %s'%param
if 'repr' not in metafunc.funcargnames:
metafunc.addcall(id=conversion, param=param)
else:
for tp, repr in simple_tests.items():
metafunc.addcall(
id='%s:%s'%(tp, conversion),
param=param,
funcargs={'tp_name':tp, 'repr':repr},
)
simple_tests = {
# type: expected before/after repr
'int': '4',
'float':'3.25',
'list': '[1, 2, 3]',
'tuple': '(1, 2, 3)',
'dict': '{6: 2, (1, 2, 3): 32}',
}
def test_simple(tp_name, repr, dump, load):
p = dump(repr)
tp , v = load(p)
assert tp == tp_name
assert v == repr
@py.test.mark.xfail
# I'm not sure if we need the complexity.
def test_recursive_list(py2, py3):
l = [1, 2, 3]
l.append(l)
p = py2.dump(l)
tp, rep = py2.load(l)
assert tp == "list"
def test_bigint_should_fail():
py.test.raises(serializer.SerializationError,
serializer.Serializer(py.io.BytesIO()).save,
123456678900)
def test_bytes(py2, py3):
p = py3.dump("b'hi'")
tp, v = py2.load(p)
assert tp == "str"
assert v == "'hi'"
tp, v = py3.load(p)
assert tp == "bytes"
assert v == "b'hi'"
def test_string(py2, py3):
p = py2.dump("'xyz'")
tp, s = py2.load(p)
assert tp == "str"
assert s == "'xyz'"
tp, s = py3.load(p)
assert tp == "bytes"
assert s == "b'xyz'"
tp, s = py3.load(p, "True")
assert tp == "str"
assert s == "'xyz'"
p = py3.dump("'xyz'")
tp, s = py2.load(p, True)
assert tp == "str"
assert s == "'xyz'"
def test_unicode(py2, py3):
p = py2.dump("u'hi'")
tp, s = py2.load(p)
assert tp == "unicode"
assert s == "u'hi'"
tp, s = py3.load(p)
assert tp == "str"
assert s == "'hi'"
p = py3.dump("'hi'")
tp, s = py3.load(p)
assert tp == "str"
assert s == "'hi'"
tp, s = py2.load(p)
assert tp == "unicode"
assert s == "u'hi'"

View File

@ -1,151 +0,0 @@
import py
XSpec = py.execnet.XSpec
class TestXSpec:
def test_norm_attributes(self):
spec = XSpec("socket=192.168.102.2:8888//python=c:/this/python2.5//chdir=d:\hello")
assert spec.socket == "192.168.102.2:8888"
assert spec.python == "c:/this/python2.5"
assert spec.chdir == "d:\hello"
assert spec.nice is None
assert not hasattr(spec, '_xyz')
py.test.raises(AttributeError, "spec._hello")
spec = XSpec("socket=192.168.102.2:8888//python=python2.5//nice=3")
assert spec.socket == "192.168.102.2:8888"
assert spec.python == "python2.5"
assert spec.chdir is None
assert spec.nice == "3"
spec = XSpec("ssh=user@host//chdir=/hello/this//python=/usr/bin/python2.5")
assert spec.ssh == "user@host"
assert spec.python == "/usr/bin/python2.5"
assert spec.chdir == "/hello/this"
spec = XSpec("popen")
assert spec.popen == True
def test__samefilesystem(self):
assert XSpec("popen")._samefilesystem()
assert XSpec("popen//python=123")._samefilesystem()
assert not XSpec("popen//chdir=hello")._samefilesystem()
def test__spec_spec(self):
for x in ("popen", "popen//python=this"):
assert XSpec(x)._spec == x
def test_samekeyword_twice_raises(self):
py.test.raises(ValueError, "XSpec('popen//popen')")
py.test.raises(ValueError, "XSpec('popen//popen=123')")
def test_unknown_keys_allowed(self):
xspec = XSpec("hello=3")
assert xspec.hello == '3'
def test_repr_and_string(self):
for x in ("popen", "popen//python=this"):
assert repr(XSpec(x)).find("popen") != -1
assert str(XSpec(x)) == x
def test_hash_equality(self):
assert XSpec("popen") == XSpec("popen")
assert hash(XSpec("popen")) == hash(XSpec("popen"))
assert XSpec("popen//python=123") != XSpec("popen")
assert hash(XSpec("socket=hello:8080")) != hash(XSpec("popen"))
class TestMakegateway:
def test_no_type(self):
py.test.raises(ValueError, "py.execnet.makegateway('hello')")
def test_popen(self):
gw = py.execnet.makegateway("popen")
assert gw.spec.python == None
rinfo = gw._rinfo()
assert rinfo.executable == py.std.sys.executable
assert rinfo.cwd == py.std.os.getcwd()
assert rinfo.version_info == py.std.sys.version_info
def test_popen_nice(self):
gw = py.execnet.makegateway("popen//nice=5")
remotenice = gw.remote_exec("""
import os
if hasattr(os, 'nice'):
channel.send(os.nice(0))
else:
channel.send(None)
""").receive()
if remotenice is not None:
assert remotenice == 5
def test_popen_explicit(self):
gw = py.execnet.makegateway("popen//python=%s" % py.std.sys.executable)
assert gw.spec.python == py.std.sys.executable
rinfo = gw._rinfo()
assert rinfo.executable == py.std.sys.executable
assert rinfo.cwd == py.std.os.getcwd()
assert rinfo.version_info == py.std.sys.version_info
def test_popen_cpython25(self):
for trypath in ('python2.5', r'C:\Python25\python.exe'):
cpython25 = py.path.local.sysfind(trypath)
if cpython25 is not None:
cpython25 = cpython25.realpath()
break
else:
py.test.skip("cpython2.5 not found")
gw = py.execnet.makegateway("popen//python=%s" % cpython25)
rinfo = gw._rinfo()
if py.std.sys.platform != "darwin": # it's confusing there
assert rinfo.executable == cpython25
assert rinfo.cwd == py.std.os.getcwd()
assert rinfo.version_info[:2] == (2,5)
def test_popen_cpython26(self):
for trypath in ('python2.6', r'C:\Python26\python.exe'):
cpython26 = py.path.local.sysfind(trypath)
if cpython26 is not None:
break
else:
py.test.skip("cpython2.6 not found")
gw = py.execnet.makegateway("popen//python=%s" % cpython26)
rinfo = gw._rinfo()
assert rinfo.executable == cpython26
assert rinfo.cwd == py.std.os.getcwd()
assert rinfo.version_info[:2] == (2,6)
def test_popen_chdir_absolute(self, testdir):
gw = py.execnet.makegateway("popen//chdir=%s" % testdir.tmpdir)
rinfo = gw._rinfo()
assert rinfo.cwd == str(testdir.tmpdir.realpath())
def test_popen_chdir_newsub(self, testdir):
testdir.chdir()
gw = py.execnet.makegateway("popen//chdir=hello")
rinfo = gw._rinfo()
assert rinfo.cwd == str(testdir.tmpdir.join("hello").realpath())
def test_ssh(self, specssh):
sshhost = specssh.ssh
gw = py.execnet.makegateway("ssh=%s" % sshhost)
rinfo = gw._rinfo()
gw2 = py.execnet.SshGateway(sshhost)
rinfo2 = gw2._rinfo()
assert rinfo.executable == rinfo2.executable
assert rinfo.cwd == rinfo2.cwd
assert rinfo.version_info == rinfo2.version_info
def test_socket(self, specsocket):
gw = py.execnet.makegateway("socket=%s" % specsocket.socket)
rinfo = gw._rinfo()
assert rinfo.executable
assert rinfo.cwd
assert rinfo.version_info
# we cannot instantiate a second gateway
#gw2 = py.execnet.SocketGateway(*specsocket.socket.split(":"))
#rinfo2 = gw2._rinfo()
#assert rinfo.executable == rinfo2.executable
#assert rinfo.cwd == rinfo2.cwd
#assert rinfo.version_info == rinfo2.version_info

View File

@ -1,8 +1,9 @@
from py.__.test.dist.dsession import DSession
from py.__.test import outcome
import py
import execnet
XSpec = py.execnet.XSpec
XSpec = execnet.XSpec
def run(item, node, excinfo=None):
runner = item.config.pluginmanager.getplugin("runner")

View File

@ -9,6 +9,7 @@ import py
import os
from py.__.test.dist.gwmanage import GatewayManager, HostRSync
from py.__.test.plugin import hookspec
import execnet
def pytest_funcarg__hookrecorder(request):
_pytest = request.getfuncargvalue('_pytest')
@ -35,7 +36,7 @@ class TestGatewayManagerPopen:
hm = GatewayManager(["popen"] * 2, hook)
hm.makegateways()
call = hookrecorder.popcall("pytest_gwmanage_newgateway")
assert call.gateway.spec == py.execnet.XSpec("popen")
assert call.gateway.spec == execnet.XSpec("popen")
assert call.gateway.id == "[1]"
assert call.platinfo.executable == call.gateway._rinfo().executable
call = hookrecorder.popcall("pytest_gwmanage_newgateway")
@ -149,7 +150,7 @@ class TestHRSync:
def test_hrsync_one_host(self, mysetup):
source, dest = mysetup.source, mysetup.dest
gw = py.execnet.makegateway("popen//chdir=%s" % dest)
gw = execnet.makegateway("popen//chdir=%s" % dest)
finished = []
rsync = HostRSync(source)
rsync.add_target_host(gw, finished=lambda: finished.append(1))

View File

@ -1,6 +1,7 @@
import py
import sys
import execnet
Queue = py.builtin._tryimport('queue', 'Queue').Queue
@ -117,7 +118,7 @@ def test_self_memoize():
TESTTIMEOUT = 2.0
class TestPickleChannelFunctional:
def setup_class(cls):
cls.gw = py.execnet.PopenGateway()
cls.gw = execnet.PopenGateway()
cls.gw.remote_init_threads(5)
def test_popen_send_instance(self):

View File

@ -1,5 +1,6 @@
import py
import execnet
from py.__.test.dist.txnode import TXNode
queue = py.builtin._tryimport("queue", "Queue")
Queue = queue.Queue
@ -46,8 +47,8 @@ class MySetup:
config = py.test.config._reparse([])
self.config = config
self.queue = Queue()
self.xspec = py.execnet.XSpec("popen")
self.gateway = py.execnet.makegateway(self.xspec)
self.xspec = execnet.XSpec("popen")
self.gateway = execnet.makegateway(self.xspec)
self.id += 1
self.gateway.id = str(self.id)
self.node = TXNode(self.gateway, self.config, putevent=self.queue.put)

View File

@ -1,12 +0,0 @@
def test_execnetplugin(testdir):
reprec = testdir.inline_runsource("""
import py
import sys
def test_hello():
sys._gw = py.execnet.PopenGateway()
def test_world():
assert hasattr(sys, '_gw')
assert sys._gw not in sys._gw._cleanup._activegateways
""", "-s", "--debug")
reprec.assertoutcome(passed=2)

View File

@ -105,6 +105,7 @@ class TestTerminal:
])
def test_gwmanage_events(self, testdir, linecomp):
execnet = py.test.importorskip("execnet")
modcol = testdir.getmodulecol("""
def test_one():
pass
@ -113,10 +114,10 @@ class TestTerminal:
rep = TerminalReporter(modcol.config, file=linecomp.stringio)
class gw1:
id = "X1"
spec = py.execnet.XSpec("popen")
spec = execnet.XSpec("popen")
class gw2:
id = "X2"
spec = py.execnet.XSpec("popen")
spec = execnet.XSpec("popen")
class rinfo:
version_info = (2, 5, 1, 'final', 0)
executable = "hello"

View File

@ -182,8 +182,9 @@ class TestConfigPickling:
old.chdir()
def test_config__setstate__wired_correctly_in_childprocess(testdir):
execnet = py.test.importorskip("execnet")
from py.__.test.dist.mypickle import PickleChannel
gw = py.execnet.PopenGateway()
gw = execnet.PopenGateway()
channel = gw.remote_exec("""
import py
from py.__.test.dist.mypickle import PickleChannel