[svn r39983] (arigo, fijal) -

* oneof(functions) runs all functions in parallel
  until one returns a value, then returns that value

* allof(functions) same as previous, but returns all
  return values in a tuple

--HG--
branch : trunk
This commit is contained in:
fijal 2007-03-06 14:54:33 +01:00
parent 8bf738614e
commit 0ef18c50bd
3 changed files with 88 additions and 133 deletions

View File

@ -189,7 +189,7 @@ class PopenCmdGateway(Gateway):
os.dup2(fd, 0) os.dup2(fd, 0)
os.dup2(fd, 1) os.dup2(fd, 1)
os.close(fd) os.close(fd)
greensock2.wait(gw.greenlet) greensock2._suspend_forever()
run_server = staticmethod(run_server) run_server = staticmethod(run_server)
class PopenGateway(PopenCmdGateway): class PopenGateway(PopenCmdGateway):

View File

@ -196,80 +196,6 @@ def remove_by_id(d, obj):
d.clear() d.clear()
d.extend(lst) d.extend(lst)
# ____________________________________________________________
##class Queue(object):
## def __init__(self):
## self.giver, self.accepter = meetingpoint()
## self.pending = deque()
## def put(self, item): # preserve the caller's atomicity
## self.pending.append(item)
## if self.accepter.ready():
## self.accepter.accept()
## def get(self, block=True):
## if self.pending:
## return self.pending.popleft()
## elif block:
## self.giver.give(None)
## return self.pending.popleft()
## else:
## raise Empty
##class Empty(Interrupted):
## pass
##class Event(object):
## def __init__(self):
## self.giver, self.accepter = meetingpoint()
## clear = __init__
## def isSet(self):
## return self.accepter is None
## def set(self): # preserve the caller's atomicity
## if self.accepter is not None:
## accepter = self.accepter
## self.giver = self.accepter = None
## while accepter.ready(): # wake up all waiters
## accepter.accept()
## def wait(self, timeout=None):
## if self.accepter is not None:
## if timeout is None:
## self.giver.give(None)
## else:
## timer = Timer(timeout)
## try:
## try:
## self.giver.give(None)
## except Interrupted:
## pass
## finally:
## timer.stop()
##class Semaphore(object):
## def __init__(self, value=1):
## self.giver, self.accepter = meetingpoint()
## for i in range(value):
## self.release()
## def acquire(self, blocking=True):
## if blocking or self.accepter.ready():
## return self.accepter.accept()
## else:
## return False
## def release(self):
## autogreenlet(self.giver.put, True)
# ____________________________________________________________
def wait_input(sock): def wait_input(sock):
_register(g_iwtd, sock) _register(g_iwtd, sock)
@ -339,28 +265,41 @@ def writeall(fd, buffer):
in_front = True in_front = True
def sleep(duration, *greenlets): def sleep(duration):
timer = Timer(duration) timer = Timer(duration)
try: try:
wait(*greenlets) _suspend_forever()
finally: finally:
ok = timer.finished ok = timer.finished
timer.stop() timer.stop()
if not ok: if not ok:
raise Interrupted raise Interrupted
def _wait(): def _suspend_forever():
g_dispatcher.switch() g_dispatcher.switch()
def wait(*greenlets): def oneof(*callables):
assert greenlets#, "should not wait without events to wait on" assert callables
current = g_getcurrent() for c in callables:
assert callable(c)
greenlets = [tracinggreenlet(c) for c in callables]
g_active.extend(greenlets)
res = g_dispatcher.switch()
for g in greenlets: for g in greenlets:
if g in g_waiters: g.interrupt()
g_waiters[g].append(current) return res
else:
g_waiters[g] = [current] def allof(*callables):
g_dispatcher.switch() for c in callables:
assert callable(c)
greenlets = [tracinggreenlet(lambda i=i, c=c: (i, c()))
for i, c in enumerate(callables)]
g_active.extend(greenlets)
result = [None] * len(callables)
for _ in callables:
num, res = g_dispatcher.switch()
result[num] = res
return tuple(result)
class Timer(object): class Timer(object):
started = False started = False
@ -387,31 +326,31 @@ class Timer(object):
# ____________________________________________________________ # ____________________________________________________________
class autogreenlet(greenlet): class tracinggreenlet(greenlet):
def __init__(self, function, *args, **kwds): def __init__(self, function, *args, **kwds):
self.parent = g_dispatcher
self.function = function self.function = function
self.args = args self.args = args
self.kwds = kwds self.kwds = kwds
g_active.append(self)
def __repr__(self):
## args = ', '.join([repr(s) for s in self.args] +
## ['%s=%r' % keyvalue for keyvalue in self.kwds.items()])
## return '<autogreenlet %s(%s)>' % (self.function.__name__, args)
return '<%s %s at %s>' % (self.__class__.__name__,
self.function.__name__,
hex(id(self)))
def run(self): def run(self):
self.trace("start") self.trace("start")
try: try:
self.function(*self.args, **self.kwds) res = self.function(*self.args, **self.kwds)
except Exception, e: except Exception, e:
self.trace("stop (%s%s)", e.__class__.__name__, self.trace("stop (%s%s)", e.__class__.__name__,
str(e) and (': '+str(e))) str(e) and (': '+str(e)))
raise raise
else: else:
self.trace("done") self.trace("done")
return res
def __repr__(self):
## args = ', '.join([repr(s) for s in self.args] +
## ['%s=%r' % keyvalue for keyvalue in self.kwds.items()])
## return '<autogreenlet %s(%s)>' % (self.function.__name__, args)
return '<autogreenlet %s at %s>' % (self.function.__name__,
hex(id(self)))
def trace(self, msg, *args): def trace(self, msg, *args):
if TRACE: if TRACE:
@ -420,6 +359,11 @@ class autogreenlet(greenlet):
def interrupt(self): def interrupt(self):
self.throw(Interrupted) self.throw(Interrupted)
class autogreenlet(tracinggreenlet):
def __init__(self, *args, **kwargs):
super(autogreenlet, self).__init__(*args, **kwargs)
self.parent = g_dispatcher
g_active.append(self)
g_active = deque() g_active = deque()
g_iwtd = {} g_iwtd = {}
@ -452,24 +396,25 @@ def check_dead_greenlets(mapping):
for k in to_remove: for k in to_remove:
del mapping[k] del mapping[k]
def check_waiters(active): #def check_waiters(active):
if active in g_waiters: # if active in g_waiters:
for g in g_waiters[active]: # for g in g_waiters[active]:
g.switch() # g.switch()
del g_waiters[active] # del g_waiters[active]
def dispatcher_mainloop(): def dispatcher_mainloop():
global g_timers_mixed global g_timers_mixed
GreenletExit = greenlet.GreenletExit
while 1: while 1:
try: try:
while g_active: while g_active:
print 'active:', g_active[0] #print 'active:', g_active[0]
active = g_active.popleft() g_active.popleft().switch()
active.switch() # active.switch()
if active.dead: # if active.dead:
check_waiters(active) # check_waiters(active)
del active # del active
if g_timers: if g_timers:
if g_timers_mixed: if g_timers_mixed:
heapify(g_timers) heapify(g_timers)
@ -482,8 +427,8 @@ def dispatcher_mainloop():
#print 'timeout:', g #print 'timeout:', g
timer.finished = True timer.finished = True
timer.g.switch() timer.g.switch()
if timer.g.dead: # if timer.g.dead:
check_waiters(timer.g) # check_waiters(timer.g)
continue continue
delay = 0.0 delay = 0.0
timer.started = True timer.started = True
@ -496,9 +441,9 @@ def dispatcher_mainloop():
continue continue
delay = None delay = None
print 'selecting...', g_iwtd.keys(), g_owtd.keys(), delay #print 'selecting...', g_iwtd.keys(), g_owtd.keys(), delay
iwtd, owtd, _ = _select(g_iwtd.keys(), g_owtd.keys(), [], delay) iwtd, owtd, _ = _select(g_iwtd.keys(), g_owtd.keys(), [], delay)
print 'done' #print 'done'
for s in owtd: for s in owtd:
if s in g_owtd: if s in g_owtd:
d = g_owtd[s] d = g_owtd[s]
@ -510,8 +455,8 @@ def dispatcher_mainloop():
except KeyError: except KeyError:
pass pass
g.switch(g_owtd) g.switch(g_owtd)
if g.dead: # if g.dead:
check_waiters(g) # check_waiters(g)
for s in iwtd: for s in iwtd:
if s in g_iwtd: if s in g_iwtd:
d = g_iwtd[s] d = g_iwtd[s]
@ -523,11 +468,13 @@ def dispatcher_mainloop():
except KeyError: except KeyError:
pass pass
g.switch(g_iwtd) g.switch(g_iwtd)
if g.dead: # if g.dead:
check_waiters(g) # check_waiters(g)
except GreenletExit:
raise
except: except:
import sys import sys
g_dispatcher.parent.throw(*sys.exc_info()) g_dispatcher.parent.throw(*sys.exc_info())
g_dispatcher = greenlet(dispatcher_mainloop) g_dispatcher = greenlet(dispatcher_mainloop)
g_waiters = {} #g_waiters = {}

