From 5d2504df0a29a5d15dd7f9469668f007d642ad6e Mon Sep 17 00:00:00 2001 From: holger krekel Date: Wed, 2 Sep 2009 15:45:59 +0200 Subject: [PATCH] * simplify lock acquiration for received messages, review code * try to fix seldomly occuring race condition with setcallback/receive and closing of channel --HG-- branch : trunk --- py/execnet/gateway_base.py | 85 +++++++++++++++++--------------------- 1 file changed, 38 insertions(+), 47 deletions(-) diff --git a/py/execnet/gateway_base.py b/py/execnet/gateway_base.py index 0a1d1a14e..20d71e0db 100644 --- a/py/execnet/gateway_base.py +++ b/py/execnet/gateway_base.py @@ -223,7 +223,7 @@ def _setupmessages(): class CHANNEL_LAST_MESSAGE(Message): def received(self, gateway): - gateway._channelfactory._local_last_message(self.channelid) + gateway._channelfactory._local_close(self.channelid, sendonly=True) classes = [CHANNEL_OPEN, CHANNEL_NEW, CHANNEL_DATA, CHANNEL_CLOSE, CHANNEL_CLOSE_ERROR, CHANNEL_LAST_MESSAGE] @@ -269,31 +269,36 @@ class Channel(object): self._remoteerrors = [] def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED): - items = self._items - lock = self.gateway._channelfactory._receivelock - lock.acquire() + # we first execute the callback on all already received + # items. We need to hold the receivelock to prevent + # race conditions with newly arriving items. + # after having cleared the queue we register + # the callback only if the channel is not closed already. + _callbacks = self.gateway._channelfactory._callbacks + _receivelock = self.gateway._channelfactory._receivelock + _receivelock.acquire() try: - _callbacks = self.gateway._channelfactory._callbacks - dictvalue = (callback, endmarker) - if _callbacks.setdefault(self.id, dictvalue) != dictvalue: + if self._items is None: raise IOError("%r has callback already registered" %(self,)) + items = self._items self._items = None while 1: try: olditem = items.get(block=False) except queue.Empty: + if not (self._closed or self._receiveclosed.isSet()): + _callbacks[self.id] = (callback, endmarker) break else: if olditem is ENDMARKER: - items.put(olditem) + items.put(olditem) # for other receivers + if endmarker is not NO_ENDMARKER_WANTED: + callback(endmarker) break else: callback(olditem) - if self._closed or self._receiveclosed.isSet(): - # no need to keep a callback - self.gateway._channelfactory._close_callback(self.id) finally: - lock.release() + _receivelock.release() def __repr__(self): flag = self.isclosed() and "closed" or "open" @@ -462,9 +467,6 @@ class ChannelFactory(object): del self._channels[id] except KeyError: pass - self._close_callback(id) - - def _close_callback(self, id): try: callback, endmarker = self._callbacks.pop(id) except KeyError: @@ -473,7 +475,7 @@ class ChannelFactory(object): if endmarker is not NO_ENDMARKER_WANTED: callback(endmarker) - def _local_close(self, id, remoteerror=None): + def _local_close(self, id, remoteerror=None, sendonly=False): channel = self._channels.get(id) if channel is None: # channel already in "deleted" state @@ -483,20 +485,8 @@ class ChannelFactory(object): # state transition to "closed" state if remoteerror: channel._remoteerrors.append(remoteerror) - channel._closed = True # --> "closed" - channel._receiveclosed.set() - queue = channel._items - if queue is not None: - queue.put(ENDMARKER) - self._no_longer_opened(id) - - def _local_last_message(self, id): - channel = self._channels.get(id) - if channel is None: - # channel already in "deleted" state - pass - else: - # state transition: if "opened", change to "sendonly" + if not sendonly: # otherwise #--> "sendonly" + channel._closed = True # --> "closed" channel._receiveclosed.set() queue = channel._items if queue is not None: @@ -505,21 +495,17 @@ class ChannelFactory(object): def _local_receive(self, id, data): # executes in receiver thread - self._receivelock.acquire() try: - try: - callback, endmarker = self._callbacks[id] - except KeyError: - channel = self._channels.get(id) - queue = channel and channel._items - if queue is None: - pass # drop data - else: - queue.put(data) + callback, endmarker = self._callbacks[id] + except KeyError: + channel = self._channels.get(id) + queue = channel and channel._items + if queue is None: + pass # drop data else: - callback(data) # even if channel may be already closed - finally: - self._receivelock.release() + queue.put(data) + else: + callback(data) # even if channel may be already closed def _finished_receiving(self): self._writelock.acquire() @@ -528,9 +514,9 @@ class ChannelFactory(object): finally: self._writelock.release() for id in list(self._channels): - self._local_last_message(id) + self._local_close(id, sendonly=True) for id in list(self._callbacks): - self._close_callback(id) + self._no_longer_opened(id) class ChannelFile(object): def __init__(self, channel, proxyclose=True): @@ -648,7 +634,12 @@ class BaseGateway(object): try: msg = Message.readfrom(self._io) self._trace("received <- %r" % msg) - msg.received(self) + _receivelock = self._channelfactory._receivelock + _receivelock.acquire() + try: + msg.received(self) + finally: + _receivelock.release() except sysex: break except EOFError: @@ -736,7 +727,7 @@ class BaseGateway(object): finally: close() self._trace("execution finished:", repr(source)[:50]) - except (KeyboardInterrupt, SystemExit): + except sysex: pass except self._StopExecLoop: channel.close()