[svn r39973] Add a (stolen almost directly from arigo/hack) network layer

based on top of greenlets. Needs some tweaking, so it's not exposed
as a py.xxx, but rather py.__.net

--HG--
branch : trunk
This commit is contained in:
fijal 2007-03-06 10:07:53 +01:00
parent 40b28ca0fe
commit f5664d4405
15 changed files with 1825 additions and 0 deletions

0
py/net/__init__.py Normal file
View File

203
py/net/greenexecnet.py Normal file
View File

@ -0,0 +1,203 @@
import sys, os, py, inspect
from py.__.net import greensock2
from py.__.net.msgstruct import message, decodemessage
MSG_REMOTE_EXEC = 'r'
MSG_OBJECT = 'o'
MSG_ERROR = 'e'
MSG_CHAN_CLOSE = 'c'
MSG_FATAL = 'f'
MSG_CHANNEL = 'n'
class Gateway(object):
def __init__(self, input, output, is_remote=False):
self.input = input
self.output = output
self.nextchannum = int(is_remote)
self.receivers = {}
self.greenlet = greensock2.autogreenlet(self.serve_forever, is_remote)
def remote_exec(self, remote_source):
remote_source = py.code.Source(remote_source)
chan = self.newchannel()
msg = message(MSG_REMOTE_EXEC, chan.n, str(remote_source))
self.output.sendall(msg)
return chan
def newchannel(self):
n = self.nextchannum
self.nextchannum += 2
return self.make_channel(n)
def make_channel(self, n):
giver, accepter = greensock2.meetingpoint()
assert n not in self.receivers
self.receivers[n] = giver
return Channel(self, n, accepter)
def serve_forever(self, is_remote=False):
try:
buffer = ""
while 1:
msg, buffer = decodemessage(buffer)
if msg is None:
buffer += self.input.recv(16384)
else:
handler = HANDLERS[msg[0]]
handler(self, *msg[1:])
except greensock2.greenlet.GreenletExit:
raise
except:
if is_remote:
msg = message(MSG_FATAL, format_error(*sys.exc_info()))
self.output.sendall(msg)
else:
raise
def msg_remote_exec(self, n, source):
def do_execute(channel):
try:
d = {'channel': channel}
exec source in d
except:
channel.report_error(*sys.exc_info())
else:
channel.close()
greensock2.autogreenlet(do_execute, self.make_channel(n))
def msg_object(self, n, objrepr):
obj = eval(objrepr)
if n in self.receivers:
self.receivers[n].give_queued(obj)
def msg_error(self, n, s):
if n in self.receivers:
self.receivers[n].give_queued(RemoteError(s))
self.receivers[n].close()
del self.receivers[n]
def msg_chan_close(self, n):
if n in self.receivers:
self.receivers[n].close()
del self.receivers[n]
def msg_channel(self, n, m):
if n in self.receivers:
self.receivers[n].give_queued(self.make_channel(m))
def msg_fatal(self, s):
raise RemoteError(s)
HANDLERS = {
MSG_REMOTE_EXEC: Gateway.msg_remote_exec,
MSG_OBJECT: Gateway.msg_object,
MSG_ERROR: Gateway.msg_error,
MSG_CHAN_CLOSE: Gateway.msg_chan_close,
MSG_CHANNEL: Gateway.msg_channel,
MSG_FATAL: Gateway.msg_fatal,
}
class Channel(object):
def __init__(self, gw, n, accepter):
self.gw = gw
self.n = n
self.accepter = accepter
def send(self, obj):
if isinstance(obj, Channel):
assert obj.gw is self.gw
msg = message(MSG_CHANNEL, self.n, obj.n)
else:
msg = message(MSG_OBJECT, self.n, repr(obj))
self.gw.output.sendall(msg)
def receive(self):
obj = self.accepter.accept()
if isinstance(obj, RemoteError):
raise obj
else:
return obj
def close(self):
try:
self.gw.output.sendall(message(MSG_CHAN_CLOSE, self.n))
except OSError:
pass
def report_error(self, exc_type, exc_value, exc_traceback=None):
s = format_error(exc_type, exc_value, exc_traceback)
try:
self.gw.output.sendall(message(MSG_ERROR, self.n, s))
except OSError:
pass
class RemoteError(Exception):
pass
def format_error(exc_type, exc_value, exc_traceback=None):
import traceback, StringIO
s = StringIO.StringIO()
traceback.print_exception(exc_type, exc_value, exc_traceback, file=s)
return s.getvalue()
class PopenCmdGateway(Gateway):
action = "exec input()"
def __init__(self, cmdline):
from py.__.net.pipe.fd import FDInput, FDOutput
child_in, child_out = os.popen2(cmdline, 't', 0)
fdin = FDInput(child_out.fileno(), child_out.close)
fdout = FDOutput(child_in.fileno(), child_in.close)
fdout.sendall(self.get_bootstrap_code())
super(PopenCmdGateway, self).__init__(input = fdin, output = fdout)
def get_bootstrap_code():
# XXX assumes that the py lib is installed on the remote side
src = []
src.append('from py.__.net import greenexecnet')
src.append('greenexecnet.PopenCmdGateway.run_server()')
src.append('')
return '%r\n' % ('\n'.join(src),)
get_bootstrap_code = staticmethod(get_bootstrap_code)
def run_server():
from py.__.net.pipe.fd import FDInput, FDOutput
gw = Gateway(input = FDInput(os.dup(0)),
output = FDOutput(os.dup(1)),
is_remote = True)
# for now, ignore normal I/O
fd = os.open('/dev/null', os.O_RDWR)
os.dup2(fd, 0)
os.dup2(fd, 1)
os.close(fd)
greensock2.wait(gw.greenlet)
run_server = staticmethod(run_server)
class PopenGateway(PopenCmdGateway):
def __init__(self, python=sys.executable):
cmdline = '"%s" -u -c "%s"' % (python, self.action)
super(PopenGateway, self).__init__(cmdline)
class SshGateway(PopenCmdGateway):
def __init__(self, sshaddress, remotepython='python', identity=None):
self.sshaddress = sshaddress
remotecmd = '%s -u -c "%s"' % (remotepython, self.action)
cmdline = [sshaddress, remotecmd]
# XXX Unix style quoting
for i in range(len(cmdline)):
cmdline[i] = "'" + cmdline[i].replace("'", "'\\''") + "'"
cmd = 'ssh -C'
if identity is not None:
cmd += ' -i %s' % (identity,)
cmdline.insert(0, cmd)
super(SshGateway, self).__init__(' '.join(cmdline))
##f = open('LOG', 'a')
##import os; print >> f, '[%d] READY' % (os.getpid(),)
##f.close()

