test_ok1/py/c-extension/greenlet/test_greenlet.py

159 lines
3.6 KiB
Python

import py
try:
from py.magic import greenlet
except (ImportError, RuntimeError), e:
py.test.skip(str(e))
import sys, gc
from py.test import raises
try:
import thread, threading
except ImportError:
thread = None
def test_simple():
lst = []
def f():
lst.append(1)
greenlet.getcurrent().parent.switch()
lst.append(3)
g = greenlet(f)
lst.append(0)
g.switch()
lst.append(2)
g.switch()
lst.append(4)
assert lst == range(5)
def test_threads():
if not thread:
py.test.skip("this is a test about thread")
success = []
def f():
test_simple()
success.append(True)
ths = [threading.Thread(target=f) for i in range(10)]
for th in ths:
th.start()
for th in ths:
th.join()
assert len(success) == len(ths)
class SomeError(Exception):
pass
def fmain(seen):
try:
greenlet.getcurrent().parent.switch()
except:
seen.append(sys.exc_info()[0])
raise
raise SomeError
def test_exception():
seen = []
g1 = greenlet(fmain)
g2 = greenlet(fmain)
g1.switch(seen)
g2.switch(seen)
g2.parent = g1
assert seen == []
raises(SomeError, g2.switch)
assert seen == [SomeError]
g2.switch()
assert seen == [SomeError]
def send_exception(g, exc):
# note: send_exception(g, exc) can be now done with g.throw(exc).
# the purpose of this test is to explicitely check the propagation rules.
def crasher(exc):
raise exc
g1 = greenlet(crasher, parent=g)
g1.switch(exc)
def test_send_exception():
seen = []
g1 = greenlet(fmain)
g1.switch(seen)
raises(KeyError, "send_exception(g1, KeyError)")
assert seen == [KeyError]
def test_dealloc():
seen = []
g1 = greenlet(fmain)
g2 = greenlet(fmain)
g1.switch(seen)
g2.switch(seen)
assert seen == []
del g1
gc.collect()
assert seen == [greenlet.GreenletExit]
del g2
gc.collect()
assert seen == [greenlet.GreenletExit, greenlet.GreenletExit]
def test_dealloc_other_thread():
if not thread:
py.test.skip("this is a test about thread")
seen = []
someref = []
lock = thread.allocate_lock()
lock.acquire()
lock2 = thread.allocate_lock()
lock2.acquire()
def f():
g1 = greenlet(fmain)
g1.switch(seen)
someref.append(g1)
del g1
gc.collect()
lock.release()
lock2.acquire()
greenlet() # trigger release
lock.release()
lock2.acquire()
t = threading.Thread(target=f)
t.start()
lock.acquire()
assert seen == []
assert len(someref) == 1
del someref[:]
gc.collect()
# g1 is not released immediately because it's from another thread
assert seen == []
lock2.release()
lock.acquire()
assert seen == [greenlet.GreenletExit]
lock2.release()
t.join()
def test_frame():
def f1():
f = sys._getframe(0)
assert f.f_back is None
greenlet.getcurrent().parent.switch(f)
return "meaning of life"
g = greenlet(f1)
frame = g.switch()
assert frame is g.gr_frame
assert g
next = g.switch()
assert not g
assert next == "meaning of life"
assert g.gr_frame is None
def test_thread_bug():
if not thread:
py.test.skip("this is a test about thread")
import time
def runner(x):
g = greenlet(lambda: time.sleep(x))
g.switch()
t1 = threading.Thread(target=runner, args=(0.2,))
t2 = threading.Thread(target=runner, args=(0.3,))
t1.start()
t2.start()
t1.join()
t2.join()