""" base execnet gateway code, a quick overview. the code of this module is sent to the "other side" as a means of bootstrapping a Gateway object capable of receiving and executing code, and routing data through channels. Gateways operate on InputOutput objects offering a write and a read(n) method. Once bootstrapped a higher level protocol based on Messages is used. Messages are serialized to and from InputOutput objects. The details of this protocol are locally defined in this module. There is no need for standardizing or versioning the protocol. After bootstrapping the BaseGateway opens a receiver thread which accepts encoded messages and triggers actions to interpret them. Sending of channel data items happens directly through write operations to InputOutput objects so there is no separate thread. Code execution messages are put into an execqueue from which they will be taken for execution. gateway.serve() will take and execute such items, one by one. This means that by incoming default execution is single-threaded. The receiver thread terminates if the remote side sends a gateway termination message or if the IO-connection drops. It puts an end symbol into the execqueue so that serve() can cleanly finish as well. (C) 2004-2009 Holger Krekel, Armin Rigo and others """ import sys, os, weakref import threading, traceback, socket, struct try: import queue except ImportError: import Queue as queue if sys.version_info > (3, 0): exec("""def do_exec(co, loc): exec(co, loc)""") unicode = str else: exec("""def do_exec(co, loc): exec co in loc""") bytes = str def str(*args): raise EnvironmentError( "use unicode or bytes, not cross-python ambigous 'str'") default_encoding = "UTF-8" sysex = (KeyboardInterrupt, SystemExit) debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid() , 'w') # ___________________________________________________________________________ # # input output classes # ___________________________________________________________________________ class SocketIO: server_stmt = "io = SocketIO(clientsock)" error = (socket.error, EOFError) def __init__(self, sock): self.sock = sock try: sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) sock.setsockopt(socket.SOL_IP, socket.IP_TOS, 0x10)# IPTOS_LOWDELAY except socket.error: e = sys.exc_info()[1] sys.stderr.write("WARNING: cannot set socketoption") self.readable = self.writeable = True def read(self, numbytes): "Read exactly 'bytes' bytes from the socket." buf = bytes() while len(buf) < numbytes: t = self.sock.recv(numbytes - len(buf)) if not t: raise EOFError buf += t return buf def write(self, data): assert isinstance(data, bytes) self.sock.sendall(data) def close_read(self): if self.readable: try: self.sock.shutdown(0) except socket.error: pass self.readable = None def close_write(self): if self.writeable: try: self.sock.shutdown(1) except socket.error: pass self.writeable = None class Popen2IO: server_stmt = """ import os, sys, tempfile io = Popen2IO(sys.stdout, sys.stdin) sys.stdout = tempfile.TemporaryFile('w') sys.stdin = tempfile.TemporaryFile('r') """ error = (IOError, OSError, EOFError) def __init__(self, outfile, infile): # we need raw byte streams self.outfile, self.infile = outfile, infile if sys.platform == "win32": import msvcrt msvcrt.setmode(infile.fileno(), os.O_BINARY) msvcrt.setmode(outfile.fileno(), os.O_BINARY) self.readable = self.writeable = True def read(self, numbytes): """Read exactly 'numbytes' bytes from the pipe. """ try: data = self.infile.buffer.read(numbytes) except AttributeError: data = self.infile.read(numbytes) if len(data) < numbytes: raise EOFError return data def write(self, data): """write out all data bytes. """ assert isinstance(data, bytes) try: self.outfile.buffer.write(data) except AttributeError: self.outfile.write(data) self.outfile.flush() def close_read(self): if self.readable: self.infile.close() self.readable = None def close_write(self): try: self.outfile.close() except EnvironmentError: pass self.writeable = None # ___________________________________________________________________________ # # Messages # ___________________________________________________________________________ # the header format HDR_FORMAT = "!hhii" HDR_SIZE = struct.calcsize(HDR_FORMAT) is3k = sys.version_info >= (3,0) class Message: """ encapsulates Messages and their wire protocol. """ _types = {} def __init__(self, channelid=0, data=''): self.channelid = channelid self.data = data def writeto(self, io): # XXX marshal.dumps doesn't work for exchanging data across Python # version :-((( XXX check this statement wrt python2.4 through 3.1 data = self.data if isinstance(data, bytes): dataformat = 1 + int(is3k) else: if isinstance(data, unicode): dataformat = 3 else: data = repr(self.data) # argh dataformat = 4 data = data.encode(default_encoding) header = struct.pack(HDR_FORMAT, self.msgtype, dataformat, self.channelid, len(data)) io.write(header + data) def readfrom(cls, io): header = io.read(HDR_SIZE) (msgtype, dataformat, senderid, stringlen) = struct.unpack(HDR_FORMAT, header) data = io.read(stringlen) if dataformat == 1: if is3k: # remote was python2-str, we are 3k-text data = data.decode(default_encoding) elif dataformat == 2: # remote was python3-bytes pass else: data = data.decode(default_encoding) if dataformat == 3: pass elif dataformat == 4: data = eval(data, {}) # reversed argh else: raise ValueError("bad data format") return cls._types[msgtype](senderid, data) readfrom = classmethod(readfrom) def __repr__(self): r = repr(self.data) if len(r) > 50: return "" %(self.__class__.__name__, self.channelid, len(r)) else: return "" %(self.__class__.__name__, self.channelid, self.data) def _setupmessages(): class CHANNEL_OPEN(Message): def received(self, gateway): channel = gateway._channelfactory.new(self.channelid) gateway._local_schedulexec(channel=channel, sourcetask=self.data) class CHANNEL_NEW(Message): def received(self, gateway): """ receive a remotely created new (sub)channel. """ newid = self.data newchannel = gateway._channelfactory.new(newid) gateway._channelfactory._local_receive(self.channelid, newchannel) class CHANNEL_DATA(Message): def received(self, gateway): gateway._channelfactory._local_receive(self.channelid, self.data) class CHANNEL_CLOSE(Message): def received(self, gateway): gateway._channelfactory._local_close(self.channelid) class CHANNEL_CLOSE_ERROR(Message): def received(self, gateway): remote_error = gateway._channelfactory.RemoteError(self.data) gateway._channelfactory._local_close(self.channelid, remote_error) class CHANNEL_LAST_MESSAGE(Message): def received(self, gateway): gateway._channelfactory._local_close(self.channelid, sendonly=True) classes = [CHANNEL_OPEN, CHANNEL_NEW, CHANNEL_DATA, CHANNEL_CLOSE, CHANNEL_CLOSE_ERROR, CHANNEL_LAST_MESSAGE] for i, cls in enumerate(classes): Message._types[i] = cls cls.msgtype = i setattr(Message, cls.__name__, cls) _setupmessages() def geterrortext(excinfo): try: l = traceback.format_exception(*excinfo) errortext = "".join(l) except sysex: raise except: errortext = '%s: %s' % (excinfo[0].__name__, excinfo[1]) return errortext class RemoteError(EOFError): """ Contains an Exceptions from the other side. """ def __init__(self, formatted): self.formatted = formatted EOFError.__init__(self) def __str__(self): return self.formatted def __repr__(self): return "%s: %s" %(self.__class__.__name__, self.formatted) def warn(self): # XXX do this better sys.stderr.write("Warning: unhandled %r\n" % (self,)) NO_ENDMARKER_WANTED = object() class Channel(object): """Communication channel between two possibly remote threads of code. """ RemoteError = RemoteError def __init__(self, gateway, id): assert isinstance(id, int) self.gateway = gateway self.id = id self._items = queue.Queue() self._closed = False self._receiveclosed = threading.Event() self._remoteerrors = [] def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED): # we first execute the callback on all already received # items. We need to hold the receivelock to prevent # race conditions with newly arriving items. # after having cleared the queue we register # the callback only if the channel is not closed already. _callbacks = self.gateway._channelfactory._callbacks _receivelock = self.gateway._receivelock _receivelock.acquire() try: if self._items is None: raise IOError("%r has callback already registered" %(self,)) items = self._items self._items = None while 1: try: olditem = items.get(block=False) except queue.Empty: if not (self._closed or self._receiveclosed.isSet()): _callbacks[self.id] = (callback, endmarker) break else: if olditem is ENDMARKER: items.put(olditem) # for other receivers if endmarker is not NO_ENDMARKER_WANTED: callback(endmarker) break else: callback(olditem) finally: _receivelock.release() def __repr__(self): flag = self.isclosed() and "closed" or "open" return "" % (self.id, flag) def __del__(self): if self.gateway is None: # can be None in tests return self.gateway._trace("Channel(%d).__del__" % self.id) # no multithreading issues here, because we have the last ref to 'self' if self._closed: # state transition "closed" --> "deleted" for error in self._remoteerrors: error.warn() elif self._receiveclosed.isSet(): # state transition "sendonly" --> "deleted" # the remote channel is already in "deleted" state, nothing to do pass else: # state transition "opened" --> "deleted" if self._items is None: # has_callback Msg = Message.CHANNEL_LAST_MESSAGE else: Msg = Message.CHANNEL_CLOSE self.gateway._send(Msg(self.id)) def _getremoteerror(self): try: return self._remoteerrors.pop(0) except IndexError: return None # # public API for channel objects # def isclosed(self): """ return True if the channel is closed. A closed channel may still hold items. """ return self._closed def makefile(self, mode='w', proxyclose=False): """ return a file-like object. mode: 'w' for writes, 'r' for reads proxyclose: if true file.close() will trigger a channel.close() call. """ if mode == "w": return ChannelFileWrite(channel=self, proxyclose=proxyclose) elif mode == "r": return ChannelFileRead(channel=self, proxyclose=proxyclose) raise ValueError("mode %r not availabe" %(mode,)) def close(self, error=None): """ close down this channel on both sides. """ if not self._closed: # state transition "opened/sendonly" --> "closed" # threads warning: the channel might be closed under our feet, # but it's never damaging to send too many CHANNEL_CLOSE messages put = self.gateway._send if error is not None: put(Message.CHANNEL_CLOSE_ERROR(self.id, error)) else: put(Message.CHANNEL_CLOSE(self.id)) if isinstance(error, RemoteError): self._remoteerrors.append(error) self._closed = True # --> "closed" self._receiveclosed.set() queue = self._items if queue is not None: queue.put(ENDMARKER) self.gateway._channelfactory._no_longer_opened(self.id) def waitclose(self, timeout=None): """ wait until this channel is closed (or the remote side otherwise signalled that no more data was being sent). The channel may still hold receiveable items, but not receive more. waitclose() reraises exceptions from executing code on the other side as channel.RemoteErrors containing a a textual representation of the remote traceback. """ self._receiveclosed.wait(timeout=timeout) # wait for non-"opened" state if not self._receiveclosed.isSet(): raise IOError("Timeout") error = self._getremoteerror() if error: raise error def send(self, item): """sends the given item to the other side of the channel, possibly blocking if the sender queue is full. Note that an item needs to be marshallable. """ if self.isclosed(): raise IOError("cannot send to %r" %(self,)) if isinstance(item, Channel): data = Message.CHANNEL_NEW(self.id, item.id) else: data = Message.CHANNEL_DATA(self.id, item) self.gateway._send(data) def receive(self): """receives an item that was sent from the other side, possibly blocking if there is none. Note that exceptions from the other side will be reraised as channel.RemoteError exceptions containing a textual representation of the remote traceback. """ queue = self._items if queue is None: raise IOError("calling receive() on channel with receiver callback") x = queue.get() if x is ENDMARKER: queue.put(x) # for other receivers raise self._getremoteerror() or EOFError() else: return x def __iter__(self): return self def next(self): try: return self.receive() except EOFError: raise StopIteration __next__ = next ENDMARKER = object() class ChannelFactory(object): RemoteError = RemoteError def __init__(self, gateway, startcount=1): self._channels = weakref.WeakValueDictionary() self._callbacks = {} self._writelock = threading.Lock() self.gateway = gateway self.count = startcount self.finished = False def new(self, id=None): """ create a new Channel with 'id' (or create new id if None). """ self._writelock.acquire() try: if self.finished: raise IOError("connexion already closed: %s" % (self.gateway,)) if id is None: id = self.count self.count += 2 channel = Channel(self.gateway, id) self._channels[id] = channel return channel finally: self._writelock.release() def channels(self): return list(self._channels.values()) # # internal methods, called from the receiver thread # def _no_longer_opened(self, id): try: del self._channels[id] except KeyError: pass try: callback, endmarker = self._callbacks.pop(id) except KeyError: pass else: if endmarker is not NO_ENDMARKER_WANTED: callback(endmarker) def _local_close(self, id, remoteerror=None, sendonly=False): channel = self._channels.get(id) if channel is None: # channel already in "deleted" state if remoteerror: remoteerror.warn() else: # state transition to "closed" state if remoteerror: channel._remoteerrors.append(remoteerror) if not sendonly: # otherwise #--> "sendonly" channel._closed = True # --> "closed" channel._receiveclosed.set() queue = channel._items if queue is not None: queue.put(ENDMARKER) self._no_longer_opened(id) def _local_receive(self, id, data): # executes in receiver thread try: callback, endmarker = self._callbacks[id] except KeyError: channel = self._channels.get(id) queue = channel and channel._items if queue is None: pass # drop data else: queue.put(data) else: callback(data) # even if channel may be already closed def _finished_receiving(self): self._writelock.acquire() try: self.finished = True finally: self._writelock.release() for id in list(self._channels): self._local_close(id, sendonly=True) for id in list(self._callbacks): self._no_longer_opened(id) class ChannelFile(object): def __init__(self, channel, proxyclose=True): self.channel = channel self._proxyclose = proxyclose def close(self): if self._proxyclose: self.channel.close() def __repr__(self): state = self.channel.isclosed() and 'closed' or 'open' return '' %(self.channel.id, state) class ChannelFileWrite(ChannelFile): def write(self, out): self.channel.send(out) def flush(self): pass class ChannelFileRead(ChannelFile): def __init__(self, channel, proxyclose=True): super(ChannelFileRead, self).__init__(channel, proxyclose) self._buffer = "" def read(self, n): while len(self._buffer) < n: try: self._buffer += self.channel.receive() except EOFError: self.close() break ret = self._buffer[:n] self._buffer = self._buffer[n:] return ret def readline(self): i = self._buffer.find("\n") if i != -1: return self.read(i+1) line = self.read(len(self._buffer)+1) while line and line[-1] != "\n": c = self.read(1) if not c: break line += c return line class BaseGateway(object): exc_info = sys.exc_info class _StopExecLoop(Exception): pass def __init__(self, io, _startcount=2): """ initialize core gateway, using the given inputoutput object. """ self._io = io self._channelfactory = ChannelFactory(self, _startcount) self._receivelock = threading.RLock() def _initreceive(self): self._receiverthread = threading.Thread(name="receiver", target=self._thread_receiver) self._receiverthread.setDaemon(1) self._receiverthread.start() def _trace(self, msg): if debug: try: debug.write(unicode(msg) + "\n") debug.flush() except sysex: raise except: sys.stderr.write("exception during tracing\n") def _thread_receiver(self): """ thread to read and handle Messages half-sync-half-async. """ self._trace("starting to receive") try: while 1: try: msg = Message.readfrom(self._io) self._trace("received <- %r" % msg) _receivelock = self._receivelock _receivelock.acquire() try: msg.received(self) finally: _receivelock.release() except sysex: break except EOFError: break except: self._trace(geterrortext(self.exc_info())) break finally: # XXX we need to signal fatal error states to # channels/callbacks, particularly ones # where the other side just died. self._stopexec() try: self._stopsend() except IOError: self._trace('IOError on _stopsend()') self._channelfactory._finished_receiving() if threading: # might be None during shutdown/finalization self._trace('leaving %r' % threading.currentThread()) def _send(self, msg): if msg is None: self._io.close_write() else: try: msg.writeto(self._io) except: excinfo = self.exc_info() self._trace(geterrortext(excinfo)) else: self._trace('sent -> %r' % msg) def _stopsend(self): self._send(None) def _stopexec(self): pass def _local_schedulexec(self, channel, sourcetask): channel.close("execution disallowed") # _____________________________________________________________________ # # High Level Interface # _____________________________________________________________________ # def newchannel(self): """ return new channel object. """ return self._channelfactory.new() def join(self, joinexec=True): """ Wait for all IO (and by default all execution activity) to stop. the joinexec parameter is obsolete. """ current = threading.currentThread() if self._receiverthread.isAlive(): self._trace("joining receiver thread") self._receiverthread.join() class SlaveGateway(BaseGateway): def _stopexec(self): self._execqueue.put(None) def _local_schedulexec(self, channel, sourcetask): self._execqueue.put((channel, sourcetask)) def serve(self, joining=True): self._execqueue = queue.Queue() self._initreceive() try: while 1: item = self._execqueue.get() if item is None: self._stopsend() break try: self.executetask(item) except self._StopExecLoop: break finally: self._trace("serve") if joining: self.join() def executetask(self, item): """ execute channel/source items. """ channel, source = item try: loc = { 'channel' : channel, '__name__': '__channelexec__'} #open("task.py", 'w').write(source) self._trace("execution starts: %s" % repr(source)[:50]) try: co = compile(source+'\n', '', 'exec') do_exec(co, loc) finally: self._trace("execution finished") except sysex: pass except self._StopExecLoop: channel.close() raise except: excinfo = self.exc_info() self._trace("got exception %s" % excinfo[1]) errortext = geterrortext(excinfo) channel.close(errortext) else: channel.close()