526
py/net/greensock2.py Normal file
View File

@ -0,0 +1,526 @@
import os, sys
try:
from stackless import greenlet
except ImportError:
import py
greenlet = py.magic.greenlet
from collections import deque
from select import select as _select
from time import time as _time
from heapq import heappush, heappop, heapify
TRACE = True
def meetingpoint():
senders = deque() # list of senders, or [None] if Giver closed
receivers = deque() # list of receivers, or [None] if Receiver closed
return (MeetingPointGiver(senders, receivers),
MeetingPointAccepter(senders, receivers))
def producer(func, *args, **kwds):
iterable = func(*args, **kwds)
giver, accepter = meetingpoint()
def autoproducer():
try:
giver.wait()
for obj in iterable:
giver.give(obj)
giver.wait()
finally:
giver.close()
autogreenlet(autoproducer)
return accepter
class MeetingPointBase(object):
def __init__(self, senders, receivers):
self.senders = senders
self.receivers = receivers
self.g_active = g_active
def close(self):
while self.senders:
if self.senders[0] is None:
break
packet = self.senders.popleft()
if packet.g_from is not None:
self.g_active.append(packet.g_from)
else:
self.senders.append(None)
while self.receivers:
if self.receivers[0] is None:
break
other = self.receivers.popleft()
self.g_active.append(other)
else:
self.receivers.append(None)
__del__ = close
def closed(self):
return self.receivers and self.receivers[0] is None
class MeetingPointGiver(MeetingPointBase):
def give(self, obj):
if self.receivers:
if self.receivers[0] is None:
raise MeetingPointClosed
other = self.receivers.popleft()
g_active.append(g_getcurrent())
packet = _Packet()
packet.payload = obj
other.switch(packet)
if not packet.accepted:
raise Interrupted("packet not accepted")
else:
packet = _Packet()
packet.g_from = g_getcurrent()
packet.payload = obj
try:
self.senders.append(packet)
g_dispatcher.switch()
if not packet.accepted:
raise Interrupted("packet not accepted")
except:
remove_by_id(self.senders, packet)
raise
def give_queued(self, obj):
if self.receivers:
self.give(obj)
else:
packet = _Packet()
packet.g_from = None
packet.payload = obj
self.senders.append(packet)
def ready(self):
return self.receivers and self.receivers[0] is not None
def wait(self):
if self.receivers:
if self.receivers[0] is None:
raise MeetingPointClosed
else:
packet = _Packet()
packet.g_from = g_getcurrent()
packet.empty = True
self.senders.append(packet)
try:
g_dispatcher.switch()
if not packet.accepted:
raise Interrupted("no accepter found")
except:
remove_by_id(self.senders, packet)
raise
def trigger(self):
if self.ready():
self.give(None)
class MeetingPointAccepter(MeetingPointBase):
def accept(self):
while self.senders:
if self.senders[0] is None:
raise MeetingPointClosed
packet = self.senders.popleft()
packet.accepted = True
if packet.g_from is not None:
g_active.append(packet.g_from)
if not packet.empty:
return packet.payload
g = g_getcurrent()
self.receivers.append(g)
try:
packet = g_dispatcher.switch()
except:
remove_by_id(self.receivers, g)
raise
if type(packet) is not _Packet:
remove_by_id(self.receivers, g)
raise Interrupted("no packet")
packet.accepted = True
return packet.payload
def ready(self):
for packet in self.senders:
if packet is None:
return False
if not packet.empty:
return True
return False
def wait_trigger(self, timeout=None, default=None):
if timeout is None:
return self.accept()
else:
timer = Timer(timeout)
try:
try:
return self.accept()
finally:
timer.stop()
except Interrupted:
if timer.finished:
return default
raise
class MeetingPointClosed(greenlet.GreenletExit):
pass
class Interrupted(greenlet.GreenletExit):
pass
class ConnexionClosed(greenlet.GreenletExit):
pass
class _Packet(object):
empty = False
accepted = False
def remove_by_id(d, obj):
lst = [x for x in d if x is not obj]
d.clear()
d.extend(lst)
# ____________________________________________________________
##class Queue(object):
## def __init__(self):
## self.giver, self.accepter = meetingpoint()
## self.pending = deque()
## def put(self, item): # preserve the caller's atomicity
## self.pending.append(item)
## if self.accepter.ready():
## self.accepter.accept()
## def get(self, block=True):
## if self.pending:
## return self.pending.popleft()
## elif block:
## self.giver.give(None)
## return self.pending.popleft()
## else:
## raise Empty
##class Empty(Interrupted):
## pass
##class Event(object):
## def __init__(self):
## self.giver, self.accepter = meetingpoint()
## clear = __init__
## def isSet(self):
## return self.accepter is None
## def set(self): # preserve the caller's atomicity
## if self.accepter is not None:
## accepter = self.accepter
## self.giver = self.accepter = None
## while accepter.ready(): # wake up all waiters
## accepter.accept()
## def wait(self, timeout=None):
## if self.accepter is not None:
## if timeout is None:
## self.giver.give(None)
## else:
## timer = Timer(timeout)
## try:
## try:
## self.giver.give(None)
## except Interrupted:
## pass
## finally:
## timer.stop()
##class Semaphore(object):
## def __init__(self, value=1):
## self.giver, self.accepter = meetingpoint()
## for i in range(value):
## self.release()
## def acquire(self, blocking=True):
## if blocking or self.accepter.ready():
## return self.accepter.accept()
## else:
## return False
## def release(self):
## autogreenlet(self.giver.put, True)
# ____________________________________________________________
def wait_input(sock):
_register(g_iwtd, sock)
def recv(sock, bufsize):
wait_input(sock)
buf = sock.recv(bufsize)
if not buf:
raise ConnexionClosed("inbound connexion closed")
return buf
def recvall(sock, bufsize):
in_front = False
data = []
while bufsize > 0:
_register(g_iwtd, sock, in_front=in_front)
buf = sock.recv(bufsize)
if not buf:
raise ConnexionClosed("inbound connexion closed")
data.append(buf)
bufsize -= len(buf)
in_front = True
return ''.join(data)
def read(fd, bufsize):
assert fd >= 0
wait_input(fd)
buf = os.read(fd, bufsize)
if not buf:
raise ConnexionClosed("inbound connexion closed")
return buf
def readall(fd, bufsize):
assert fd >= 0
in_front = False
data = []
while bufsize > 0:
_register(g_iwtd, fd, in_front=in_front)
buf = os.read(fd, bufsize)
if not buf:
raise ConnexionClosed("inbound connexion closed")
data.append(buf)
bufsize -= len(buf)
in_front = True
return ''.join(data)
def wait_output(sock):
_register(g_owtd, sock)
def sendall(sock, buffer):
in_front = False
while buffer:
_register(g_owtd, sock, in_front=in_front)
count = sock.send(buffer)
buffer = buffer[count:]
in_front = True
def writeall(fd, buffer):
assert fd >= 0
in_front = False
while buffer:
_register(g_owtd, fd, in_front=in_front)
count = os.write(fd, buffer)
if not count:
raise ConnexionClosed("outbound connexion closed")
buffer = buffer[count:]
in_front = True
def sleep(duration, *greenlets):
timer = Timer(duration)
try:
wait(*greenlets)
finally:
ok = timer.finished
timer.stop()
if not ok:
raise Interrupted
def _wait():
g_dispatcher.switch()
def wait(*greenlets):
assert greenlets#, "should not wait without events to wait on"
current = g_getcurrent()
for g in greenlets:
if g in g_waiters:
g_waiters[g].append(current)
else:
g_waiters[g] = [current]
g_dispatcher.switch()
class Timer(object):
started = False
finished = False
def __init__(self, timeout):
self.g = g_getcurrent()
entry = (_time() + timeout, self)
if g_timers_mixed:
g_timers.append(entry)
else:
heappush(g_timers, entry)
def stop(self):
global g_timers_mixed
if not self.finished:
for i, (activationtime, timer) in enumerate(g_timers):
if timer is self:
g_timers[i] = g_timers[-1]
g_timers.pop()
g_timers_mixed = True
break
self.finished = True
# ____________________________________________________________
class autogreenlet(greenlet):
def __init__(self, function, *args, **kwds):
self.parent = g_dispatcher
self.function = function
self.args = args
self.kwds = kwds
g_active.append(self)
def run(self):
self.trace("start")
try:
self.function(*self.args, **self.kwds)
except Exception, e:
self.trace("stop (%s%s)", e.__class__.__name__,
str(e) and (': '+str(e)))
raise
else:
self.trace("done")
def __repr__(self):
## args = ', '.join([repr(s) for s in self.args] +
## ['%s=%r' % keyvalue for keyvalue in self.kwds.items()])
## return '<autogreenlet %s(%s)>' % (self.function.__name__, args)
return '<autogreenlet %s at %s>' % (self.function.__name__,
hex(id(self)))
def trace(self, msg, *args):
if TRACE:
print self, msg % args
def interrupt(self):
self.throw(Interrupted)
g_active = deque()
g_iwtd = {}
g_owtd = {}
g_timers = []
g_timers_mixed = False
g_getcurrent = greenlet.getcurrent
def _register(g_wtd, sock, in_front=False):
d = g_wtd.setdefault(sock, deque())
g = g_getcurrent()
if in_front:
d.appendleft(g)
else:
d.append(g)
try:
if g_dispatcher.switch() is not g_wtd:
raise Interrupted
except:
remove_by_id(d, g)
raise
##def _unregister_timer():
## ...
def check_dead_greenlets(mapping):
to_remove = [i for i, v in mapping.items() if not v]
for k in to_remove:
del mapping[k]
def check_waiters(active):
if active in g_waiters:
for g in g_waiters[active]:
g.switch()
del g_waiters[active]
def dispatcher_mainloop():
global g_timers_mixed
while 1:
try:
while g_active:
print 'active:', g_active[0]
active = g_active.popleft()
active.switch()
if active.dead:
check_waiters(active)
del active
if g_timers:
if g_timers_mixed:
heapify(g_timers)
g_timers_mixed = False
activationtime, timer = g_timers[0]
delay = activationtime - _time()
if delay <= 0.0:
if timer.started:
heappop(g_timers)
#print 'timeout:', g
timer.finished = True
timer.g.switch()
if timer.g.dead:
check_waiters(timer.g)
continue
delay = 0.0
timer.started = True
else:
check_dead_greenlets(g_iwtd)
check_dead_greenlets(g_owtd)
if not (g_iwtd or g_owtd):
# nothing to do, switch to the main greenlet
g_dispatcher.parent.switch()
continue
delay = None
print 'selecting...', g_iwtd.keys(), g_owtd.keys(), delay
iwtd, owtd, _ = _select(g_iwtd.keys(), g_owtd.keys(), [], delay)
print 'done'
for s in owtd:
if s in g_owtd:
d = g_owtd[s]
#print 'owtd:', d[0]
g = d.popleft()
if not d:
try:
del g_owtd[s]
except KeyError:
pass
g.switch(g_owtd)
if g.dead:
check_waiters(g)
for s in iwtd:
if s in g_iwtd:
d = g_iwtd[s]
#print 'iwtd:', d[0]
g = d.popleft()
if not d:
try:
del g_iwtd[s]
except KeyError:
pass
g.switch(g_iwtd)
if g.dead:
check_waiters(g)
except:
import sys
g_dispatcher.parent.throw(*sys.exc_info())
g_dispatcher = greenlet(dispatcher_mainloop)
g_waiters = {}

