feat(server): Implemented periodic snapshotting (#161) (#250)

* feat(server): Implemented periodic snapshotting (#161)

* feat(test): Added the ability to specify dragonfly cli parameters on a test basis (#199)

Signed-off-by: Braydn <braydn.moore@uwaterloo.ca>

* feat(server): Implemented periodic snapshotting (#161)

Code cleanup & CONTRIBUTORS.md modifcation

Signed-off-by: Braydn <braydn.moore@uwaterloo.ca>

* feat(server): Implemented periodic snapshotting (#161)

Parsing and race condition fixes. Improved pytests

Signed-off-by: Braydn <braydn.moore@uwaterloo.ca>

* feat(test): Cleaned up pytest code & added documentation (#199)

- Moved tests into their own file
- Renamed test namespace to avoid naming conflicts with pytest
- Updated requirements.txt to make test environment reproducible
- Added documentation to write tests

feat(server): Updated helio submodule

Signed-off-by: Braydn <braydn.moore@uwaterloo.ca>

Signed-off-by: Braydn <braydn.moore@uwaterloo.ca>
Co-authored-by: Braydn <braydnmoore3@gmail.com>
This commit is contained in:
Braydn 2022-08-26 06:54:38 -04:00 committed by GitHub
parent c49e88899b
commit b7f85e59a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 482 additions and 89 deletions

View File

@ -2,6 +2,7 @@
* **[Philipp Born](https://github.com/tamcore)**
* Helm Chart
* **[Braydn Moore](https://github.com/braydnm)**
* **[Ryan Russell](https://github.com/ryanrussell)**
* Docs & Code Readability
* **[Ali-Akber Saifee](https://github.com/alisaifee)**

2
helio

@ -1 +1 @@
Subproject commit d03ea0cae7d21de11391d2146e46f31457c7cef2
Subproject commit 17fdc10f97c8c28eb9a0544ca65fb7e60cfc575a

View File

@ -30,6 +30,7 @@ cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/redis6_small.rd
testdata/redis6_stream.rdb LABELS DFLY)
cxx_test(zset_family_test dfly_test_lib LABELS DFLY)
cxx_test(blocking_controller_test dragonfly_lib LABELS DFLY)
cxx_test(snapshot_test dragonfly_lib LABELS DFLY)
add_custom_target(check_dfly WORKING_DIRECTORY .. COMMAND ctest -L DFLY)

View File

@ -11,7 +11,9 @@
#include <mimalloc-types.h>
#include <sys/resource.h>
#include <chrono>
#include <filesystem>
#include <optional>
extern "C" {
#include "redis/redis_aux.h"
@ -47,6 +49,8 @@ using namespace std;
ABSL_FLAG(string, dir, "", "working directory");
ABSL_FLAG(string, dbfilename, "dump", "the filename to save/load the DB");
ABSL_FLAG(string, requirepass, "", "password for AUTH authentication");
ABSL_FLAG(string, save_schedule, "",
"glob spec for the UTC time to save a snapshot which matches HH:MM 24h time");
ABSL_DECLARE_FLAG(uint32_t, port);
ABSL_DECLARE_FLAG(bool, cache_mode);
@ -147,6 +151,80 @@ class LinuxWriteWrapper : public io::WriteFile {
} // namespace
bool IsValidSaveScheduleNibble(string_view time, unsigned int max) {
/*
* a nibble is valid iff there exists one time that matches the pattern
* and that time is <= max. For any wildcard the minimum value is 0.
* Therefore the minimum time the pattern can match is the time with
* all *s replaced with 0s. If this time is > max all other times that
* match the pattern are > max and the pattern is invalid. Otherwise
* there exists at least one valid nibble specified by this pattern
*
* Note the edge case of "*" is equivalent to "**". While using this
* approach "*" and "**" both map to 0.
*/
unsigned int min_match = 0;
for (size_t i = 0; i < time.size(); ++i) {
// check for valid characters
if (time[i] != '*' && (time[i] < '0' || time[i] > '9')) {
return false;
}
min_match *= 10;
min_match += time[i] == '*' ? 0 : time[i] - '0';
}
return min_match <= max;
}
std::optional<SnapshotSpec> ParseSaveSchedule(string_view time) {
if (time.length() < 3 || time.length() > 5) {
return std::nullopt;
}
size_t separator_idx = time.find(':');
// the time cannot start with ':' and it must be present in the first 3 characters of any time
if (separator_idx == 0 || separator_idx >= 3) {
return std::nullopt;
}
SnapshotSpec spec{string(time.substr(0, separator_idx)), string(time.substr(separator_idx + 1))};
// a minute should be 2 digits as it is zero padded, unless it is a '*' in which case this
// greedily can make up both digits
if (spec.minute_spec != "*" && spec.minute_spec.length() != 2) {
return std::nullopt;
}
return IsValidSaveScheduleNibble(spec.hour_spec, 23) &&
IsValidSaveScheduleNibble(spec.minute_spec, 59)
? std::optional<SnapshotSpec>(spec)
: std::nullopt;
}
bool DoesTimeNibbleMatchSpecifier(string_view time_spec, unsigned int current_time) {
// single greedy wildcard matches everything
if (time_spec == "*") {
return true;
}
for (int i = time_spec.length() - 1; i >= 0; --i) {
// if the current digit is not a wildcard and it does not match the digit in the current time it
// does not match
if (time_spec[i] != '*' && (current_time % 10) != (time_spec[i] - '0')) {
return false;
}
current_time /= 10;
}
return current_time == 0;
}
bool DoesTimeMatchSpecifier(const SnapshotSpec& spec, time_t now) {
unsigned hour = (now / 3600) % 24;
unsigned min = (now / 60) % 60;
return DoesTimeNibbleMatchSpecifier(spec.hour_spec, hour) &&
DoesTimeNibbleMatchSpecifier(spec.minute_spec, min);
}
ServerFamily::ServerFamily(Service* service) : service_(*service) {
start_time_ = time(NULL);
lsinfo_ = make_shared<LastSaveInfo>();
@ -199,6 +277,19 @@ void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* m
if (!load_path.empty()) {
Load(load_path);
}
string save_time = GetFlag(FLAGS_save_schedule);
if (!save_time.empty()) {
std::optional<SnapshotSpec> spec = ParseSaveSchedule(save_time);
if (spec) {
snapshot_fiber_ = service_.proactor_pool().GetNextProactor()->LaunchFiber(
[save_spec = std::move(spec.value()), this] {
SnapshotScheduling(std::move(save_spec));
});
} else {
LOG(WARNING) << "Invalid snapshot time specifier " << save_time;
}
}
}
void ServerFamily::Shutdown() {
@ -207,6 +298,11 @@ void ServerFamily::Shutdown() {
if (load_fiber_.joinable())
load_fiber_.join();
is_snapshot_done_.Notify();
if (snapshot_fiber_.joinable()) {
snapshot_fiber_.join();
}
pb_task_->Await([this] {
pb_task_->CancelPeriodic(stats_caching_task_);
stats_caching_task_ = 0;
@ -264,6 +360,44 @@ void ServerFamily::Load(const std::string& load_path) {
});
}
void ServerFamily::SnapshotScheduling(const SnapshotSpec&& spec) {
const auto loop_sleep_time = std::chrono::seconds(20);
while (true) {
if (is_snapshot_done_.WaitFor(loop_sleep_time)) {
break;
}
time_t now = std::time(NULL);
if (!DoesTimeMatchSpecifier(spec, now)) {
continue;
}
// if it matches check the last save time, if it is the same minute don't save another snapshot
time_t last_save;
{
lock_guard lk(save_mu_);
last_save = lsinfo_->save_time;
}
if ((last_save / 60) == (now / 60)) {
continue;
}
// do the save
string err_details;
error_code ec;
const CommandId* cid = service().FindCmd("SAVE");
CHECK_NOTNULL(cid);
boost::intrusive_ptr<Transaction> trans(new Transaction{cid});
trans->InitByArgs(0, {});
ec = DoSave(trans.get(), &err_details);
if (ec) {
LOG(WARNING) << "Failed to perform snapshot " << err_details;
}
}
}
error_code ServerFamily::LoadRdb(const std::string& rdb_file) {
io::ReadonlyFileOrError res = uring::OpenRead(rdb_file);
error_code ec;

View File

@ -51,6 +51,11 @@ struct LastSaveInfo {
std::vector<std::pair<std::string_view, size_t>> freq_map; // RDB_TYPE_xxx -> count mapping.
};
struct SnapshotSpec {
std::string hour_spec;
std::string minute_spec;
};
class ServerFamily {
public:
ServerFamily(Service* service);
@ -117,7 +122,9 @@ class ServerFamily {
void Load(const std::string& file_name);
boost::fibers::fiber load_fiber_;
void SnapshotScheduling(const SnapshotSpec &&time);
boost::fibers::fiber load_fiber_, snapshot_fiber_;
uint32_t stats_caching_task_ = 0;
Service& service_;
@ -137,6 +144,8 @@ class ServerFamily {
std::shared_ptr<LastSaveInfo> lsinfo_; // protected by save_mu_;
std::atomic_bool is_saving_{false};
util::fibers_ext::Done is_snapshot_done_;
};
} // namespace dfly

122
src/server/snapshot_test.cc Normal file
View File

@ -0,0 +1,122 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <chrono>
#include "base/gtest.h"
#include "server/test_utils.h"
using namespace testing;
using namespace std;
using namespace util;
using namespace facade;
using absl::StrCat;
namespace dfly {
class SnapshotTest : public Test {
protected:
};
std::optional<SnapshotSpec> ParseSaveSchedule(string_view time);
bool DoesTimeMatchSpecifier(const SnapshotSpec&, time_t);
bool DoesTimeMatchSpecifier(string_view time_spec, unsigned int hour, unsigned int min) {
auto spec = ParseSaveSchedule(time_spec);
if (!spec) {
return false;
}
time_t now = ((hour * 60) + min) * 60;
return DoesTimeMatchSpecifier(spec.value(), now);
}
TEST_F(SnapshotTest, InvalidTimes) {
EXPECT_FALSE(ParseSaveSchedule("24:00"));
EXPECT_FALSE(ParseSaveSchedule("00:60"));
EXPECT_FALSE(ParseSaveSchedule("100:00"));
EXPECT_FALSE(ParseSaveSchedule("00:100"));
// invalid times with regex
EXPECT_FALSE(ParseSaveSchedule("23:6*"));
// Minutes must be zero padded
EXPECT_FALSE(ParseSaveSchedule("00:9"));
// No separators or start with separator
EXPECT_FALSE(ParseSaveSchedule(":12"));
EXPECT_FALSE(ParseSaveSchedule("1234"));
EXPECT_FALSE(ParseSaveSchedule("1"));
// Negative numbers / non numeric characters
EXPECT_FALSE(ParseSaveSchedule("-1:-2"));
EXPECT_FALSE(ParseSaveSchedule("12:34b"));
EXPECT_FALSE(ParseSaveSchedule("0;:1="));
// Wildcards for full times
EXPECT_FALSE(ParseSaveSchedule("12*:09"));
EXPECT_FALSE(ParseSaveSchedule("23:45*"));
}
TEST_F(SnapshotTest, ValidTimes) {
// Test endpoints
EXPECT_TRUE(ParseSaveSchedule("23:59"));
EXPECT_TRUE(ParseSaveSchedule("00:00"));
// hours don't need to be zero padded
EXPECT_TRUE(ParseSaveSchedule("0:00"));
// wildcard checks
EXPECT_TRUE(ParseSaveSchedule("1*:09"));
EXPECT_TRUE(ParseSaveSchedule("*9:23"));
EXPECT_TRUE(ParseSaveSchedule("23:*1"));
EXPECT_TRUE(ParseSaveSchedule("18:1*"));
// Greedy wildcards
EXPECT_TRUE(ParseSaveSchedule("*:12"));
EXPECT_TRUE(ParseSaveSchedule("9:*"));
EXPECT_TRUE(ParseSaveSchedule("09:*"));
EXPECT_TRUE(ParseSaveSchedule("*:*"));
}
TEST_F(SnapshotTest, TimeMatches) {
EXPECT_TRUE(DoesTimeMatchSpecifier("12:34", 12, 34));
EXPECT_TRUE(DoesTimeMatchSpecifier("2:34", 2, 34));
EXPECT_TRUE(DoesTimeMatchSpecifier("2:04", 2, 4));
EXPECT_FALSE(DoesTimeMatchSpecifier("12:34", 2, 4));
EXPECT_FALSE(DoesTimeMatchSpecifier("12:34", 2, 34));
EXPECT_FALSE(DoesTimeMatchSpecifier("2:34", 12, 34));
EXPECT_FALSE(DoesTimeMatchSpecifier("2:34", 3, 34));
EXPECT_FALSE(DoesTimeMatchSpecifier("2:04", 3, 5));
// Check wildcard for one slot
for (int i = 0; i < 9; ++i)
EXPECT_TRUE(DoesTimeMatchSpecifier("1*:34", 10 + i, 34));
EXPECT_TRUE(DoesTimeMatchSpecifier("*3:04", 13, 4));
EXPECT_TRUE(DoesTimeMatchSpecifier("*3:04", 23, 4));
// do the same checks for the minutes
for (int i = 0; i < 9; ++i)
EXPECT_TRUE(DoesTimeMatchSpecifier("10:3*", 10, 30 + i));
for (int i = 0; i < 6; ++i)
EXPECT_TRUE(DoesTimeMatchSpecifier("13:*4", 13, (10 * i) + 4));
// check greedy wildcards
for (int i = 0; i < 24; ++i)
EXPECT_TRUE(DoesTimeMatchSpecifier("*:12", i, 12));
for (int i = 0; i < 60; ++i)
EXPECT_TRUE(DoesTimeMatchSpecifier("3:*", 3, i));
for (int i = 0; i < 24; ++i)
for (int j = 0; j < 60; ++j)
EXPECT_TRUE(DoesTimeMatchSpecifier("*:*", i, j));
}
} // namespace dfly

View File

@ -7,8 +7,38 @@ The tests assume you have the "dragonfly" binary in `<root>/build-dbg` directory
You can override the location of the binary using `DRAGONFLY_HOME` environment var.
to run pytest, run:
`pytest -xv pytest`
`pytest -xv dragonfly`
## Writing tests
The [Getting Started](https://docs.pytest.org/en/7.1.x/getting-started.html) guide is a great resource to become familiar with writing pytest test cases.
Pytest will recursively search the `tests/dragonfly` directory for files matching the patterns `test_*.py` or `*_test.py` for functions matching these [rules](https://docs.pytest.org/en/7.1.x/explanation/goodpractices.html#conventions-for-python-test-discovery):
- Functions or methods outside of a class prefixed by `test`
- Functions or methods prefixed by `test` inside a class prefixed by `Test` (without an `__init__` method)
**Note**: When making a new directory in `tests/dragonfly` be sure to create an `__init__.py` file to avoid [name conflicts](https://docs.pytest.org/en/7.1.x/explanation/goodpractices.html#tests-outside-application-code)
### Interacting with Dragonfly
Pytest allows for parameters with a specific name to be automatically resolved through [fixtures](https://docs.pytest.org/en/7.1.x/explanation/fixtures.html) for any test function. The following fixtures are to be used to interact with Dragonfly when writing a test:
| Name | Type | [Scope](https://docs.pytest.org/en/7.1.x/how-to/fixtures.html?highlight=scope#scope-sharing-fixtures-across-classes-modules-packages-or-session) | Description
| ----- | ---- | ----- | ----------- |
| tmp_dir | [pathlib.Path](https://docs.python.org/3/library/pathlib.html) | Session | The temporary directory the Dragonfly binary will be running in. The environment variable `DRAGONFLY_TMP` is also set to this value |
| test_env | `dict` | Session | The environment variables used when running Dragonfly as a dictionary |
| client | [redis.Redis](https://redis-py.readthedocs.io/en/stable/connections.html#generic-client) | Class | The redis client to interact with the Dragonfly instance |
To avoid the overhead of spawning a Dragonfly process for every test the `client` provided fixture has a `Class` scope which means that all test functions in the same class will interact with the same Dragonfly instance.
### Passing CLI commands to Dragonfly
To pass custom flags to the Dragonfly executable two class decorators have been created. `@dfly_args` allows you to pass a list of parameters to the Dragonfly executable, similarly `@dfly_multi_test_args` allows you to specify multiple parameter configurations to test with a given test class.
In the case of `@dfly_multi_test_args` each parameter configuration will create one Dragonfly instance which each test will receive a client to as described in the [above section](#interacting-with-dragonfly)
Parameters can use environmental variables with a formatted string where `"{<VAR>}"` will be replaced with the value of the `<VAR>` environment variable. Due to [current pytest limtations](https://github.com/pytest-dev/pytest/issues/349) fixtures cannot be passed to either of these decorators, this is currently the provided way to pass the temporary directory path in a CLI parameter.
### Test Examples
- **[blpop_test](./dragonfly/blpop_test.py)**: Simple test case interacting with Dragonfly
- **[snapshot_test](./dragonfly/snapshot_test.py)**: Example test using `@dfly_args`, environment variables and pre-test setup
- **[key_limt_test](./dragonfly/key_limit_test.py)**: Example test using `@dfly_multi_test_args`
# Integration tests
To simplify running integration test each package should have its own Dockerfile. The Dockerfile should contain everything needed in order to test the package against Drafongly. Docker can assume Dragonfly is running on localhost:6379.

View File

@ -0,0 +1,11 @@
import pytest
def dfly_args(*args):
""" used to define a singular set of arguments for dragonfly test """
return pytest.mark.parametrize("df_server", [args], indirect=True)
def dfly_multi_test_args(*args):
""" used to define multiple sets of arguments to test multiple dragonfly configurations """
return pytest.mark.parametrize("df_server", args, indirect=True)

View File

@ -0,0 +1,42 @@
from threading import Thread
import pytest
import redis
class BLPopWorkerThread:
def __init__(self):
self.result = None
self.thread = None
def async_blpop(self, client: redis.Redis):
self.result = None
def blpop_task(self, client):
self.result = client.blpop(
['list1{t}', 'list2{t}', 'list2{t}', 'list1{t}'], 0.5)
self.thread = Thread(target=blpop_task, args=(self, client))
self.thread.start()
def wait(self, timeout):
self.thread.join(timeout)
return not self.thread.is_alive()
@pytest.mark.parametrize('index', range(50))
class TestBlPop:
def test_blpop_multiple_keys(self, client, index):
wt_blpop = BLPopWorkerThread()
wt_blpop.async_blpop(client)
client.lpush('list1{t}', 'a')
assert wt_blpop.wait(2)
assert wt_blpop.result[1] == 'a'
watched = client.execute_command('DEBUG WATCHED')
assert watched == []
wt_blpop.async_blpop(client)
client.lpush('list2{t}', 'b')
assert wt_blpop.wait(2)
assert wt_blpop.result[1] == 'b'

View File

@ -0,0 +1,82 @@
"""
Pytest fixtures to be provided for all tests without import
"""
from pathlib import Path
from tempfile import TemporaryDirectory
import os
import subprocess
import time
import pytest
import redis
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
DRAGONFLY_PATH = os.environ.get("DRAGONFLY_HOME", os.path.join(
SCRIPT_DIR, '../../build-dbg/dragonfly'))
@pytest.fixture(scope="session")
def tmp_dir():
"""
Pytest fixture to provide the test temporary directory for the session
where the Dragonfly executable will be run and where all test data
should be stored. The directory will be cleaned up at the end of a session
"""
tmp = TemporaryDirectory()
yield Path(tmp.name)
tmp.cleanup()
@pytest.fixture(scope="session")
def test_env(tmp_dir: Path):
"""
Provide the environment the Dragonfly executable is running in as a
python dictionary
"""
env = os.environ.copy()
env["DRAGONFLY_TMP"] = str(tmp_dir)
return env
@pytest.fixture(scope="class", params=[[]])
def df_server(request, tmp_dir: Path, test_env):
""" Starts a single DragonflyDB process, runs once per test class. """
print(f"Starting DragonflyDB [{DRAGONFLY_PATH}]")
arguments = [arg.format(**test_env) for arg in request.param]
dfly_proc = subprocess.Popen([DRAGONFLY_PATH, *arguments],
env=test_env, cwd=str(tmp_dir))
time.sleep(0.1)
return_code = dfly_proc.poll()
if return_code is not None:
dfly_proc.terminate()
pytest.exit(f"Failed to start DragonflyDB [{DRAGONFLY_PATH}]")
yield
print(f"Terminating DragonflyDB process [{dfly_proc.pid}]")
try:
dfly_proc.terminate()
outs, errs = dfly_proc.communicate(timeout=15)
except subprocess.TimeoutExpired:
print("Unable to terminate DragonflyDB gracefully, it was killed")
outs, errs = dfly_proc.communicate()
print(outs)
print(errs)
@pytest.fixture(scope="class")
def connection(df_server):
""" Creates the Redis client to interact with the Dragonfly instance """
pool = redis.ConnectionPool(decode_responses=True)
client = redis.Redis(connection_pool=pool)
return client
@pytest.fixture
def client(connection):
""" Flushes all the records, runs before each test. """
connection.flushall()
return connection

View File

@ -0,0 +1,10 @@
from dragonfly import dfly_multi_test_args
@dfly_multi_test_args(["--keys_output_limit", "512"], ["--keys_output_limit", "1024"])
class TestKeys:
def test_max_keys(self, client):
for x in range(8192):
client.set(str(x), str(x))
keys = client.keys()
assert len(keys) in [513, 1025]

View File

@ -0,0 +1,12 @@
async-timeout==4.0.2
attrs==22.1.0
Deprecated==1.2.13
iniconfig==1.1.1
packaging==21.3
pluggy==1.0.0
py==1.11.0
pyparsing==3.0.9
pytest==7.1.2
redis==4.3.4
tomli==2.0.1
wrapt==1.14.1

View File

@ -0,0 +1,25 @@
import time
import pytest
import redis
from pathlib import Path
from dragonfly import dfly_args
@dfly_args("--alsologtostderr", "--dbfilename", "test.rdb",
"--save_schedule", "*:*",
"--dir", "{DRAGONFLY_TMP}/")
class TestSnapshot:
@pytest.fixture(autouse=True)
def setup(self, tmp_dir: Path):
self.rdb_out = tmp_dir / "test.rdb"
if self.rdb_out.exists():
self.rdb_out.unlink()
def test_snapshot(self, client: redis.Redis):
client.set("test", "test")
time.sleep(60)
assert self.rdb_out.exists()

View File

@ -1,2 +0,0 @@
pytest
redis

View File

@ -1,84 +0,0 @@
import pytest
import redis
import os
import subprocess
import time
from threading import Thread
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
@pytest.fixture(scope="module")
def df_server():
""" Starts a single DragonflyDB process, runs only once. """
dragonfly_path = os.environ.get("DRAGONFLY_HOME", os.path.join(
SCRIPT_DIR, '../../build-dbg/dragonfly'))
print("Starting DragonflyDB [{}]".format(dragonfly_path))
# TODO: parse arguments and pass them over
p = subprocess.Popen([dragonfly_path])
time.sleep(0.1)
return_code = p.poll()
if return_code is not None:
pytest.exit("Failed to start DragonflyDB [{}]".format(dragonfly_path))
yield
print("Terminating DragonflyDB process [{}]".format(p.pid))
try:
p.terminate()
outs, errs = p.communicate(timeout=15)
except subprocess.TimeoutExpired:
print("Unable to terminate DragonflyDB gracefully, it was killed")
outs, errs = p.communicate()
print(outs)
print(errs)
@pytest.fixture(scope="module")
def connection(df_server):
pool = redis.ConnectionPool(decode_responses=True)
client = redis.Redis(connection_pool=pool)
return client
@pytest.fixture
def client(connection):
""" Flushes all the records, runs before each test. """
connection.flushall()
return connection
class BLPopWorkerThread:
def __init__(self):
self.result = None
self.thread = None
def async_blpop(self, client: redis.Redis):
self.result = None
def blpop_task(self, client):
self.result = client.blpop(
['list1{t}', 'list2{t}', 'list2{t}', 'list1{t}'], 0.5)
self.thread = Thread(target=blpop_task, args=(self, client))
self.thread.start()
def wait(self, timeout):
self.thread.join(timeout)
return not self.thread.is_alive()
@pytest.mark.parametrize('index', range(50))
def test_blpop_multiple_keys(client: redis.Redis, index):
wt_blpop = BLPopWorkerThread()
wt_blpop.async_blpop(client)
client.lpush('list1{t}', 'a')
assert wt_blpop.wait(2)
assert wt_blpop.result[1] == 'a'
watched = client.execute_command('DEBUG WATCHED')
assert watched == []
wt_blpop.async_blpop(client)
client.lpush('list2{t}', 'b')
assert wt_blpop.wait(2)
assert wt_blpop.result[1] == 'b'