[svn r37785] privatizing some more attributes

--HG--
branch : trunk
This commit is contained in:
hpk 2007-02-02 02:02:55 +01:00
parent b4919c8102
commit 7a718ca2e7
5 changed files with 43 additions and 43 deletions

View File

@ -37,10 +37,10 @@ class Channel(object):
def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED): def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED):
queue = self._items queue = self._items
lock = self.gateway.channelfactory._receivelock lock = self.gateway._channelfactory._receivelock
lock.acquire() lock.acquire()
try: try:
_callbacks = self.gateway.channelfactory._callbacks _callbacks = self.gateway._channelfactory._callbacks
dictvalue = (callback, endmarker) dictvalue = (callback, endmarker)
if _callbacks.setdefault(self.id, dictvalue) != dictvalue: if _callbacks.setdefault(self.id, dictvalue) != dictvalue:
raise IOError("%r has callback already registered" %(self,)) raise IOError("%r has callback already registered" %(self,))
@ -58,7 +58,7 @@ class Channel(object):
callback(olditem) callback(olditem)
if self._closed or self._receiveclosed.isSet(): if self._closed or self._receiveclosed.isSet():
# no need to keep a callback # no need to keep a callback
self.gateway.channelfactory._close_callback(self.id) self.gateway._channelfactory._close_callback(self.id)
finally: finally:
lock.release() lock.release()
@ -129,7 +129,7 @@ class Channel(object):
queue = self._items queue = self._items
if queue is not None: if queue is not None:
queue.put(ENDMARKER) queue.put(ENDMARKER)
self.gateway.channelfactory._no_longer_opened(self.id) self.gateway._channelfactory._no_longer_opened(self.id)
def waitclose(self, timeout=None): def waitclose(self, timeout=None):
""" wait until this channel is closed (or the remote side """ wait until this channel is closed (or the remote side

View File

@ -10,7 +10,7 @@ import __future__
# other modules) execute not only on the local side but # other modules) execute not only on the local side but
# also on any gateway's remote side. On such remote sides # also on any gateway's remote side. On such remote sides
# we cannot assume the py library to be there and # we cannot assume the py library to be there and
# InstallableGateway.remote_bootstrap_gateway() (located # InstallableGateway._remote_bootstrap_gateway() (located
# in register.py) will take care to send source fragments # in register.py) will take care to send source fragments
# to the other side. Yes, it is fragile but we have a # to the other side. Yes, it is fragile but we have a
# few tests that try to catch when we mess up. # few tests that try to catch when we mess up.
@ -38,15 +38,15 @@ class Gateway(object):
global registered_cleanup global registered_cleanup
self._execpool = WorkerPool(maxthreads=maxthreads) self._execpool = WorkerPool(maxthreads=maxthreads)
## self.running = True ## self.running = True
self.io = io self._io = io
self._outgoing = Queue.Queue() self._outgoing = Queue.Queue()
self.channelfactory = ChannelFactory(self, startcount) self._channelfactory = ChannelFactory(self, startcount)
## self._exitlock = threading.Lock() ## self._exitlock = threading.Lock()
if not registered_cleanup: if not registered_cleanup:
atexit.register(cleanup_atexit) atexit.register(cleanup_atexit)
registered_cleanup = True registered_cleanup = True
_active_sendqueues[self._outgoing] = True _active_sendqueues[self._outgoing] = True
self.pool = NamedThreadPool(receiver = self._thread_receiver, self._pool = NamedThreadPool(receiver = self._thread_receiver,
sender = self._thread_sender) sender = self._thread_sender)
def __repr__(self): def __repr__(self):
@ -56,11 +56,11 @@ class Gateway(object):
else: else:
addr = '' addr = ''
try: try:
r = (len(self.pool.getstarted('receiver')) r = (len(self._pool.getstarted('receiver'))
and "receiving" or "not receiving") and "receiving" or "not receiving")
s = (len(self.pool.getstarted('sender')) s = (len(self._pool.getstarted('sender'))
and "sending" or "not sending") and "sending" or "not sending")
i = len(self.channelfactory.channels()) i = len(self._channelfactory.channels())
except AttributeError: except AttributeError:
r = s = "uninitialized" r = s = "uninitialized"
i = "no" i = "no"
@ -98,7 +98,7 @@ class Gateway(object):
from sys import exc_info from sys import exc_info
while 1: while 1:
try: try:
msg = Message.readfrom(self.io) msg = Message.readfrom(self._io)
self._trace("received <- %r" % msg) self._trace("received <- %r" % msg)
msg.received(self) msg.received(self)
except sysex: except sysex:
@ -110,7 +110,7 @@ class Gateway(object):
break break
finally: finally:
self._outgoing.put(None) self._outgoing.put(None)
self.channelfactory._finished_receiving() self._channelfactory._finished_receiving()
self._trace('leaving %r' % threading.currentThread()) self._trace('leaving %r' % threading.currentThread())
def _thread_sender(self): def _thread_sender(self):
@ -121,9 +121,9 @@ class Gateway(object):
msg = self._outgoing.get() msg = self._outgoing.get()
try: try:
if msg is None: if msg is None:
self.io.close_write() self._io.close_write()
break break
msg.writeto(self.io) msg.writeto(self._io)
except: except:
excinfo = exc_info() excinfo = exc_info()
self._traceex(excinfo) self._traceex(excinfo)
@ -140,7 +140,7 @@ class Gateway(object):
l = [] l = []
for name, id in ('stdout', outid), ('stderr', errid): for name, id in ('stdout', outid), ('stderr', errid):
if id: if id:
channel = self.channelfactory.new(outid) channel = self._channelfactory.new(outid)
out = self._ThreadOut(sys, name) out = self._ThreadOut(sys, name)
out.setwritefunc(channel.send) out.setwritefunc(channel.send)
l.append((out, channel)) l.append((out, channel))
@ -196,7 +196,7 @@ class Gateway(object):
# #
def newchannel(self): def newchannel(self):
""" return new channel object. """ """ return new channel object. """
return self.channelfactory.new() return self._channelfactory.new()
def remote_exec(self, source, stdout=None, stderr=None): def remote_exec(self, source, stdout=None, stderr=None):
""" return channel object for communicating with the asynchronously """ return channel object for communicating with the asynchronously
@ -218,7 +218,7 @@ class Gateway(object):
(source, outid, errid))) (source, outid, errid)))
return channel return channel
def remote_redirect(self, stdout=None, stderr=None): def _remote_redirect(self, stdout=None, stderr=None):
""" return a handle representing a redirection of a remote """ return a handle representing a redirection of a remote
end's stdout to a local file object. with handle.close() end's stdout to a local file object. with handle.close()
the redirection will be reverted. the redirection will be reverted.
@ -262,7 +262,7 @@ class Gateway(object):
## try: ## try:
## if self.running: ## if self.running:
## self.running = False ## self.running = False
## if not self.pool.getstarted('sender'): ## if not self._pool.getstarted('sender'):
## raise IOError("sender thread not alive anymore!") ## raise IOError("sender thread not alive anymore!")
## self._outgoing.put(None) ## self._outgoing.put(None)
## self._trace("exit procedure triggered, pid %d " % (os.getpid(),)) ## self._trace("exit procedure triggered, pid %d " % (os.getpid(),))
@ -279,7 +279,7 @@ class Gateway(object):
def join(self, joinexec=True): def join(self, joinexec=True):
current = threading.currentThread() current = threading.currentThread()
for x in self.pool.getstarted(): for x in self._pool.getstarted():
if x != current: if x != current:
self._trace("joining %s" % x) self._trace("joining %s" % x)
x.join() x.join()

