From f5664d4405807a0e103098d144e1ecada142893a Mon Sep 17 00:00:00 2001 From: fijal Date: Tue, 6 Mar 2007 10:07:53 +0100 Subject: [PATCH] [svn r39973] Add a (stolen almost directly from arigo/hack) network layer based on top of greenlets. Needs some tweaking, so it's not exposed as a py.xxx, but rather py.__.net --HG-- branch : trunk --- py/net/__init__.py | 0 py/net/greenexecnet.py | 203 ++++++++++++ py/net/greensock2.py | 526 +++++++++++++++++++++++++++++++ py/net/msgstruct.py | 29 ++ py/net/pipe/__init__.py | 0 py/net/pipe/common.py | 38 +++ py/net/pipe/fd.py | 69 ++++ py/net/pipe/gsocket.py | 114 +++++++ py/net/pipe/mp.py | 29 ++ py/net/pipelayer.py | 306 ++++++++++++++++++ py/net/server/__init__.py | 0 py/net/server/httpserver.py | 42 +++ py/net/test/test_greenexecnet.py | 41 +++ py/net/test/test_greensock2.py | 213 +++++++++++++ py/net/test/test_pipelayer.py | 215 +++++++++++++ 15 files changed, 1825 insertions(+) create mode 100644 py/net/__init__.py create mode 100644 py/net/greenexecnet.py create mode 100644 py/net/greensock2.py create mode 100644 py/net/msgstruct.py create mode 100644 py/net/pipe/__init__.py create mode 100644 py/net/pipe/common.py create mode 100644 py/net/pipe/fd.py create mode 100644 py/net/pipe/gsocket.py create mode 100644 py/net/pipe/mp.py create mode 100644 py/net/pipelayer.py create mode 100644 py/net/server/__init__.py create mode 100644 py/net/server/httpserver.py create mode 100644 py/net/test/test_greenexecnet.py create mode 100644 py/net/test/test_greensock2.py create mode 100644 py/net/test/test_pipelayer.py diff --git a/py/net/__init__.py b/py/net/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/py/net/greenexecnet.py b/py/net/greenexecnet.py new file mode 100644 index 000000000..29686a0d0 --- /dev/null +++ b/py/net/greenexecnet.py @@ -0,0 +1,203 @@ +import sys, os, py, inspect +from py.__.net import greensock2 +from py.__.net.msgstruct import message, decodemessage + +MSG_REMOTE_EXEC = 'r' +MSG_OBJECT = 'o' +MSG_ERROR = 'e' +MSG_CHAN_CLOSE = 'c' +MSG_FATAL = 'f' +MSG_CHANNEL = 'n' + +class Gateway(object): + + def __init__(self, input, output, is_remote=False): + self.input = input + self.output = output + self.nextchannum = int(is_remote) + self.receivers = {} + self.greenlet = greensock2.autogreenlet(self.serve_forever, is_remote) + + def remote_exec(self, remote_source): + remote_source = py.code.Source(remote_source) + chan = self.newchannel() + msg = message(MSG_REMOTE_EXEC, chan.n, str(remote_source)) + self.output.sendall(msg) + return chan + + def newchannel(self): + n = self.nextchannum + self.nextchannum += 2 + return self.make_channel(n) + + def make_channel(self, n): + giver, accepter = greensock2.meetingpoint() + assert n not in self.receivers + self.receivers[n] = giver + return Channel(self, n, accepter) + + def serve_forever(self, is_remote=False): + try: + buffer = "" + while 1: + msg, buffer = decodemessage(buffer) + if msg is None: + buffer += self.input.recv(16384) + else: + handler = HANDLERS[msg[0]] + handler(self, *msg[1:]) + except greensock2.greenlet.GreenletExit: + raise + except: + if is_remote: + msg = message(MSG_FATAL, format_error(*sys.exc_info())) + self.output.sendall(msg) + else: + raise + + def msg_remote_exec(self, n, source): + def do_execute(channel): + try: + d = {'channel': channel} + exec source in d + except: + channel.report_error(*sys.exc_info()) + else: + channel.close() + greensock2.autogreenlet(do_execute, self.make_channel(n)) + + def msg_object(self, n, objrepr): + obj = eval(objrepr) + if n in self.receivers: + self.receivers[n].give_queued(obj) + + def msg_error(self, n, s): + if n in self.receivers: + self.receivers[n].give_queued(RemoteError(s)) + self.receivers[n].close() + del self.receivers[n] + + def msg_chan_close(self, n): + if n in self.receivers: + self.receivers[n].close() + del self.receivers[n] + + def msg_channel(self, n, m): + if n in self.receivers: + self.receivers[n].give_queued(self.make_channel(m)) + + def msg_fatal(self, s): + raise RemoteError(s) + +HANDLERS = { + MSG_REMOTE_EXEC: Gateway.msg_remote_exec, + MSG_OBJECT: Gateway.msg_object, + MSG_ERROR: Gateway.msg_error, + MSG_CHAN_CLOSE: Gateway.msg_chan_close, + MSG_CHANNEL: Gateway.msg_channel, + MSG_FATAL: Gateway.msg_fatal, + } + + +class Channel(object): + + def __init__(self, gw, n, accepter): + self.gw = gw + self.n = n + self.accepter = accepter + + def send(self, obj): + if isinstance(obj, Channel): + assert obj.gw is self.gw + msg = message(MSG_CHANNEL, self.n, obj.n) + else: + msg = message(MSG_OBJECT, self.n, repr(obj)) + self.gw.output.sendall(msg) + + def receive(self): + obj = self.accepter.accept() + if isinstance(obj, RemoteError): + raise obj + else: + return obj + + def close(self): + try: + self.gw.output.sendall(message(MSG_CHAN_CLOSE, self.n)) + except OSError: + pass + + def report_error(self, exc_type, exc_value, exc_traceback=None): + s = format_error(exc_type, exc_value, exc_traceback) + try: + self.gw.output.sendall(message(MSG_ERROR, self.n, s)) + except OSError: + pass + + +class RemoteError(Exception): + pass + +def format_error(exc_type, exc_value, exc_traceback=None): + import traceback, StringIO + s = StringIO.StringIO() + traceback.print_exception(exc_type, exc_value, exc_traceback, file=s) + return s.getvalue() + + +class PopenCmdGateway(Gateway): + action = "exec input()" + + def __init__(self, cmdline): + from py.__.net.pipe.fd import FDInput, FDOutput + child_in, child_out = os.popen2(cmdline, 't', 0) + fdin = FDInput(child_out.fileno(), child_out.close) + fdout = FDOutput(child_in.fileno(), child_in.close) + fdout.sendall(self.get_bootstrap_code()) + super(PopenCmdGateway, self).__init__(input = fdin, output = fdout) + + def get_bootstrap_code(): + # XXX assumes that the py lib is installed on the remote side + src = [] + src.append('from py.__.net import greenexecnet') + src.append('greenexecnet.PopenCmdGateway.run_server()') + src.append('') + return '%r\n' % ('\n'.join(src),) + get_bootstrap_code = staticmethod(get_bootstrap_code) + + def run_server(): + from py.__.net.pipe.fd import FDInput, FDOutput + gw = Gateway(input = FDInput(os.dup(0)), + output = FDOutput(os.dup(1)), + is_remote = True) + # for now, ignore normal I/O + fd = os.open('/dev/null', os.O_RDWR) + os.dup2(fd, 0) + os.dup2(fd, 1) + os.close(fd) + greensock2.wait(gw.greenlet) + run_server = staticmethod(run_server) + +class PopenGateway(PopenCmdGateway): + def __init__(self, python=sys.executable): + cmdline = '"%s" -u -c "%s"' % (python, self.action) + super(PopenGateway, self).__init__(cmdline) + +class SshGateway(PopenCmdGateway): + def __init__(self, sshaddress, remotepython='python', identity=None): + self.sshaddress = sshaddress + remotecmd = '%s -u -c "%s"' % (remotepython, self.action) + cmdline = [sshaddress, remotecmd] + # XXX Unix style quoting + for i in range(len(cmdline)): + cmdline[i] = "'" + cmdline[i].replace("'", "'\\''") + "'" + cmd = 'ssh -C' + if identity is not None: + cmd += ' -i %s' % (identity,) + cmdline.insert(0, cmd) + super(SshGateway, self).__init__(' '.join(cmdline)) + + +##f = open('LOG', 'a') +##import os; print >> f, '[%d] READY' % (os.getpid(),) +##f.close() diff --git a/py/net/greensock2.py b/py/net/greensock2.py new file mode 100644 index 000000000..a4e28e233 --- /dev/null +++ b/py/net/greensock2.py @@ -0,0 +1,526 @@ +import os, sys +try: + from stackless import greenlet +except ImportError: + import py + greenlet = py.magic.greenlet +from collections import deque +from select import select as _select +from time import time as _time +from heapq import heappush, heappop, heapify + +TRACE = True + +def meetingpoint(): + senders = deque() # list of senders, or [None] if Giver closed + receivers = deque() # list of receivers, or [None] if Receiver closed + return (MeetingPointGiver(senders, receivers), + MeetingPointAccepter(senders, receivers)) + +def producer(func, *args, **kwds): + iterable = func(*args, **kwds) + giver, accepter = meetingpoint() + def autoproducer(): + try: + giver.wait() + for obj in iterable: + giver.give(obj) + giver.wait() + finally: + giver.close() + autogreenlet(autoproducer) + return accepter + + +class MeetingPointBase(object): + + def __init__(self, senders, receivers): + self.senders = senders + self.receivers = receivers + self.g_active = g_active + + def close(self): + while self.senders: + if self.senders[0] is None: + break + packet = self.senders.popleft() + if packet.g_from is not None: + self.g_active.append(packet.g_from) + else: + self.senders.append(None) + while self.receivers: + if self.receivers[0] is None: + break + other = self.receivers.popleft() + self.g_active.append(other) + else: + self.receivers.append(None) + + __del__ = close + + def closed(self): + return self.receivers and self.receivers[0] is None + + +class MeetingPointGiver(MeetingPointBase): + + def give(self, obj): + if self.receivers: + if self.receivers[0] is None: + raise MeetingPointClosed + other = self.receivers.popleft() + g_active.append(g_getcurrent()) + packet = _Packet() + packet.payload = obj + other.switch(packet) + if not packet.accepted: + raise Interrupted("packet not accepted") + else: + packet = _Packet() + packet.g_from = g_getcurrent() + packet.payload = obj + try: + self.senders.append(packet) + g_dispatcher.switch() + if not packet.accepted: + raise Interrupted("packet not accepted") + except: + remove_by_id(self.senders, packet) + raise + + def give_queued(self, obj): + if self.receivers: + self.give(obj) + else: + packet = _Packet() + packet.g_from = None + packet.payload = obj + self.senders.append(packet) + + def ready(self): + return self.receivers and self.receivers[0] is not None + + def wait(self): + if self.receivers: + if self.receivers[0] is None: + raise MeetingPointClosed + else: + packet = _Packet() + packet.g_from = g_getcurrent() + packet.empty = True + self.senders.append(packet) + try: + g_dispatcher.switch() + if not packet.accepted: + raise Interrupted("no accepter found") + except: + remove_by_id(self.senders, packet) + raise + + def trigger(self): + if self.ready(): + self.give(None) + + +class MeetingPointAccepter(MeetingPointBase): + + def accept(self): + while self.senders: + if self.senders[0] is None: + raise MeetingPointClosed + packet = self.senders.popleft() + packet.accepted = True + if packet.g_from is not None: + g_active.append(packet.g_from) + if not packet.empty: + return packet.payload + g = g_getcurrent() + self.receivers.append(g) + try: + packet = g_dispatcher.switch() + except: + remove_by_id(self.receivers, g) + raise + if type(packet) is not _Packet: + remove_by_id(self.receivers, g) + raise Interrupted("no packet") + packet.accepted = True + return packet.payload + + def ready(self): + for packet in self.senders: + if packet is None: + return False + if not packet.empty: + return True + return False + + def wait_trigger(self, timeout=None, default=None): + if timeout is None: + return self.accept() + else: + timer = Timer(timeout) + try: + try: + return self.accept() + finally: + timer.stop() + except Interrupted: + if timer.finished: + return default + raise + + +class MeetingPointClosed(greenlet.GreenletExit): + pass + +class Interrupted(greenlet.GreenletExit): + pass + +class ConnexionClosed(greenlet.GreenletExit): + pass + +class _Packet(object): + empty = False + accepted = False + +def remove_by_id(d, obj): + lst = [x for x in d if x is not obj] + d.clear() + d.extend(lst) + +# ____________________________________________________________ + +##class Queue(object): + +## def __init__(self): +## self.giver, self.accepter = meetingpoint() +## self.pending = deque() + +## def put(self, item): # preserve the caller's atomicity +## self.pending.append(item) +## if self.accepter.ready(): +## self.accepter.accept() + +## def get(self, block=True): +## if self.pending: +## return self.pending.popleft() +## elif block: +## self.giver.give(None) +## return self.pending.popleft() +## else: +## raise Empty + +##class Empty(Interrupted): +## pass + +##class Event(object): + +## def __init__(self): +## self.giver, self.accepter = meetingpoint() + +## clear = __init__ + +## def isSet(self): +## return self.accepter is None + +## def set(self): # preserve the caller's atomicity +## if self.accepter is not None: +## accepter = self.accepter +## self.giver = self.accepter = None +## while accepter.ready(): # wake up all waiters +## accepter.accept() + +## def wait(self, timeout=None): +## if self.accepter is not None: +## if timeout is None: +## self.giver.give(None) +## else: +## timer = Timer(timeout) +## try: +## try: +## self.giver.give(None) +## except Interrupted: +## pass +## finally: +## timer.stop() + +##class Semaphore(object): + +## def __init__(self, value=1): +## self.giver, self.accepter = meetingpoint() +## for i in range(value): +## self.release() + +## def acquire(self, blocking=True): +## if blocking or self.accepter.ready(): +## return self.accepter.accept() +## else: +## return False + +## def release(self): +## autogreenlet(self.giver.put, True) + +# ____________________________________________________________ + +def wait_input(sock): + _register(g_iwtd, sock) + +def recv(sock, bufsize): + wait_input(sock) + buf = sock.recv(bufsize) + if not buf: + raise ConnexionClosed("inbound connexion closed") + return buf + +def recvall(sock, bufsize): + in_front = False + data = [] + while bufsize > 0: + _register(g_iwtd, sock, in_front=in_front) + buf = sock.recv(bufsize) + if not buf: + raise ConnexionClosed("inbound connexion closed") + data.append(buf) + bufsize -= len(buf) + in_front = True + return ''.join(data) + +def read(fd, bufsize): + assert fd >= 0 + wait_input(fd) + buf = os.read(fd, bufsize) + if not buf: + raise ConnexionClosed("inbound connexion closed") + return buf + +def readall(fd, bufsize): + assert fd >= 0 + in_front = False + data = [] + while bufsize > 0: + _register(g_iwtd, fd, in_front=in_front) + buf = os.read(fd, bufsize) + if not buf: + raise ConnexionClosed("inbound connexion closed") + data.append(buf) + bufsize -= len(buf) + in_front = True + return ''.join(data) + + +def wait_output(sock): + _register(g_owtd, sock) + +def sendall(sock, buffer): + in_front = False + while buffer: + _register(g_owtd, sock, in_front=in_front) + count = sock.send(buffer) + buffer = buffer[count:] + in_front = True + +def writeall(fd, buffer): + assert fd >= 0 + in_front = False + while buffer: + _register(g_owtd, fd, in_front=in_front) + count = os.write(fd, buffer) + if not count: + raise ConnexionClosed("outbound connexion closed") + buffer = buffer[count:] + in_front = True + + +def sleep(duration, *greenlets): + timer = Timer(duration) + try: + wait(*greenlets) + finally: + ok = timer.finished + timer.stop() + if not ok: + raise Interrupted + +def _wait(): + g_dispatcher.switch() + +def wait(*greenlets): + assert greenlets#, "should not wait without events to wait on" + current = g_getcurrent() + for g in greenlets: + if g in g_waiters: + g_waiters[g].append(current) + else: + g_waiters[g] = [current] + g_dispatcher.switch() + +class Timer(object): + started = False + finished = False + + def __init__(self, timeout): + self.g = g_getcurrent() + entry = (_time() + timeout, self) + if g_timers_mixed: + g_timers.append(entry) + else: + heappush(g_timers, entry) + + def stop(self): + global g_timers_mixed + if not self.finished: + for i, (activationtime, timer) in enumerate(g_timers): + if timer is self: + g_timers[i] = g_timers[-1] + g_timers.pop() + g_timers_mixed = True + break + self.finished = True + +# ____________________________________________________________ + +class autogreenlet(greenlet): + def __init__(self, function, *args, **kwds): + self.parent = g_dispatcher + self.function = function + self.args = args + self.kwds = kwds + g_active.append(self) + + def run(self): + self.trace("start") + try: + self.function(*self.args, **self.kwds) + except Exception, e: + self.trace("stop (%s%s)", e.__class__.__name__, + str(e) and (': '+str(e))) + raise + else: + self.trace("done") + + def __repr__(self): +## args = ', '.join([repr(s) for s in self.args] + +## ['%s=%r' % keyvalue for keyvalue in self.kwds.items()]) +## return '' % (self.function.__name__, args) + return '' % (self.function.__name__, + hex(id(self))) + + def trace(self, msg, *args): + if TRACE: + print self, msg % args + + def interrupt(self): + self.throw(Interrupted) + + +g_active = deque() +g_iwtd = {} +g_owtd = {} +g_timers = [] +g_timers_mixed = False + +g_getcurrent = greenlet.getcurrent + +def _register(g_wtd, sock, in_front=False): + d = g_wtd.setdefault(sock, deque()) + g = g_getcurrent() + if in_front: + d.appendleft(g) + else: + d.append(g) + try: + if g_dispatcher.switch() is not g_wtd: + raise Interrupted + except: + remove_by_id(d, g) + raise + +##def _unregister_timer(): +## ... + + +def check_dead_greenlets(mapping): + to_remove = [i for i, v in mapping.items() if not v] + for k in to_remove: + del mapping[k] + +def check_waiters(active): + if active in g_waiters: + for g in g_waiters[active]: + g.switch() + del g_waiters[active] + + +def dispatcher_mainloop(): + global g_timers_mixed + while 1: + try: + while g_active: + print 'active:', g_active[0] + active = g_active.popleft() + active.switch() + if active.dead: + check_waiters(active) + del active + if g_timers: + if g_timers_mixed: + heapify(g_timers) + g_timers_mixed = False + activationtime, timer = g_timers[0] + delay = activationtime - _time() + if delay <= 0.0: + if timer.started: + heappop(g_timers) + #print 'timeout:', g + timer.finished = True + timer.g.switch() + if timer.g.dead: + check_waiters(timer.g) + continue + delay = 0.0 + timer.started = True + else: + check_dead_greenlets(g_iwtd) + check_dead_greenlets(g_owtd) + if not (g_iwtd or g_owtd): + # nothing to do, switch to the main greenlet + g_dispatcher.parent.switch() + continue + delay = None + + print 'selecting...', g_iwtd.keys(), g_owtd.keys(), delay + iwtd, owtd, _ = _select(g_iwtd.keys(), g_owtd.keys(), [], delay) + print 'done' + for s in owtd: + if s in g_owtd: + d = g_owtd[s] + #print 'owtd:', d[0] + g = d.popleft() + if not d: + try: + del g_owtd[s] + except KeyError: + pass + g.switch(g_owtd) + if g.dead: + check_waiters(g) + for s in iwtd: + if s in g_iwtd: + d = g_iwtd[s] + #print 'iwtd:', d[0] + g = d.popleft() + if not d: + try: + del g_iwtd[s] + except KeyError: + pass + g.switch(g_iwtd) + if g.dead: + check_waiters(g) + except: + import sys + g_dispatcher.parent.throw(*sys.exc_info()) + +g_dispatcher = greenlet(dispatcher_mainloop) +g_waiters = {} diff --git a/py/net/msgstruct.py b/py/net/msgstruct.py new file mode 100644 index 000000000..be6e73b22 --- /dev/null +++ b/py/net/msgstruct.py @@ -0,0 +1,29 @@ +from struct import pack, unpack, calcsize + + +def message(tp, *values): + strtype = type('') + typecodes = [''] + for v in values: + if type(v) is strtype: + typecodes.append('%ds' % len(v)) + elif 0 <= v < 256: + typecodes.append('B') + else: + typecodes.append('l') + typecodes = ''.join(typecodes) + assert len(typecodes) < 256 + return pack(("!B%dsc" % len(typecodes)) + typecodes, + len(typecodes), typecodes, tp, *values) + +def decodemessage(data): + if data: + limit = ord(data[0]) + 1 + if len(data) >= limit: + typecodes = "!c" + data[1:limit] + end = limit + calcsize(typecodes) + if len(data) >= end: + return unpack(typecodes, data[limit:end]), data[end:] + #elif end > 1000000: + # raise OverflowError + return None, data diff --git a/py/net/pipe/__init__.py b/py/net/pipe/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/py/net/pipe/common.py b/py/net/pipe/common.py new file mode 100644 index 000000000..7a17fb890 --- /dev/null +++ b/py/net/pipe/common.py @@ -0,0 +1,38 @@ +import greensock2 +from pypeers.tool import log + + +class BufferedInput(object): + in_buf = '' + + def recv(self, bufsize): + self.wait_input() + buf = self.in_buf[:bufsize] + self.in_buf = self.in_buf[bufsize:] + return buf + + def recvall(self, bufsize): + result = [] + while bufsize > 0: + buf = self.recv(bufsize) + result.append(buf) + bufsize -= len(buf) + return ''.join(result) + +# ____________________________________________________________ + +def forwardpipe(s1, s2): + try: + while 1: + s2.wait_output() + buffer = s1.recv(32768) + log('[%r -> %r] %r', s1, s2, buffer) + s2.sendall(buffer) + del buffer + finally: + s2.shutdown_wr() + s1.shutdown_rd() + +def linkpipes(s1, s2): + greensock2.autogreenlet(forwardpipe, s1, s2) + greensock2.autogreenlet(forwardpipe, s2, s1) diff --git a/py/net/pipe/fd.py b/py/net/pipe/fd.py new file mode 100644 index 000000000..28b3ec3c3 --- /dev/null +++ b/py/net/pipe/fd.py @@ -0,0 +1,69 @@ +import os +from py.__.net import greensock2 + + +class FDInput(object): + + def __init__(self, read_fd, close=True): + self.read_fd = read_fd + self._close = close # a flag or a callback + + def shutdown_rd(self): + fd = self.read_fd + if fd is not None: + self.read_fd = None + close = self._close + if close: + self._close = False + if close == True: + os.close(fd) + else: + close() + + __del__ = shutdown_rd + + def wait_input(self): + greensock2.wait_input(self.read_fd) + + def recv(self, bufsize): +## f = open('LOG', 'a') +## import os; print >> f, '[%d] RECV' % (os.getpid(),) +## f.close() + res = greensock2.read(self.read_fd, bufsize) +## f = open('LOG', 'a') +## import os; print >> f, '[%d] RECV %r' % (os.getpid(), res) +## f.close() + return res + + def recvall(self, bufsize): + return greensock2.readall(self.read_fd, bufsize) + + +class FDOutput(object): + + def __init__(self, write_fd, close=True): + self.write_fd = write_fd + self._close = close # a flag or a callback + + def shutdown_wr(self): + fd = self.write_fd + if fd is not None: + self.write_fd = None + close = self._close + if close: + self._close = False + if close == True: + os.close(fd) + else: + close() + + __del__ = shutdown_wr + + def wait_output(self): + greensock2.wait_output(self.write_fd) + + def sendall(self, buffer): +## f = open('LOG', 'a') +## import os; print >> f, '[%d] %r' % (os.getpid(), buffer) +## f.close() + greensock2.writeall(self.write_fd, buffer) diff --git a/py/net/pipe/gsocket.py b/py/net/pipe/gsocket.py new file mode 100644 index 000000000..4a2d9e05e --- /dev/null +++ b/py/net/pipe/gsocket.py @@ -0,0 +1,114 @@ +import greensock2 +import socket, errno, os + +error = socket.error + + +class _delegate(object): + def __init__(self, methname): + self.methname = methname + def __get__(self, obj, typ=None): + result = getattr(obj._s, self.methname) + setattr(obj, self.methname, result) + return result + + +class GreenSocket(object): + + def __init__(self, family = socket.AF_INET, + type = socket.SOCK_STREAM, + proto = 0): + self._s = socket.socket(family, type, proto) + self._s.setblocking(False) + + def fromsocket(cls, s): + if isinstance(s, GreenSocket): + s = s._s + result = GreenSocket.__new__(cls) + result._s = s + s.setblocking(False) + return result + fromsocket = classmethod(fromsocket) + + def accept(self): + while 1: + try: + s, addr = self._s.accept() + break + except error, e: + import pdb;pdb.set_trace() + if e.args[0] not in (errno.EAGAIN, errno.EWOULDBLOCK): + raise + self.wait_input() + return self.fromsocket(s), addr + + bind = _delegate("bind") + close = _delegate("close") + + def connect(self, addr): + err = self.connect_ex(addr) + if err: + raise error(err, os.strerror(err)) + + def connect_ex(self, addr): + err = self._s.connect_ex(addr) + if err == errno.EINPROGRESS: + greensock2.wait_output(self._s) + err = self._s.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) + return err + + #XXX dup + fileno = _delegate("fileno") + getpeername = _delegate("getpeername") + getsockname = _delegate("getsockname") + getsockopt = _delegate("getsockopt") + listen = _delegate("listen") + + def makefile(self, mode='r', bufsize=-1): + # hack, but reusing the internal socket._fileobject should just work + return socket._fileobject(self, mode, bufsize) + + def recv(self, bufsize): + return greensock2.recv(self._s, bufsize) + + def recvall(self, bufsize): + return greensock2.recvall(self._s, bufsize) + + def recvfrom(self, bufsize): + self.wait_input() + buf, addr = self._s.recvfrom(bufsize) + if not buf: + raise ConnexionClosed("inbound connexion closed") + return buf, addr + + def send(self, data): + self.wait_output() + return self._s.send(data) + + def sendto(self, data, addr): + self.wait_output() + return self._s.sendto(data, addr) + + def sendall(self, data): + greensock2.sendall(self._s, data) + + setsockopt = _delegate("setsockopt") + shutdown = _delegate("shutdown") + + def shutdown_rd(self): + try: + self._s.shutdown(socket.SHUT_RD) + except error: + pass + + def shutdown_wr(self): + try: + self._s.shutdown(socket.SHUT_WR) + except error: + pass + + def wait_input(self): + greensock2.wait_input(self._s) + + def wait_output(self): + greensock2.wait_output(self._s) diff --git a/py/net/pipe/mp.py b/py/net/pipe/mp.py new file mode 100644 index 000000000..cfc9c13c7 --- /dev/null +++ b/py/net/pipe/mp.py @@ -0,0 +1,29 @@ +from pypeers.stream.common import BufferedInput + + +class MeetingPointInput(BufferedInput): + + def __init__(self, accepter): + self.accepter = accepter + + def wait_input(self): + while not self.in_buf: + self.in_buf = self.accepter.accept() + + def shutdown_rd(self): + self.accepter.close() + + +class MeetingPointOutput(BufferedInput): + + def __init__(self, giver): + self.giver = giver + + def wait_output(self): + self.giver.wait() + + def sendall(self, buffer): + self.giver.give(buffer) + + def shutdown_wr(self): + self.giver.close() diff --git a/py/net/pipelayer.py b/py/net/pipelayer.py new file mode 100644 index 000000000..43726251f --- /dev/null +++ b/py/net/pipelayer.py @@ -0,0 +1,306 @@ +#import os +import struct +from collections import deque + + +class InvalidPacket(Exception): + pass + + +FLAG_NAK1 = 0xE0 +FLAG_NAK = 0xE1 +FLAG_REG = 0xE2 +FLAG_CFRM = 0xE3 + +FLAG_RANGE_START = 0xE0 +FLAG_RANGE_STOP = 0xE4 + +max_old_packets = 256 # must be <= 256 + + +class PipeLayer(object): + timeout = 1 + headersize = 4 + + def __init__(self): + #self.localid = os.urandom(4) + #self.remoteid = None + self.cur_time = 0 + self.out_queue = deque() + self.out_nextseqid = 0 + self.out_nextrepeattime = None + self.in_nextseqid = 0 + self.in_outoforder = {} + self.out_oldpackets = deque() + self.out_flags = FLAG_REG + self.out_resend = 0 + self.out_resend_skip = False + + def queue(self, data): + if data: + self.out_queue.appendleft(data) + + def queue_size(self): + total = 0 + for data in self.out_queue: + total += len(data) + return total + + def in_sync(self): + return not self.out_queue and self.out_nextrepeattime is None + + def settime(self, curtime): + self.cur_time = curtime + if self.out_queue: + if len(self.out_oldpackets) < max_old_packets: + return 0 # more data to send now + if self.out_nextrepeattime is not None: + return max(0, self.out_nextrepeattime - curtime) + else: + return None + + def encode(self, maxlength): + #print ' '*self._dump_indent, '--- OUTQ', self.out_resend, self.out_queue + if len(self.out_oldpackets) >= max_old_packets: + # congestion, stalling + payload = 0 + else: + payload = maxlength - 4 + if payload <= 0: + raise ValueError("encode(): buffer too small") + if (self.out_nextrepeattime is not None and + self.out_nextrepeattime <= self.cur_time): + # no ACK received so far, send a packet (possibly empty) + if not self.out_queue: + payload = 0 + else: + if not self.out_queue: # no more data to send + return None + if payload == 0: # congestion + return None + # prepare a packet + seqid = self.out_nextseqid + flags = self.out_flags + self.out_flags = FLAG_REG # clear out the flags for the next time + if payload > 0: + self.out_nextseqid = (seqid + 1) & 0xFFFF + data = self.out_queue.pop() + packetlength = len(data) + if self.out_resend > 0: + if packetlength > payload: + raise ValueError("XXX need constant buffer size for now") + self.out_resend -= 1 + if self.out_resend_skip: + if self.out_resend > 0: + self.out_queue.pop() + self.out_resend -= 1 + self.out_nextseqid = (seqid + 2) & 0xFFFF + self.out_resend_skip = False + packetpayload = data + else: + packet = [] + while packetlength <= payload: + packet.append(data) + if not self.out_queue: + break + data = self.out_queue.pop() + packetlength += len(data) + else: + rest = len(data) + payload - packetlength + packet.append(data[:rest]) + self.out_queue.append(data[rest:]) + packetpayload = ''.join(packet) + self.out_oldpackets.appendleft(packetpayload) + #print ' '*self._dump_indent, '--- OLDPK', self.out_oldpackets + else: + # a pure ACK packet, no payload + if self.out_oldpackets and flags == FLAG_REG: + flags = FLAG_CFRM + packetpayload = '' + packet = struct.pack("!BBH", flags, + self.in_nextseqid & 0xFF, + seqid) + packetpayload + if self.out_oldpackets: + self.out_nextrepeattime = self.cur_time + self.timeout + else: + self.out_nextrepeattime = None + #self.dump('OUT', packet) + return packet + + def decode(self, rawdata): + if len(rawdata) < 4: + raise InvalidPacket + #print ' '*self._dump_indent, '------ out %d (+%d) in %d' % (self.out_nextseqid, self.out_resend, self.in_nextseqid) + #self.dump('IN ', rawdata) + in_flags, ack_seqid, in_seqid = struct.unpack("!BBH", rawdata[:4]) + if not (FLAG_RANGE_START <= in_flags < FLAG_RANGE_STOP): + raise InvalidPacket + in_diff = (in_seqid - self.in_nextseqid ) & 0xFFFF + ack_diff = (self.out_nextseqid + self.out_resend - ack_seqid) & 0xFF + if in_diff >= max_old_packets: + return '' # invalid, but can occur as a late repetition + if ack_diff != len(self.out_oldpackets): + # forget all acknowledged packets + if ack_diff > len(self.out_oldpackets): + return '' # invalid, but can occur with packet reordering + while len(self.out_oldpackets) > ack_diff: + #print ' '*self._dump_indent, '--- POP', repr(self.out_oldpackets[-1]) + self.out_oldpackets.pop() + if self.out_oldpackets: + self.out_nextrepeattime = self.cur_time + self.timeout + else: + self.out_nextrepeattime = None # all packets ACKed + if in_flags == FLAG_NAK or in_flags == FLAG_NAK1: + # this is a NAK: resend the old packets as far as they've not + # also been ACK'ed in the meantime (can occur with reordering) + while self.out_resend < len(self.out_oldpackets): + self.out_queue.append(self.out_oldpackets[self.out_resend]) + self.out_resend += 1 + self.out_nextseqid = (self.out_nextseqid - 1) & 0xFFFF + #print ' '*self._dump_indent, '--- REP', self.out_nextseqid, repr(self.out_queue[-1]) + self.out_resend_skip = in_flags == FLAG_NAK1 + elif in_flags == FLAG_CFRM: + # this is a CFRM: request for confirmation + self.out_nextrepeattime = self.cur_time + # receive this packet's payload if it is the next in the sequence + if in_diff == 0: + if len(rawdata) > 4: + #print ' '*self._dump_indent, 'RECV ', self.in_nextseqid, repr(rawdata[4:]) + self.in_nextseqid = (self.in_nextseqid + 1) & 0xFFFF + result = [rawdata[4:]] + while self.in_nextseqid in self.in_outoforder: + result.append(self.in_outoforder.pop(self.in_nextseqid)) + self.in_nextseqid = (self.in_nextseqid + 1) & 0xFFFF + return ''.join(result) + else: + # we missed at least one intermediate packet: send a NAK + if len(rawdata) > 4: + self.in_outoforder[in_seqid] = rawdata[4:] + if ((self.in_nextseqid + 1) & 0xFFFF) in self.in_outoforder: + self.out_flags = FLAG_NAK1 + else: + self.out_flags = FLAG_NAK + self.out_nextrepeattime = self.cur_time + return '' + + _dump_indent = 0 + def dump(self, dir, rawdata): + in_flags, ack_seqid, in_seqid = struct.unpack("!BBH", rawdata[:4]) + print ' ' * self._dump_indent, dir, + if in_flags == FLAG_NAK: + print 'NAK', + elif in_flags == FLAG_NAK1: + print 'NAK1', + elif in_flags == FLAG_CFRM: + print 'CFRM', + #print ack_seqid, in_seqid, '(%d bytes)' % (len(rawdata)-4,) + print ack_seqid, in_seqid, repr(rawdata[4:]) + + +def pipe_over_udp(udpsock, send_fd=-1, recv_fd=-1, + timeout=1.0, inactivity_timeout=None): + """Example: send all data showing up in send_fd over the given UDP + socket, and write incoming data into recv_fd. The send_fd and + recv_fd are plain file descriptors. When an EOF is read from + send_fd, this function returns (after making sure that all data was + received by the remote side). + """ + import os + from select import select + from time import time + p = PipeLayer() + p.timeout = timeout + iwtdlist = [udpsock] + if send_fd >= 0: + iwtdlist.append(send_fd) + running = True + while running or not p.in_sync(): + delay = delay1 = p.settime(time()) + if delay is None: + delay = inactivity_timeout + iwtd, owtd, ewtd = select(iwtdlist, [], [], delay) + if iwtd: + if send_fd in iwtd: + data = os.read(send_fd, 1500 - p.headersize) + if not data: + # EOF + iwtdlist.remove(send_fd) + running = False + else: + #print 'queue', len(data) + p.queue(data) + if udpsock in iwtd: + packet = udpsock.recv(65535) + #print 'decode', len(packet) + p.settime(time()) + data = p.decode(packet) + i = 0 + while i < len(data): + i += os.write(recv_fd, data[i:]) + elif delay1 is None: + break # long inactivity + p.settime(time()) + packet = p.encode(1500) + if packet: + #print 'send', len(packet) + #if os.urandom(1) >= '\x08': # emulate packet losses + udpsock.send(packet) + + +class PipeOverUdp(object): + + def __init__(self, udpsock, timeout=1.0): + import thread, os + self.os = os + self.sendpipe = os.pipe() + self.recvpipe = os.pipe() + thread.start_new_thread(pipe_over_udp, (udpsock, + self.sendpipe[0], + self.recvpipe[1], + timeout)) + + def __del__(self): + os = self.os + if self.sendpipe: + os.close(self.sendpipe[0]) + os.close(self.sendpipe[1]) + self.sendpipe = None + if self.recvpipe: + os.close(self.recvpipe[0]) + os.close(self.recvpipe[1]) + self.recvpipe = None + + close = __del__ + + def send(self, data): + if not self.sendpipe: + raise IOError("I/O operation on a closed PipeOverUdp") + return self.os.write(self.sendpipe[1], data) + + def sendall(self, data): + i = 0 + while i < len(data): + i += self.send(data[i:]) + + def recv(self, bufsize): + if not self.recvpipe: + raise IOError("I/O operation on a closed PipeOverUdp") + return self.os.read(self.recvpipe[0], bufsize) + + def recvall(self, bufsize): + buf = [] + while bufsize > 0: + data = self.recv(bufsize) + buf.append(data) + bufsize -= len(data) + return ''.join(buf) + + def fileno(self): + if not self.recvpipe: + raise IOError("I/O operation on a closed PipeOverUdp") + return self.recvpipe[0] + + def ofileno(self): + if not self.sendpipe: + raise IOError("I/O operation on a closed PipeOverUdp") + return self.sendpipe[1] diff --git a/py/net/server/__init__.py b/py/net/server/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/py/net/server/httpserver.py b/py/net/server/httpserver.py new file mode 100644 index 000000000..e0f2dbf21 --- /dev/null +++ b/py/net/server/httpserver.py @@ -0,0 +1,42 @@ +import BaseHTTPServer +from pypeers import greensock2 +from pypeers.pipe.gsocket import GreenSocket + + +class GreenMixIn: + """Mix-in class to handle each request in a new greenlet.""" + + def process_request_greenlet(self, request, client_address): + """Same as in BaseServer but as a greenlet. + In addition, exception handling is done here. + """ + try: + self.finish_request(request, client_address) + self.close_request(request) + except: + self.handle_error(request, client_address) + self.close_request(request) + + def process_request(self, request, client_address): + """Start a new greenlet to process the request.""" + greensock2.autogreenlet(self.process_request_greenlet, + request, client_address) + + +class GreenHTTPServer(GreenMixIn, BaseHTTPServer.HTTPServer): + protocol_version = "HTTP/1.1" + + def server_bind(self): + self.socket = GreenSocket.fromsocket(self.socket) + BaseHTTPServer.HTTPServer.server_bind(self) + + +def test_simple(handler_class=None): + if handler_class is None: + from SimpleHTTPServer import SimpleHTTPRequestHandler + handler_class = SimpleHTTPRequestHandler + server_address = ('', 8000) + httpd = GreenHTTPServer(server_address, handler_class) + sa = httpd.socket.getsockname() + print "Serving HTTP on", sa[0], "port", sa[1], "..." + httpd.serve_forever() diff --git a/py/net/test/test_greenexecnet.py b/py/net/test/test_greenexecnet.py new file mode 100644 index 000000000..bfac4e2e3 --- /dev/null +++ b/py/net/test/test_greenexecnet.py @@ -0,0 +1,41 @@ +import py +from py.__.net.greenexecnet import * + +def test_simple(): + gw = PopenGateway() + channel = gw.remote_exec("x = channel.receive(); channel.send(x * 6)") + channel.send(7) + res = channel.receive() + assert res == 42 + +def test_ssh(): + py.test.skip("Bootstrapping") + gw = SshGateway('codespeak.net') + channel = gw.remote_exec(""" + import socket + channel.send(socket.gethostname()) + """) + res = channel.receive() + assert res.endswith('codespeak.net') + +def test_remote_error(): + gw = PopenGateway() + channel = gw.remote_exec("x = channel.receive(); channel.send(x + 1)") + channel.send("hello") + py.test.raises(RemoteError, channel.receive) + +def test_invalid_object(): + class X(object): + pass + gw = PopenGateway() + channel = gw.remote_exec("x = channel.receive(); channel.send(x + 1)") + channel.send(X()) + py.test.raises(RemoteError, channel.receive) + +def test_channel_over_channel(): + gw = PopenGateway() + chan1 = gw.newchannel() + channel = gw.remote_exec("chan1 = channel.receive(); chan1.send(42)") + channel.send(chan1) + res = chan1.receive() + assert res == 42 diff --git a/py/net/test/test_greensock2.py b/py/net/test/test_greensock2.py new file mode 100644 index 000000000..08760205f --- /dev/null +++ b/py/net/test/test_greensock2.py @@ -0,0 +1,213 @@ +import py +from socket import * +from py.__.net.greensock2 import * + +def test_meetingpoint(): + giv1, acc1 = meetingpoint() + giv2, acc2 = meetingpoint() + giv3, acc3 = meetingpoint() + + lst = [] + + def g1(): + lst.append(0) + x = acc2.accept() + assert x == 'hello' + lst.append(2) + giv1.give('world') + lst.append(5) + x = acc3.accept() + assert x == 'middle' + lst.append(6) + giv3.give('done') + + def g2(): + lst.append(1) + giv2.give('hello') + lst.append(3) + y = acc1.accept() + assert y == 'world' + lst.append(4) + + autogreenlet(g1) + autogreenlet(g2) + giv3.give('middle') + tag = acc3.accept() + assert tag == 'done' + assert lst == [0, 1, 2, 3, 4, 5, 6] + + +def test_producer(): + lst = [] + + def prod(n): + lst.append(1) + yield n + lst.append(2) + yield 87 + lst.append(3) + + def cons(): + lst.append(4) + accepter = producer(prod, 145) + lst.append(5) + lst.append(accepter.accept()) + lst.append(6) + lst.append(accepter.accept()) + lst.append(7) + try: + accepter.accept() + except Interrupted: + lst.append(8) + + g = autogreenlet(cons) + wait(g) + assert lst == [4, 5, 1, 145, 6, 2, 87, 7, 3, 8] + + +def test_timer(): + lst = [] + + def g1(): + sleep(0.1, g_1) + lst.append(1) + sleep(0.2, g_1) + lst.append(3) + + def g2(): + lst.append(0) + sleep(0.2, g_2) + lst.append(2) + sleep(0.2, g_2) + lst.append(4) + + g_1 = autogreenlet(g1) + g_2 = autogreenlet(g2) + wait(g_1) + wait(g_2) + assert lst == [0, 1, 2, 3, 4] + + +def test_socket(): + s1 = socket(AF_INET, SOCK_DGRAM) + s2 = socket(AF_INET, SOCK_DGRAM) + s1.bind(('', INADDR_ANY)) + s2.bind(('', INADDR_ANY)) + s1.connect(s2.getsockname()) + s2.connect(s1.getsockname()) + + lst = [] + + def g1(): + lst.append(0) + x = recv(s1, 5) + assert x == 'hello' + lst.append(3) + sendall(s1, 'world') + lst.append(4) + + def g2(): + lst.append(1) + sendall(s2, 'hello') + lst.append(2) + y = recv(s2, 5) + assert y == 'world' + lst.append(5) + + g_1 = autogreenlet(g1) + g_2 = autogreenlet(g2) + wait(g_1) + wait(g_2) + assert lst == [0, 1, 2, 3, 4, 5] + + +##def test_Queue(): + +## def g1(): +## lst.append(5) +## q.put(6) +## lst.append(7) +## q.put(8) +## lst.append(9) +## q.put(10) +## lst.append(11) +## q.put(12) # not used + +## def g2(): +## lst.append(1) +## lst.append(q.get()) +## lst.append(2) +## lst.append(q.get()) +## lst.append(3) +## lst.append(q.get()) +## lst.append(4) + +## q = Queue() +## lst = [] +## autogreenlet(g1) +## autogreenlet(g2) +## wait() +## assert lst == [5, 7, 9, 11, 1, 6, 2, 8, 3, 10, 4] + +## q = Queue() +## lst = [] +## autogreenlet(g2) +## autogreenlet(g1) +## wait() +## assert lst == [1, 5, 7, 9, 11, 6, 2, 8, 3, 10, 4] + + +##def test_Event(): + +## def g1(): +## assert not e.isSet() +## e.wait() +## assert not e.isSet() # no longer set +## lst.append(1) +## e.set() +## e.wait() +## lst.append(2) +## assert e.isSet() +## e.clear() +## assert not e.isSet() +## lst.append(0) +## e.set() +## lst.append(3) +## assert e.isSet() + +## def g2(): +## assert not e.isSet() +## lst.append(4) +## e.set() +## lst.append(7) +## e.clear() +## e.set() +## e.clear() +## assert not e.isSet() +## lst.append(5) +## e.wait() +## assert e.isSet() +## lst.append(6) + +## e = Event() +## lst = [] +## autogreenlet(g1) +## autogreenlet(g2) +## wait() +## assert lst == [4, 7, 5, 1, 2, 0, 3, 6] + + +##def test_Event_timeout(): +## def g1(): +## lst.append(5) +## e.wait(0.1) +## lst.append(e.isSet()) +## e.wait(60.0) +## lst.append(e.isSet()) +## lst = [] +## e = Event() +## autogreenlet(g1) +## sleep(0.5) +## e.set() +## wait() +## assert lst == [5, False, True] diff --git a/py/net/test/test_pipelayer.py b/py/net/test/test_pipelayer.py new file mode 100644 index 000000000..437f87d28 --- /dev/null +++ b/py/net/test/test_pipelayer.py @@ -0,0 +1,215 @@ +import os, random +from py.__.net.pipelayer import PipeLayer, pipe_over_udp, PipeOverUdp + +def test_simple(): + data1 = os.urandom(1000) + data2 = os.urandom(1000) + p1 = PipeLayer() + p2 = PipeLayer() + p2._dump_indent = 40 + p1.queue(data1) + p1.queue(data2) + recv = '' + while len(recv) < 2000: + raw = p1.encode(64) + assert raw is not None + res = p2.decode(raw) + assert res is not None + recv += res + assert recv == data1 + data2 + raw = p1.encode(64) + assert raw is None + +def test_stabilize(): + data1 = os.urandom(28) + p1 = PipeLayer() + p2 = PipeLayer() + p2._dump_indent = 40 + p1.queue(data1) + recv = '' + t = 0.0 + print + while True: + delay1 = p1.settime(t) + delay2 = p2.settime(t) + t += 0.100000001 + if delay1 is delay2 is None: + break + if delay1 == 0: + raw = p1.encode(10) + p1.dump('OUT', raw) + assert raw is not None + res = p2.decode(raw) + assert res is not None + recv += res + if delay2 == 0: + raw = p2.encode(10) + p2.dump('OUT', raw) + assert raw is not None + res = p1.decode(raw) + assert res == '' + assert recv == data1 + +def test_bidir(): + data1 = os.urandom(1000) + data2 = os.urandom(1000) + p1 = PipeLayer() + p2 = PipeLayer() + p2._dump_indent = 40 + p1.queue(data1) + p2.queue(data2) + recv = ['', ''] + while len(recv[0]) < 1000 or len(recv[1]) < 1000: + progress = False + for (me, other, i) in [(p1, p2, 1), (p2, p1, 0)]: + raw = me.encode(64) + if raw is not None: + res = other.decode(raw) + assert res is not None + recv[i] += res + if res: + progress = True + assert progress + assert recv[0] == data2 + assert recv[1] == data1 + raw = p1.encode(64) + assert raw is None + raw = p2.encode(64) + assert raw is None + +def test_with_loss(): + data1 = os.urandom(10000).encode('hex') + data2 = os.urandom(10000).encode('hex') + #data1 = '0123456789' + #data2 = 'ABCDEFGHIJ' + p1 = PipeLayer() + p2 = PipeLayer() + p2._dump_indent = 40 + p1.queue(data1) + p2.queue(data2) + recv = ['', ''] + time = 0 + active = 1 + while active: + active = 0 + time += 0.2 + #print '.' + exchange = [] + for (me, other, i) in [(p1, p2, 1), (p2, p1, 0)]: + to = me.settime(time) + packet = me.encode(12) + assert (packet is not None) == (to == 0) + if to is not None: + active = 1 + if to == 0: + exchange.append((packet, other, i)) + for (packet, other, i) in exchange: + if random.random() < 0.5: + pass # drop packet + else: + res = other.decode(packet) + assert res is not None + recv[i] += res + assert data2.startswith(recv[0]) + assert data1.startswith(recv[1]) + assert recv[0] == data2 + assert recv[1] == data1 + print time + +def test_massive_reordering(): + data1 = os.urandom(10000).encode('hex') + data2 = os.urandom(10000).encode('hex') + #data1 = '0123456789' + #data2 = 'ABCDEFGHIJ' + p1 = PipeLayer() + p2 = PipeLayer() + p2._dump_indent = 40 + p1.queue(data1) + p2.queue(data2) + recv = ['', ''] + time = 0 + active = 1 + exchange = [] + while active or exchange: + active = 0 + time += 0.2 + #print '.' + for (me, other, i) in [(p1, p2, 1), (p2, p1, 0)]: + to = me.settime(time) + packet = me.encode(12) + assert (packet is not None) == (to == 0) + if to is not None: + active = 1 + if to == 0: + exchange.append((packet, other, i)) + if random.random() < 0.02: + random.shuffle(exchange) + for (packet, other, i) in exchange: + res = other.decode(packet) + assert res is not None + recv[i] += res + exchange = [] + assert data2.startswith(recv[0]) + assert data1.startswith(recv[1]) + assert recv[0] == data2 + assert recv[1] == data1 + print time + +def udpsockpair(): + from socket import socket, AF_INET, SOCK_DGRAM, INADDR_ANY + s1 = socket(AF_INET, SOCK_DGRAM) + s2 = socket(AF_INET, SOCK_DGRAM) + s1.bind(('127.0.0.1', INADDR_ANY)) + s2.bind(('127.0.0.1', INADDR_ANY)) + s2.connect(s1.getsockname()) + s1.connect(s2.getsockname()) + return s1, s2 + +def test_pipe_over_udp(): + import thread + s1, s2 = udpsockpair() + + fd1 = os.open(__file__, os.O_RDONLY) + fd2 = os.open('test_pipelayer.py~copy', os.O_WRONLY|os.O_CREAT|os.O_TRUNC) + + thread.start_new_thread(pipe_over_udp, (s1, fd1)) + pipe_over_udp(s2, recv_fd=fd2, inactivity_timeout=2.5) + os.close(fd1) + os.close(fd2) + f = open(__file__, 'rb') + data1 = f.read() + f.close() + f = open('test_pipelayer.py~copy', 'rb') + data2 = f.read() + f.close() + assert data1 == data2 + os.unlink('test_pipelayer.py~copy') + +def test_PipeOverUdp(): + s1, s2 = udpsockpair() + p1 = PipeOverUdp(s1, timeout=0.2) + p2 = PipeOverUdp(s2, timeout=0.2) + p2.sendall('goodbye') + for k in range(10): + p1.sendall('hello world') + input = p2.recvall(11) + assert input == 'hello world' + input = p1.recvall(7) + assert input == 'goodbye' + + bigchunk1 = os.urandom(500000) + bigchunk2 = os.urandom(500000) + i1 = i2 = 0 + j1 = j2 = 0 + while j1 < len(bigchunk1) or j2 < len(bigchunk2): + i1 += p1.send(bigchunk1[i1:i1+512]) + i2 += p2.send(bigchunk2[i2:i2+512]) + data = p1.recv(512) + assert data == bigchunk2[j2:j2+len(data)] + j2 += len(data) + data = p2.recv(512) + assert data == bigchunk1[j1:j1+len(data)] + j1 += len(data) + #print i1, i2, j1, j2 + p1.close() + p2.close()