test_ok2/py/thread/testing/test_pool.py

94 lines
2.3 KiB
Python

import py
import sys
WorkerPool = py._thread.WorkerPool
ThreadOut = py._thread.ThreadOut
def test_some():
pool = WorkerPool()
q = py.std.Queue.Queue()
num = 4
def f(i):
q.put(i)
while q.qsize():
py.std.time.sleep(0.01)
for i in range(num):
pool.dispatch(f, i)
for i in range(num):
q.get()
assert len(pool._alive) == 4
pool.shutdown()
# XXX I replaced the following join() with a time.sleep(1), which seems
# to fix the test on Windows, and doesn't break it on Linux... Completely
# unsure what the idea is, though, so it would be nice if someone with some
# more understanding of what happens here would either fix this better, or
# remove this comment...
# pool.join(timeout=1.0)
py.std.time.sleep(1)
assert len(pool._alive) == 0
assert len(pool._ready) == 0
def test_get():
pool = WorkerPool()
def f():
return 42
reply = pool.dispatch(f)
result = reply.get()
assert result == 42
def test_get_timeout():
pool = WorkerPool()
def f():
py.std.time.sleep(0.2)
return 42
reply = pool.dispatch(f)
py.test.raises(IOError, "reply.get(timeout=0.01)")
def test_get_excinfo():
pool = WorkerPool()
def f():
raise ValueError("42")
reply = pool.dispatch(f)
excinfo = py.test.raises(ValueError, "reply.get(1.0)")
py.test.raises(EOFError, "reply.get(1.0)")
def test_maxthreads():
pool = WorkerPool(maxthreads=1)
def f():
py.std.time.sleep(0.5)
try:
pool.dispatch(f)
py.test.raises(IOError, pool.dispatch, f)
finally:
pool.shutdown()
def test_join_timeout():
pool = WorkerPool()
q = py.std.Queue.Queue()
def f():
q.get()
reply = pool.dispatch(f)
pool.shutdown()
py.test.raises(IOError, pool.join, 0.01)
q.put(None)
reply.get(timeout=1.0)
pool.join(timeout=0.1)
def test_pool_clean_shutdown():
capture = py.io.OutErrCapture()
pool = WorkerPool()
def f():
pass
pool.dispatch(f)
pool.dispatch(f)
pool.shutdown()
pool.join(timeout=1.0)
assert not pool._alive
assert not pool._ready
out, err = capture.reset()
print out
print >>sys.stderr, err
assert err == ''