29
py/net/msgstruct.py Normal file
View File

@ -0,0 +1,29 @@
from struct import pack, unpack, calcsize
def message(tp, *values):
strtype = type('')
typecodes = ['']
for v in values:
if type(v) is strtype:
typecodes.append('%ds' % len(v))
elif 0 <= v < 256:
typecodes.append('B')
else:
typecodes.append('l')
typecodes = ''.join(typecodes)
assert len(typecodes) < 256
return pack(("!B%dsc" % len(typecodes)) + typecodes,
len(typecodes), typecodes, tp, *values)
def decodemessage(data):
if data:
limit = ord(data[0]) + 1
if len(data) >= limit:
typecodes = "!c" + data[1:limit]
end = limit + calcsize(typecodes)
if len(data) >= end:
return unpack(typecodes, data[limit:end]), data[end:]
#elif end > 1000000:
# raise OverflowError
return None, data

0
py/net/pipe/__init__.py Normal file
View File

38
py/net/pipe/common.py Normal file
View File

@ -0,0 +1,38 @@
import greensock2
from pypeers.tool import log
class BufferedInput(object):
in_buf = ''
def recv(self, bufsize):
self.wait_input()
buf = self.in_buf[:bufsize]
self.in_buf = self.in_buf[bufsize:]
return buf
def recvall(self, bufsize):
result = []
while bufsize > 0:
buf = self.recv(bufsize)
result.append(buf)
bufsize -= len(buf)
return ''.join(result)
# ____________________________________________________________
def forwardpipe(s1, s2):
try:
while 1:
s2.wait_output()
buffer = s1.recv(32768)
log('[%r -> %r] %r', s1, s2, buffer)
s2.sendall(buffer)
del buffer
finally:
s2.shutdown_wr()
s1.shutdown_rd()
def linkpipes(s1, s2):
greensock2.autogreenlet(forwardpipe, s1, s2)
greensock2.autogreenlet(forwardpipe, s2, s1)

