145 lines
3.2 KiB
Python
145 lines
3.2 KiB
Python
|
import py
|
||
|
try:
|
||
|
from py.magic import greenlet
|
||
|
except (ImportError, RuntimeError), e:
|
||
|
py.test.skip(str(e))
|
||
|
|
||
|
import sys, gc
|
||
|
from py.test import raises
|
||
|
try:
|
||
|
import thread, threading
|
||
|
except ImportError:
|
||
|
thread = None
|
||
|
|
||
|
def test_simple():
|
||
|
lst = []
|
||
|
def f():
|
||
|
lst.append(1)
|
||
|
greenlet.getcurrent().parent.switch()
|
||
|
lst.append(3)
|
||
|
g = greenlet(f)
|
||
|
lst.append(0)
|
||
|
g.switch()
|
||
|
lst.append(2)
|
||
|
g.switch()
|
||
|
lst.append(4)
|
||
|
assert lst == range(5)
|
||
|
|
||
|
def test_threads():
|
||
|
if not thread:
|
||
|
py.test.skip("this is a test about thread")
|
||
|
success = []
|
||
|
def f():
|
||
|
test_simple()
|
||
|
success.append(True)
|
||
|
ths = [threading.Thread(target=f) for i in range(10)]
|
||
|
for th in ths:
|
||
|
th.start()
|
||
|
for th in ths:
|
||
|
th.join()
|
||
|
assert len(success) == len(ths)
|
||
|
|
||
|
|
||
|
class SomeError(Exception):
|
||
|
pass
|
||
|
|
||
|
def fmain(seen):
|
||
|
try:
|
||
|
greenlet.getcurrent().parent.switch()
|
||
|
except:
|
||
|
seen.append(sys.exc_info()[0])
|
||
|
raise
|
||
|
raise SomeError
|
||
|
|
||
|
def test_exception():
|
||
|
seen = []
|
||
|
g1 = greenlet(fmain)
|
||
|
g2 = greenlet(fmain)
|
||
|
g1.switch(seen)
|
||
|
g2.switch(seen)
|
||
|
g2.parent = g1
|
||
|
assert seen == []
|
||
|
raises(SomeError, g2.switch)
|
||
|
assert seen == [SomeError]
|
||
|
g2.switch()
|
||
|
assert seen == [SomeError]
|
||
|
|
||
|
def send_exception(g, exc):
|
||
|
# note: send_exception(g, exc) can be now done with g.throw(exc).
|
||
|
# the purpose of this test is to explicitely check the propagation rules.
|
||
|
def crasher(exc):
|
||
|
raise exc
|
||
|
g1 = greenlet(crasher, parent=g)
|
||
|
g1.switch(exc)
|
||
|
|
||
|
def test_send_exception():
|
||
|
seen = []
|
||
|
g1 = greenlet(fmain)
|
||
|
g1.switch(seen)
|
||
|
raises(KeyError, "send_exception(g1, KeyError)")
|
||
|
assert seen == [KeyError]
|
||
|
|
||
|
def test_dealloc():
|
||
|
seen = []
|
||
|
g1 = greenlet(fmain)
|
||
|
g2 = greenlet(fmain)
|
||
|
g1.switch(seen)
|
||
|
g2.switch(seen)
|
||
|
assert seen == []
|
||
|
del g1
|
||
|
gc.collect()
|
||
|
assert seen == [greenlet.GreenletExit]
|
||
|
del g2
|
||
|
gc.collect()
|
||
|
assert seen == [greenlet.GreenletExit, greenlet.GreenletExit]
|
||
|
|
||
|
def test_dealloc_other_thread():
|
||
|
if not thread:
|
||
|
py.test.skip("this is a test about thread")
|
||
|
seen = []
|
||
|
someref = []
|
||
|
lock = thread.allocate_lock()
|
||
|
lock.acquire()
|
||
|
lock2 = thread.allocate_lock()
|
||
|
lock2.acquire()
|
||
|
def f():
|
||
|
g1 = greenlet(fmain)
|
||
|
g1.switch(seen)
|
||
|
someref.append(g1)
|
||
|
del g1
|
||
|
gc.collect()
|
||
|
lock.release()
|
||
|
lock2.acquire()
|
||
|
greenlet() # trigger release
|
||
|
lock.release()
|
||
|
lock2.acquire()
|
||
|
t = threading.Thread(target=f)
|
||
|
t.start()
|
||
|
lock.acquire()
|
||
|
assert seen == []
|
||
|
assert len(someref) == 1
|
||
|
del someref[:]
|
||
|
gc.collect()
|
||
|
# g1 is not released immediately because it's from another thread
|
||
|
assert seen == []
|
||
|
lock2.release()
|
||
|
lock.acquire()
|
||
|
assert seen == [greenlet.GreenletExit]
|
||
|
lock2.release()
|
||
|
t.join()
|
||
|
|
||
|
def test_frame():
|
||
|
def f1():
|
||
|
f = sys._getframe(0)
|
||
|
assert f.f_back is None
|
||
|
greenlet.getcurrent().parent.switch(f)
|
||
|
return "meaning of life"
|
||
|
g = greenlet(f1)
|
||
|
frame = g.switch()
|
||
|
assert frame is g.gr_frame
|
||
|
assert g
|
||
|
next = g.switch()
|
||
|
assert not g
|
||
|
assert next == "meaning of life"
|
||
|
assert g.gr_frame is None
|