diff --git a/py/execnet/channel.py b/py/execnet/channel.py index cf25db967..04e4b1224 100644 --- a/py/execnet/channel.py +++ b/py/execnet/channel.py @@ -85,7 +85,7 @@ class Channel(object): Msg = Message.CHANNEL_LAST_MESSAGE else: Msg = Message.CHANNEL_CLOSE - self.gateway._outgoing.put(Msg(self.id)) + self.gateway._send(Msg(self.id)) def _getremoteerror(self): try: @@ -117,7 +117,7 @@ class Channel(object): # state transition "opened/sendonly" --> "closed" # threads warning: the channel might be closed under our feet, # but it's never damaging to send too many CHANNEL_CLOSE messages - put = self.gateway._outgoing.put + put = self.gateway._send if error is not None: put(Message.CHANNEL_CLOSE_ERROR(self.id, str(error))) else: @@ -157,7 +157,7 @@ class Channel(object): data = Message.CHANNEL_NEW(self.id, item.id) else: data = Message.CHANNEL_DATA(self.id, item) - self.gateway._outgoing.put(data) + self.gateway._send(data) def receive(self): """receives an item that was sent from the other side, diff --git a/py/execnet/gateway.py b/py/execnet/gateway.py index d53f8b93c..a85b3df3d 100644 --- a/py/execnet/gateway.py +++ b/py/execnet/gateway.py @@ -111,10 +111,13 @@ class Gateway(object): self._traceex(exc_info()) break finally: - self._outgoing.put(None) + self._send(None) self._channelfactory._finished_receiving() self._trace('leaving %r' % threading.currentThread()) + def _send(self, msg): + self._outgoing.put(msg) + def _thread_sender(self): """ thread to send Messages over the wire. """ try: @@ -219,8 +222,8 @@ class Gateway(object): channel = self.newchannel() outid = self._newredirectchannelid(stdout) errid = self._newredirectchannelid(stderr) - self._outgoing.put(Message.CHANNEL_OPEN(channel.id, - (source, outid, errid))) + self._send(Message.CHANNEL_OPEN( + channel.id, (source, outid, errid))) return channel def _remote_redirect(self, stdout=None, stderr=None): @@ -260,7 +263,7 @@ class Gateway(object): except KeyError: pass else: - self._outgoing.put(None) + self._send(None) def join(self, joinexec=True): """ Wait for all IO (and by default all execution activity)