69
py/net/pipe/fd.py Normal file
View File

@ -0,0 +1,69 @@
import os
from py.__.net import greensock2
class FDInput(object):
def __init__(self, read_fd, close=True):
self.read_fd = read_fd
self._close = close # a flag or a callback
def shutdown_rd(self):
fd = self.read_fd
if fd is not None:
self.read_fd = None
close = self._close
if close:
self._close = False
if close == True:
os.close(fd)
else:
close()
__del__ = shutdown_rd
def wait_input(self):
greensock2.wait_input(self.read_fd)
def recv(self, bufsize):
## f = open('LOG', 'a')
## import os; print >> f, '[%d] RECV' % (os.getpid(),)
## f.close()
res = greensock2.read(self.read_fd, bufsize)
## f = open('LOG', 'a')
## import os; print >> f, '[%d] RECV %r' % (os.getpid(), res)
## f.close()
return res
def recvall(self, bufsize):
return greensock2.readall(self.read_fd, bufsize)
class FDOutput(object):
def __init__(self, write_fd, close=True):
self.write_fd = write_fd
self._close = close # a flag or a callback
def shutdown_wr(self):
fd = self.write_fd
if fd is not None:
self.write_fd = None
close = self._close
if close:
self._close = False
if close == True:
os.close(fd)
else:
close()
__del__ = shutdown_wr
def wait_output(self):
greensock2.wait_output(self.write_fd)
def sendall(self, buffer):
## f = open('LOG', 'a')
## import os; print >> f, '[%d] %r' % (os.getpid(), buffer)
## f.close()
greensock2.writeall(self.write_fd, buffer)

114
py/net/pipe/gsocket.py Normal file
View File

@ -0,0 +1,114 @@
import greensock2
import socket, errno, os
error = socket.error
class _delegate(object):
def __init__(self, methname):
self.methname = methname
def __get__(self, obj, typ=None):
result = getattr(obj._s, self.methname)
setattr(obj, self.methname, result)
return result
class GreenSocket(object):
def __init__(self, family = socket.AF_INET,
type = socket.SOCK_STREAM,
proto = 0):
self._s = socket.socket(family, type, proto)
self._s.setblocking(False)
def fromsocket(cls, s):
if isinstance(s, GreenSocket):
s = s._s
result = GreenSocket.__new__(cls)
result._s = s
s.setblocking(False)
return result
fromsocket = classmethod(fromsocket)
def accept(self):
while 1:
try:
s, addr = self._s.accept()
break
except error, e:
import pdb;pdb.set_trace()
if e.args[0] not in (errno.EAGAIN, errno.EWOULDBLOCK):
raise
self.wait_input()
return self.fromsocket(s), addr
bind = _delegate("bind")
close = _delegate("close")
def connect(self, addr):
err = self.connect_ex(addr)
if err:
raise error(err, os.strerror(err))
def connect_ex(self, addr):
err = self._s.connect_ex(addr)
if err == errno.EINPROGRESS:
greensock2.wait_output(self._s)
err = self._s.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
return err
#XXX dup
fileno = _delegate("fileno")
getpeername = _delegate("getpeername")
getsockname = _delegate("getsockname")
getsockopt = _delegate("getsockopt")
listen = _delegate("listen")
def makefile(self, mode='r', bufsize=-1):
# hack, but reusing the internal socket._fileobject should just work
return socket._fileobject(self, mode, bufsize)
def recv(self, bufsize):
return greensock2.recv(self._s, bufsize)
def recvall(self, bufsize):
return greensock2.recvall(self._s, bufsize)
def recvfrom(self, bufsize):
self.wait_input()
buf, addr = self._s.recvfrom(bufsize)
if not buf:
raise ConnexionClosed("inbound connexion closed")
return buf, addr
def send(self, data):
self.wait_output()
return self._s.send(data)
def sendto(self, data, addr):
self.wait_output()
return self._s.sendto(data, addr)
def sendall(self, data):
greensock2.sendall(self._s, data)
setsockopt = _delegate("setsockopt")
shutdown = _delegate("shutdown")
def shutdown_rd(self):
try:
self._s.shutdown(socket.SHUT_RD)
except error:
pass
def shutdown_wr(self):
try:
self._s.shutdown(socket.SHUT_WR)
except error:
pass
def wait_input(self):
greensock2.wait_input(self._s)
def wait_output(self):
greensock2.wait_output(self._s)

