test: flush all records between pytest tests #199 (#205)

test: flush all records between pytest tests
This commit is contained in:
Shmulik Klein 2022-07-13 11:45:29 +02:00 committed by GitHub
parent d29425fb2e
commit d2b7987ac3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 10 additions and 2 deletions

View File

@ -8,6 +8,7 @@ from threading import Thread
@pytest.fixture(scope="module")
def dragonfly_db():
""" Starts a single DragonflyDB process, runs only once. """
dragonfly_path = os.environ.get("DRAGONFLY_HOME", '../../build-dbg/dragonfly')
print("Starting DragonflyDB [{}]".format(dragonfly_path))
# TODO: parse arguments and pass them over
@ -19,7 +20,7 @@ def dragonfly_db():
yield
print("Terminating DragonflyDB process [{}]".format(p.id))
print("Terminating DragonflyDB process [{}]".format(p.pid))
try:
p.terminate()
outs, errs = p.communicate(timeout=15)
@ -31,12 +32,19 @@ def dragonfly_db():
@pytest.fixture(scope="module")
def client(dragonfly_db):
def connection(dragonfly_db):
pool = redis.ConnectionPool(decode_responses=True)
client = redis.Redis(connection_pool=pool)
return client
@pytest.fixture
def client(connection):
""" Flushes all the records, runs before each test. """
connection.flushall()
return connection
class BLPopWorkerThread:
def __init__(self):
self.result = None