[svn r39982] introduce gateway._send and have all places route

their sending of Messages (or None's) through that method.

--HG--
branch : trunk
This commit is contained in:
hpk 2007-03-06 13:51:18 +01:00
parent 9bbef25ec1
commit 8bf738614e
2 changed files with 10 additions and 7 deletions

View File

@ -85,7 +85,7 @@ class Channel(object):
Msg = Message.CHANNEL_LAST_MESSAGE Msg = Message.CHANNEL_LAST_MESSAGE
else: else:
Msg = Message.CHANNEL_CLOSE Msg = Message.CHANNEL_CLOSE
self.gateway._outgoing.put(Msg(self.id)) self.gateway._send(Msg(self.id))
def _getremoteerror(self): def _getremoteerror(self):
try: try:
@ -117,7 +117,7 @@ class Channel(object):
# state transition "opened/sendonly" --> "closed" # state transition "opened/sendonly" --> "closed"
# threads warning: the channel might be closed under our feet, # threads warning: the channel might be closed under our feet,
# but it's never damaging to send too many CHANNEL_CLOSE messages # but it's never damaging to send too many CHANNEL_CLOSE messages
put = self.gateway._outgoing.put put = self.gateway._send
if error is not None: if error is not None:
put(Message.CHANNEL_CLOSE_ERROR(self.id, str(error))) put(Message.CHANNEL_CLOSE_ERROR(self.id, str(error)))
else: else:
@ -157,7 +157,7 @@ class Channel(object):
data = Message.CHANNEL_NEW(self.id, item.id) data = Message.CHANNEL_NEW(self.id, item.id)
else: else:
data = Message.CHANNEL_DATA(self.id, item) data = Message.CHANNEL_DATA(self.id, item)
self.gateway._outgoing.put(data) self.gateway._send(data)
def receive(self): def receive(self):
"""receives an item that was sent from the other side, """receives an item that was sent from the other side,

View File

@ -111,10 +111,13 @@ class Gateway(object):
self._traceex(exc_info()) self._traceex(exc_info())
break break
finally: finally:
self._outgoing.put(None) self._send(None)
self._channelfactory._finished_receiving() self._channelfactory._finished_receiving()
self._trace('leaving %r' % threading.currentThread()) self._trace('leaving %r' % threading.currentThread())
def _send(self, msg):
self._outgoing.put(msg)
def _thread_sender(self): def _thread_sender(self):
""" thread to send Messages over the wire. """ """ thread to send Messages over the wire. """
try: try:
@ -219,8 +222,8 @@ class Gateway(object):
channel = self.newchannel() channel = self.newchannel()
outid = self._newredirectchannelid(stdout) outid = self._newredirectchannelid(stdout)
errid = self._newredirectchannelid(stderr) errid = self._newredirectchannelid(stderr)
self._outgoing.put(Message.CHANNEL_OPEN(channel.id, self._send(Message.CHANNEL_OPEN(
(source, outid, errid))) channel.id, (source, outid, errid)))
return channel return channel
def _remote_redirect(self, stdout=None, stderr=None): def _remote_redirect(self, stdout=None, stderr=None):
@ -260,7 +263,7 @@ class Gateway(object):
except KeyError: except KeyError:
pass pass
else: else:
self._outgoing.put(None) self._send(None)
def join(self, joinexec=True): def join(self, joinexec=True):
""" Wait for all IO (and by default all execution activity) """ Wait for all IO (and by default all execution activity)