View File

@ -62,32 +62,32 @@ def _setupmessages():
class CHANNEL_OPEN(Message): class CHANNEL_OPEN(Message):
def received(self, gateway): def received(self, gateway):
channel = gateway.channelfactory.new(self.channelid) channel = gateway._channelfactory.new(self.channelid)
gateway._local_schedulexec(channel=channel, sourcetask=self.data) gateway._local_schedulexec(channel=channel, sourcetask=self.data)
class CHANNEL_NEW(Message): class CHANNEL_NEW(Message):
def received(self, gateway): def received(self, gateway):
""" receive a remotely created new (sub)channel. """ """ receive a remotely created new (sub)channel. """
newid = self.data newid = self.data
newchannel = gateway.channelfactory.new(newid) newchannel = gateway._channelfactory.new(newid)
gateway.channelfactory._local_receive(self.channelid, newchannel) gateway._channelfactory._local_receive(self.channelid, newchannel)
class CHANNEL_DATA(Message): class CHANNEL_DATA(Message):
def received(self, gateway): def received(self, gateway):
gateway.channelfactory._local_receive(self.channelid, self.data) gateway._channelfactory._local_receive(self.channelid, self.data)
class CHANNEL_CLOSE(Message): class CHANNEL_CLOSE(Message):
def received(self, gateway): def received(self, gateway):
gateway.channelfactory._local_close(self.channelid) gateway._channelfactory._local_close(self.channelid)
class CHANNEL_CLOSE_ERROR(Message): class CHANNEL_CLOSE_ERROR(Message):
def received(self, gateway): def received(self, gateway):
remote_error = gateway.channelfactory.RemoteError(self.data) remote_error = gateway._channelfactory.RemoteError(self.data)
gateway.channelfactory._local_close(self.channelid, remote_error) gateway._channelfactory._local_close(self.channelid, remote_error)
class CHANNEL_LAST_MESSAGE(Message): class CHANNEL_LAST_MESSAGE(Message):
def received(self, gateway): def received(self, gateway):
gateway.channelfactory._local_last_message(self.channelid) gateway._channelfactory._local_last_message(self.channelid)
classes = [x for x in locals().values() if hasattr(x, '__bases__')] classes = [x for x in locals().values() if hasattr(x, '__bases__')]
classes.sort(lambda x,y : cmp(x.__name__, y.__name__)) classes.sort(lambda x,y : cmp(x.__name__, y.__name__))

