diff --git a/py/execnet/channel.py b/py/execnet/channel.py index 22473c5bd..cf25db967 100644 --- a/py/execnet/channel.py +++ b/py/execnet/channel.py @@ -37,10 +37,10 @@ class Channel(object): def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED): queue = self._items - lock = self.gateway.channelfactory._receivelock + lock = self.gateway._channelfactory._receivelock lock.acquire() try: - _callbacks = self.gateway.channelfactory._callbacks + _callbacks = self.gateway._channelfactory._callbacks dictvalue = (callback, endmarker) if _callbacks.setdefault(self.id, dictvalue) != dictvalue: raise IOError("%r has callback already registered" %(self,)) @@ -58,7 +58,7 @@ class Channel(object): callback(olditem) if self._closed or self._receiveclosed.isSet(): # no need to keep a callback - self.gateway.channelfactory._close_callback(self.id) + self.gateway._channelfactory._close_callback(self.id) finally: lock.release() @@ -129,7 +129,7 @@ class Channel(object): queue = self._items if queue is not None: queue.put(ENDMARKER) - self.gateway.channelfactory._no_longer_opened(self.id) + self.gateway._channelfactory._no_longer_opened(self.id) def waitclose(self, timeout=None): """ wait until this channel is closed (or the remote side diff --git a/py/execnet/gateway.py b/py/execnet/gateway.py index 934dac083..35100368e 100644 --- a/py/execnet/gateway.py +++ b/py/execnet/gateway.py @@ -10,7 +10,7 @@ import __future__ # other modules) execute not only on the local side but # also on any gateway's remote side. On such remote sides # we cannot assume the py library to be there and -# InstallableGateway.remote_bootstrap_gateway() (located +# InstallableGateway._remote_bootstrap_gateway() (located # in register.py) will take care to send source fragments # to the other side. Yes, it is fragile but we have a # few tests that try to catch when we mess up. @@ -38,15 +38,15 @@ class Gateway(object): global registered_cleanup self._execpool = WorkerPool(maxthreads=maxthreads) ## self.running = True - self.io = io + self._io = io self._outgoing = Queue.Queue() - self.channelfactory = ChannelFactory(self, startcount) + self._channelfactory = ChannelFactory(self, startcount) ## self._exitlock = threading.Lock() if not registered_cleanup: atexit.register(cleanup_atexit) registered_cleanup = True _active_sendqueues[self._outgoing] = True - self.pool = NamedThreadPool(receiver = self._thread_receiver, + self._pool = NamedThreadPool(receiver = self._thread_receiver, sender = self._thread_sender) def __repr__(self): @@ -56,11 +56,11 @@ class Gateway(object): else: addr = '' try: - r = (len(self.pool.getstarted('receiver')) + r = (len(self._pool.getstarted('receiver')) and "receiving" or "not receiving") - s = (len(self.pool.getstarted('sender')) + s = (len(self._pool.getstarted('sender')) and "sending" or "not sending") - i = len(self.channelfactory.channels()) + i = len(self._channelfactory.channels()) except AttributeError: r = s = "uninitialized" i = "no" @@ -98,7 +98,7 @@ class Gateway(object): from sys import exc_info while 1: try: - msg = Message.readfrom(self.io) + msg = Message.readfrom(self._io) self._trace("received <- %r" % msg) msg.received(self) except sysex: @@ -110,7 +110,7 @@ class Gateway(object): break finally: self._outgoing.put(None) - self.channelfactory._finished_receiving() + self._channelfactory._finished_receiving() self._trace('leaving %r' % threading.currentThread()) def _thread_sender(self): @@ -121,9 +121,9 @@ class Gateway(object): msg = self._outgoing.get() try: if msg is None: - self.io.close_write() + self._io.close_write() break - msg.writeto(self.io) + msg.writeto(self._io) except: excinfo = exc_info() self._traceex(excinfo) @@ -140,7 +140,7 @@ class Gateway(object): l = [] for name, id in ('stdout', outid), ('stderr', errid): if id: - channel = self.channelfactory.new(outid) + channel = self._channelfactory.new(outid) out = self._ThreadOut(sys, name) out.setwritefunc(channel.send) l.append((out, channel)) @@ -196,7 +196,7 @@ class Gateway(object): # def newchannel(self): """ return new channel object. """ - return self.channelfactory.new() + return self._channelfactory.new() def remote_exec(self, source, stdout=None, stderr=None): """ return channel object for communicating with the asynchronously @@ -218,7 +218,7 @@ class Gateway(object): (source, outid, errid))) return channel - def remote_redirect(self, stdout=None, stderr=None): + def _remote_redirect(self, stdout=None, stderr=None): """ return a handle representing a redirection of a remote end's stdout to a local file object. with handle.close() the redirection will be reverted. @@ -262,7 +262,7 @@ class Gateway(object): ## try: ## if self.running: ## self.running = False -## if not self.pool.getstarted('sender'): +## if not self._pool.getstarted('sender'): ## raise IOError("sender thread not alive anymore!") ## self._outgoing.put(None) ## self._trace("exit procedure triggered, pid %d " % (os.getpid(),)) @@ -279,7 +279,7 @@ class Gateway(object): def join(self, joinexec=True): current = threading.currentThread() - for x in self.pool.getstarted(): + for x in self._pool.getstarted(): if x != current: self._trace("joining %s" % x) x.join() diff --git a/py/execnet/message.py b/py/execnet/message.py index 9cf0ea229..3a1c49e6d 100644 --- a/py/execnet/message.py +++ b/py/execnet/message.py @@ -62,32 +62,32 @@ def _setupmessages(): class CHANNEL_OPEN(Message): def received(self, gateway): - channel = gateway.channelfactory.new(self.channelid) + channel = gateway._channelfactory.new(self.channelid) gateway._local_schedulexec(channel=channel, sourcetask=self.data) class CHANNEL_NEW(Message): def received(self, gateway): """ receive a remotely created new (sub)channel. """ newid = self.data - newchannel = gateway.channelfactory.new(newid) - gateway.channelfactory._local_receive(self.channelid, newchannel) + newchannel = gateway._channelfactory.new(newid) + gateway._channelfactory._local_receive(self.channelid, newchannel) class CHANNEL_DATA(Message): def received(self, gateway): - gateway.channelfactory._local_receive(self.channelid, self.data) + gateway._channelfactory._local_receive(self.channelid, self.data) class CHANNEL_CLOSE(Message): def received(self, gateway): - gateway.channelfactory._local_close(self.channelid) + gateway._channelfactory._local_close(self.channelid) class CHANNEL_CLOSE_ERROR(Message): def received(self, gateway): - remote_error = gateway.channelfactory.RemoteError(self.data) - gateway.channelfactory._local_close(self.channelid, remote_error) + remote_error = gateway._channelfactory.RemoteError(self.data) + gateway._channelfactory._local_close(self.channelid, remote_error) class CHANNEL_LAST_MESSAGE(Message): def received(self, gateway): - gateway.channelfactory._local_last_message(self.channelid) + gateway._channelfactory._local_last_message(self.channelid) classes = [x for x in locals().values() if hasattr(x, '__bases__')] classes.sort(lambda x,y : cmp(x.__name__, y.__name__)) diff --git a/py/execnet/register.py b/py/execnet/register.py index b027f8c1c..e35cfcd12 100644 --- a/py/execnet/register.py +++ b/py/execnet/register.py @@ -29,10 +29,10 @@ from py.__.execnet import inputoutput, gateway class InstallableGateway(gateway.Gateway): """ initialize gateways on both sides of a inputoutput object. """ def __init__(self, io): - self.remote_bootstrap_gateway(io) + self._remote_bootstrap_gateway(io) super(InstallableGateway, self).__init__(io=io, startcount=1) - def remote_bootstrap_gateway(self, io, extra=''): + def _remote_bootstrap_gateway(self, io, extra=''): """ return Gateway with a asynchronously remotely initialized counterpart Gateway (which may or may not succeed). Note that the other sides gateways starts enumerating @@ -84,7 +84,7 @@ class PopenGateway(PopenCmdGateway): cmd = '%s -u -c "exec input()"' % python super(PopenGateway, self).__init__(cmd) - def remote_bootstrap_gateway(self, io, extra=''): + def _remote_bootstrap_gateway(self, io, extra=''): # XXX the following hack helps us to import the same version # of the py lib and other dependcies, but only works for # PopenGateways because we can assume to have access to @@ -126,7 +126,7 @@ class PopenGateway(PopenCmdGateway): """)), "" ]) - super(PopenGateway, self).remote_bootstrap_gateway(io, s) + super(PopenGateway, self)._remote_bootstrap_gateway(io, s) class SocketGateway(InstallableGateway): def __init__(self, host, port): diff --git a/py/execnet/testing/test_gateway.py b/py/execnet/testing/test_gateway.py index 52dc4edae..0b684193e 100644 --- a/py/execnet/testing/test_gateway.py +++ b/py/execnet/testing/test_gateway.py @@ -62,13 +62,13 @@ class PopenGatewayTestSetup: def setup_class(cls): cls.gw = py.execnet.PopenGateway() - def teardown_class(cls): - cls.gw.exit() + #def teardown_class(cls): + # cls.gw.exit() class BasicRemoteExecution: def test_correct_setup(self): for x in 'sender', 'receiver': - assert self.gw.pool.getstarted(x) + assert self.gw._pool.getstarted(x) def test_repr_doesnt_crash(self): assert isinstance(repr(self), str) @@ -127,13 +127,13 @@ class BasicRemoteExecution: py.test.raises(channel.RemoteError, channel.receive) def test_channel__local_close(self): - channel = self.gw.channelfactory.new() - self.gw.channelfactory._local_close(channel.id) + channel = self.gw._channelfactory.new() + self.gw._channelfactory._local_close(channel.id) channel.waitclose(0.1) def test_channel__local_close_error(self): - channel = self.gw.channelfactory.new() - self.gw.channelfactory._local_close(channel.id, + channel = self.gw._channelfactory.new() + self.gw._channelfactory._local_close(channel.id, channel.RemoteError("error")) py.test.raises(channel.RemoteError, channel.waitclose, 0.01) @@ -177,10 +177,10 @@ class BasicRemoteExecution: # check that the both sides previous channels are really gone channel.waitclose(0.3) - assert channel.id not in self.gw.channelfactory._channels - #assert c.id not in self.gw.channelfactory + assert channel.id not in self.gw._channelfactory._channels + #assert c.id not in self.gw._channelfactory newchan = self.gw.remote_exec(''' - assert %d not in channel.gateway.channelfactory._channels + assert %d not in channel.gateway._channelfactory._channels ''' % (channel.id)) newchan.waitclose(0.3) @@ -278,7 +278,7 @@ class BasicRemoteExecution: def test_remote_redirect_stdout(self): out = py.std.StringIO.StringIO() - handle = self.gw.remote_redirect(stdout=out) + handle = self.gw._remote_redirect(stdout=out) c = self.gw.remote_exec("print 42") c.waitclose(1.0) handle.close()