29
py/net/pipe/mp.py Normal file
View File

@ -0,0 +1,29 @@
from pypeers.stream.common import BufferedInput
class MeetingPointInput(BufferedInput):
def __init__(self, accepter):
self.accepter = accepter
def wait_input(self):
while not self.in_buf:
self.in_buf = self.accepter.accept()
def shutdown_rd(self):
self.accepter.close()
class MeetingPointOutput(BufferedInput):
def __init__(self, giver):
self.giver = giver
def wait_output(self):
self.giver.wait()
def sendall(self, buffer):
self.giver.give(buffer)
def shutdown_wr(self):
self.giver.close()

306
py/net/pipelayer.py Normal file
View File

@ -0,0 +1,306 @@
#import os
import struct
from collections import deque
class InvalidPacket(Exception):
pass
FLAG_NAK1 = 0xE0
FLAG_NAK = 0xE1
FLAG_REG = 0xE2
FLAG_CFRM = 0xE3
FLAG_RANGE_START = 0xE0
FLAG_RANGE_STOP = 0xE4
max_old_packets = 256 # must be <= 256
class PipeLayer(object):
timeout = 1
headersize = 4
def __init__(self):
#self.localid = os.urandom(4)
#self.remoteid = None
self.cur_time = 0
self.out_queue = deque()
self.out_nextseqid = 0
self.out_nextrepeattime = None
self.in_nextseqid = 0
self.in_outoforder = {}
self.out_oldpackets = deque()
self.out_flags = FLAG_REG
self.out_resend = 0
self.out_resend_skip = False
def queue(self, data):
if data:
self.out_queue.appendleft(data)
def queue_size(self):
total = 0
for data in self.out_queue:
total += len(data)
return total
def in_sync(self):
return not self.out_queue and self.out_nextrepeattime is None
def settime(self, curtime):
self.cur_time = curtime
if self.out_queue:
if len(self.out_oldpackets) < max_old_packets:
return 0 # more data to send now
if self.out_nextrepeattime is not None:
return max(0, self.out_nextrepeattime - curtime)
else:
return None
def encode(self, maxlength):
#print ' '*self._dump_indent, '--- OUTQ', self.out_resend, self.out_queue
if len(self.out_oldpackets) >= max_old_packets:
# congestion, stalling
payload = 0
else:
payload = maxlength - 4
if payload <= 0:
raise ValueError("encode(): buffer too small")
if (self.out_nextrepeattime is not None and
self.out_nextrepeattime <= self.cur_time):
# no ACK received so far, send a packet (possibly empty)
if not self.out_queue:
payload = 0
else:
if not self.out_queue: # no more data to send
return None
if payload == 0: # congestion
return None
# prepare a packet
seqid = self.out_nextseqid
flags = self.out_flags
self.out_flags = FLAG_REG # clear out the flags for the next time
if payload > 0:
self.out_nextseqid = (seqid + 1) & 0xFFFF
data = self.out_queue.pop()
packetlength = len(data)
if self.out_resend > 0:
if packetlength > payload:
raise ValueError("XXX need constant buffer size for now")
self.out_resend -= 1
if self.out_resend_skip:
if self.out_resend > 0:
self.out_queue.pop()
self.out_resend -= 1
self.out_nextseqid = (seqid + 2) & 0xFFFF
self.out_resend_skip = False
packetpayload = data
else:
packet = []
while packetlength <= payload:
packet.append(data)
if not self.out_queue:
break
data = self.out_queue.pop()
packetlength += len(data)
else:
rest = len(data) + payload - packetlength
packet.append(data[:rest])
self.out_queue.append(data[rest:])
packetpayload = ''.join(packet)
self.out_oldpackets.appendleft(packetpayload)
#print ' '*self._dump_indent, '--- OLDPK', self.out_oldpackets
else:
# a pure ACK packet, no payload
if self.out_oldpackets and flags == FLAG_REG:
flags = FLAG_CFRM
packetpayload = ''
packet = struct.pack("!BBH", flags,
self.in_nextseqid & 0xFF,
seqid) + packetpayload
if self.out_oldpackets:
self.out_nextrepeattime = self.cur_time + self.timeout
else:
self.out_nextrepeattime = None
#self.dump('OUT', packet)
return packet
def decode(self, rawdata):
if len(rawdata) < 4:
raise InvalidPacket
#print ' '*self._dump_indent, '------ out %d (+%d) in %d' % (self.out_nextseqid, self.out_resend, self.in_nextseqid)
#self.dump('IN ', rawdata)
in_flags, ack_seqid, in_seqid = struct.unpack("!BBH", rawdata[:4])
if not (FLAG_RANGE_START <= in_flags < FLAG_RANGE_STOP):
raise InvalidPacket
in_diff = (in_seqid - self.in_nextseqid ) & 0xFFFF
ack_diff = (self.out_nextseqid + self.out_resend - ack_seqid) & 0xFF
if in_diff >= max_old_packets:
return '' # invalid, but can occur as a late repetition
if ack_diff != len(self.out_oldpackets):
# forget all acknowledged packets
if ack_diff > len(self.out_oldpackets):
return '' # invalid, but can occur with packet reordering
while len(self.out_oldpackets) > ack_diff:
#print ' '*self._dump_indent, '--- POP', repr(self.out_oldpackets[-1])
self.out_oldpackets.pop()
if self.out_oldpackets:
self.out_nextrepeattime = self.cur_time + self.timeout
else:
self.out_nextrepeattime = None # all packets ACKed
if in_flags == FLAG_NAK or in_flags == FLAG_NAK1:
# this is a NAK: resend the old packets as far as they've not
# also been ACK'ed in the meantime (can occur with reordering)
while self.out_resend < len(self.out_oldpackets):
self.out_queue.append(self.out_oldpackets[self.out_resend])
self.out_resend += 1
self.out_nextseqid = (self.out_nextseqid - 1) & 0xFFFF
#print ' '*self._dump_indent, '--- REP', self.out_nextseqid, repr(self.out_queue[-1])
self.out_resend_skip = in_flags == FLAG_NAK1
elif in_flags == FLAG_CFRM:
# this is a CFRM: request for confirmation
self.out_nextrepeattime = self.cur_time
# receive this packet's payload if it is the next in the sequence
if in_diff == 0:
if len(rawdata) > 4:
#print ' '*self._dump_indent, 'RECV ', self.in_nextseqid, repr(rawdata[4:])
self.in_nextseqid = (self.in_nextseqid + 1) & 0xFFFF
result = [rawdata[4:]]
while self.in_nextseqid in self.in_outoforder:
result.append(self.in_outoforder.pop(self.in_nextseqid))
self.in_nextseqid = (self.in_nextseqid + 1) & 0xFFFF
return ''.join(result)
else:
# we missed at least one intermediate packet: send a NAK
if len(rawdata) > 4:
self.in_outoforder[in_seqid] = rawdata[4:]
if ((self.in_nextseqid + 1) & 0xFFFF) in self.in_outoforder:
self.out_flags = FLAG_NAK1
else:
self.out_flags = FLAG_NAK
self.out_nextrepeattime = self.cur_time
return ''
_dump_indent = 0
def dump(self, dir, rawdata):
in_flags, ack_seqid, in_seqid = struct.unpack("!BBH", rawdata[:4])
print ' ' * self._dump_indent, dir,
if in_flags == FLAG_NAK:
print 'NAK',
elif in_flags == FLAG_NAK1:
print 'NAK1',
elif in_flags == FLAG_CFRM:
print 'CFRM',
#print ack_seqid, in_seqid, '(%d bytes)' % (len(rawdata)-4,)
print ack_seqid, in_seqid, repr(rawdata[4:])
def pipe_over_udp(udpsock, send_fd=-1, recv_fd=-1,
timeout=1.0, inactivity_timeout=None):
"""Example: send all data showing up in send_fd over the given UDP
socket, and write incoming data into recv_fd. The send_fd and
recv_fd are plain file descriptors. When an EOF is read from
send_fd, this function returns (after making sure that all data was
received by the remote side).
"""
import os
from select import select
from time import time
p = PipeLayer()
p.timeout = timeout
iwtdlist = [udpsock]
if send_fd >= 0:
iwtdlist.append(send_fd)
running = True
while running or not p.in_sync():
delay = delay1 = p.settime(time())
if delay is None:
delay = inactivity_timeout
iwtd, owtd, ewtd = select(iwtdlist, [], [], delay)
if iwtd:
if send_fd in iwtd:
data = os.read(send_fd, 1500 - p.headersize)
if not data:
# EOF
iwtdlist.remove(send_fd)
running = False
else:
#print 'queue', len(data)
p.queue(data)
if udpsock in iwtd:
packet = udpsock.recv(65535)
#print 'decode', len(packet)
p.settime(time())
data = p.decode(packet)
i = 0
while i < len(data):
i += os.write(recv_fd, data[i:])
elif delay1 is None:
break # long inactivity
p.settime(time())
packet = p.encode(1500)
if packet:
#print 'send', len(packet)
#if os.urandom(1) >= '\x08': # emulate packet losses
udpsock.send(packet)
class PipeOverUdp(object):
def __init__(self, udpsock, timeout=1.0):
import thread, os
self.os = os
self.sendpipe = os.pipe()
self.recvpipe = os.pipe()
thread.start_new_thread(pipe_over_udp, (udpsock,
self.sendpipe[0],
self.recvpipe[1],
timeout))
def __del__(self):
os = self.os
if self.sendpipe:
os.close(self.sendpipe[0])
os.close(self.sendpipe[1])
self.sendpipe = None
if self.recvpipe:
os.close(self.recvpipe[0])
os.close(self.recvpipe[1])
self.recvpipe = None
close = __del__
def send(self, data):
if not self.sendpipe:
raise IOError("I/O operation on a closed PipeOverUdp")
return self.os.write(self.sendpipe[1], data)
def sendall(self, data):
i = 0
while i < len(data):
i += self.send(data[i:])
def recv(self, bufsize):
if not self.recvpipe:
raise IOError("I/O operation on a closed PipeOverUdp")
return self.os.read(self.recvpipe[0], bufsize)
def recvall(self, bufsize):
buf = []
while bufsize > 0:
data = self.recv(bufsize)
buf.append(data)
bufsize -= len(data)
return ''.join(buf)
def fileno(self):
if not self.recvpipe:
raise IOError("I/O operation on a closed PipeOverUdp")
return self.recvpipe[0]
def ofileno(self):
if not self.sendpipe:
raise IOError("I/O operation on a closed PipeOverUdp")
return self.sendpipe[1]