View File

@ -29,10 +29,10 @@ from py.__.execnet import inputoutput, gateway
class InstallableGateway(gateway.Gateway): class InstallableGateway(gateway.Gateway):
""" initialize gateways on both sides of a inputoutput object. """ """ initialize gateways on both sides of a inputoutput object. """
def __init__(self, io): def __init__(self, io):
self.remote_bootstrap_gateway(io) self._remote_bootstrap_gateway(io)
super(InstallableGateway, self).__init__(io=io, startcount=1) super(InstallableGateway, self).__init__(io=io, startcount=1)
def remote_bootstrap_gateway(self, io, extra=''): def _remote_bootstrap_gateway(self, io, extra=''):
""" return Gateway with a asynchronously remotely """ return Gateway with a asynchronously remotely
initialized counterpart Gateway (which may or may not succeed). initialized counterpart Gateway (which may or may not succeed).
Note that the other sides gateways starts enumerating Note that the other sides gateways starts enumerating
@ -84,7 +84,7 @@ class PopenGateway(PopenCmdGateway):
cmd = '%s -u -c "exec input()"' % python cmd = '%s -u -c "exec input()"' % python
super(PopenGateway, self).__init__(cmd) super(PopenGateway, self).__init__(cmd)
def remote_bootstrap_gateway(self, io, extra=''): def _remote_bootstrap_gateway(self, io, extra=''):
# XXX the following hack helps us to import the same version # XXX the following hack helps us to import the same version
# of the py lib and other dependcies, but only works for # of the py lib and other dependcies, but only works for
# PopenGateways because we can assume to have access to # PopenGateways because we can assume to have access to
@ -126,7 +126,7 @@ class PopenGateway(PopenCmdGateway):
""")), """)),
"" ""
]) ])
super(PopenGateway, self).remote_bootstrap_gateway(io, s) super(PopenGateway, self)._remote_bootstrap_gateway(io, s)
class SocketGateway(InstallableGateway): class SocketGateway(InstallableGateway):
def __init__(self, host, port): def __init__(self, host, port):

View File

@ -62,13 +62,13 @@ class PopenGatewayTestSetup:
def setup_class(cls): def setup_class(cls):
cls.gw = py.execnet.PopenGateway() cls.gw = py.execnet.PopenGateway()
def teardown_class(cls): #def teardown_class(cls):
cls.gw.exit() # cls.gw.exit()
class BasicRemoteExecution: class BasicRemoteExecution:
def test_correct_setup(self): def test_correct_setup(self):
for x in 'sender', 'receiver': for x in 'sender', 'receiver':
assert self.gw.pool.getstarted(x) assert self.gw._pool.getstarted(x)
def test_repr_doesnt_crash(self): def test_repr_doesnt_crash(self):
assert isinstance(repr(self), str) assert isinstance(repr(self), str)
@ -127,13 +127,13 @@ class BasicRemoteExecution:
py.test.raises(channel.RemoteError, channel.receive) py.test.raises(channel.RemoteError, channel.receive)
def test_channel__local_close(self): def test_channel__local_close(self):
channel = self.gw.channelfactory.new() channel = self.gw._channelfactory.new()
self.gw.channelfactory._local_close(channel.id) self.gw._channelfactory._local_close(channel.id)
channel.waitclose(0.1) channel.waitclose(0.1)
def test_channel__local_close_error(self): def test_channel__local_close_error(self):
channel = self.gw.channelfactory.new() channel = self.gw._channelfactory.new()
self.gw.channelfactory._local_close(channel.id, self.gw._channelfactory._local_close(channel.id,
channel.RemoteError("error")) channel.RemoteError("error"))
py.test.raises(channel.RemoteError, channel.waitclose, 0.01) py.test.raises(channel.RemoteError, channel.waitclose, 0.01)
@ -177,10 +177,10 @@ class BasicRemoteExecution:
# check that the both sides previous channels are really gone # check that the both sides previous channels are really gone
channel.waitclose(0.3) channel.waitclose(0.3)
assert channel.id not in self.gw.channelfactory._channels assert channel.id not in self.gw._channelfactory._channels
#assert c.id not in self.gw.channelfactory #assert c.id not in self.gw._channelfactory
newchan = self.gw.remote_exec(''' newchan = self.gw.remote_exec('''
assert %d not in channel.gateway.channelfactory._channels assert %d not in channel.gateway._channelfactory._channels
''' % (channel.id)) ''' % (channel.id))
newchan.waitclose(0.3) newchan.waitclose(0.3)
@ -278,7 +278,7 @@ class BasicRemoteExecution:
def test_remote_redirect_stdout(self): def test_remote_redirect_stdout(self):
out = py.std.StringIO.StringIO() out = py.std.StringIO.StringIO()
handle = self.gw.remote_redirect(stdout=out) handle = self.gw._remote_redirect(stdout=out)
c = self.gw.remote_exec("print 42") c = self.gw.remote_exec("print 42")
c.waitclose(1.0) c.waitclose(1.0)
handle.close() handle.close()