[svn r63542] moving pygreen to greenlet contrib dir
--HG-- branch : trunk
This commit is contained in:
parent
fbf70a35a2
commit
18a0dd8d09
|
@ -1,8 +0,0 @@
|
||||||
import py, os
|
|
||||||
|
|
||||||
class Directory(py.test.collect.Directory):
|
|
||||||
def collect(self):
|
|
||||||
if os.name == 'nt':
|
|
||||||
py.test.skip("Cannot test green layer on windows")
|
|
||||||
else:
|
|
||||||
return super(Directory, self).collect()
|
|
|
@ -1,217 +0,0 @@
|
||||||
|
|
||||||
""" This is an implementation of an execnet protocol on top
|
|
||||||
of a transport layer provided by the greensock2 interface.
|
|
||||||
|
|
||||||
It has the same semantics, but does not use threads at all
|
|
||||||
(which makes it suitable for specific enviroments, like pypy-c).
|
|
||||||
|
|
||||||
There are some features lacking, most notable:
|
|
||||||
- callback support for channels
|
|
||||||
- socket gateway
|
|
||||||
- bootstrapping (there is assumption of pylib being available
|
|
||||||
on remote side, which is not always true)
|
|
||||||
"""
|
|
||||||
|
|
||||||
import sys, os, py, inspect
|
|
||||||
from pygreen import greensock2
|
|
||||||
from pygreen.msgstruct import message, decodemessage
|
|
||||||
|
|
||||||
MSG_REMOTE_EXEC = 'r'
|
|
||||||
MSG_OBJECT = 'o'
|
|
||||||
MSG_ERROR = 'e'
|
|
||||||
MSG_CHAN_CLOSE = 'c'
|
|
||||||
MSG_FATAL = 'f'
|
|
||||||
MSG_CHANNEL = 'n'
|
|
||||||
|
|
||||||
class Gateway(object):
|
|
||||||
|
|
||||||
def __init__(self, input, output, is_remote=False):
|
|
||||||
self.input = input
|
|
||||||
self.output = output
|
|
||||||
self.nextchannum = int(is_remote)
|
|
||||||
self.receivers = {}
|
|
||||||
self.greenlet = greensock2.autogreenlet(self.serve_forever, is_remote)
|
|
||||||
|
|
||||||
def remote_exec(self, remote_source):
|
|
||||||
remote_source = py.code.Source(remote_source)
|
|
||||||
chan = self.newchannel()
|
|
||||||
msg = message(MSG_REMOTE_EXEC, chan.n, str(remote_source))
|
|
||||||
self.output.sendall(msg)
|
|
||||||
return chan
|
|
||||||
|
|
||||||
def newchannel(self):
|
|
||||||
n = self.nextchannum
|
|
||||||
self.nextchannum += 2
|
|
||||||
return self.make_channel(n)
|
|
||||||
|
|
||||||
def make_channel(self, n):
|
|
||||||
giver, accepter = greensock2.meetingpoint()
|
|
||||||
assert n not in self.receivers
|
|
||||||
self.receivers[n] = giver
|
|
||||||
return Channel(self, n, accepter)
|
|
||||||
|
|
||||||
def serve_forever(self, is_remote=False):
|
|
||||||
try:
|
|
||||||
buffer = ""
|
|
||||||
while 1:
|
|
||||||
msg, buffer = decodemessage(buffer)
|
|
||||||
if msg is None:
|
|
||||||
buffer += self.input.recv(16384)
|
|
||||||
else:
|
|
||||||
handler = HANDLERS[msg[0]]
|
|
||||||
handler(self, *msg[1:])
|
|
||||||
except greensock2.greenlet.GreenletExit:
|
|
||||||
raise
|
|
||||||
except:
|
|
||||||
if is_remote:
|
|
||||||
msg = message(MSG_FATAL, format_error(*sys.exc_info()))
|
|
||||||
self.output.sendall(msg)
|
|
||||||
else:
|
|
||||||
raise
|
|
||||||
|
|
||||||
def msg_remote_exec(self, n, source):
|
|
||||||
def do_execute(channel):
|
|
||||||
try:
|
|
||||||
d = {'channel': channel}
|
|
||||||
exec source in d
|
|
||||||
except:
|
|
||||||
channel.report_error(*sys.exc_info())
|
|
||||||
else:
|
|
||||||
channel.close()
|
|
||||||
greensock2.autogreenlet(do_execute, self.make_channel(n))
|
|
||||||
|
|
||||||
def msg_object(self, n, objrepr):
|
|
||||||
obj = eval(objrepr)
|
|
||||||
if n in self.receivers:
|
|
||||||
self.receivers[n].give_queued(obj)
|
|
||||||
|
|
||||||
def msg_error(self, n, s):
|
|
||||||
if n in self.receivers:
|
|
||||||
self.receivers[n].give_queued(RemoteError(s))
|
|
||||||
self.receivers[n].close()
|
|
||||||
del self.receivers[n]
|
|
||||||
|
|
||||||
def msg_chan_close(self, n):
|
|
||||||
if n in self.receivers:
|
|
||||||
self.receivers[n].close()
|
|
||||||
del self.receivers[n]
|
|
||||||
|
|
||||||
def msg_channel(self, n, m):
|
|
||||||
if n in self.receivers:
|
|
||||||
self.receivers[n].give_queued(self.make_channel(m))
|
|
||||||
|
|
||||||
def msg_fatal(self, s):
|
|
||||||
raise RemoteError(s)
|
|
||||||
|
|
||||||
HANDLERS = {
|
|
||||||
MSG_REMOTE_EXEC: Gateway.msg_remote_exec,
|
|
||||||
MSG_OBJECT: Gateway.msg_object,
|
|
||||||
MSG_ERROR: Gateway.msg_error,
|
|
||||||
MSG_CHAN_CLOSE: Gateway.msg_chan_close,
|
|
||||||
MSG_CHANNEL: Gateway.msg_channel,
|
|
||||||
MSG_FATAL: Gateway.msg_fatal,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class Channel(object):
|
|
||||||
|
|
||||||
def __init__(self, gw, n, accepter):
|
|
||||||
self.gw = gw
|
|
||||||
self.n = n
|
|
||||||
self.accepter = accepter
|
|
||||||
|
|
||||||
def send(self, obj):
|
|
||||||
if isinstance(obj, Channel):
|
|
||||||
assert obj.gw is self.gw
|
|
||||||
msg = message(MSG_CHANNEL, self.n, obj.n)
|
|
||||||
else:
|
|
||||||
msg = message(MSG_OBJECT, self.n, repr(obj))
|
|
||||||
self.gw.output.sendall(msg)
|
|
||||||
|
|
||||||
def receive(self):
|
|
||||||
obj = self.accepter.accept()
|
|
||||||
if isinstance(obj, RemoteError):
|
|
||||||
raise obj
|
|
||||||
else:
|
|
||||||
return obj
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
try:
|
|
||||||
self.gw.output.sendall(message(MSG_CHAN_CLOSE, self.n))
|
|
||||||
except OSError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def report_error(self, exc_type, exc_value, exc_traceback=None):
|
|
||||||
s = format_error(exc_type, exc_value, exc_traceback)
|
|
||||||
try:
|
|
||||||
self.gw.output.sendall(message(MSG_ERROR, self.n, s))
|
|
||||||
except OSError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class RemoteError(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def format_error(exc_type, exc_value, exc_traceback=None):
|
|
||||||
import traceback, StringIO
|
|
||||||
s = StringIO.StringIO()
|
|
||||||
traceback.print_exception(exc_type, exc_value, exc_traceback, file=s)
|
|
||||||
return s.getvalue()
|
|
||||||
|
|
||||||
|
|
||||||
class PopenCmdGateway(Gateway):
|
|
||||||
action = "exec input()"
|
|
||||||
|
|
||||||
def __init__(self, cmdline):
|
|
||||||
from pygreen.pipe.fd import FDInput, FDOutput
|
|
||||||
child_in, child_out = os.popen2(cmdline, 't', 0)
|
|
||||||
fdin = FDInput(child_out.fileno(), child_out.close)
|
|
||||||
fdout = FDOutput(child_in.fileno(), child_in.close)
|
|
||||||
fdout.sendall(self.get_bootstrap_code())
|
|
||||||
super(PopenCmdGateway, self).__init__(input = fdin, output = fdout)
|
|
||||||
|
|
||||||
def get_bootstrap_code():
|
|
||||||
# XXX assumes that the py lib is installed on the remote side
|
|
||||||
src = []
|
|
||||||
src.append('from pygreen import greenexecnet')
|
|
||||||
src.append('greenexecnet.PopenCmdGateway.run_server()')
|
|
||||||
src.append('')
|
|
||||||
return '%r\n' % ('\n'.join(src),)
|
|
||||||
get_bootstrap_code = staticmethod(get_bootstrap_code)
|
|
||||||
|
|
||||||
def run_server():
|
|
||||||
from pygreen.pipe.fd import FDInput, FDOutput
|
|
||||||
gw = Gateway(input = FDInput(os.dup(0)),
|
|
||||||
output = FDOutput(os.dup(1)),
|
|
||||||
is_remote = True)
|
|
||||||
# for now, ignore normal I/O
|
|
||||||
fd = os.open('/dev/null', os.O_RDWR)
|
|
||||||
os.dup2(fd, 0)
|
|
||||||
os.dup2(fd, 1)
|
|
||||||
os.close(fd)
|
|
||||||
greensock2._suspend_forever()
|
|
||||||
run_server = staticmethod(run_server)
|
|
||||||
|
|
||||||
class PopenGateway(PopenCmdGateway):
|
|
||||||
def __init__(self, python=sys.executable):
|
|
||||||
cmdline = '"%s" -u -c "%s"' % (python, self.action)
|
|
||||||
super(PopenGateway, self).__init__(cmdline)
|
|
||||||
|
|
||||||
class SshGateway(PopenCmdGateway):
|
|
||||||
def __init__(self, sshaddress, remotepython='python', identity=None):
|
|
||||||
self.sshaddress = sshaddress
|
|
||||||
remotecmd = '%s -u -c "%s"' % (remotepython, self.action)
|
|
||||||
cmdline = [sshaddress, remotecmd]
|
|
||||||
# XXX Unix style quoting
|
|
||||||
for i in range(len(cmdline)):
|
|
||||||
cmdline[i] = "'" + cmdline[i].replace("'", "'\\''") + "'"
|
|
||||||
cmd = 'ssh -C'
|
|
||||||
if identity is not None:
|
|
||||||
cmd += ' -i %s' % (identity,)
|
|
||||||
cmdline.insert(0, cmd)
|
|
||||||
super(SshGateway, self).__init__(' '.join(cmdline))
|
|
||||||
|
|
||||||
|
|
||||||
##f = open('LOG', 'a')
|
|
||||||
##import os; print >> f, '[%d] READY' % (os.getpid(),)
|
|
||||||
##f.close()
|
|
|
@ -1,490 +0,0 @@
|
||||||
|
|
||||||
""" This is a base implementation of thread-like network programming
|
|
||||||
on top of greenlets. From API available here it's quite unlikely
|
|
||||||
that you would like to use anything except wait(). Higher level interface
|
|
||||||
is available in pipe directory
|
|
||||||
"""
|
|
||||||
|
|
||||||
import os, sys
|
|
||||||
try:
|
|
||||||
from stackless import greenlet
|
|
||||||
except ImportError:
|
|
||||||
import py
|
|
||||||
greenlet = py.magic.greenlet
|
|
||||||
from collections import deque
|
|
||||||
from select import select as _select
|
|
||||||
from time import time as _time
|
|
||||||
from heapq import heappush, heappop, heapify
|
|
||||||
|
|
||||||
TRACE = False
|
|
||||||
|
|
||||||
def meetingpoint():
|
|
||||||
senders = deque() # list of senders, or [None] if Giver closed
|
|
||||||
receivers = deque() # list of receivers, or [None] if Receiver closed
|
|
||||||
return (MeetingPointGiver(senders, receivers),
|
|
||||||
MeetingPointAccepter(senders, receivers))
|
|
||||||
|
|
||||||
def producer(func, *args, **kwds):
|
|
||||||
iterable = func(*args, **kwds)
|
|
||||||
giver, accepter = meetingpoint()
|
|
||||||
def autoproducer():
|
|
||||||
try:
|
|
||||||
giver.wait()
|
|
||||||
for obj in iterable:
|
|
||||||
giver.give(obj)
|
|
||||||
giver.wait()
|
|
||||||
finally:
|
|
||||||
giver.close()
|
|
||||||
autogreenlet(autoproducer)
|
|
||||||
return accepter
|
|
||||||
|
|
||||||
|
|
||||||
class MeetingPointBase(object):
|
|
||||||
|
|
||||||
def __init__(self, senders, receivers):
|
|
||||||
self.senders = senders
|
|
||||||
self.receivers = receivers
|
|
||||||
self.g_active = g_active
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
while self.senders:
|
|
||||||
if self.senders[0] is None:
|
|
||||||
break
|
|
||||||
packet = self.senders.popleft()
|
|
||||||
if packet.g_from is not None:
|
|
||||||
self.g_active.append(packet.g_from)
|
|
||||||
else:
|
|
||||||
self.senders.append(None)
|
|
||||||
while self.receivers:
|
|
||||||
if self.receivers[0] is None:
|
|
||||||
break
|
|
||||||
other = self.receivers.popleft()
|
|
||||||
self.g_active.append(other)
|
|
||||||
else:
|
|
||||||
self.receivers.append(None)
|
|
||||||
|
|
||||||
__del__ = close
|
|
||||||
|
|
||||||
def closed(self):
|
|
||||||
return self.receivers and self.receivers[0] is None
|
|
||||||
|
|
||||||
|
|
||||||
class MeetingPointGiver(MeetingPointBase):
|
|
||||||
|
|
||||||
def give(self, obj):
|
|
||||||
if self.receivers:
|
|
||||||
if self.receivers[0] is None:
|
|
||||||
raise MeetingPointClosed
|
|
||||||
other = self.receivers.popleft()
|
|
||||||
g_active.append(g_getcurrent())
|
|
||||||
packet = _Packet()
|
|
||||||
packet.payload = obj
|
|
||||||
other.switch(packet)
|
|
||||||
if not packet.accepted:
|
|
||||||
raise Interrupted("packet not accepted")
|
|
||||||
else:
|
|
||||||
packet = _Packet()
|
|
||||||
packet.g_from = g_getcurrent()
|
|
||||||
packet.payload = obj
|
|
||||||
try:
|
|
||||||
self.senders.append(packet)
|
|
||||||
g_dispatcher.switch()
|
|
||||||
if not packet.accepted:
|
|
||||||
raise Interrupted("packet not accepted")
|
|
||||||
except:
|
|
||||||
remove_by_id(self.senders, packet)
|
|
||||||
raise
|
|
||||||
|
|
||||||
def give_queued(self, obj):
|
|
||||||
if self.receivers:
|
|
||||||
self.give(obj)
|
|
||||||
else:
|
|
||||||
packet = _Packet()
|
|
||||||
packet.g_from = None
|
|
||||||
packet.payload = obj
|
|
||||||
self.senders.append(packet)
|
|
||||||
|
|
||||||
def ready(self):
|
|
||||||
return self.receivers and self.receivers[0] is not None
|
|
||||||
|
|
||||||
def wait(self):
|
|
||||||
if self.receivers:
|
|
||||||
if self.receivers[0] is None:
|
|
||||||
raise MeetingPointClosed
|
|
||||||
else:
|
|
||||||
packet = _Packet()
|
|
||||||
packet.g_from = g_getcurrent()
|
|
||||||
packet.empty = True
|
|
||||||
self.senders.append(packet)
|
|
||||||
try:
|
|
||||||
g_dispatcher.switch()
|
|
||||||
if not packet.accepted:
|
|
||||||
raise Interrupted("no accepter found")
|
|
||||||
except:
|
|
||||||
remove_by_id(self.senders, packet)
|
|
||||||
raise
|
|
||||||
|
|
||||||
def trigger(self):
|
|
||||||
if self.ready():
|
|
||||||
self.give(None)
|
|
||||||
|
|
||||||
|
|
||||||
class MeetingPointAccepter(MeetingPointBase):
|
|
||||||
|
|
||||||
def accept(self):
|
|
||||||
while self.senders:
|
|
||||||
if self.senders[0] is None:
|
|
||||||
raise MeetingPointClosed
|
|
||||||
packet = self.senders.popleft()
|
|
||||||
packet.accepted = True
|
|
||||||
if packet.g_from is not None:
|
|
||||||
g_active.append(packet.g_from)
|
|
||||||
if not packet.empty:
|
|
||||||
return packet.payload
|
|
||||||
g = g_getcurrent()
|
|
||||||
self.receivers.append(g)
|
|
||||||
try:
|
|
||||||
packet = g_dispatcher.switch()
|
|
||||||
except:
|
|
||||||
remove_by_id(self.receivers, g)
|
|
||||||
raise
|
|
||||||
if type(packet) is not _Packet:
|
|
||||||
remove_by_id(self.receivers, g)
|
|
||||||
raise Interrupted("no packet")
|
|
||||||
packet.accepted = True
|
|
||||||
return packet.payload
|
|
||||||
|
|
||||||
def ready(self):
|
|
||||||
for packet in self.senders:
|
|
||||||
if packet is None:
|
|
||||||
return False
|
|
||||||
if not packet.empty:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def wait_trigger(self, timeout=None, default=None):
|
|
||||||
if timeout is None:
|
|
||||||
return self.accept()
|
|
||||||
else:
|
|
||||||
timer = Timer(timeout)
|
|
||||||
try:
|
|
||||||
try:
|
|
||||||
return self.accept()
|
|
||||||
finally:
|
|
||||||
timer.stop()
|
|
||||||
except Interrupted:
|
|
||||||
if timer.finished:
|
|
||||||
return default
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
class MeetingPointClosed(greenlet.GreenletExit):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class Interrupted(greenlet.GreenletExit):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class ConnexionClosed(greenlet.GreenletExit):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class _Packet(object):
|
|
||||||
empty = False
|
|
||||||
accepted = False
|
|
||||||
|
|
||||||
def remove_by_id(d, obj):
|
|
||||||
lst = [x for x in d if x is not obj]
|
|
||||||
d.clear()
|
|
||||||
d.extend(lst)
|
|
||||||
|
|
||||||
def wait_input(sock):
|
|
||||||
_register(g_iwtd, sock)
|
|
||||||
|
|
||||||
def recv(sock, bufsize):
|
|
||||||
wait_input(sock)
|
|
||||||
buf = sock.recv(bufsize)
|
|
||||||
if not buf:
|
|
||||||
raise ConnexionClosed("inbound connexion closed")
|
|
||||||
return buf
|
|
||||||
|
|
||||||
def recvall(sock, bufsize):
|
|
||||||
in_front = False
|
|
||||||
data = []
|
|
||||||
while bufsize > 0:
|
|
||||||
_register(g_iwtd, sock, in_front=in_front)
|
|
||||||
buf = sock.recv(bufsize)
|
|
||||||
if not buf:
|
|
||||||
raise ConnexionClosed("inbound connexion closed")
|
|
||||||
data.append(buf)
|
|
||||||
bufsize -= len(buf)
|
|
||||||
in_front = True
|
|
||||||
return ''.join(data)
|
|
||||||
|
|
||||||
def read(fd, bufsize):
|
|
||||||
assert fd >= 0
|
|
||||||
wait_input(fd)
|
|
||||||
buf = os.read(fd, bufsize)
|
|
||||||
if not buf:
|
|
||||||
raise ConnexionClosed("inbound connexion closed")
|
|
||||||
return buf
|
|
||||||
|
|
||||||
def readall(fd, bufsize):
|
|
||||||
assert fd >= 0
|
|
||||||
in_front = False
|
|
||||||
data = []
|
|
||||||
while bufsize > 0:
|
|
||||||
_register(g_iwtd, fd, in_front=in_front)
|
|
||||||
buf = os.read(fd, bufsize)
|
|
||||||
if not buf:
|
|
||||||
raise ConnexionClosed("inbound connexion closed")
|
|
||||||
data.append(buf)
|
|
||||||
bufsize -= len(buf)
|
|
||||||
in_front = True
|
|
||||||
return ''.join(data)
|
|
||||||
|
|
||||||
|
|
||||||
def wait_output(sock):
|
|
||||||
_register(g_owtd, sock)
|
|
||||||
|
|
||||||
def sendall(sock, buffer):
|
|
||||||
in_front = False
|
|
||||||
while buffer:
|
|
||||||
_register(g_owtd, sock, in_front=in_front)
|
|
||||||
count = sock.send(buffer)
|
|
||||||
buffer = buffer[count:]
|
|
||||||
in_front = True
|
|
||||||
|
|
||||||
def writeall(fd, buffer):
|
|
||||||
assert fd >= 0
|
|
||||||
in_front = False
|
|
||||||
while buffer:
|
|
||||||
_register(g_owtd, fd, in_front=in_front)
|
|
||||||
count = os.write(fd, buffer)
|
|
||||||
if not count:
|
|
||||||
raise ConnexionClosed("outbound connexion closed")
|
|
||||||
buffer = buffer[count:]
|
|
||||||
in_front = True
|
|
||||||
|
|
||||||
|
|
||||||
def sleep(duration):
|
|
||||||
timer = Timer(duration)
|
|
||||||
try:
|
|
||||||
_suspend_forever()
|
|
||||||
finally:
|
|
||||||
ok = timer.finished
|
|
||||||
timer.stop()
|
|
||||||
if not ok:
|
|
||||||
raise Interrupted
|
|
||||||
|
|
||||||
def _suspend_forever():
|
|
||||||
g_dispatcher.switch()
|
|
||||||
|
|
||||||
def oneof(*callables):
|
|
||||||
assert callables
|
|
||||||
for c in callables:
|
|
||||||
assert callable(c)
|
|
||||||
greenlets = [tracinggreenlet(c) for c in callables]
|
|
||||||
g_active.extend(greenlets)
|
|
||||||
res = g_dispatcher.switch()
|
|
||||||
for g in greenlets:
|
|
||||||
g.interrupt()
|
|
||||||
return res
|
|
||||||
|
|
||||||
def allof(*callables):
|
|
||||||
for c in callables:
|
|
||||||
assert callable(c)
|
|
||||||
greenlets = [tracinggreenlet(lambda i=i, c=c: (i, c()))
|
|
||||||
for i, c in enumerate(callables)]
|
|
||||||
g_active.extend(greenlets)
|
|
||||||
result = [None] * len(callables)
|
|
||||||
for _ in callables:
|
|
||||||
num, res = g_dispatcher.switch()
|
|
||||||
result[num] = res
|
|
||||||
return tuple(result)
|
|
||||||
|
|
||||||
class Timer(object):
|
|
||||||
started = False
|
|
||||||
finished = False
|
|
||||||
|
|
||||||
def __init__(self, timeout):
|
|
||||||
self.g = g_getcurrent()
|
|
||||||
entry = (_time() + timeout, self)
|
|
||||||
if g_timers_mixed:
|
|
||||||
g_timers.append(entry)
|
|
||||||
else:
|
|
||||||
heappush(g_timers, entry)
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
global g_timers_mixed
|
|
||||||
if not self.finished:
|
|
||||||
for i, (activationtime, timer) in enumerate(g_timers):
|
|
||||||
if timer is self:
|
|
||||||
g_timers[i] = g_timers[-1]
|
|
||||||
g_timers.pop()
|
|
||||||
g_timers_mixed = True
|
|
||||||
break
|
|
||||||
self.finished = True
|
|
||||||
|
|
||||||
# ____________________________________________________________
|
|
||||||
|
|
||||||
class tracinggreenlet(greenlet):
|
|
||||||
def __init__(self, function, *args, **kwds):
|
|
||||||
self.function = function
|
|
||||||
self.args = args
|
|
||||||
self.kwds = kwds
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
## args = ', '.join([repr(s) for s in self.args] +
|
|
||||||
## ['%s=%r' % keyvalue for keyvalue in self.kwds.items()])
|
|
||||||
## return '<autogreenlet %s(%s)>' % (self.function.__name__, args)
|
|
||||||
return '<%s %s at %s>' % (self.__class__.__name__,
|
|
||||||
self.function.__name__,
|
|
||||||
hex(id(self)))
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
self.trace("start")
|
|
||||||
try:
|
|
||||||
res = self.function(*self.args, **self.kwds)
|
|
||||||
except Exception, e:
|
|
||||||
self.trace("stop (%s%s)", e.__class__.__name__,
|
|
||||||
str(e) and (': '+str(e)))
|
|
||||||
raise
|
|
||||||
else:
|
|
||||||
self.trace("done")
|
|
||||||
return res
|
|
||||||
|
|
||||||
def trace(self, msg, *args):
|
|
||||||
if TRACE:
|
|
||||||
print self, msg % args
|
|
||||||
|
|
||||||
def interrupt(self):
|
|
||||||
self.throw(Interrupted)
|
|
||||||
|
|
||||||
class autogreenlet(tracinggreenlet):
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super(autogreenlet, self).__init__(*args, **kwargs)
|
|
||||||
self.parent = g_dispatcher
|
|
||||||
g_active.append(self)
|
|
||||||
|
|
||||||
g_active = deque()
|
|
||||||
g_iwtd = {}
|
|
||||||
g_owtd = {}
|
|
||||||
g_timers = []
|
|
||||||
g_timers_mixed = False
|
|
||||||
|
|
||||||
g_getcurrent = greenlet.getcurrent
|
|
||||||
|
|
||||||
def _register(g_wtd, sock, in_front=False):
|
|
||||||
d = g_wtd.setdefault(sock, deque())
|
|
||||||
g = g_getcurrent()
|
|
||||||
if in_front:
|
|
||||||
d.appendleft(g)
|
|
||||||
else:
|
|
||||||
d.append(g)
|
|
||||||
try:
|
|
||||||
if g_dispatcher.switch() is not g_wtd:
|
|
||||||
raise Interrupted
|
|
||||||
except:
|
|
||||||
remove_by_id(d, g)
|
|
||||||
raise
|
|
||||||
|
|
||||||
##def _unregister_timer():
|
|
||||||
## ...
|
|
||||||
|
|
||||||
|
|
||||||
def check_dead_greenlets(mapping):
|
|
||||||
to_remove = [i for i, v in mapping.items() if not v]
|
|
||||||
for k in to_remove:
|
|
||||||
del mapping[k]
|
|
||||||
|
|
||||||
#def check_waiters(active):
|
|
||||||
# if active in g_waiters:
|
|
||||||
# for g in g_waiters[active]:
|
|
||||||
# g.switch()
|
|
||||||
# del g_waiters[active]
|
|
||||||
|
|
||||||
|
|
||||||
def dispatcher_mainloop():
|
|
||||||
global g_timers_mixed
|
|
||||||
GreenletExit = greenlet.GreenletExit
|
|
||||||
while 1:
|
|
||||||
try:
|
|
||||||
while g_active:
|
|
||||||
#print 'active:', g_active[0]
|
|
||||||
g_active.popleft().switch()
|
|
||||||
# active.switch()
|
|
||||||
# if active.dead:
|
|
||||||
# check_waiters(active)
|
|
||||||
# del active
|
|
||||||
if g_timers:
|
|
||||||
if g_timers_mixed:
|
|
||||||
heapify(g_timers)
|
|
||||||
g_timers_mixed = False
|
|
||||||
activationtime, timer = g_timers[0]
|
|
||||||
delay = activationtime - _time()
|
|
||||||
if delay <= 0.0:
|
|
||||||
if timer.started:
|
|
||||||
heappop(g_timers)
|
|
||||||
#print 'timeout:', g
|
|
||||||
timer.finished = True
|
|
||||||
timer.g.switch()
|
|
||||||
# if timer.g.dead:
|
|
||||||
# check_waiters(timer.g)
|
|
||||||
continue
|
|
||||||
delay = 0.0
|
|
||||||
timer.started = True
|
|
||||||
else:
|
|
||||||
check_dead_greenlets(g_iwtd)
|
|
||||||
check_dead_greenlets(g_owtd)
|
|
||||||
if not (g_iwtd or g_owtd):
|
|
||||||
# nothing to do, switch to the main greenlet
|
|
||||||
g_dispatcher.parent.switch()
|
|
||||||
continue
|
|
||||||
delay = None
|
|
||||||
|
|
||||||
#print 'selecting...', g_iwtd.keys(), g_owtd.keys(), delay
|
|
||||||
iwtd, owtd, _ = _select(g_iwtd.keys(), g_owtd.keys(), [], delay)
|
|
||||||
#print 'done'
|
|
||||||
for s in owtd:
|
|
||||||
if s in g_owtd:
|
|
||||||
d = g_owtd[s]
|
|
||||||
#print 'owtd:', d[0]
|
|
||||||
# XXX: Check if d is non-empty
|
|
||||||
try:
|
|
||||||
g = d.popleft()
|
|
||||||
except IndexError:
|
|
||||||
g = None
|
|
||||||
if not d:
|
|
||||||
try:
|
|
||||||
del g_owtd[s]
|
|
||||||
except KeyError:
|
|
||||||
pass
|
|
||||||
if g:
|
|
||||||
g.switch(g_owtd)
|
|
||||||
# if g.dead:
|
|
||||||
# check_waiters(g)
|
|
||||||
for s in iwtd:
|
|
||||||
if s in g_iwtd:
|
|
||||||
d = g_iwtd[s]
|
|
||||||
#print 'iwtd:', d[0]
|
|
||||||
# XXX: Check if d is non-empty
|
|
||||||
try:
|
|
||||||
g = d.popleft()
|
|
||||||
except IndexError:
|
|
||||||
g = None
|
|
||||||
if not d:
|
|
||||||
try:
|
|
||||||
del g_iwtd[s]
|
|
||||||
except KeyError:
|
|
||||||
pass
|
|
||||||
if g:
|
|
||||||
g.switch(g_iwtd)
|
|
||||||
# if g.dead:
|
|
||||||
# check_waiters(g)
|
|
||||||
except GreenletExit:
|
|
||||||
raise
|
|
||||||
except:
|
|
||||||
import sys
|
|
||||||
g_dispatcher.parent.throw(*sys.exc_info())
|
|
||||||
|
|
||||||
g_dispatcher = greenlet(dispatcher_mainloop)
|
|
||||||
#g_waiters = {}
|
|
|
@ -1,29 +0,0 @@
|
||||||
from struct import pack, unpack, calcsize
|
|
||||||
|
|
||||||
|
|
||||||
def message(tp, *values):
|
|
||||||
strtype = type('')
|
|
||||||
typecodes = ['']
|
|
||||||
for v in values:
|
|
||||||
if type(v) is strtype:
|
|
||||||
typecodes.append('%ds' % len(v))
|
|
||||||
elif 0 <= v < 256:
|
|
||||||
typecodes.append('B')
|
|
||||||
else:
|
|
||||||
typecodes.append('l')
|
|
||||||
typecodes = ''.join(typecodes)
|
|
||||||
assert len(typecodes) < 256
|
|
||||||
return pack(("!B%dsc" % len(typecodes)) + typecodes,
|
|
||||||
len(typecodes), typecodes, tp, *values)
|
|
||||||
|
|
||||||
def decodemessage(data):
|
|
||||||
if data:
|
|
||||||
limit = ord(data[0]) + 1
|
|
||||||
if len(data) >= limit:
|
|
||||||
typecodes = "!c" + data[1:limit]
|
|
||||||
end = limit + calcsize(typecodes)
|
|
||||||
if len(data) >= end:
|
|
||||||
return unpack(typecodes, data[limit:end]), data[end:]
|
|
||||||
#elif end > 1000000:
|
|
||||||
# raise OverflowError
|
|
||||||
return None, data
|
|
|
@ -1,8 +0,0 @@
|
||||||
|
|
||||||
""" This is a higher level network interface based on top
|
|
||||||
of greensock2. Objects here are ready to use, specific examples
|
|
||||||
are listed in tests (test_pipelayer and test_greensock2).
|
|
||||||
|
|
||||||
The limitation is that you're not supposed to use threads + blocking
|
|
||||||
I/O at all.
|
|
||||||
"""
|
|
|
@ -1,46 +0,0 @@
|
||||||
from pygreen import greensock2
|
|
||||||
|
|
||||||
VERBOSE = True
|
|
||||||
|
|
||||||
|
|
||||||
if VERBOSE:
|
|
||||||
def log(msg, *args):
|
|
||||||
print '*', msg % args
|
|
||||||
else:
|
|
||||||
def log(msg, *args):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class BufferedInput(object):
|
|
||||||
in_buf = ''
|
|
||||||
|
|
||||||
def recv(self, bufsize):
|
|
||||||
self.wait_input()
|
|
||||||
buf = self.in_buf[:bufsize]
|
|
||||||
self.in_buf = self.in_buf[bufsize:]
|
|
||||||
return buf
|
|
||||||
|
|
||||||
def recvall(self, bufsize):
|
|
||||||
result = []
|
|
||||||
while bufsize > 0:
|
|
||||||
buf = self.recv(bufsize)
|
|
||||||
result.append(buf)
|
|
||||||
bufsize -= len(buf)
|
|
||||||
return ''.join(result)
|
|
||||||
|
|
||||||
# ____________________________________________________________
|
|
||||||
|
|
||||||
def forwardpipe(s1, s2):
|
|
||||||
try:
|
|
||||||
while 1:
|
|
||||||
s2.wait_output()
|
|
||||||
buffer = s1.recv(32768)
|
|
||||||
log('[%r -> %r] %r', s1, s2, buffer)
|
|
||||||
s2.sendall(buffer)
|
|
||||||
del buffer
|
|
||||||
finally:
|
|
||||||
s2.shutdown_wr()
|
|
||||||
s1.shutdown_rd()
|
|
||||||
|
|
||||||
def linkpipes(s1, s2):
|
|
||||||
greensock2.autogreenlet(forwardpipe, s1, s2)
|
|
||||||
greensock2.autogreenlet(forwardpipe, s2, s1)
|
|
|
@ -1,69 +0,0 @@
|
||||||
import os
|
|
||||||
from pygreen import greensock2
|
|
||||||
|
|
||||||
|
|
||||||
class FDInput(object):
|
|
||||||
|
|
||||||
def __init__(self, read_fd, close=True):
|
|
||||||
self.read_fd = read_fd
|
|
||||||
self._close = close # a flag or a callback
|
|
||||||
|
|
||||||
def shutdown_rd(self):
|
|
||||||
fd = self.read_fd
|
|
||||||
if fd is not None:
|
|
||||||
self.read_fd = None
|
|
||||||
close = self._close
|
|
||||||
if close:
|
|
||||||
self._close = False
|
|
||||||
if close == True:
|
|
||||||
os.close(fd)
|
|
||||||
else:
|
|
||||||
close()
|
|
||||||
|
|
||||||
__del__ = shutdown_rd
|
|
||||||
|
|
||||||
def wait_input(self):
|
|
||||||
greensock2.wait_input(self.read_fd)
|
|
||||||
|
|
||||||
def recv(self, bufsize):
|
|
||||||
## f = open('LOG', 'a')
|
|
||||||
## import os; print >> f, '[%d] RECV' % (os.getpid(),)
|
|
||||||
## f.close()
|
|
||||||
res = greensock2.read(self.read_fd, bufsize)
|
|
||||||
## f = open('LOG', 'a')
|
|
||||||
## import os; print >> f, '[%d] RECV %r' % (os.getpid(), res)
|
|
||||||
## f.close()
|
|
||||||
return res
|
|
||||||
|
|
||||||
def recvall(self, bufsize):
|
|
||||||
return greensock2.readall(self.read_fd, bufsize)
|
|
||||||
|
|
||||||
|
|
||||||
class FDOutput(object):
|
|
||||||
|
|
||||||
def __init__(self, write_fd, close=True):
|
|
||||||
self.write_fd = write_fd
|
|
||||||
self._close = close # a flag or a callback
|
|
||||||
|
|
||||||
def shutdown_wr(self):
|
|
||||||
fd = self.write_fd
|
|
||||||
if fd is not None:
|
|
||||||
self.write_fd = None
|
|
||||||
close = self._close
|
|
||||||
if close:
|
|
||||||
self._close = False
|
|
||||||
if close == True:
|
|
||||||
os.close(fd)
|
|
||||||
else:
|
|
||||||
close()
|
|
||||||
|
|
||||||
__del__ = shutdown_wr
|
|
||||||
|
|
||||||
def wait_output(self):
|
|
||||||
greensock2.wait_output(self.write_fd)
|
|
||||||
|
|
||||||
def sendall(self, buffer):
|
|
||||||
## f = open('LOG', 'a')
|
|
||||||
## import os; print >> f, '[%d] %r' % (os.getpid(), buffer)
|
|
||||||
## f.close()
|
|
||||||
greensock2.writeall(self.write_fd, buffer)
|
|
|
@ -1,113 +0,0 @@
|
||||||
from pygreen import greensock2
|
|
||||||
import socket, errno, os
|
|
||||||
|
|
||||||
error = socket.error
|
|
||||||
|
|
||||||
|
|
||||||
class _delegate(object):
|
|
||||||
def __init__(self, methname):
|
|
||||||
self.methname = methname
|
|
||||||
def __get__(self, obj, typ=None):
|
|
||||||
result = getattr(obj._s, self.methname)
|
|
||||||
setattr(obj, self.methname, result)
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
class GreenSocket(object):
|
|
||||||
|
|
||||||
def __init__(self, family = socket.AF_INET,
|
|
||||||
type = socket.SOCK_STREAM,
|
|
||||||
proto = 0):
|
|
||||||
self._s = socket.socket(family, type, proto)
|
|
||||||
self._s.setblocking(False)
|
|
||||||
|
|
||||||
def fromsocket(cls, s):
|
|
||||||
if isinstance(s, GreenSocket):
|
|
||||||
s = s._s
|
|
||||||
result = GreenSocket.__new__(cls)
|
|
||||||
result._s = s
|
|
||||||
s.setblocking(False)
|
|
||||||
return result
|
|
||||||
fromsocket = classmethod(fromsocket)
|
|
||||||
|
|
||||||
def accept(self):
|
|
||||||
while 1:
|
|
||||||
try:
|
|
||||||
s, addr = self._s.accept()
|
|
||||||
break
|
|
||||||
except error, e:
|
|
||||||
if e.args[0] not in (errno.EAGAIN, errno.EWOULDBLOCK):
|
|
||||||
raise
|
|
||||||
self.wait_input()
|
|
||||||
return self.fromsocket(s), addr
|
|
||||||
|
|
||||||
bind = _delegate("bind")
|
|
||||||
close = _delegate("close")
|
|
||||||
|
|
||||||
def connect(self, addr):
|
|
||||||
err = self.connect_ex(addr)
|
|
||||||
if err:
|
|
||||||
raise error(err, os.strerror(err))
|
|
||||||
|
|
||||||
def connect_ex(self, addr):
|
|
||||||
err = self._s.connect_ex(addr)
|
|
||||||
if err == errno.EINPROGRESS:
|
|
||||||
greensock2.wait_output(self._s)
|
|
||||||
err = self._s.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
|
|
||||||
return err
|
|
||||||
|
|
||||||
#XXX dup
|
|
||||||
fileno = _delegate("fileno")
|
|
||||||
getpeername = _delegate("getpeername")
|
|
||||||
getsockname = _delegate("getsockname")
|
|
||||||
getsockopt = _delegate("getsockopt")
|
|
||||||
listen = _delegate("listen")
|
|
||||||
|
|
||||||
def makefile(self, mode='r', bufsize=-1):
|
|
||||||
# hack, but reusing the internal socket._fileobject should just work
|
|
||||||
return socket._fileobject(self, mode, bufsize)
|
|
||||||
|
|
||||||
def recv(self, bufsize):
|
|
||||||
return greensock2.recv(self._s, bufsize)
|
|
||||||
|
|
||||||
def recvall(self, bufsize):
|
|
||||||
return greensock2.recvall(self._s, bufsize)
|
|
||||||
|
|
||||||
def recvfrom(self, bufsize):
|
|
||||||
self.wait_input()
|
|
||||||
buf, addr = self._s.recvfrom(bufsize)
|
|
||||||
if not buf:
|
|
||||||
raise ConnexionClosed("inbound connexion closed")
|
|
||||||
return buf, addr
|
|
||||||
|
|
||||||
def send(self, data):
|
|
||||||
self.wait_output()
|
|
||||||
return self._s.send(data)
|
|
||||||
|
|
||||||
def sendto(self, data, addr):
|
|
||||||
self.wait_output()
|
|
||||||
return self._s.sendto(data, addr)
|
|
||||||
|
|
||||||
def sendall(self, data):
|
|
||||||
greensock2.sendall(self._s, data)
|
|
||||||
|
|
||||||
setsockopt = _delegate("setsockopt")
|
|
||||||
shutdown = _delegate("shutdown")
|
|
||||||
|
|
||||||
def shutdown_rd(self):
|
|
||||||
try:
|
|
||||||
self._s.shutdown(socket.SHUT_RD)
|
|
||||||
except error:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def shutdown_wr(self):
|
|
||||||
try:
|
|
||||||
self._s.shutdown(socket.SHUT_WR)
|
|
||||||
except error:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def wait_input(self):
|
|
||||||
greensock2.wait_input(self._s)
|
|
||||||
|
|
||||||
def wait_output(self):
|
|
||||||
greensock2.wait_output(self._s)
|
|
|
@ -1,29 +0,0 @@
|
||||||
from pygreen.pipe.common import BufferedInput
|
|
||||||
|
|
||||||
|
|
||||||
class MeetingPointInput(BufferedInput):
|
|
||||||
|
|
||||||
def __init__(self, accepter):
|
|
||||||
self.accepter = accepter
|
|
||||||
|
|
||||||
def wait_input(self):
|
|
||||||
while not self.in_buf:
|
|
||||||
self.in_buf = self.accepter.accept()
|
|
||||||
|
|
||||||
def shutdown_rd(self):
|
|
||||||
self.accepter.close()
|
|
||||||
|
|
||||||
|
|
||||||
class MeetingPointOutput(BufferedInput):
|
|
||||||
|
|
||||||
def __init__(self, giver):
|
|
||||||
self.giver = giver
|
|
||||||
|
|
||||||
def wait_output(self):
|
|
||||||
self.giver.wait()
|
|
||||||
|
|
||||||
def sendall(self, buffer):
|
|
||||||
self.giver.give(buffer)
|
|
||||||
|
|
||||||
def shutdown_wr(self):
|
|
||||||
self.giver.close()
|
|
|
@ -1,306 +0,0 @@
|
||||||
#import os
|
|
||||||
import struct
|
|
||||||
from collections import deque
|
|
||||||
|
|
||||||
|
|
||||||
class InvalidPacket(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
FLAG_NAK1 = 0xE0
|
|
||||||
FLAG_NAK = 0xE1
|
|
||||||
FLAG_REG = 0xE2
|
|
||||||
FLAG_CFRM = 0xE3
|
|
||||||
|
|
||||||
FLAG_RANGE_START = 0xE0
|
|
||||||
FLAG_RANGE_STOP = 0xE4
|
|
||||||
|
|
||||||
max_old_packets = 256 # must be <= 256
|
|
||||||
|
|
||||||
|
|
||||||
class PipeLayer(object):
|
|
||||||
timeout = 1
|
|
||||||
headersize = 4
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
#self.localid = os.urandom(4)
|
|
||||||
#self.remoteid = None
|
|
||||||
self.cur_time = 0
|
|
||||||
self.out_queue = deque()
|
|
||||||
self.out_nextseqid = 0
|
|
||||||
self.out_nextrepeattime = None
|
|
||||||
self.in_nextseqid = 0
|
|
||||||
self.in_outoforder = {}
|
|
||||||
self.out_oldpackets = deque()
|
|
||||||
self.out_flags = FLAG_REG
|
|
||||||
self.out_resend = 0
|
|
||||||
self.out_resend_skip = False
|
|
||||||
|
|
||||||
def queue(self, data):
|
|
||||||
if data:
|
|
||||||
self.out_queue.appendleft(data)
|
|
||||||
|
|
||||||
def queue_size(self):
|
|
||||||
total = 0
|
|
||||||
for data in self.out_queue:
|
|
||||||
total += len(data)
|
|
||||||
return total
|
|
||||||
|
|
||||||
def in_sync(self):
|
|
||||||
return not self.out_queue and self.out_nextrepeattime is None
|
|
||||||
|
|
||||||
def settime(self, curtime):
|
|
||||||
self.cur_time = curtime
|
|
||||||
if self.out_queue:
|
|
||||||
if len(self.out_oldpackets) < max_old_packets:
|
|
||||||
return 0 # more data to send now
|
|
||||||
if self.out_nextrepeattime is not None:
|
|
||||||
return max(0, self.out_nextrepeattime - curtime)
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
||||||
def encode(self, maxlength):
|
|
||||||
#print ' '*self._dump_indent, '--- OUTQ', self.out_resend, self.out_queue
|
|
||||||
if len(self.out_oldpackets) >= max_old_packets:
|
|
||||||
# congestion, stalling
|
|
||||||
payload = 0
|
|
||||||
else:
|
|
||||||
payload = maxlength - 4
|
|
||||||
if payload <= 0:
|
|
||||||
raise ValueError("encode(): buffer too small")
|
|
||||||
if (self.out_nextrepeattime is not None and
|
|
||||||
self.out_nextrepeattime <= self.cur_time):
|
|
||||||
# no ACK received so far, send a packet (possibly empty)
|
|
||||||
if not self.out_queue:
|
|
||||||
payload = 0
|
|
||||||
else:
|
|
||||||
if not self.out_queue: # no more data to send
|
|
||||||
return None
|
|
||||||
if payload == 0: # congestion
|
|
||||||
return None
|
|
||||||
# prepare a packet
|
|
||||||
seqid = self.out_nextseqid
|
|
||||||
flags = self.out_flags
|
|
||||||
self.out_flags = FLAG_REG # clear out the flags for the next time
|
|
||||||
if payload > 0:
|
|
||||||
self.out_nextseqid = (seqid + 1) & 0xFFFF
|
|
||||||
data = self.out_queue.pop()
|
|
||||||
packetlength = len(data)
|
|
||||||
if self.out_resend > 0:
|
|
||||||
if packetlength > payload:
|
|
||||||
raise ValueError("XXX need constant buffer size for now")
|
|
||||||
self.out_resend -= 1
|
|
||||||
if self.out_resend_skip:
|
|
||||||
if self.out_resend > 0:
|
|
||||||
self.out_queue.pop()
|
|
||||||
self.out_resend -= 1
|
|
||||||
self.out_nextseqid = (seqid + 2) & 0xFFFF
|
|
||||||
self.out_resend_skip = False
|
|
||||||
packetpayload = data
|
|
||||||
else:
|
|
||||||
packet = []
|
|
||||||
while packetlength <= payload:
|
|
||||||
packet.append(data)
|
|
||||||
if not self.out_queue:
|
|
||||||
break
|
|
||||||
data = self.out_queue.pop()
|
|
||||||
packetlength += len(data)
|
|
||||||
else:
|
|
||||||
rest = len(data) + payload - packetlength
|
|
||||||
packet.append(data[:rest])
|
|
||||||
self.out_queue.append(data[rest:])
|
|
||||||
packetpayload = ''.join(packet)
|
|
||||||
self.out_oldpackets.appendleft(packetpayload)
|
|
||||||
#print ' '*self._dump_indent, '--- OLDPK', self.out_oldpackets
|
|
||||||
else:
|
|
||||||
# a pure ACK packet, no payload
|
|
||||||
if self.out_oldpackets and flags == FLAG_REG:
|
|
||||||
flags = FLAG_CFRM
|
|
||||||
packetpayload = ''
|
|
||||||
packet = struct.pack("!BBH", flags,
|
|
||||||
self.in_nextseqid & 0xFF,
|
|
||||||
seqid) + packetpayload
|
|
||||||
if self.out_oldpackets:
|
|
||||||
self.out_nextrepeattime = self.cur_time + self.timeout
|
|
||||||
else:
|
|
||||||
self.out_nextrepeattime = None
|
|
||||||
#self.dump('OUT', packet)
|
|
||||||
return packet
|
|
||||||
|
|
||||||
def decode(self, rawdata):
|
|
||||||
if len(rawdata) < 4:
|
|
||||||
raise InvalidPacket
|
|
||||||
#print ' '*self._dump_indent, '------ out %d (+%d) in %d' % (self.out_nextseqid, self.out_resend, self.in_nextseqid)
|
|
||||||
#self.dump('IN ', rawdata)
|
|
||||||
in_flags, ack_seqid, in_seqid = struct.unpack("!BBH", rawdata[:4])
|
|
||||||
if not (FLAG_RANGE_START <= in_flags < FLAG_RANGE_STOP):
|
|
||||||
raise InvalidPacket
|
|
||||||
in_diff = (in_seqid - self.in_nextseqid ) & 0xFFFF
|
|
||||||
ack_diff = (self.out_nextseqid + self.out_resend - ack_seqid) & 0xFF
|
|
||||||
if in_diff >= max_old_packets:
|
|
||||||
return '' # invalid, but can occur as a late repetition
|
|
||||||
if ack_diff != len(self.out_oldpackets):
|
|
||||||
# forget all acknowledged packets
|
|
||||||
if ack_diff > len(self.out_oldpackets):
|
|
||||||
return '' # invalid, but can occur with packet reordering
|
|
||||||
while len(self.out_oldpackets) > ack_diff:
|
|
||||||
#print ' '*self._dump_indent, '--- POP', repr(self.out_oldpackets[-1])
|
|
||||||
self.out_oldpackets.pop()
|
|
||||||
if self.out_oldpackets:
|
|
||||||
self.out_nextrepeattime = self.cur_time + self.timeout
|
|
||||||
else:
|
|
||||||
self.out_nextrepeattime = None # all packets ACKed
|
|
||||||
if in_flags == FLAG_NAK or in_flags == FLAG_NAK1:
|
|
||||||
# this is a NAK: resend the old packets as far as they've not
|
|
||||||
# also been ACK'ed in the meantime (can occur with reordering)
|
|
||||||
while self.out_resend < len(self.out_oldpackets):
|
|
||||||
self.out_queue.append(self.out_oldpackets[self.out_resend])
|
|
||||||
self.out_resend += 1
|
|
||||||
self.out_nextseqid = (self.out_nextseqid - 1) & 0xFFFF
|
|
||||||
#print ' '*self._dump_indent, '--- REP', self.out_nextseqid, repr(self.out_queue[-1])
|
|
||||||
self.out_resend_skip = in_flags == FLAG_NAK1
|
|
||||||
elif in_flags == FLAG_CFRM:
|
|
||||||
# this is a CFRM: request for confirmation
|
|
||||||
self.out_nextrepeattime = self.cur_time
|
|
||||||
# receive this packet's payload if it is the next in the sequence
|
|
||||||
if in_diff == 0:
|
|
||||||
if len(rawdata) > 4:
|
|
||||||
#print ' '*self._dump_indent, 'RECV ', self.in_nextseqid, repr(rawdata[4:])
|
|
||||||
self.in_nextseqid = (self.in_nextseqid + 1) & 0xFFFF
|
|
||||||
result = [rawdata[4:]]
|
|
||||||
while self.in_nextseqid in self.in_outoforder:
|
|
||||||
result.append(self.in_outoforder.pop(self.in_nextseqid))
|
|
||||||
self.in_nextseqid = (self.in_nextseqid + 1) & 0xFFFF
|
|
||||||
return ''.join(result)
|
|
||||||
else:
|
|
||||||
# we missed at least one intermediate packet: send a NAK
|
|
||||||
if len(rawdata) > 4:
|
|
||||||
self.in_outoforder[in_seqid] = rawdata[4:]
|
|
||||||
if ((self.in_nextseqid + 1) & 0xFFFF) in self.in_outoforder:
|
|
||||||
self.out_flags = FLAG_NAK1
|
|
||||||
else:
|
|
||||||
self.out_flags = FLAG_NAK
|
|
||||||
self.out_nextrepeattime = self.cur_time
|
|
||||||
return ''
|
|
||||||
|
|
||||||
_dump_indent = 0
|
|
||||||
def dump(self, dir, rawdata):
|
|
||||||
in_flags, ack_seqid, in_seqid = struct.unpack("!BBH", rawdata[:4])
|
|
||||||
print ' ' * self._dump_indent, dir,
|
|
||||||
if in_flags == FLAG_NAK:
|
|
||||||
print 'NAK',
|
|
||||||
elif in_flags == FLAG_NAK1:
|
|
||||||
print 'NAK1',
|
|
||||||
elif in_flags == FLAG_CFRM:
|
|
||||||
print 'CFRM',
|
|
||||||
#print ack_seqid, in_seqid, '(%d bytes)' % (len(rawdata)-4,)
|
|
||||||
print ack_seqid, in_seqid, repr(rawdata[4:])
|
|
||||||
|
|
||||||
|
|
||||||
def pipe_over_udp(udpsock, send_fd=-1, recv_fd=-1,
|
|
||||||
timeout=1.0, inactivity_timeout=None):
|
|
||||||
"""Example: send all data showing up in send_fd over the given UDP
|
|
||||||
socket, and write incoming data into recv_fd. The send_fd and
|
|
||||||
recv_fd are plain file descriptors. When an EOF is read from
|
|
||||||
send_fd, this function returns (after making sure that all data was
|
|
||||||
received by the remote side).
|
|
||||||
"""
|
|
||||||
import os
|
|
||||||
from select import select
|
|
||||||
from time import time
|
|
||||||
p = PipeLayer()
|
|
||||||
p.timeout = timeout
|
|
||||||
iwtdlist = [udpsock]
|
|
||||||
if send_fd >= 0:
|
|
||||||
iwtdlist.append(send_fd)
|
|
||||||
running = True
|
|
||||||
while running or not p.in_sync():
|
|
||||||
delay = delay1 = p.settime(time())
|
|
||||||
if delay is None:
|
|
||||||
delay = inactivity_timeout
|
|
||||||
iwtd, owtd, ewtd = select(iwtdlist, [], [], delay)
|
|
||||||
if iwtd:
|
|
||||||
if send_fd in iwtd:
|
|
||||||
data = os.read(send_fd, 1500 - p.headersize)
|
|
||||||
if not data:
|
|
||||||
# EOF
|
|
||||||
iwtdlist.remove(send_fd)
|
|
||||||
running = False
|
|
||||||
else:
|
|
||||||
#print 'queue', len(data)
|
|
||||||
p.queue(data)
|
|
||||||
if udpsock in iwtd:
|
|
||||||
packet = udpsock.recv(65535)
|
|
||||||
#print 'decode', len(packet)
|
|
||||||
p.settime(time())
|
|
||||||
data = p.decode(packet)
|
|
||||||
i = 0
|
|
||||||
while i < len(data):
|
|
||||||
i += os.write(recv_fd, data[i:])
|
|
||||||
elif delay1 is None:
|
|
||||||
break # long inactivity
|
|
||||||
p.settime(time())
|
|
||||||
packet = p.encode(1500)
|
|
||||||
if packet:
|
|
||||||
#print 'send', len(packet)
|
|
||||||
#if os.urandom(1) >= '\x08': # emulate packet losses
|
|
||||||
udpsock.send(packet)
|
|
||||||
|
|
||||||
|
|
||||||
class PipeOverUdp(object):
|
|
||||||
|
|
||||||
def __init__(self, udpsock, timeout=1.0):
|
|
||||||
import thread, os
|
|
||||||
self.os = os
|
|
||||||
self.sendpipe = os.pipe()
|
|
||||||
self.recvpipe = os.pipe()
|
|
||||||
thread.start_new_thread(pipe_over_udp, (udpsock,
|
|
||||||
self.sendpipe[0],
|
|
||||||
self.recvpipe[1],
|
|
||||||
timeout))
|
|
||||||
|
|
||||||
def __del__(self):
|
|
||||||
os = self.os
|
|
||||||
if self.sendpipe:
|
|
||||||
os.close(self.sendpipe[0])
|
|
||||||
os.close(self.sendpipe[1])
|
|
||||||
self.sendpipe = None
|
|
||||||
if self.recvpipe:
|
|
||||||
os.close(self.recvpipe[0])
|
|
||||||
os.close(self.recvpipe[1])
|
|
||||||
self.recvpipe = None
|
|
||||||
|
|
||||||
close = __del__
|
|
||||||
|
|
||||||
def send(self, data):
|
|
||||||
if not self.sendpipe:
|
|
||||||
raise IOError("I/O operation on a closed PipeOverUdp")
|
|
||||||
return self.os.write(self.sendpipe[1], data)
|
|
||||||
|
|
||||||
def sendall(self, data):
|
|
||||||
i = 0
|
|
||||||
while i < len(data):
|
|
||||||
i += self.send(data[i:])
|
|
||||||
|
|
||||||
def recv(self, bufsize):
|
|
||||||
if not self.recvpipe:
|
|
||||||
raise IOError("I/O operation on a closed PipeOverUdp")
|
|
||||||
return self.os.read(self.recvpipe[0], bufsize)
|
|
||||||
|
|
||||||
def recvall(self, bufsize):
|
|
||||||
buf = []
|
|
||||||
while bufsize > 0:
|
|
||||||
data = self.recv(bufsize)
|
|
||||||
buf.append(data)
|
|
||||||
bufsize -= len(data)
|
|
||||||
return ''.join(buf)
|
|
||||||
|
|
||||||
def fileno(self):
|
|
||||||
if not self.recvpipe:
|
|
||||||
raise IOError("I/O operation on a closed PipeOverUdp")
|
|
||||||
return self.recvpipe[0]
|
|
||||||
|
|
||||||
def ofileno(self):
|
|
||||||
if not self.sendpipe:
|
|
||||||
raise IOError("I/O operation on a closed PipeOverUdp")
|
|
||||||
return self.sendpipe[1]
|
|
|
@ -1,42 +0,0 @@
|
||||||
import BaseHTTPServer
|
|
||||||
from pygreen import greensock2
|
|
||||||
from pygreen.pipe.gsocket import GreenSocket
|
|
||||||
|
|
||||||
|
|
||||||
class GreenMixIn:
|
|
||||||
"""Mix-in class to handle each request in a new greenlet."""
|
|
||||||
|
|
||||||
def process_request_greenlet(self, request, client_address):
|
|
||||||
"""Same as in BaseServer but as a greenlet.
|
|
||||||
In addition, exception handling is done here.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
self.finish_request(request, client_address)
|
|
||||||
self.close_request(request)
|
|
||||||
except:
|
|
||||||
self.handle_error(request, client_address)
|
|
||||||
self.close_request(request)
|
|
||||||
|
|
||||||
def process_request(self, request, client_address):
|
|
||||||
"""Start a new greenlet to process the request."""
|
|
||||||
greensock2.autogreenlet(self.process_request_greenlet,
|
|
||||||
request, client_address)
|
|
||||||
|
|
||||||
|
|
||||||
class GreenHTTPServer(GreenMixIn, BaseHTTPServer.HTTPServer):
|
|
||||||
protocol_version = "HTTP/1.1"
|
|
||||||
|
|
||||||
def server_bind(self):
|
|
||||||
self.socket = GreenSocket.fromsocket(self.socket)
|
|
||||||
BaseHTTPServer.HTTPServer.server_bind(self)
|
|
||||||
|
|
||||||
|
|
||||||
def test_simple(handler_class=None):
|
|
||||||
if handler_class is None:
|
|
||||||
from SimpleHTTPServer import SimpleHTTPRequestHandler
|
|
||||||
handler_class = SimpleHTTPRequestHandler
|
|
||||||
server_address = ('', 8000)
|
|
||||||
httpd = GreenHTTPServer(server_address, handler_class)
|
|
||||||
sa = httpd.socket.getsockname()
|
|
||||||
print "Serving HTTP on", sa[0], "port", sa[1], "..."
|
|
||||||
httpd.serve_forever()
|
|
|
@ -1 +0,0 @@
|
||||||
#
|
|
|
@ -1,48 +0,0 @@
|
||||||
import py
|
|
||||||
from pygreen.greenexecnet import *
|
|
||||||
import pygreen
|
|
||||||
|
|
||||||
def setup_module(mod):
|
|
||||||
os.environ["PYTHONPATH"] = "%s:%s" %(
|
|
||||||
py.path.local(pygreen.__file__).dirpath().dirpath(), os.environ['PYTHONPATH'])
|
|
||||||
#py.test.skip("need to fix PYTHONPATH/sys.path handling for sub processes so "
|
|
||||||
# "that they find the pygreen package.")
|
|
||||||
|
|
||||||
def test_simple():
|
|
||||||
gw = PopenGateway()
|
|
||||||
channel = gw.remote_exec("x = channel.receive(); channel.send(x * 6)")
|
|
||||||
channel.send(7)
|
|
||||||
res = channel.receive()
|
|
||||||
assert res == 42
|
|
||||||
|
|
||||||
def test_ssh():
|
|
||||||
py.test.skip("Bootstrapping")
|
|
||||||
gw = SshGateway('codespeak.net')
|
|
||||||
channel = gw.remote_exec("""
|
|
||||||
import socket
|
|
||||||
channel.send(socket.gethostname())
|
|
||||||
""")
|
|
||||||
res = channel.receive()
|
|
||||||
assert res.endswith('codespeak.net')
|
|
||||||
|
|
||||||
def test_remote_error():
|
|
||||||
gw = PopenGateway()
|
|
||||||
channel = gw.remote_exec("x = channel.receive(); channel.send(x + 1)")
|
|
||||||
channel.send("hello")
|
|
||||||
py.test.raises(RemoteError, channel.receive)
|
|
||||||
|
|
||||||
def test_invalid_object():
|
|
||||||
class X(object):
|
|
||||||
pass
|
|
||||||
gw = PopenGateway()
|
|
||||||
channel = gw.remote_exec("x = channel.receive(); channel.send(x + 1)")
|
|
||||||
channel.send(X())
|
|
||||||
py.test.raises(RemoteError, channel.receive)
|
|
||||||
|
|
||||||
def test_channel_over_channel():
|
|
||||||
gw = PopenGateway()
|
|
||||||
chan1 = gw.newchannel()
|
|
||||||
channel = gw.remote_exec("chan1 = channel.receive(); chan1.send(42)")
|
|
||||||
channel.send(chan1)
|
|
||||||
res = chan1.receive()
|
|
||||||
assert res == 42
|
|
|
@ -1,221 +0,0 @@
|
||||||
import py
|
|
||||||
from socket import *
|
|
||||||
from pygreen.greensock2 import *
|
|
||||||
|
|
||||||
def test_meetingpoint():
|
|
||||||
giv1, acc1 = meetingpoint()
|
|
||||||
giv2, acc2 = meetingpoint()
|
|
||||||
giv3, acc3 = meetingpoint()
|
|
||||||
|
|
||||||
lst = []
|
|
||||||
|
|
||||||
def g1():
|
|
||||||
lst.append(0)
|
|
||||||
x = acc2.accept()
|
|
||||||
assert x == 'hello'
|
|
||||||
lst.append(2)
|
|
||||||
giv1.give('world')
|
|
||||||
lst.append(5)
|
|
||||||
x = acc3.accept()
|
|
||||||
assert x == 'middle'
|
|
||||||
lst.append(6)
|
|
||||||
giv3.give('done')
|
|
||||||
|
|
||||||
def g2():
|
|
||||||
lst.append(1)
|
|
||||||
giv2.give('hello')
|
|
||||||
lst.append(3)
|
|
||||||
y = acc1.accept()
|
|
||||||
assert y == 'world'
|
|
||||||
lst.append(4)
|
|
||||||
|
|
||||||
autogreenlet(g1)
|
|
||||||
autogreenlet(g2)
|
|
||||||
giv3.give('middle')
|
|
||||||
tag = acc3.accept()
|
|
||||||
assert tag == 'done'
|
|
||||||
assert lst == [0, 1, 2, 3, 4, 5, 6]
|
|
||||||
|
|
||||||
|
|
||||||
def test_producer():
|
|
||||||
lst = []
|
|
||||||
|
|
||||||
def prod(n):
|
|
||||||
lst.append(1)
|
|
||||||
yield n
|
|
||||||
lst.append(2)
|
|
||||||
yield 87
|
|
||||||
lst.append(3)
|
|
||||||
|
|
||||||
def cons():
|
|
||||||
lst.append(4)
|
|
||||||
accepter = producer(prod, 145)
|
|
||||||
lst.append(5)
|
|
||||||
lst.append(accepter.accept())
|
|
||||||
lst.append(6)
|
|
||||||
lst.append(accepter.accept())
|
|
||||||
lst.append(7)
|
|
||||||
try:
|
|
||||||
accepter.accept()
|
|
||||||
except Interrupted:
|
|
||||||
lst.append(8)
|
|
||||||
|
|
||||||
oneof(cons)
|
|
||||||
assert lst == [4, 5, 1, 145, 6, 2, 87, 7, 3, 8]
|
|
||||||
|
|
||||||
|
|
||||||
def test_timer():
|
|
||||||
lst = []
|
|
||||||
|
|
||||||
def g1():
|
|
||||||
sleep(0.1)
|
|
||||||
lst.append(1)
|
|
||||||
sleep(0.2)
|
|
||||||
lst.append(3)
|
|
||||||
|
|
||||||
def g2():
|
|
||||||
lst.append(0)
|
|
||||||
sleep(0.2)
|
|
||||||
lst.append(2)
|
|
||||||
sleep(0.2)
|
|
||||||
lst.append(4)
|
|
||||||
|
|
||||||
oneof(g1, g2)
|
|
||||||
assert lst == [0, 1, 2, 3]
|
|
||||||
|
|
||||||
def test_kill_other():
|
|
||||||
|
|
||||||
def g1():
|
|
||||||
sleep(.1)
|
|
||||||
return 1
|
|
||||||
|
|
||||||
def g2():
|
|
||||||
sleep(.2)
|
|
||||||
return 2
|
|
||||||
|
|
||||||
res = oneof(g1, g2)
|
|
||||||
assert res == 1
|
|
||||||
|
|
||||||
def test_socket():
|
|
||||||
s1 = socket(AF_INET, SOCK_DGRAM)
|
|
||||||
s2 = socket(AF_INET, SOCK_DGRAM)
|
|
||||||
s1.bind(('', INADDR_ANY))
|
|
||||||
s2.bind(('', INADDR_ANY))
|
|
||||||
s1.connect(s2.getsockname())
|
|
||||||
s2.connect(s1.getsockname())
|
|
||||||
|
|
||||||
lst = []
|
|
||||||
|
|
||||||
def g1():
|
|
||||||
lst.append(0)
|
|
||||||
x = recv(s1, 5)
|
|
||||||
assert x == 'hello'
|
|
||||||
lst.append(3)
|
|
||||||
sendall(s1, 'world')
|
|
||||||
lst.append(4)
|
|
||||||
return 1
|
|
||||||
|
|
||||||
def g2():
|
|
||||||
lst.append(1)
|
|
||||||
sendall(s2, 'hello')
|
|
||||||
lst.append(2)
|
|
||||||
y = recv(s2, 5)
|
|
||||||
assert y == 'world'
|
|
||||||
lst.append(5)
|
|
||||||
return 2
|
|
||||||
|
|
||||||
one, two = allof(g1, g2)
|
|
||||||
assert lst == [0, 1, 2, 3, 4, 5]
|
|
||||||
assert one == 1
|
|
||||||
assert two == 2
|
|
||||||
|
|
||||||
##def test_Queue():
|
|
||||||
|
|
||||||
## def g1():
|
|
||||||
## lst.append(5)
|
|
||||||
## q.put(6)
|
|
||||||
## lst.append(7)
|
|
||||||
## q.put(8)
|
|
||||||
## lst.append(9)
|
|
||||||
## q.put(10)
|
|
||||||
## lst.append(11)
|
|
||||||
## q.put(12) # not used
|
|
||||||
|
|
||||||
## def g2():
|
|
||||||
## lst.append(1)
|
|
||||||
## lst.append(q.get())
|
|
||||||
## lst.append(2)
|
|
||||||
## lst.append(q.get())
|
|
||||||
## lst.append(3)
|
|
||||||
## lst.append(q.get())
|
|
||||||
## lst.append(4)
|
|
||||||
|
|
||||||
## q = Queue()
|
|
||||||
## lst = []
|
|
||||||
## autogreenlet(g1)
|
|
||||||
## autogreenlet(g2)
|
|
||||||
## wait()
|
|
||||||
## assert lst == [5, 7, 9, 11, 1, 6, 2, 8, 3, 10, 4]
|
|
||||||
|
|
||||||
## q = Queue()
|
|
||||||
## lst = []
|
|
||||||
## autogreenlet(g2)
|
|
||||||
## autogreenlet(g1)
|
|
||||||
## wait()
|
|
||||||
## assert lst == [1, 5, 7, 9, 11, 6, 2, 8, 3, 10, 4]
|
|
||||||
|
|
||||||
|
|
||||||
##def test_Event():
|
|
||||||
|
|
||||||
## def g1():
|
|
||||||
## assert not e.isSet()
|
|
||||||
## e.wait()
|
|
||||||
## assert not e.isSet() # no longer set
|
|
||||||
## lst.append(1)
|
|
||||||
## e.set()
|
|
||||||
## e.wait()
|
|
||||||
## lst.append(2)
|
|
||||||
## assert e.isSet()
|
|
||||||
## e.clear()
|
|
||||||
## assert not e.isSet()
|
|
||||||
## lst.append(0)
|
|
||||||
## e.set()
|
|
||||||
## lst.append(3)
|
|
||||||
## assert e.isSet()
|
|
||||||
|
|
||||||
## def g2():
|
|
||||||
## assert not e.isSet()
|
|
||||||
## lst.append(4)
|
|
||||||
## e.set()
|
|
||||||
## lst.append(7)
|
|
||||||
## e.clear()
|
|
||||||
## e.set()
|
|
||||||
## e.clear()
|
|
||||||
## assert not e.isSet()
|
|
||||||
## lst.append(5)
|
|
||||||
## e.wait()
|
|
||||||
## assert e.isSet()
|
|
||||||
## lst.append(6)
|
|
||||||
|
|
||||||
## e = Event()
|
|
||||||
## lst = []
|
|
||||||
## autogreenlet(g1)
|
|
||||||
## autogreenlet(g2)
|
|
||||||
## wait()
|
|
||||||
## assert lst == [4, 7, 5, 1, 2, 0, 3, 6]
|
|
||||||
|
|
||||||
|
|
||||||
##def test_Event_timeout():
|
|
||||||
## def g1():
|
|
||||||
## lst.append(5)
|
|
||||||
## e.wait(0.1)
|
|
||||||
## lst.append(e.isSet())
|
|
||||||
## e.wait(60.0)
|
|
||||||
## lst.append(e.isSet())
|
|
||||||
## lst = []
|
|
||||||
## e = Event()
|
|
||||||
## autogreenlet(g1)
|
|
||||||
## sleep(0.5)
|
|
||||||
## e.set()
|
|
||||||
## wait()
|
|
||||||
## assert lst == [5, False, True]
|
|
|
@ -1,223 +0,0 @@
|
||||||
import os, random
|
|
||||||
from pygreen.pipelayer import PipeLayer, pipe_over_udp, PipeOverUdp
|
|
||||||
import py
|
|
||||||
|
|
||||||
def test_simple():
|
|
||||||
data1 = os.urandom(1000)
|
|
||||||
data2 = os.urandom(1000)
|
|
||||||
p1 = PipeLayer()
|
|
||||||
p2 = PipeLayer()
|
|
||||||
p2._dump_indent = 40
|
|
||||||
p1.queue(data1)
|
|
||||||
p1.queue(data2)
|
|
||||||
recv = ''
|
|
||||||
while len(recv) < 2000:
|
|
||||||
raw = p1.encode(64)
|
|
||||||
assert raw is not None
|
|
||||||
res = p2.decode(raw)
|
|
||||||
assert res is not None
|
|
||||||
recv += res
|
|
||||||
assert recv == data1 + data2
|
|
||||||
raw = p1.encode(64)
|
|
||||||
assert raw is None
|
|
||||||
|
|
||||||
def test_stabilize():
|
|
||||||
data1 = os.urandom(28)
|
|
||||||
p1 = PipeLayer()
|
|
||||||
p2 = PipeLayer()
|
|
||||||
p2._dump_indent = 40
|
|
||||||
p1.queue(data1)
|
|
||||||
recv = ''
|
|
||||||
t = 0.0
|
|
||||||
print
|
|
||||||
while True:
|
|
||||||
delay1 = p1.settime(t)
|
|
||||||
delay2 = p2.settime(t)
|
|
||||||
t += 0.100000001
|
|
||||||
if delay1 is delay2 is None:
|
|
||||||
break
|
|
||||||
if delay1 == 0:
|
|
||||||
raw = p1.encode(10)
|
|
||||||
p1.dump('OUT', raw)
|
|
||||||
assert raw is not None
|
|
||||||
res = p2.decode(raw)
|
|
||||||
assert res is not None
|
|
||||||
recv += res
|
|
||||||
if delay2 == 0:
|
|
||||||
raw = p2.encode(10)
|
|
||||||
p2.dump('OUT', raw)
|
|
||||||
assert raw is not None
|
|
||||||
res = p1.decode(raw)
|
|
||||||
assert res == ''
|
|
||||||
assert recv == data1
|
|
||||||
|
|
||||||
def test_bidir():
|
|
||||||
data1 = os.urandom(1000)
|
|
||||||
data2 = os.urandom(1000)
|
|
||||||
p1 = PipeLayer()
|
|
||||||
p2 = PipeLayer()
|
|
||||||
p2._dump_indent = 40
|
|
||||||
p1.queue(data1)
|
|
||||||
p2.queue(data2)
|
|
||||||
recv = ['', '']
|
|
||||||
while len(recv[0]) < 1000 or len(recv[1]) < 1000:
|
|
||||||
progress = False
|
|
||||||
for (me, other, i) in [(p1, p2, 1), (p2, p1, 0)]:
|
|
||||||
raw = me.encode(64)
|
|
||||||
if raw is not None:
|
|
||||||
res = other.decode(raw)
|
|
||||||
assert res is not None
|
|
||||||
recv[i] += res
|
|
||||||
if res:
|
|
||||||
progress = True
|
|
||||||
assert progress
|
|
||||||
assert recv[0] == data2
|
|
||||||
assert recv[1] == data1
|
|
||||||
raw = p1.encode(64)
|
|
||||||
assert raw is None
|
|
||||||
raw = p2.encode(64)
|
|
||||||
assert raw is None
|
|
||||||
|
|
||||||
def test_with_loss():
|
|
||||||
data1 = os.urandom(10000).encode('hex')
|
|
||||||
data2 = os.urandom(10000).encode('hex')
|
|
||||||
#data1 = '0123456789'
|
|
||||||
#data2 = 'ABCDEFGHIJ'
|
|
||||||
p1 = PipeLayer()
|
|
||||||
p2 = PipeLayer()
|
|
||||||
p2._dump_indent = 40
|
|
||||||
p1.queue(data1)
|
|
||||||
p2.queue(data2)
|
|
||||||
recv = ['', '']
|
|
||||||
time = 0
|
|
||||||
active = 1
|
|
||||||
while active:
|
|
||||||
active = 0
|
|
||||||
time += 0.2
|
|
||||||
#print '.'
|
|
||||||
exchange = []
|
|
||||||
for (me, other, i) in [(p1, p2, 1), (p2, p1, 0)]:
|
|
||||||
to = me.settime(time)
|
|
||||||
packet = me.encode(12)
|
|
||||||
assert (packet is not None) == (to == 0)
|
|
||||||
if to is not None:
|
|
||||||
active = 1
|
|
||||||
if to == 0:
|
|
||||||
exchange.append((packet, other, i))
|
|
||||||
for (packet, other, i) in exchange:
|
|
||||||
if random.random() < 0.5:
|
|
||||||
pass # drop packet
|
|
||||||
else:
|
|
||||||
res = other.decode(packet)
|
|
||||||
assert res is not None
|
|
||||||
recv[i] += res
|
|
||||||
assert data2.startswith(recv[0])
|
|
||||||
assert data1.startswith(recv[1])
|
|
||||||
assert recv[0] == data2
|
|
||||||
assert recv[1] == data1
|
|
||||||
print time
|
|
||||||
|
|
||||||
def test_massive_reordering():
|
|
||||||
data1 = os.urandom(10000).encode('hex')
|
|
||||||
data2 = os.urandom(10000).encode('hex')
|
|
||||||
#data1 = '0123456789'
|
|
||||||
#data2 = 'ABCDEFGHIJ'
|
|
||||||
p1 = PipeLayer()
|
|
||||||
p2 = PipeLayer()
|
|
||||||
p2._dump_indent = 40
|
|
||||||
p1.queue(data1)
|
|
||||||
p2.queue(data2)
|
|
||||||
recv = ['', '']
|
|
||||||
time = 0
|
|
||||||
active = 1
|
|
||||||
exchange = []
|
|
||||||
while active or exchange:
|
|
||||||
active = 0
|
|
||||||
time += 0.2
|
|
||||||
#print '.'
|
|
||||||
for (me, other, i) in [(p1, p2, 1), (p2, p1, 0)]:
|
|
||||||
to = me.settime(time)
|
|
||||||
packet = me.encode(12)
|
|
||||||
assert (packet is not None) == (to == 0)
|
|
||||||
if to is not None:
|
|
||||||
active = 1
|
|
||||||
if to == 0:
|
|
||||||
exchange.append((packet, other, i))
|
|
||||||
if random.random() < 0.02:
|
|
||||||
random.shuffle(exchange)
|
|
||||||
for (packet, other, i) in exchange:
|
|
||||||
res = other.decode(packet)
|
|
||||||
assert res is not None
|
|
||||||
recv[i] += res
|
|
||||||
exchange = []
|
|
||||||
assert data2.startswith(recv[0])
|
|
||||||
assert data1.startswith(recv[1])
|
|
||||||
assert recv[0] == data2
|
|
||||||
assert recv[1] == data1
|
|
||||||
print time
|
|
||||||
|
|
||||||
def udpsockpair():
|
|
||||||
from socket import socket, AF_INET, SOCK_DGRAM, INADDR_ANY
|
|
||||||
s1 = socket(AF_INET, SOCK_DGRAM)
|
|
||||||
s2 = socket(AF_INET, SOCK_DGRAM)
|
|
||||||
s1.bind(('127.0.0.1', INADDR_ANY))
|
|
||||||
s2.bind(('127.0.0.1', INADDR_ANY))
|
|
||||||
s2.connect(s1.getsockname())
|
|
||||||
s1.connect(s2.getsockname())
|
|
||||||
return s1, s2
|
|
||||||
|
|
||||||
def test_pipe_over_udp():
|
|
||||||
import thread
|
|
||||||
s1, s2 = udpsockpair()
|
|
||||||
|
|
||||||
tmp = py.test.ensuretemp("pipeoverudp")
|
|
||||||
p = py.path.local(__file__)
|
|
||||||
p.copy(tmp.join(p.basename))
|
|
||||||
old = tmp.chdir()
|
|
||||||
try:
|
|
||||||
fd1 = os.open(__file__, os.O_RDONLY)
|
|
||||||
fd2 = os.open('test_pipelayer.py~copy', os.O_WRONLY|os.O_CREAT|os.O_TRUNC)
|
|
||||||
|
|
||||||
thread.start_new_thread(pipe_over_udp, (s1, fd1))
|
|
||||||
pipe_over_udp(s2, recv_fd=fd2, inactivity_timeout=2.5)
|
|
||||||
os.close(fd1)
|
|
||||||
os.close(fd2)
|
|
||||||
f = open(__file__, 'rb')
|
|
||||||
data1 = f.read()
|
|
||||||
f.close()
|
|
||||||
f = open('test_pipelayer.py~copy', 'rb')
|
|
||||||
data2 = f.read()
|
|
||||||
f.close()
|
|
||||||
assert data1 == data2
|
|
||||||
os.unlink('test_pipelayer.py~copy')
|
|
||||||
finally:
|
|
||||||
old.chdir()
|
|
||||||
|
|
||||||
def test_PipeOverUdp():
|
|
||||||
s1, s2 = udpsockpair()
|
|
||||||
p1 = PipeOverUdp(s1, timeout=0.2)
|
|
||||||
p2 = PipeOverUdp(s2, timeout=0.2)
|
|
||||||
p2.sendall('goodbye')
|
|
||||||
for k in range(10):
|
|
||||||
p1.sendall('hello world')
|
|
||||||
input = p2.recvall(11)
|
|
||||||
assert input == 'hello world'
|
|
||||||
input = p1.recvall(7)
|
|
||||||
assert input == 'goodbye'
|
|
||||||
|
|
||||||
bigchunk1 = os.urandom(500000)
|
|
||||||
bigchunk2 = os.urandom(500000)
|
|
||||||
i1 = i2 = 0
|
|
||||||
j1 = j2 = 0
|
|
||||||
while j1 < len(bigchunk1) or j2 < len(bigchunk2):
|
|
||||||
i1 += p1.send(bigchunk1[i1:i1+512])
|
|
||||||
i2 += p2.send(bigchunk2[i2:i2+512])
|
|
||||||
data = p1.recv(512)
|
|
||||||
assert data == bigchunk2[j2:j2+len(data)]
|
|
||||||
j2 += len(data)
|
|
||||||
data = p2.recv(512)
|
|
||||||
assert data == bigchunk1[j1:j1+len(data)]
|
|
||||||
j1 += len(data)
|
|
||||||
#print i1, i2, j1, j2
|
|
||||||
p1.close()
|
|
||||||
p2.close()
|
|
Loading…
Reference in New Issue