View File

View File

@ -0,0 +1,42 @@
import BaseHTTPServer
from pypeers import greensock2
from pypeers.pipe.gsocket import GreenSocket
class GreenMixIn:
"""Mix-in class to handle each request in a new greenlet."""
def process_request_greenlet(self, request, client_address):
"""Same as in BaseServer but as a greenlet.
In addition, exception handling is done here.
"""
try:
self.finish_request(request, client_address)
self.close_request(request)
except:
self.handle_error(request, client_address)
self.close_request(request)
def process_request(self, request, client_address):
"""Start a new greenlet to process the request."""
greensock2.autogreenlet(self.process_request_greenlet,
request, client_address)
class GreenHTTPServer(GreenMixIn, BaseHTTPServer.HTTPServer):
protocol_version = "HTTP/1.1"
def server_bind(self):
self.socket = GreenSocket.fromsocket(self.socket)
BaseHTTPServer.HTTPServer.server_bind(self)
def test_simple(handler_class=None):
if handler_class is None:
from SimpleHTTPServer import SimpleHTTPRequestHandler
handler_class = SimpleHTTPRequestHandler
server_address = ('', 8000)
httpd = GreenHTTPServer(server_address, handler_class)
sa = httpd.socket.getsockname()
print "Serving HTTP on", sa[0], "port", sa[1], "..."
httpd.serve_forever()