View File

@ -60,8 +60,7 @@ def test_producer():
except Interrupted: except Interrupted:
lst.append(8) lst.append(8)
g = autogreenlet(cons) oneof(cons)
wait(g)
assert lst == [4, 5, 1, 145, 6, 2, 87, 7, 3, 8] assert lst == [4, 5, 1, 145, 6, 2, 87, 7, 3, 8]
@ -69,24 +68,33 @@ def test_timer():
lst = [] lst = []
def g1(): def g1():
sleep(0.1, g_1) sleep(0.1)
lst.append(1) lst.append(1)
sleep(0.2, g_1) sleep(0.2)
lst.append(3) lst.append(3)
def g2(): def g2():
lst.append(0) lst.append(0)
sleep(0.2, g_2) sleep(0.2)
lst.append(2) lst.append(2)
sleep(0.2, g_2) sleep(0.2)
lst.append(4) lst.append(4)
g_1 = autogreenlet(g1) oneof(g1, g2)
g_2 = autogreenlet(g2) assert lst == [0, 1, 2, 3]
wait(g_1)
wait(g_2)
assert lst == [0, 1, 2, 3, 4]
def test_kill_other():
def g1():
sleep(.1)
return 1
def g2():
sleep(.2)
return 2
res = oneof(g1, g2)
assert res == 1
def test_socket(): def test_socket():
s1 = socket(AF_INET, SOCK_DGRAM) s1 = socket(AF_INET, SOCK_DGRAM)
@ -105,6 +113,7 @@ def test_socket():
lst.append(3) lst.append(3)
sendall(s1, 'world') sendall(s1, 'world')
lst.append(4) lst.append(4)
return 1
def g2(): def g2():
lst.append(1) lst.append(1)
@ -113,13 +122,12 @@ def test_socket():
y = recv(s2, 5) y = recv(s2, 5)
assert y == 'world' assert y == 'world'
lst.append(5) lst.append(5)
return 2
g_1 = autogreenlet(g1) one, two = allof(g1, g2)
g_2 = autogreenlet(g2)
wait(g_1)
wait(g_2)
assert lst == [0, 1, 2, 3, 4, 5] assert lst == [0, 1, 2, 3, 4, 5]
assert one == 1
assert two == 2
##def test_Queue(): ##def test_Queue():