diff --git a/py/net/greenexecnet.py b/py/net/greenexecnet.py index 765e12a4a..7c5e2d882 100644 --- a/py/net/greenexecnet.py +++ b/py/net/greenexecnet.py @@ -189,7 +189,7 @@ class PopenCmdGateway(Gateway): os.dup2(fd, 0) os.dup2(fd, 1) os.close(fd) - greensock2.wait(gw.greenlet) + greensock2._suspend_forever() run_server = staticmethod(run_server) class PopenGateway(PopenCmdGateway): diff --git a/py/net/greensock2.py b/py/net/greensock2.py index 0525f4694..c52a634a0 100644 --- a/py/net/greensock2.py +++ b/py/net/greensock2.py @@ -196,80 +196,6 @@ def remove_by_id(d, obj): d.clear() d.extend(lst) -# ____________________________________________________________ - -##class Queue(object): - -## def __init__(self): -## self.giver, self.accepter = meetingpoint() -## self.pending = deque() - -## def put(self, item): # preserve the caller's atomicity -## self.pending.append(item) -## if self.accepter.ready(): -## self.accepter.accept() - -## def get(self, block=True): -## if self.pending: -## return self.pending.popleft() -## elif block: -## self.giver.give(None) -## return self.pending.popleft() -## else: -## raise Empty - -##class Empty(Interrupted): -## pass - -##class Event(object): - -## def __init__(self): -## self.giver, self.accepter = meetingpoint() - -## clear = __init__ - -## def isSet(self): -## return self.accepter is None - -## def set(self): # preserve the caller's atomicity -## if self.accepter is not None: -## accepter = self.accepter -## self.giver = self.accepter = None -## while accepter.ready(): # wake up all waiters -## accepter.accept() - -## def wait(self, timeout=None): -## if self.accepter is not None: -## if timeout is None: -## self.giver.give(None) -## else: -## timer = Timer(timeout) -## try: -## try: -## self.giver.give(None) -## except Interrupted: -## pass -## finally: -## timer.stop() - -##class Semaphore(object): - -## def __init__(self, value=1): -## self.giver, self.accepter = meetingpoint() -## for i in range(value): -## self.release() - -## def acquire(self, blocking=True): -## if blocking or self.accepter.ready(): -## return self.accepter.accept() -## else: -## return False - -## def release(self): -## autogreenlet(self.giver.put, True) - -# ____________________________________________________________ - def wait_input(sock): _register(g_iwtd, sock) @@ -339,28 +265,41 @@ def writeall(fd, buffer): in_front = True -def sleep(duration, *greenlets): +def sleep(duration): timer = Timer(duration) try: - wait(*greenlets) + _suspend_forever() finally: ok = timer.finished timer.stop() if not ok: raise Interrupted -def _wait(): +def _suspend_forever(): g_dispatcher.switch() -def wait(*greenlets): - assert greenlets#, "should not wait without events to wait on" - current = g_getcurrent() +def oneof(*callables): + assert callables + for c in callables: + assert callable(c) + greenlets = [tracinggreenlet(c) for c in callables] + g_active.extend(greenlets) + res = g_dispatcher.switch() for g in greenlets: - if g in g_waiters: - g_waiters[g].append(current) - else: - g_waiters[g] = [current] - g_dispatcher.switch() + g.interrupt() + return res + +def allof(*callables): + for c in callables: + assert callable(c) + greenlets = [tracinggreenlet(lambda i=i, c=c: (i, c())) + for i, c in enumerate(callables)] + g_active.extend(greenlets) + result = [None] * len(callables) + for _ in callables: + num, res = g_dispatcher.switch() + result[num] = res + return tuple(result) class Timer(object): started = False @@ -387,31 +326,31 @@ class Timer(object): # ____________________________________________________________ -class autogreenlet(greenlet): +class tracinggreenlet(greenlet): def __init__(self, function, *args, **kwds): - self.parent = g_dispatcher self.function = function self.args = args self.kwds = kwds - g_active.append(self) + + def __repr__(self): +## args = ', '.join([repr(s) for s in self.args] + +## ['%s=%r' % keyvalue for keyvalue in self.kwds.items()]) +## return '' % (self.function.__name__, args) + return '<%s %s at %s>' % (self.__class__.__name__, + self.function.__name__, + hex(id(self))) def run(self): self.trace("start") try: - self.function(*self.args, **self.kwds) + res = self.function(*self.args, **self.kwds) except Exception, e: self.trace("stop (%s%s)", e.__class__.__name__, str(e) and (': '+str(e))) raise else: self.trace("done") - - def __repr__(self): -## args = ', '.join([repr(s) for s in self.args] + -## ['%s=%r' % keyvalue for keyvalue in self.kwds.items()]) -## return '' % (self.function.__name__, args) - return '' % (self.function.__name__, - hex(id(self))) + return res def trace(self, msg, *args): if TRACE: @@ -420,6 +359,11 @@ class autogreenlet(greenlet): def interrupt(self): self.throw(Interrupted) +class autogreenlet(tracinggreenlet): + def __init__(self, *args, **kwargs): + super(autogreenlet, self).__init__(*args, **kwargs) + self.parent = g_dispatcher + g_active.append(self) g_active = deque() g_iwtd = {} @@ -452,24 +396,25 @@ def check_dead_greenlets(mapping): for k in to_remove: del mapping[k] -def check_waiters(active): - if active in g_waiters: - for g in g_waiters[active]: - g.switch() - del g_waiters[active] +#def check_waiters(active): +# if active in g_waiters: +# for g in g_waiters[active]: +# g.switch() +# del g_waiters[active] def dispatcher_mainloop(): global g_timers_mixed + GreenletExit = greenlet.GreenletExit while 1: try: while g_active: - print 'active:', g_active[0] - active = g_active.popleft() - active.switch() - if active.dead: - check_waiters(active) - del active + #print 'active:', g_active[0] + g_active.popleft().switch() +# active.switch() +# if active.dead: +# check_waiters(active) +# del active if g_timers: if g_timers_mixed: heapify(g_timers) @@ -482,8 +427,8 @@ def dispatcher_mainloop(): #print 'timeout:', g timer.finished = True timer.g.switch() - if timer.g.dead: - check_waiters(timer.g) +# if timer.g.dead: +# check_waiters(timer.g) continue delay = 0.0 timer.started = True @@ -496,9 +441,9 @@ def dispatcher_mainloop(): continue delay = None - print 'selecting...', g_iwtd.keys(), g_owtd.keys(), delay + #print 'selecting...', g_iwtd.keys(), g_owtd.keys(), delay iwtd, owtd, _ = _select(g_iwtd.keys(), g_owtd.keys(), [], delay) - print 'done' + #print 'done' for s in owtd: if s in g_owtd: d = g_owtd[s] @@ -510,8 +455,8 @@ def dispatcher_mainloop(): except KeyError: pass g.switch(g_owtd) - if g.dead: - check_waiters(g) +# if g.dead: +# check_waiters(g) for s in iwtd: if s in g_iwtd: d = g_iwtd[s] @@ -523,11 +468,13 @@ def dispatcher_mainloop(): except KeyError: pass g.switch(g_iwtd) - if g.dead: - check_waiters(g) +# if g.dead: +# check_waiters(g) + except GreenletExit: + raise except: import sys g_dispatcher.parent.throw(*sys.exc_info()) g_dispatcher = greenlet(dispatcher_mainloop) -g_waiters = {} +#g_waiters = {} diff --git a/py/net/test/test_greensock2.py b/py/net/test/test_greensock2.py index 08760205f..f11529b77 100644 --- a/py/net/test/test_greensock2.py +++ b/py/net/test/test_greensock2.py @@ -60,8 +60,7 @@ def test_producer(): except Interrupted: lst.append(8) - g = autogreenlet(cons) - wait(g) + oneof(cons) assert lst == [4, 5, 1, 145, 6, 2, 87, 7, 3, 8] @@ -69,24 +68,33 @@ def test_timer(): lst = [] def g1(): - sleep(0.1, g_1) + sleep(0.1) lst.append(1) - sleep(0.2, g_1) + sleep(0.2) lst.append(3) def g2(): lst.append(0) - sleep(0.2, g_2) + sleep(0.2) lst.append(2) - sleep(0.2, g_2) + sleep(0.2) lst.append(4) - g_1 = autogreenlet(g1) - g_2 = autogreenlet(g2) - wait(g_1) - wait(g_2) - assert lst == [0, 1, 2, 3, 4] + oneof(g1, g2) + assert lst == [0, 1, 2, 3] +def test_kill_other(): + + def g1(): + sleep(.1) + return 1 + + def g2(): + sleep(.2) + return 2 + + res = oneof(g1, g2) + assert res == 1 def test_socket(): s1 = socket(AF_INET, SOCK_DGRAM) @@ -105,6 +113,7 @@ def test_socket(): lst.append(3) sendall(s1, 'world') lst.append(4) + return 1 def g2(): lst.append(1) @@ -113,13 +122,12 @@ def test_socket(): y = recv(s2, 5) assert y == 'world' lst.append(5) + return 2 - g_1 = autogreenlet(g1) - g_2 = autogreenlet(g2) - wait(g_1) - wait(g_2) + one, two = allof(g1, g2) assert lst == [0, 1, 2, 3, 4, 5] - + assert one == 1 + assert two == 2 ##def test_Queue():