View File

@ -0,0 +1,41 @@
import py
from py.__.net.greenexecnet import *
def test_simple():
gw = PopenGateway()
channel = gw.remote_exec("x = channel.receive(); channel.send(x * 6)")
channel.send(7)
res = channel.receive()
assert res == 42
def test_ssh():
py.test.skip("Bootstrapping")
gw = SshGateway('codespeak.net')
channel = gw.remote_exec("""
import socket
channel.send(socket.gethostname())
""")
res = channel.receive()
assert res.endswith('codespeak.net')
def test_remote_error():
gw = PopenGateway()
channel = gw.remote_exec("x = channel.receive(); channel.send(x + 1)")
channel.send("hello")
py.test.raises(RemoteError, channel.receive)
def test_invalid_object():
class X(object):
pass
gw = PopenGateway()
channel = gw.remote_exec("x = channel.receive(); channel.send(x + 1)")
channel.send(X())
py.test.raises(RemoteError, channel.receive)
def test_channel_over_channel():
gw = PopenGateway()
chan1 = gw.newchannel()
channel = gw.remote_exec("chan1 = channel.receive(); chan1.send(42)")
channel.send(chan1)
res = chan1.receive()
assert res == 42

View File

@ -0,0 +1,213 @@
import py
from socket import *
from py.__.net.greensock2 import *
def test_meetingpoint():
giv1, acc1 = meetingpoint()
giv2, acc2 = meetingpoint()
giv3, acc3 = meetingpoint()
lst = []
def g1():
lst.append(0)
x = acc2.accept()
assert x == 'hello'
lst.append(2)
giv1.give('world')
lst.append(5)
x = acc3.accept()
assert x == 'middle'
lst.append(6)
giv3.give('done')
def g2():
lst.append(1)
giv2.give('hello')
lst.append(3)
y = acc1.accept()
assert y == 'world'
lst.append(4)
autogreenlet(g1)
autogreenlet(g2)
giv3.give('middle')
tag = acc3.accept()
assert tag == 'done'
assert lst == [0, 1, 2, 3, 4, 5, 6]
def test_producer():
lst = []
def prod(n):
lst.append(1)
yield n
lst.append(2)
yield 87
lst.append(3)
def cons():
lst.append(4)
accepter = producer(prod, 145)
lst.append(5)
lst.append(accepter.accept())
lst.append(6)
lst.append(accepter.accept())
lst.append(7)
try:
accepter.accept()
except Interrupted:
lst.append(8)
g = autogreenlet(cons)
wait(g)
assert lst == [4, 5, 1, 145, 6, 2, 87, 7, 3, 8]
def test_timer():
lst = []
def g1():
sleep(0.1, g_1)
lst.append(1)
sleep(0.2, g_1)
lst.append(3)
def g2():
lst.append(0)
sleep(0.2, g_2)
lst.append(2)
sleep(0.2, g_2)
lst.append(4)
g_1 = autogreenlet(g1)
g_2 = autogreenlet(g2)
wait(g_1)
wait(g_2)
assert lst == [0, 1, 2, 3, 4]
def test_socket():
s1 = socket(AF_INET, SOCK_DGRAM)
s2 = socket(AF_INET, SOCK_DGRAM)
s1.bind(('', INADDR_ANY))
s2.bind(('', INADDR_ANY))
s1.connect(s2.getsockname())
s2.connect(s1.getsockname())
lst = []
def g1():
lst.append(0)
x = recv(s1, 5)
assert x == 'hello'
lst.append(3)
sendall(s1, 'world')
lst.append(4)
def g2():
lst.append(1)
sendall(s2, 'hello')
lst.append(2)
y = recv(s2, 5)
assert y == 'world'
lst.append(5)
g_1 = autogreenlet(g1)
g_2 = autogreenlet(g2)
wait(g_1)
wait(g_2)
assert lst == [0, 1, 2, 3, 4, 5]
##def test_Queue():
## def g1():
## lst.append(5)
## q.put(6)
## lst.append(7)
## q.put(8)
## lst.append(9)
## q.put(10)
## lst.append(11)
## q.put(12) # not used
## def g2():
## lst.append(1)
## lst.append(q.get())
## lst.append(2)
## lst.append(q.get())
## lst.append(3)
## lst.append(q.get())
## lst.append(4)
## q = Queue()
## lst = []
## autogreenlet(g1)
## autogreenlet(g2)
## wait()
## assert lst == [5, 7, 9, 11, 1, 6, 2, 8, 3, 10, 4]
## q = Queue()
## lst = []
## autogreenlet(g2)
## autogreenlet(g1)
## wait()
## assert lst == [1, 5, 7, 9, 11, 6, 2, 8, 3, 10, 4]
##def test_Event():
## def g1():
## assert not e.isSet()
## e.wait()
## assert not e.isSet() # no longer set
## lst.append(1)
## e.set()
## e.wait()
## lst.append(2)
## assert e.isSet()
## e.clear()
## assert not e.isSet()
## lst.append(0)
## e.set()
## lst.append(3)
## assert e.isSet()
## def g2():
## assert not e.isSet()
## lst.append(4)
## e.set()
## lst.append(7)
## e.clear()
## e.set()
## e.clear()
## assert not e.isSet()
## lst.append(5)
## e.wait()
## assert e.isSet()
## lst.append(6)
## e = Event()
## lst = []
## autogreenlet(g1)
## autogreenlet(g2)
## wait()
## assert lst == [4, 7, 5, 1, 2, 0, 3, 6]
##def test_Event_timeout():
## def g1():
## lst.append(5)
## e.wait(0.1)
## lst.append(e.isSet())
## e.wait(60.0)
## lst.append(e.isSet())
## lst = []
## e = Event()
## autogreenlet(g1)
## sleep(0.5)
## e.set()
## wait()
## assert lst == [5, False, True]

View File

