From 18a0dd8d091555a07645f0286cc8ba3250533974 Mon Sep 17 00:00:00 2001 From: hpk Date: Thu, 2 Apr 2009 15:43:26 +0200 Subject: [PATCH] [svn r63542] moving pygreen to greenlet contrib dir --HG-- branch : trunk --- contrib/pygreen/__init__.py | 0 contrib/pygreen/conftest.py | 8 - contrib/pygreen/greenexecnet.py | 217 ---------- contrib/pygreen/greensock2.py | 490 ---------------------- contrib/pygreen/msgstruct.py | 29 -- contrib/pygreen/pipe/__init__.py | 8 - contrib/pygreen/pipe/common.py | 46 -- contrib/pygreen/pipe/fd.py | 69 --- contrib/pygreen/pipe/gsocket.py | 113 ----- contrib/pygreen/pipe/mp.py | 29 -- contrib/pygreen/pipelayer.py | 306 -------------- contrib/pygreen/server/__init__.py | 0 contrib/pygreen/server/httpserver.py | 42 -- contrib/pygreen/test/__init__.py | 1 - contrib/pygreen/test/test_greenexecnet.py | 48 --- contrib/pygreen/test/test_greensock2.py | 221 ---------- contrib/pygreen/test/test_pipelayer.py | 223 ---------- 17 files changed, 1850 deletions(-) delete mode 100644 contrib/pygreen/__init__.py delete mode 100644 contrib/pygreen/conftest.py delete mode 100644 contrib/pygreen/greenexecnet.py delete mode 100644 contrib/pygreen/greensock2.py delete mode 100644 contrib/pygreen/msgstruct.py delete mode 100644 contrib/pygreen/pipe/__init__.py delete mode 100644 contrib/pygreen/pipe/common.py delete mode 100644 contrib/pygreen/pipe/fd.py delete mode 100644 contrib/pygreen/pipe/gsocket.py delete mode 100644 contrib/pygreen/pipe/mp.py delete mode 100644 contrib/pygreen/pipelayer.py delete mode 100644 contrib/pygreen/server/__init__.py delete mode 100644 contrib/pygreen/server/httpserver.py delete mode 100644 contrib/pygreen/test/__init__.py delete mode 100644 contrib/pygreen/test/test_greenexecnet.py delete mode 100644 contrib/pygreen/test/test_greensock2.py delete mode 100644 contrib/pygreen/test/test_pipelayer.py diff --git a/contrib/pygreen/__init__.py b/contrib/pygreen/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/contrib/pygreen/conftest.py b/contrib/pygreen/conftest.py deleted file mode 100644 index 8e89c9d06..000000000 --- a/contrib/pygreen/conftest.py +++ /dev/null @@ -1,8 +0,0 @@ -import py, os - -class Directory(py.test.collect.Directory): - def collect(self): - if os.name == 'nt': - py.test.skip("Cannot test green layer on windows") - else: - return super(Directory, self).collect() diff --git a/contrib/pygreen/greenexecnet.py b/contrib/pygreen/greenexecnet.py deleted file mode 100644 index 17c72fa47..000000000 --- a/contrib/pygreen/greenexecnet.py +++ /dev/null @@ -1,217 +0,0 @@ - -""" This is an implementation of an execnet protocol on top -of a transport layer provided by the greensock2 interface. - -It has the same semantics, but does not use threads at all -(which makes it suitable for specific enviroments, like pypy-c). - -There are some features lacking, most notable: -- callback support for channels -- socket gateway -- bootstrapping (there is assumption of pylib being available - on remote side, which is not always true) -""" - -import sys, os, py, inspect -from pygreen import greensock2 -from pygreen.msgstruct import message, decodemessage - -MSG_REMOTE_EXEC = 'r' -MSG_OBJECT = 'o' -MSG_ERROR = 'e' -MSG_CHAN_CLOSE = 'c' -MSG_FATAL = 'f' -MSG_CHANNEL = 'n' - -class Gateway(object): - - def __init__(self, input, output, is_remote=False): - self.input = input - self.output = output - self.nextchannum = int(is_remote) - self.receivers = {} - self.greenlet = greensock2.autogreenlet(self.serve_forever, is_remote) - - def remote_exec(self, remote_source): - remote_source = py.code.Source(remote_source) - chan = self.newchannel() - msg = message(MSG_REMOTE_EXEC, chan.n, str(remote_source)) - self.output.sendall(msg) - return chan - - def newchannel(self): - n = self.nextchannum - self.nextchannum += 2 - return self.make_channel(n) - - def make_channel(self, n): - giver, accepter = greensock2.meetingpoint() - assert n not in self.receivers - self.receivers[n] = giver - return Channel(self, n, accepter) - - def serve_forever(self, is_remote=False): - try: - buffer = "" - while 1: - msg, buffer = decodemessage(buffer) - if msg is None: - buffer += self.input.recv(16384) - else: - handler = HANDLERS[msg[0]] - handler(self, *msg[1:]) - except greensock2.greenlet.GreenletExit: - raise - except: - if is_remote: - msg = message(MSG_FATAL, format_error(*sys.exc_info())) - self.output.sendall(msg) - else: - raise - - def msg_remote_exec(self, n, source): - def do_execute(channel): - try: - d = {'channel': channel} - exec source in d - except: - channel.report_error(*sys.exc_info()) - else: - channel.close() - greensock2.autogreenlet(do_execute, self.make_channel(n)) - - def msg_object(self, n, objrepr): - obj = eval(objrepr) - if n in self.receivers: - self.receivers[n].give_queued(obj) - - def msg_error(self, n, s): - if n in self.receivers: - self.receivers[n].give_queued(RemoteError(s)) - self.receivers[n].close() - del self.receivers[n] - - def msg_chan_close(self, n): - if n in self.receivers: - self.receivers[n].close() - del self.receivers[n] - - def msg_channel(self, n, m): - if n in self.receivers: - self.receivers[n].give_queued(self.make_channel(m)) - - def msg_fatal(self, s): - raise RemoteError(s) - -HANDLERS = { - MSG_REMOTE_EXEC: Gateway.msg_remote_exec, - MSG_OBJECT: Gateway.msg_object, - MSG_ERROR: Gateway.msg_error, - MSG_CHAN_CLOSE: Gateway.msg_chan_close, - MSG_CHANNEL: Gateway.msg_channel, - MSG_FATAL: Gateway.msg_fatal, - } - - -class Channel(object): - - def __init__(self, gw, n, accepter): - self.gw = gw - self.n = n - self.accepter = accepter - - def send(self, obj): - if isinstance(obj, Channel): - assert obj.gw is self.gw - msg = message(MSG_CHANNEL, self.n, obj.n) - else: - msg = message(MSG_OBJECT, self.n, repr(obj)) - self.gw.output.sendall(msg) - - def receive(self): - obj = self.accepter.accept() - if isinstance(obj, RemoteError): - raise obj - else: - return obj - - def close(self): - try: - self.gw.output.sendall(message(MSG_CHAN_CLOSE, self.n)) - except OSError: - pass - - def report_error(self, exc_type, exc_value, exc_traceback=None): - s = format_error(exc_type, exc_value, exc_traceback) - try: - self.gw.output.sendall(message(MSG_ERROR, self.n, s)) - except OSError: - pass - - -class RemoteError(Exception): - pass - -def format_error(exc_type, exc_value, exc_traceback=None): - import traceback, StringIO - s = StringIO.StringIO() - traceback.print_exception(exc_type, exc_value, exc_traceback, file=s) - return s.getvalue() - - -class PopenCmdGateway(Gateway): - action = "exec input()" - - def __init__(self, cmdline): - from pygreen.pipe.fd import FDInput, FDOutput - child_in, child_out = os.popen2(cmdline, 't', 0) - fdin = FDInput(child_out.fileno(), child_out.close) - fdout = FDOutput(child_in.fileno(), child_in.close) - fdout.sendall(self.get_bootstrap_code()) - super(PopenCmdGateway, self).__init__(input = fdin, output = fdout) - - def get_bootstrap_code(): - # XXX assumes that the py lib is installed on the remote side - src = [] - src.append('from pygreen import greenexecnet') - src.append('greenexecnet.PopenCmdGateway.run_server()') - src.append('') - return '%r\n' % ('\n'.join(src),) - get_bootstrap_code = staticmethod(get_bootstrap_code) - - def run_server(): - from pygreen.pipe.fd import FDInput, FDOutput - gw = Gateway(input = FDInput(os.dup(0)), - output = FDOutput(os.dup(1)), - is_remote = True) - # for now, ignore normal I/O - fd = os.open('/dev/null', os.O_RDWR) - os.dup2(fd, 0) - os.dup2(fd, 1) - os.close(fd) - greensock2._suspend_forever() - run_server = staticmethod(run_server) - -class PopenGateway(PopenCmdGateway): - def __init__(self, python=sys.executable): - cmdline = '"%s" -u -c "%s"' % (python, self.action) - super(PopenGateway, self).__init__(cmdline) - -class SshGateway(PopenCmdGateway): - def __init__(self, sshaddress, remotepython='python', identity=None): - self.sshaddress = sshaddress - remotecmd = '%s -u -c "%s"' % (remotepython, self.action) - cmdline = [sshaddress, remotecmd] - # XXX Unix style quoting - for i in range(len(cmdline)): - cmdline[i] = "'" + cmdline[i].replace("'", "'\\''") + "'" - cmd = 'ssh -C' - if identity is not None: - cmd += ' -i %s' % (identity,) - cmdline.insert(0, cmd) - super(SshGateway, self).__init__(' '.join(cmdline)) - - -##f = open('LOG', 'a') -##import os; print >> f, '[%d] READY' % (os.getpid(),) -##f.close() diff --git a/contrib/pygreen/greensock2.py b/contrib/pygreen/greensock2.py deleted file mode 100644 index 74b26c2c8..000000000 --- a/contrib/pygreen/greensock2.py +++ /dev/null @@ -1,490 +0,0 @@ - -""" This is a base implementation of thread-like network programming -on top of greenlets. From API available here it's quite unlikely -that you would like to use anything except wait(). Higher level interface -is available in pipe directory -""" - -import os, sys -try: - from stackless import greenlet -except ImportError: - import py - greenlet = py.magic.greenlet -from collections import deque -from select import select as _select -from time import time as _time -from heapq import heappush, heappop, heapify - -TRACE = False - -def meetingpoint(): - senders = deque() # list of senders, or [None] if Giver closed - receivers = deque() # list of receivers, or [None] if Receiver closed - return (MeetingPointGiver(senders, receivers), - MeetingPointAccepter(senders, receivers)) - -def producer(func, *args, **kwds): - iterable = func(*args, **kwds) - giver, accepter = meetingpoint() - def autoproducer(): - try: - giver.wait() - for obj in iterable: - giver.give(obj) - giver.wait() - finally: - giver.close() - autogreenlet(autoproducer) - return accepter - - -class MeetingPointBase(object): - - def __init__(self, senders, receivers): - self.senders = senders - self.receivers = receivers - self.g_active = g_active - - def close(self): - while self.senders: - if self.senders[0] is None: - break - packet = self.senders.popleft() - if packet.g_from is not None: - self.g_active.append(packet.g_from) - else: - self.senders.append(None) - while self.receivers: - if self.receivers[0] is None: - break - other = self.receivers.popleft() - self.g_active.append(other) - else: - self.receivers.append(None) - - __del__ = close - - def closed(self): - return self.receivers and self.receivers[0] is None - - -class MeetingPointGiver(MeetingPointBase): - - def give(self, obj): - if self.receivers: - if self.receivers[0] is None: - raise MeetingPointClosed - other = self.receivers.popleft() - g_active.append(g_getcurrent()) - packet = _Packet() - packet.payload = obj - other.switch(packet) - if not packet.accepted: - raise Interrupted("packet not accepted") - else: - packet = _Packet() - packet.g_from = g_getcurrent() - packet.payload = obj - try: - self.senders.append(packet) - g_dispatcher.switch() - if not packet.accepted: - raise Interrupted("packet not accepted") - except: - remove_by_id(self.senders, packet) - raise - - def give_queued(self, obj): - if self.receivers: - self.give(obj) - else: - packet = _Packet() - packet.g_from = None - packet.payload = obj - self.senders.append(packet) - - def ready(self): - return self.receivers and self.receivers[0] is not None - - def wait(self): - if self.receivers: - if self.receivers[0] is None: - raise MeetingPointClosed - else: - packet = _Packet() - packet.g_from = g_getcurrent() - packet.empty = True - self.senders.append(packet) - try: - g_dispatcher.switch() - if not packet.accepted: - raise Interrupted("no accepter found") - except: - remove_by_id(self.senders, packet) - raise - - def trigger(self): - if self.ready(): - self.give(None) - - -class MeetingPointAccepter(MeetingPointBase): - - def accept(self): - while self.senders: - if self.senders[0] is None: - raise MeetingPointClosed - packet = self.senders.popleft() - packet.accepted = True - if packet.g_from is not None: - g_active.append(packet.g_from) - if not packet.empty: - return packet.payload - g = g_getcurrent() - self.receivers.append(g) - try: - packet = g_dispatcher.switch() - except: - remove_by_id(self.receivers, g) - raise - if type(packet) is not _Packet: - remove_by_id(self.receivers, g) - raise Interrupted("no packet") - packet.accepted = True - return packet.payload - - def ready(self): - for packet in self.senders: - if packet is None: - return False - if not packet.empty: - return True - return False - - def wait_trigger(self, timeout=None, default=None): - if timeout is None: - return self.accept() - else: - timer = Timer(timeout) - try: - try: - return self.accept() - finally: - timer.stop() - except Interrupted: - if timer.finished: - return default - raise - - -class MeetingPointClosed(greenlet.GreenletExit): - pass - -class Interrupted(greenlet.GreenletExit): - pass - -class ConnexionClosed(greenlet.GreenletExit): - pass - -class _Packet(object): - empty = False - accepted = False - -def remove_by_id(d, obj): - lst = [x for x in d if x is not obj] - d.clear() - d.extend(lst) - -def wait_input(sock): - _register(g_iwtd, sock) - -def recv(sock, bufsize): - wait_input(sock) - buf = sock.recv(bufsize) - if not buf: - raise ConnexionClosed("inbound connexion closed") - return buf - -def recvall(sock, bufsize): - in_front = False - data = [] - while bufsize > 0: - _register(g_iwtd, sock, in_front=in_front) - buf = sock.recv(bufsize) - if not buf: - raise ConnexionClosed("inbound connexion closed") - data.append(buf) - bufsize -= len(buf) - in_front = True - return ''.join(data) - -def read(fd, bufsize): - assert fd >= 0 - wait_input(fd) - buf = os.read(fd, bufsize) - if not buf: - raise ConnexionClosed("inbound connexion closed") - return buf - -def readall(fd, bufsize): - assert fd >= 0 - in_front = False - data = [] - while bufsize > 0: - _register(g_iwtd, fd, in_front=in_front) - buf = os.read(fd, bufsize) - if not buf: - raise ConnexionClosed("inbound connexion closed") - data.append(buf) - bufsize -= len(buf) - in_front = True - return ''.join(data) - - -def wait_output(sock): - _register(g_owtd, sock) - -def sendall(sock, buffer): - in_front = False - while buffer: - _register(g_owtd, sock, in_front=in_front) - count = sock.send(buffer) - buffer = buffer[count:] - in_front = True - -def writeall(fd, buffer): - assert fd >= 0 - in_front = False - while buffer: - _register(g_owtd, fd, in_front=in_front) - count = os.write(fd, buffer) - if not count: - raise ConnexionClosed("outbound connexion closed") - buffer = buffer[count:] - in_front = True - - -def sleep(duration): - timer = Timer(duration) - try: - _suspend_forever() - finally: - ok = timer.finished - timer.stop() - if not ok: - raise Interrupted - -def _suspend_forever(): - g_dispatcher.switch() - -def oneof(*callables): - assert callables - for c in callables: - assert callable(c) - greenlets = [tracinggreenlet(c) for c in callables] - g_active.extend(greenlets) - res = g_dispatcher.switch() - for g in greenlets: - g.interrupt() - return res - -def allof(*callables): - for c in callables: - assert callable(c) - greenlets = [tracinggreenlet(lambda i=i, c=c: (i, c())) - for i, c in enumerate(callables)] - g_active.extend(greenlets) - result = [None] * len(callables) - for _ in callables: - num, res = g_dispatcher.switch() - result[num] = res - return tuple(result) - -class Timer(object): - started = False - finished = False - - def __init__(self, timeout): - self.g = g_getcurrent() - entry = (_time() + timeout, self) - if g_timers_mixed: - g_timers.append(entry) - else: - heappush(g_timers, entry) - - def stop(self): - global g_timers_mixed - if not self.finished: - for i, (activationtime, timer) in enumerate(g_timers): - if timer is self: - g_timers[i] = g_timers[-1] - g_timers.pop() - g_timers_mixed = True - break - self.finished = True - -# ____________________________________________________________ - -class tracinggreenlet(greenlet): - def __init__(self, function, *args, **kwds): - self.function = function - self.args = args - self.kwds = kwds - - def __repr__(self): -## args = ', '.join([repr(s) for s in self.args] + -## ['%s=%r' % keyvalue for keyvalue in self.kwds.items()]) -## return '' % (self.function.__name__, args) - return '<%s %s at %s>' % (self.__class__.__name__, - self.function.__name__, - hex(id(self))) - - def run(self): - self.trace("start") - try: - res = self.function(*self.args, **self.kwds) - except Exception, e: - self.trace("stop (%s%s)", e.__class__.__name__, - str(e) and (': '+str(e))) - raise - else: - self.trace("done") - return res - - def trace(self, msg, *args): - if TRACE: - print self, msg % args - - def interrupt(self): - self.throw(Interrupted) - -class autogreenlet(tracinggreenlet): - def __init__(self, *args, **kwargs): - super(autogreenlet, self).__init__(*args, **kwargs) - self.parent = g_dispatcher - g_active.append(self) - -g_active = deque() -g_iwtd = {} -g_owtd = {} -g_timers = [] -g_timers_mixed = False - -g_getcurrent = greenlet.getcurrent - -def _register(g_wtd, sock, in_front=False): - d = g_wtd.setdefault(sock, deque()) - g = g_getcurrent() - if in_front: - d.appendleft(g) - else: - d.append(g) - try: - if g_dispatcher.switch() is not g_wtd: - raise Interrupted - except: - remove_by_id(d, g) - raise - -##def _unregister_timer(): -## ... - - -def check_dead_greenlets(mapping): - to_remove = [i for i, v in mapping.items() if not v] - for k in to_remove: - del mapping[k] - -#def check_waiters(active): -# if active in g_waiters: -# for g in g_waiters[active]: -# g.switch() -# del g_waiters[active] - - -def dispatcher_mainloop(): - global g_timers_mixed - GreenletExit = greenlet.GreenletExit - while 1: - try: - while g_active: - #print 'active:', g_active[0] - g_active.popleft().switch() -# active.switch() -# if active.dead: -# check_waiters(active) -# del active - if g_timers: - if g_timers_mixed: - heapify(g_timers) - g_timers_mixed = False - activationtime, timer = g_timers[0] - delay = activationtime - _time() - if delay <= 0.0: - if timer.started: - heappop(g_timers) - #print 'timeout:', g - timer.finished = True - timer.g.switch() -# if timer.g.dead: -# check_waiters(timer.g) - continue - delay = 0.0 - timer.started = True - else: - check_dead_greenlets(g_iwtd) - check_dead_greenlets(g_owtd) - if not (g_iwtd or g_owtd): - # nothing to do, switch to the main greenlet - g_dispatcher.parent.switch() - continue - delay = None - - #print 'selecting...', g_iwtd.keys(), g_owtd.keys(), delay - iwtd, owtd, _ = _select(g_iwtd.keys(), g_owtd.keys(), [], delay) - #print 'done' - for s in owtd: - if s in g_owtd: - d = g_owtd[s] - #print 'owtd:', d[0] - # XXX: Check if d is non-empty - try: - g = d.popleft() - except IndexError: - g = None - if not d: - try: - del g_owtd[s] - except KeyError: - pass - if g: - g.switch(g_owtd) -# if g.dead: -# check_waiters(g) - for s in iwtd: - if s in g_iwtd: - d = g_iwtd[s] - #print 'iwtd:', d[0] - # XXX: Check if d is non-empty - try: - g = d.popleft() - except IndexError: - g = None - if not d: - try: - del g_iwtd[s] - except KeyError: - pass - if g: - g.switch(g_iwtd) -# if g.dead: -# check_waiters(g) - except GreenletExit: - raise - except: - import sys - g_dispatcher.parent.throw(*sys.exc_info()) - -g_dispatcher = greenlet(dispatcher_mainloop) -#g_waiters = {} diff --git a/contrib/pygreen/msgstruct.py b/contrib/pygreen/msgstruct.py deleted file mode 100644 index be6e73b22..000000000 --- a/contrib/pygreen/msgstruct.py +++ /dev/null @@ -1,29 +0,0 @@ -from struct import pack, unpack, calcsize - - -def message(tp, *values): - strtype = type('') - typecodes = [''] - for v in values: - if type(v) is strtype: - typecodes.append('%ds' % len(v)) - elif 0 <= v < 256: - typecodes.append('B') - else: - typecodes.append('l') - typecodes = ''.join(typecodes) - assert len(typecodes) < 256 - return pack(("!B%dsc" % len(typecodes)) + typecodes, - len(typecodes), typecodes, tp, *values) - -def decodemessage(data): - if data: - limit = ord(data[0]) + 1 - if len(data) >= limit: - typecodes = "!c" + data[1:limit] - end = limit + calcsize(typecodes) - if len(data) >= end: - return unpack(typecodes, data[limit:end]), data[end:] - #elif end > 1000000: - # raise OverflowError - return None, data diff --git a/contrib/pygreen/pipe/__init__.py b/contrib/pygreen/pipe/__init__.py deleted file mode 100644 index ac07348e1..000000000 --- a/contrib/pygreen/pipe/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ - -""" This is a higher level network interface based on top -of greensock2. Objects here are ready to use, specific examples -are listed in tests (test_pipelayer and test_greensock2). - -The limitation is that you're not supposed to use threads + blocking -I/O at all. -""" diff --git a/contrib/pygreen/pipe/common.py b/contrib/pygreen/pipe/common.py deleted file mode 100644 index 521c95752..000000000 --- a/contrib/pygreen/pipe/common.py +++ /dev/null @@ -1,46 +0,0 @@ -from pygreen import greensock2 - -VERBOSE = True - - -if VERBOSE: - def log(msg, *args): - print '*', msg % args -else: - def log(msg, *args): - pass - -class BufferedInput(object): - in_buf = '' - - def recv(self, bufsize): - self.wait_input() - buf = self.in_buf[:bufsize] - self.in_buf = self.in_buf[bufsize:] - return buf - - def recvall(self, bufsize): - result = [] - while bufsize > 0: - buf = self.recv(bufsize) - result.append(buf) - bufsize -= len(buf) - return ''.join(result) - -# ____________________________________________________________ - -def forwardpipe(s1, s2): - try: - while 1: - s2.wait_output() - buffer = s1.recv(32768) - log('[%r -> %r] %r', s1, s2, buffer) - s2.sendall(buffer) - del buffer - finally: - s2.shutdown_wr() - s1.shutdown_rd() - -def linkpipes(s1, s2): - greensock2.autogreenlet(forwardpipe, s1, s2) - greensock2.autogreenlet(forwardpipe, s2, s1) diff --git a/contrib/pygreen/pipe/fd.py b/contrib/pygreen/pipe/fd.py deleted file mode 100644 index 14f630443..000000000 --- a/contrib/pygreen/pipe/fd.py +++ /dev/null @@ -1,69 +0,0 @@ -import os -from pygreen import greensock2 - - -class FDInput(object): - - def __init__(self, read_fd, close=True): - self.read_fd = read_fd - self._close = close # a flag or a callback - - def shutdown_rd(self): - fd = self.read_fd - if fd is not None: - self.read_fd = None - close = self._close - if close: - self._close = False - if close == True: - os.close(fd) - else: - close() - - __del__ = shutdown_rd - - def wait_input(self): - greensock2.wait_input(self.read_fd) - - def recv(self, bufsize): -## f = open('LOG', 'a') -## import os; print >> f, '[%d] RECV' % (os.getpid(),) -## f.close() - res = greensock2.read(self.read_fd, bufsize) -## f = open('LOG', 'a') -## import os; print >> f, '[%d] RECV %r' % (os.getpid(), res) -## f.close() - return res - - def recvall(self, bufsize): - return greensock2.readall(self.read_fd, bufsize) - - -class FDOutput(object): - - def __init__(self, write_fd, close=True): - self.write_fd = write_fd - self._close = close # a flag or a callback - - def shutdown_wr(self): - fd = self.write_fd - if fd is not None: - self.write_fd = None - close = self._close - if close: - self._close = False - if close == True: - os.close(fd) - else: - close() - - __del__ = shutdown_wr - - def wait_output(self): - greensock2.wait_output(self.write_fd) - - def sendall(self, buffer): -## f = open('LOG', 'a') -## import os; print >> f, '[%d] %r' % (os.getpid(), buffer) -## f.close() - greensock2.writeall(self.write_fd, buffer) diff --git a/contrib/pygreen/pipe/gsocket.py b/contrib/pygreen/pipe/gsocket.py deleted file mode 100644 index 2d48eb198..000000000 --- a/contrib/pygreen/pipe/gsocket.py +++ /dev/null @@ -1,113 +0,0 @@ -from pygreen import greensock2 -import socket, errno, os - -error = socket.error - - -class _delegate(object): - def __init__(self, methname): - self.methname = methname - def __get__(self, obj, typ=None): - result = getattr(obj._s, self.methname) - setattr(obj, self.methname, result) - return result - - -class GreenSocket(object): - - def __init__(self, family = socket.AF_INET, - type = socket.SOCK_STREAM, - proto = 0): - self._s = socket.socket(family, type, proto) - self._s.setblocking(False) - - def fromsocket(cls, s): - if isinstance(s, GreenSocket): - s = s._s - result = GreenSocket.__new__(cls) - result._s = s - s.setblocking(False) - return result - fromsocket = classmethod(fromsocket) - - def accept(self): - while 1: - try: - s, addr = self._s.accept() - break - except error, e: - if e.args[0] not in (errno.EAGAIN, errno.EWOULDBLOCK): - raise - self.wait_input() - return self.fromsocket(s), addr - - bind = _delegate("bind") - close = _delegate("close") - - def connect(self, addr): - err = self.connect_ex(addr) - if err: - raise error(err, os.strerror(err)) - - def connect_ex(self, addr): - err = self._s.connect_ex(addr) - if err == errno.EINPROGRESS: - greensock2.wait_output(self._s) - err = self._s.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) - return err - - #XXX dup - fileno = _delegate("fileno") - getpeername = _delegate("getpeername") - getsockname = _delegate("getsockname") - getsockopt = _delegate("getsockopt") - listen = _delegate("listen") - - def makefile(self, mode='r', bufsize=-1): - # hack, but reusing the internal socket._fileobject should just work - return socket._fileobject(self, mode, bufsize) - - def recv(self, bufsize): - return greensock2.recv(self._s, bufsize) - - def recvall(self, bufsize): - return greensock2.recvall(self._s, bufsize) - - def recvfrom(self, bufsize): - self.wait_input() - buf, addr = self._s.recvfrom(bufsize) - if not buf: - raise ConnexionClosed("inbound connexion closed") - return buf, addr - - def send(self, data): - self.wait_output() - return self._s.send(data) - - def sendto(self, data, addr): - self.wait_output() - return self._s.sendto(data, addr) - - def sendall(self, data): - greensock2.sendall(self._s, data) - - setsockopt = _delegate("setsockopt") - shutdown = _delegate("shutdown") - - def shutdown_rd(self): - try: - self._s.shutdown(socket.SHUT_RD) - except error: - pass - - def shutdown_wr(self): - try: - self._s.shutdown(socket.SHUT_WR) - except error: - pass - - def wait_input(self): - greensock2.wait_input(self._s) - - def wait_output(self): - greensock2.wait_output(self._s) diff --git a/contrib/pygreen/pipe/mp.py b/contrib/pygreen/pipe/mp.py deleted file mode 100644 index d7e95e6a4..000000000 --- a/contrib/pygreen/pipe/mp.py +++ /dev/null @@ -1,29 +0,0 @@ -from pygreen.pipe.common import BufferedInput - - -class MeetingPointInput(BufferedInput): - - def __init__(self, accepter): - self.accepter = accepter - - def wait_input(self): - while not self.in_buf: - self.in_buf = self.accepter.accept() - - def shutdown_rd(self): - self.accepter.close() - - -class MeetingPointOutput(BufferedInput): - - def __init__(self, giver): - self.giver = giver - - def wait_output(self): - self.giver.wait() - - def sendall(self, buffer): - self.giver.give(buffer) - - def shutdown_wr(self): - self.giver.close() diff --git a/contrib/pygreen/pipelayer.py b/contrib/pygreen/pipelayer.py deleted file mode 100644 index 43726251f..000000000 --- a/contrib/pygreen/pipelayer.py +++ /dev/null @@ -1,306 +0,0 @@ -#import os -import struct -from collections import deque - - -class InvalidPacket(Exception): - pass - - -FLAG_NAK1 = 0xE0 -FLAG_NAK = 0xE1 -FLAG_REG = 0xE2 -FLAG_CFRM = 0xE3 - -FLAG_RANGE_START = 0xE0 -FLAG_RANGE_STOP = 0xE4 - -max_old_packets = 256 # must be <= 256 - - -class PipeLayer(object): - timeout = 1 - headersize = 4 - - def __init__(self): - #self.localid = os.urandom(4) - #self.remoteid = None - self.cur_time = 0 - self.out_queue = deque() - self.out_nextseqid = 0 - self.out_nextrepeattime = None - self.in_nextseqid = 0 - self.in_outoforder = {} - self.out_oldpackets = deque() - self.out_flags = FLAG_REG - self.out_resend = 0 - self.out_resend_skip = False - - def queue(self, data): - if data: - self.out_queue.appendleft(data) - - def queue_size(self): - total = 0 - for data in self.out_queue: - total += len(data) - return total - - def in_sync(self): - return not self.out_queue and self.out_nextrepeattime is None - - def settime(self, curtime): - self.cur_time = curtime - if self.out_queue: - if len(self.out_oldpackets) < max_old_packets: - return 0 # more data to send now - if self.out_nextrepeattime is not None: - return max(0, self.out_nextrepeattime - curtime) - else: - return None - - def encode(self, maxlength): - #print ' '*self._dump_indent, '--- OUTQ', self.out_resend, self.out_queue - if len(self.out_oldpackets) >= max_old_packets: - # congestion, stalling - payload = 0 - else: - payload = maxlength - 4 - if payload <= 0: - raise ValueError("encode(): buffer too small") - if (self.out_nextrepeattime is not None and - self.out_nextrepeattime <= self.cur_time): - # no ACK received so far, send a packet (possibly empty) - if not self.out_queue: - payload = 0 - else: - if not self.out_queue: # no more data to send - return None - if payload == 0: # congestion - return None - # prepare a packet - seqid = self.out_nextseqid - flags = self.out_flags - self.out_flags = FLAG_REG # clear out the flags for the next time - if payload > 0: - self.out_nextseqid = (seqid + 1) & 0xFFFF - data = self.out_queue.pop() - packetlength = len(data) - if self.out_resend > 0: - if packetlength > payload: - raise ValueError("XXX need constant buffer size for now") - self.out_resend -= 1 - if self.out_resend_skip: - if self.out_resend > 0: - self.out_queue.pop() - self.out_resend -= 1 - self.out_nextseqid = (seqid + 2) & 0xFFFF - self.out_resend_skip = False - packetpayload = data - else: - packet = [] - while packetlength <= payload: - packet.append(data) - if not self.out_queue: - break - data = self.out_queue.pop() - packetlength += len(data) - else: - rest = len(data) + payload - packetlength - packet.append(data[:rest]) - self.out_queue.append(data[rest:]) - packetpayload = ''.join(packet) - self.out_oldpackets.appendleft(packetpayload) - #print ' '*self._dump_indent, '--- OLDPK', self.out_oldpackets - else: - # a pure ACK packet, no payload - if self.out_oldpackets and flags == FLAG_REG: - flags = FLAG_CFRM - packetpayload = '' - packet = struct.pack("!BBH", flags, - self.in_nextseqid & 0xFF, - seqid) + packetpayload - if self.out_oldpackets: - self.out_nextrepeattime = self.cur_time + self.timeout - else: - self.out_nextrepeattime = None - #self.dump('OUT', packet) - return packet - - def decode(self, rawdata): - if len(rawdata) < 4: - raise InvalidPacket - #print ' '*self._dump_indent, '------ out %d (+%d) in %d' % (self.out_nextseqid, self.out_resend, self.in_nextseqid) - #self.dump('IN ', rawdata) - in_flags, ack_seqid, in_seqid = struct.unpack("!BBH", rawdata[:4]) - if not (FLAG_RANGE_START <= in_flags < FLAG_RANGE_STOP): - raise InvalidPacket - in_diff = (in_seqid - self.in_nextseqid ) & 0xFFFF - ack_diff = (self.out_nextseqid + self.out_resend - ack_seqid) & 0xFF - if in_diff >= max_old_packets: - return '' # invalid, but can occur as a late repetition - if ack_diff != len(self.out_oldpackets): - # forget all acknowledged packets - if ack_diff > len(self.out_oldpackets): - return '' # invalid, but can occur with packet reordering - while len(self.out_oldpackets) > ack_diff: - #print ' '*self._dump_indent, '--- POP', repr(self.out_oldpackets[-1]) - self.out_oldpackets.pop() - if self.out_oldpackets: - self.out_nextrepeattime = self.cur_time + self.timeout - else: - self.out_nextrepeattime = None # all packets ACKed - if in_flags == FLAG_NAK or in_flags == FLAG_NAK1: - # this is a NAK: resend the old packets as far as they've not - # also been ACK'ed in the meantime (can occur with reordering) - while self.out_resend < len(self.out_oldpackets): - self.out_queue.append(self.out_oldpackets[self.out_resend]) - self.out_resend += 1 - self.out_nextseqid = (self.out_nextseqid - 1) & 0xFFFF - #print ' '*self._dump_indent, '--- REP', self.out_nextseqid, repr(self.out_queue[-1]) - self.out_resend_skip = in_flags == FLAG_NAK1 - elif in_flags == FLAG_CFRM: - # this is a CFRM: request for confirmation - self.out_nextrepeattime = self.cur_time - # receive this packet's payload if it is the next in the sequence - if in_diff == 0: - if len(rawdata) > 4: - #print ' '*self._dump_indent, 'RECV ', self.in_nextseqid, repr(rawdata[4:]) - self.in_nextseqid = (self.in_nextseqid + 1) & 0xFFFF - result = [rawdata[4:]] - while self.in_nextseqid in self.in_outoforder: - result.append(self.in_outoforder.pop(self.in_nextseqid)) - self.in_nextseqid = (self.in_nextseqid + 1) & 0xFFFF - return ''.join(result) - else: - # we missed at least one intermediate packet: send a NAK - if len(rawdata) > 4: - self.in_outoforder[in_seqid] = rawdata[4:] - if ((self.in_nextseqid + 1) & 0xFFFF) in self.in_outoforder: - self.out_flags = FLAG_NAK1 - else: - self.out_flags = FLAG_NAK - self.out_nextrepeattime = self.cur_time - return '' - - _dump_indent = 0 - def dump(self, dir, rawdata): - in_flags, ack_seqid, in_seqid = struct.unpack("!BBH", rawdata[:4]) - print ' ' * self._dump_indent, dir, - if in_flags == FLAG_NAK: - print 'NAK', - elif in_flags == FLAG_NAK1: - print 'NAK1', - elif in_flags == FLAG_CFRM: - print 'CFRM', - #print ack_seqid, in_seqid, '(%d bytes)' % (len(rawdata)-4,) - print ack_seqid, in_seqid, repr(rawdata[4:]) - - -def pipe_over_udp(udpsock, send_fd=-1, recv_fd=-1, - timeout=1.0, inactivity_timeout=None): - """Example: send all data showing up in send_fd over the given UDP - socket, and write incoming data into recv_fd. The send_fd and - recv_fd are plain file descriptors. When an EOF is read from - send_fd, this function returns (after making sure that all data was - received by the remote side). - """ - import os - from select import select - from time import time - p = PipeLayer() - p.timeout = timeout - iwtdlist = [udpsock] - if send_fd >= 0: - iwtdlist.append(send_fd) - running = True - while running or not p.in_sync(): - delay = delay1 = p.settime(time()) - if delay is None: - delay = inactivity_timeout - iwtd, owtd, ewtd = select(iwtdlist, [], [], delay) - if iwtd: - if send_fd in iwtd: - data = os.read(send_fd, 1500 - p.headersize) - if not data: - # EOF - iwtdlist.remove(send_fd) - running = False - else: - #print 'queue', len(data) - p.queue(data) - if udpsock in iwtd: - packet = udpsock.recv(65535) - #print 'decode', len(packet) - p.settime(time()) - data = p.decode(packet) - i = 0 - while i < len(data): - i += os.write(recv_fd, data[i:]) - elif delay1 is None: - break # long inactivity - p.settime(time()) - packet = p.encode(1500) - if packet: - #print 'send', len(packet) - #if os.urandom(1) >= '\x08': # emulate packet losses - udpsock.send(packet) - - -class PipeOverUdp(object): - - def __init__(self, udpsock, timeout=1.0): - import thread, os - self.os = os - self.sendpipe = os.pipe() - self.recvpipe = os.pipe() - thread.start_new_thread(pipe_over_udp, (udpsock, - self.sendpipe[0], - self.recvpipe[1], - timeout)) - - def __del__(self): - os = self.os - if self.sendpipe: - os.close(self.sendpipe[0]) - os.close(self.sendpipe[1]) - self.sendpipe = None - if self.recvpipe: - os.close(self.recvpipe[0]) - os.close(self.recvpipe[1]) - self.recvpipe = None - - close = __del__ - - def send(self, data): - if not self.sendpipe: - raise IOError("I/O operation on a closed PipeOverUdp") - return self.os.write(self.sendpipe[1], data) - - def sendall(self, data): - i = 0 - while i < len(data): - i += self.send(data[i:]) - - def recv(self, bufsize): - if not self.recvpipe: - raise IOError("I/O operation on a closed PipeOverUdp") - return self.os.read(self.recvpipe[0], bufsize) - - def recvall(self, bufsize): - buf = [] - while bufsize > 0: - data = self.recv(bufsize) - buf.append(data) - bufsize -= len(data) - return ''.join(buf) - - def fileno(self): - if not self.recvpipe: - raise IOError("I/O operation on a closed PipeOverUdp") - return self.recvpipe[0] - - def ofileno(self): - if not self.sendpipe: - raise IOError("I/O operation on a closed PipeOverUdp") - return self.sendpipe[1] diff --git a/contrib/pygreen/server/__init__.py b/contrib/pygreen/server/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/contrib/pygreen/server/httpserver.py b/contrib/pygreen/server/httpserver.py deleted file mode 100644 index 2f4377c05..000000000 --- a/contrib/pygreen/server/httpserver.py +++ /dev/null @@ -1,42 +0,0 @@ -import BaseHTTPServer -from pygreen import greensock2 -from pygreen.pipe.gsocket import GreenSocket - - -class GreenMixIn: - """Mix-in class to handle each request in a new greenlet.""" - - def process_request_greenlet(self, request, client_address): - """Same as in BaseServer but as a greenlet. - In addition, exception handling is done here. - """ - try: - self.finish_request(request, client_address) - self.close_request(request) - except: - self.handle_error(request, client_address) - self.close_request(request) - - def process_request(self, request, client_address): - """Start a new greenlet to process the request.""" - greensock2.autogreenlet(self.process_request_greenlet, - request, client_address) - - -class GreenHTTPServer(GreenMixIn, BaseHTTPServer.HTTPServer): - protocol_version = "HTTP/1.1" - - def server_bind(self): - self.socket = GreenSocket.fromsocket(self.socket) - BaseHTTPServer.HTTPServer.server_bind(self) - - -def test_simple(handler_class=None): - if handler_class is None: - from SimpleHTTPServer import SimpleHTTPRequestHandler - handler_class = SimpleHTTPRequestHandler - server_address = ('', 8000) - httpd = GreenHTTPServer(server_address, handler_class) - sa = httpd.socket.getsockname() - print "Serving HTTP on", sa[0], "port", sa[1], "..." - httpd.serve_forever() diff --git a/contrib/pygreen/test/__init__.py b/contrib/pygreen/test/__init__.py deleted file mode 100644 index 792d60054..000000000 --- a/contrib/pygreen/test/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# diff --git a/contrib/pygreen/test/test_greenexecnet.py b/contrib/pygreen/test/test_greenexecnet.py deleted file mode 100644 index b378d193a..000000000 --- a/contrib/pygreen/test/test_greenexecnet.py +++ /dev/null @@ -1,48 +0,0 @@ -import py -from pygreen.greenexecnet import * -import pygreen - -def setup_module(mod): - os.environ["PYTHONPATH"] = "%s:%s" %( - py.path.local(pygreen.__file__).dirpath().dirpath(), os.environ['PYTHONPATH']) - #py.test.skip("need to fix PYTHONPATH/sys.path handling for sub processes so " - # "that they find the pygreen package.") - -def test_simple(): - gw = PopenGateway() - channel = gw.remote_exec("x = channel.receive(); channel.send(x * 6)") - channel.send(7) - res = channel.receive() - assert res == 42 - -def test_ssh(): - py.test.skip("Bootstrapping") - gw = SshGateway('codespeak.net') - channel = gw.remote_exec(""" - import socket - channel.send(socket.gethostname()) - """) - res = channel.receive() - assert res.endswith('codespeak.net') - -def test_remote_error(): - gw = PopenGateway() - channel = gw.remote_exec("x = channel.receive(); channel.send(x + 1)") - channel.send("hello") - py.test.raises(RemoteError, channel.receive) - -def test_invalid_object(): - class X(object): - pass - gw = PopenGateway() - channel = gw.remote_exec("x = channel.receive(); channel.send(x + 1)") - channel.send(X()) - py.test.raises(RemoteError, channel.receive) - -def test_channel_over_channel(): - gw = PopenGateway() - chan1 = gw.newchannel() - channel = gw.remote_exec("chan1 = channel.receive(); chan1.send(42)") - channel.send(chan1) - res = chan1.receive() - assert res == 42 diff --git a/contrib/pygreen/test/test_greensock2.py b/contrib/pygreen/test/test_greensock2.py deleted file mode 100644 index 1d48799aa..000000000 --- a/contrib/pygreen/test/test_greensock2.py +++ /dev/null @@ -1,221 +0,0 @@ -import py -from socket import * -from pygreen.greensock2 import * - -def test_meetingpoint(): - giv1, acc1 = meetingpoint() - giv2, acc2 = meetingpoint() - giv3, acc3 = meetingpoint() - - lst = [] - - def g1(): - lst.append(0) - x = acc2.accept() - assert x == 'hello' - lst.append(2) - giv1.give('world') - lst.append(5) - x = acc3.accept() - assert x == 'middle' - lst.append(6) - giv3.give('done') - - def g2(): - lst.append(1) - giv2.give('hello') - lst.append(3) - y = acc1.accept() - assert y == 'world' - lst.append(4) - - autogreenlet(g1) - autogreenlet(g2) - giv3.give('middle') - tag = acc3.accept() - assert tag == 'done' - assert lst == [0, 1, 2, 3, 4, 5, 6] - - -def test_producer(): - lst = [] - - def prod(n): - lst.append(1) - yield n - lst.append(2) - yield 87 - lst.append(3) - - def cons(): - lst.append(4) - accepter = producer(prod, 145) - lst.append(5) - lst.append(accepter.accept()) - lst.append(6) - lst.append(accepter.accept()) - lst.append(7) - try: - accepter.accept() - except Interrupted: - lst.append(8) - - oneof(cons) - assert lst == [4, 5, 1, 145, 6, 2, 87, 7, 3, 8] - - -def test_timer(): - lst = [] - - def g1(): - sleep(0.1) - lst.append(1) - sleep(0.2) - lst.append(3) - - def g2(): - lst.append(0) - sleep(0.2) - lst.append(2) - sleep(0.2) - lst.append(4) - - oneof(g1, g2) - assert lst == [0, 1, 2, 3] - -def test_kill_other(): - - def g1(): - sleep(.1) - return 1 - - def g2(): - sleep(.2) - return 2 - - res = oneof(g1, g2) - assert res == 1 - -def test_socket(): - s1 = socket(AF_INET, SOCK_DGRAM) - s2 = socket(AF_INET, SOCK_DGRAM) - s1.bind(('', INADDR_ANY)) - s2.bind(('', INADDR_ANY)) - s1.connect(s2.getsockname()) - s2.connect(s1.getsockname()) - - lst = [] - - def g1(): - lst.append(0) - x = recv(s1, 5) - assert x == 'hello' - lst.append(3) - sendall(s1, 'world') - lst.append(4) - return 1 - - def g2(): - lst.append(1) - sendall(s2, 'hello') - lst.append(2) - y = recv(s2, 5) - assert y == 'world' - lst.append(5) - return 2 - - one, two = allof(g1, g2) - assert lst == [0, 1, 2, 3, 4, 5] - assert one == 1 - assert two == 2 - -##def test_Queue(): - -## def g1(): -## lst.append(5) -## q.put(6) -## lst.append(7) -## q.put(8) -## lst.append(9) -## q.put(10) -## lst.append(11) -## q.put(12) # not used - -## def g2(): -## lst.append(1) -## lst.append(q.get()) -## lst.append(2) -## lst.append(q.get()) -## lst.append(3) -## lst.append(q.get()) -## lst.append(4) - -## q = Queue() -## lst = [] -## autogreenlet(g1) -## autogreenlet(g2) -## wait() -## assert lst == [5, 7, 9, 11, 1, 6, 2, 8, 3, 10, 4] - -## q = Queue() -## lst = [] -## autogreenlet(g2) -## autogreenlet(g1) -## wait() -## assert lst == [1, 5, 7, 9, 11, 6, 2, 8, 3, 10, 4] - - -##def test_Event(): - -## def g1(): -## assert not e.isSet() -## e.wait() -## assert not e.isSet() # no longer set -## lst.append(1) -## e.set() -## e.wait() -## lst.append(2) -## assert e.isSet() -## e.clear() -## assert not e.isSet() -## lst.append(0) -## e.set() -## lst.append(3) -## assert e.isSet() - -## def g2(): -## assert not e.isSet() -## lst.append(4) -## e.set() -## lst.append(7) -## e.clear() -## e.set() -## e.clear() -## assert not e.isSet() -## lst.append(5) -## e.wait() -## assert e.isSet() -## lst.append(6) - -## e = Event() -## lst = [] -## autogreenlet(g1) -## autogreenlet(g2) -## wait() -## assert lst == [4, 7, 5, 1, 2, 0, 3, 6] - - -##def test_Event_timeout(): -## def g1(): -## lst.append(5) -## e.wait(0.1) -## lst.append(e.isSet()) -## e.wait(60.0) -## lst.append(e.isSet()) -## lst = [] -## e = Event() -## autogreenlet(g1) -## sleep(0.5) -## e.set() -## wait() -## assert lst == [5, False, True] diff --git a/contrib/pygreen/test/test_pipelayer.py b/contrib/pygreen/test/test_pipelayer.py deleted file mode 100644 index 3917486a9..000000000 --- a/contrib/pygreen/test/test_pipelayer.py +++ /dev/null @@ -1,223 +0,0 @@ -import os, random -from pygreen.pipelayer import PipeLayer, pipe_over_udp, PipeOverUdp -import py - -def test_simple(): - data1 = os.urandom(1000) - data2 = os.urandom(1000) - p1 = PipeLayer() - p2 = PipeLayer() - p2._dump_indent = 40 - p1.queue(data1) - p1.queue(data2) - recv = '' - while len(recv) < 2000: - raw = p1.encode(64) - assert raw is not None - res = p2.decode(raw) - assert res is not None - recv += res - assert recv == data1 + data2 - raw = p1.encode(64) - assert raw is None - -def test_stabilize(): - data1 = os.urandom(28) - p1 = PipeLayer() - p2 = PipeLayer() - p2._dump_indent = 40 - p1.queue(data1) - recv = '' - t = 0.0 - print - while True: - delay1 = p1.settime(t) - delay2 = p2.settime(t) - t += 0.100000001 - if delay1 is delay2 is None: - break - if delay1 == 0: - raw = p1.encode(10) - p1.dump('OUT', raw) - assert raw is not None - res = p2.decode(raw) - assert res is not None - recv += res - if delay2 == 0: - raw = p2.encode(10) - p2.dump('OUT', raw) - assert raw is not None - res = p1.decode(raw) - assert res == '' - assert recv == data1 - -def test_bidir(): - data1 = os.urandom(1000) - data2 = os.urandom(1000) - p1 = PipeLayer() - p2 = PipeLayer() - p2._dump_indent = 40 - p1.queue(data1) - p2.queue(data2) - recv = ['', ''] - while len(recv[0]) < 1000 or len(recv[1]) < 1000: - progress = False - for (me, other, i) in [(p1, p2, 1), (p2, p1, 0)]: - raw = me.encode(64) - if raw is not None: - res = other.decode(raw) - assert res is not None - recv[i] += res - if res: - progress = True - assert progress - assert recv[0] == data2 - assert recv[1] == data1 - raw = p1.encode(64) - assert raw is None - raw = p2.encode(64) - assert raw is None - -def test_with_loss(): - data1 = os.urandom(10000).encode('hex') - data2 = os.urandom(10000).encode('hex') - #data1 = '0123456789' - #data2 = 'ABCDEFGHIJ' - p1 = PipeLayer() - p2 = PipeLayer() - p2._dump_indent = 40 - p1.queue(data1) - p2.queue(data2) - recv = ['', ''] - time = 0 - active = 1 - while active: - active = 0 - time += 0.2 - #print '.' - exchange = [] - for (me, other, i) in [(p1, p2, 1), (p2, p1, 0)]: - to = me.settime(time) - packet = me.encode(12) - assert (packet is not None) == (to == 0) - if to is not None: - active = 1 - if to == 0: - exchange.append((packet, other, i)) - for (packet, other, i) in exchange: - if random.random() < 0.5: - pass # drop packet - else: - res = other.decode(packet) - assert res is not None - recv[i] += res - assert data2.startswith(recv[0]) - assert data1.startswith(recv[1]) - assert recv[0] == data2 - assert recv[1] == data1 - print time - -def test_massive_reordering(): - data1 = os.urandom(10000).encode('hex') - data2 = os.urandom(10000).encode('hex') - #data1 = '0123456789' - #data2 = 'ABCDEFGHIJ' - p1 = PipeLayer() - p2 = PipeLayer() - p2._dump_indent = 40 - p1.queue(data1) - p2.queue(data2) - recv = ['', ''] - time = 0 - active = 1 - exchange = [] - while active or exchange: - active = 0 - time += 0.2 - #print '.' - for (me, other, i) in [(p1, p2, 1), (p2, p1, 0)]: - to = me.settime(time) - packet = me.encode(12) - assert (packet is not None) == (to == 0) - if to is not None: - active = 1 - if to == 0: - exchange.append((packet, other, i)) - if random.random() < 0.02: - random.shuffle(exchange) - for (packet, other, i) in exchange: - res = other.decode(packet) - assert res is not None - recv[i] += res - exchange = [] - assert data2.startswith(recv[0]) - assert data1.startswith(recv[1]) - assert recv[0] == data2 - assert recv[1] == data1 - print time - -def udpsockpair(): - from socket import socket, AF_INET, SOCK_DGRAM, INADDR_ANY - s1 = socket(AF_INET, SOCK_DGRAM) - s2 = socket(AF_INET, SOCK_DGRAM) - s1.bind(('127.0.0.1', INADDR_ANY)) - s2.bind(('127.0.0.1', INADDR_ANY)) - s2.connect(s1.getsockname()) - s1.connect(s2.getsockname()) - return s1, s2 - -def test_pipe_over_udp(): - import thread - s1, s2 = udpsockpair() - - tmp = py.test.ensuretemp("pipeoverudp") - p = py.path.local(__file__) - p.copy(tmp.join(p.basename)) - old = tmp.chdir() - try: - fd1 = os.open(__file__, os.O_RDONLY) - fd2 = os.open('test_pipelayer.py~copy', os.O_WRONLY|os.O_CREAT|os.O_TRUNC) - - thread.start_new_thread(pipe_over_udp, (s1, fd1)) - pipe_over_udp(s2, recv_fd=fd2, inactivity_timeout=2.5) - os.close(fd1) - os.close(fd2) - f = open(__file__, 'rb') - data1 = f.read() - f.close() - f = open('test_pipelayer.py~copy', 'rb') - data2 = f.read() - f.close() - assert data1 == data2 - os.unlink('test_pipelayer.py~copy') - finally: - old.chdir() - -def test_PipeOverUdp(): - s1, s2 = udpsockpair() - p1 = PipeOverUdp(s1, timeout=0.2) - p2 = PipeOverUdp(s2, timeout=0.2) - p2.sendall('goodbye') - for k in range(10): - p1.sendall('hello world') - input = p2.recvall(11) - assert input == 'hello world' - input = p1.recvall(7) - assert input == 'goodbye' - - bigchunk1 = os.urandom(500000) - bigchunk2 = os.urandom(500000) - i1 = i2 = 0 - j1 = j2 = 0 - while j1 < len(bigchunk1) or j2 < len(bigchunk2): - i1 += p1.send(bigchunk1[i1:i1+512]) - i2 += p2.send(bigchunk2[i2:i2+512]) - data = p1.recv(512) - assert data == bigchunk2[j2:j2+len(data)] - j2 += len(data) - data = p2.recv(512) - assert data == bigchunk1[j1:j1+len(data)] - j1 += len(data) - #print i1, i2, j1, j2 - p1.close() - p2.close()