@ -0,0 +1,215 @@
import os, random
from py.__.net.pipelayer import PipeLayer, pipe_over_udp, PipeOverUdp
def test_simple():
data1 = os.urandom(1000)
data2 = os.urandom(1000)
p1 = PipeLayer()
p2 = PipeLayer()
p2._dump_indent = 40
p1.queue(data1)
p1.queue(data2)
recv = ''
while len(recv) < 2000:
raw = p1.encode(64)
assert raw is not None
res = p2.decode(raw)
assert res is not None
recv += res
assert recv == data1 + data2
raw = p1.encode(64)
assert raw is None
def test_stabilize():
data1 = os.urandom(28)
p1 = PipeLayer()
p2 = PipeLayer()
p2._dump_indent = 40
p1.queue(data1)
recv = ''
t = 0.0
print
while True:
delay1 = p1.settime(t)
delay2 = p2.settime(t)
t += 0.100000001
if delay1 is delay2 is None:
break
if delay1 == 0:
raw = p1.encode(10)
p1.dump('OUT', raw)
assert raw is not None
res = p2.decode(raw)
assert res is not None
recv += res
if delay2 == 0:
raw = p2.encode(10)
p2.dump('OUT', raw)
assert raw is not None
res = p1.decode(raw)
assert res == ''
assert recv == data1
def test_bidir():
data1 = os.urandom(1000)
data2 = os.urandom(1000)
p1 = PipeLayer()
p2 = PipeLayer()
p2._dump_indent = 40
p1.queue(data1)
p2.queue(data2)
recv = ['', '']
while len(recv[0]) < 1000 or len(recv[1]) < 1000:
progress = False
for (me, other, i) in [(p1, p2, 1), (p2, p1, 0)]:
raw = me.encode(64)
if raw is not None:
res = other.decode(raw)
assert res is not None
recv[i] += res
if res:
progress = True
assert progress
assert recv[0] == data2
assert recv[1] == data1
raw = p1.encode(64)
assert raw is None
raw = p2.encode(64)
assert raw is None
def test_with_loss():
data1 = os.urandom(10000).encode('hex')
data2 = os.urandom(10000).encode('hex')
#data1 = '0123456789'
#data2 = 'ABCDEFGHIJ'
p1 = PipeLayer()
p2 = PipeLayer()
p2._dump_indent = 40
p1.queue(data1)
p2.queue(data2)
recv = ['', '']
time = 0
active = 1
while active:
active = 0
time += 0.2
#print '.'
exchange = []
for (me, other, i) in [(p1, p2, 1), (p2, p1, 0)]:
to = me.settime(time)
packet = me.encode(12)
assert (packet is not None) == (to == 0)
if to is not None:
active = 1
if to == 0:
exchange.append((packet, other, i))
for (packet, other, i) in exchange:
if random.random() < 0.5:
pass # drop packet
else:
res = other.decode(packet)
assert res is not None
recv[i] += res
assert data2.startswith(recv[0])
assert data1.startswith(recv[1])
assert recv[0] == data2
assert recv[1] == data1
print time
def test_massive_reordering():
data1 = os.urandom(10000).encode('hex')
data2 = os.urandom(10000).encode('hex')
#data1 = '0123456789'
#data2 = 'ABCDEFGHIJ'
p1 = PipeLayer()
p2 = PipeLayer()
p2._dump_indent = 40
p1.queue(data1)
p2.queue(data2)
recv = ['', '']
time = 0
active = 1
exchange = []
while active or exchange:
active = 0
time += 0.2
#print '.'
for (me, other, i) in [(p1, p2, 1), (p2, p1, 0)]:
to = me.settime(time)
packet = me.encode(12)
assert (packet is not None) == (to == 0)
if to is not None:
active = 1
if to == 0:
exchange.append((packet, other, i))
if random.random() < 0.02:
random.shuffle(exchange)
for (packet, other, i) in exchange:
res = other.decode(packet)
assert res is not None
recv[i] += res
exchange = []
assert data2.startswith(recv[0])
assert data1.startswith(recv[1])
assert recv[0] == data2
assert recv[1] == data1
print time
def udpsockpair():
from socket import socket, AF_INET, SOCK_DGRAM, INADDR_ANY
s1 = socket(AF_INET, SOCK_DGRAM)
s2 = socket(AF_INET, SOCK_DGRAM)
s1.bind(('127.0.0.1', INADDR_ANY))
s2.bind(('127.0.0.1', INADDR_ANY))
s2.connect(s1.getsockname())
s1.connect(s2.getsockname())
return s1, s2
def test_pipe_over_udp():
import thread
s1, s2 = udpsockpair()
fd1 = os.open(__file__, os.O_RDONLY)
fd2 = os.open('test_pipelayer.py~copy', os.O_WRONLY|os.O_CREAT|os.O_TRUNC)
thread.start_new_thread(pipe_over_udp, (s1, fd1))
pipe_over_udp(s2, recv_fd=fd2, inactivity_timeout=2.5)
os.close(fd1)
os.close(fd2)
f = open(__file__, 'rb')
data1 = f.read()
f.close()
f = open('test_pipelayer.py~copy', 'rb')
data2 = f.read()
f.close()
assert data1 == data2
os.unlink('test_pipelayer.py~copy')
def test_PipeOverUdp():
s1, s2 = udpsockpair()
p1 = PipeOverUdp(s1, timeout=0.2)
p2 = PipeOverUdp(s2, timeout=0.2)
p2.sendall('goodbye')
for k in range(10):
p1.sendall('hello world')
input = p2.recvall(11)
assert input == 'hello world'
input = p1.recvall(7)
assert input == 'goodbye'
bigchunk1 = os.urandom(500000)
bigchunk2 = os.urandom(500000)
i1 = i2 = 0
j1 = j2 = 0
while j1 < len(bigchunk1) or j2 < len(bigchunk2):
i1 += p1.send(bigchunk1[i1:i1+512])
i2 += p2.send(bigchunk2[i2:i2+512])
data = p1.recv(512)
assert data == bigchunk2[j2:j2+len(data)]
j2 += len(data)
data = p2.recv(512)
assert data == bigchunk1[j1:j1+len(data)]
j1 += len(data)
#print i1, i2, j1, j2
p1.close()
p2.close()