Fixes #41.
1. Found dangling transaction pointers that where left in the watch queue. Fix the state machine there. 2. Improved transaction code a bit, merged duplicated code into RunInShard function, got rid of RunNoop. 3. Improved BPopper::Run flow. 4. Added 'DEBUG WATCH' command. Also 'DEBUG OBJECT' now returns shard id and the lock status of the object.
This commit is contained in:
parent
3a38576bbb
commit
114e8bec5d
|
@ -114,20 +114,14 @@ void BlockingController::RunStep(Transaction* completed_t) {
|
|||
DbWatchTable& wt = *dbit->second;
|
||||
for (auto key : wt.awakened_keys) {
|
||||
string_view sv_key = static_cast<string_view>(key);
|
||||
DVLOG(1) << "Processing awakened key " << sv_key;
|
||||
|
||||
// Double verify we still got the item.
|
||||
auto [it, exp_it] = owner_->db_slice().FindExt(index, sv_key);
|
||||
if (!IsValid(it) || it->second.ObjType() != OBJ_LIST) // Only LIST is allowed to block.
|
||||
continue;
|
||||
|
||||
auto w_it = wt.queue_map.find(sv_key);
|
||||
CHECK(w_it != wt.queue_map.end());
|
||||
DVLOG(1) << "NotifyWatchQueue " << key;
|
||||
WatchQueue* wq = w_it->second.get();
|
||||
NotifyWatchQueue(wq);
|
||||
if (wq->items.empty()) {
|
||||
wt.queue_map.erase(w_it);
|
||||
}
|
||||
NotifyWatchQueue(sv_key, &wt.queue_map);
|
||||
}
|
||||
wt.awakened_keys.clear();
|
||||
|
||||
|
@ -155,33 +149,42 @@ void BlockingController::AddWatched(Transaction* trans) {
|
|||
res->second.reset(new WatchQueue);
|
||||
}
|
||||
|
||||
if (!res->second->items.empty()) {
|
||||
Transaction* last = res->second->items.back().get();
|
||||
DCHECK_GT(last->use_count(), 0u);
|
||||
|
||||
// Duplicate keys case. We push only once per key.
|
||||
if (last == trans)
|
||||
continue;
|
||||
}
|
||||
DVLOG(2) << "Emplace " << trans << " " << trans->DebugId() << " to watch " << key;
|
||||
res->second->items.emplace_back(trans);
|
||||
}
|
||||
}
|
||||
|
||||
// Runs in O(N) complexity.
|
||||
// Runs in O(N) complexity in the worst case.
|
||||
void BlockingController::RemoveWatched(Transaction* trans) {
|
||||
VLOG(1) << "RemoveWatched [" << owner_->shard_id() << "] " << trans->DebugId();
|
||||
|
||||
auto dbit = watched_dbs_.find(trans->db_index());
|
||||
CHECK(dbit != watched_dbs_.end());
|
||||
if (dbit == watched_dbs_.end())
|
||||
return;
|
||||
|
||||
DbWatchTable& wt = *dbit->second;
|
||||
auto args = trans->ShardArgsInShard(owner_->shard_id());
|
||||
for (auto key : args) {
|
||||
auto watch_it = wt.queue_map.find(key);
|
||||
CHECK(watch_it != wt.queue_map.end());
|
||||
if (watch_it == wt.queue_map.end())
|
||||
continue; // that can happen in case of duplicate keys
|
||||
|
||||
WatchQueue& wq = *watch_it->second;
|
||||
bool erased = false;
|
||||
for (auto items_it = wq.items.begin(); items_it != wq.items.end(); ++items_it) {
|
||||
if (items_it->trans == trans) {
|
||||
wq.items.erase(items_it);
|
||||
erased = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
CHECK(erased);
|
||||
// again, we may not find trans if we searched for the same key several times.
|
||||
|
||||
if (wq.items.empty()) {
|
||||
wt.RemoveEntry(watch_it);
|
||||
|
@ -208,13 +211,18 @@ void BlockingController::AwakeWatched(DbIndex db_index, string_view db_key) {
|
|||
|
||||
if (wt.AddAwakeEvent(WatchQueue::SUSPENDED, db_key)) {
|
||||
awakened_indices_.insert(db_index);
|
||||
} else {
|
||||
DVLOG(1) << "Skipped awakening " << db_index;
|
||||
}
|
||||
}
|
||||
|
||||
// Internal function called from ProcessAwakened().
|
||||
// Internal function called from RunStep().
|
||||
// Marks the queue as active and notifies the first transaction in the queue.
|
||||
void BlockingController::NotifyWatchQueue(WatchQueue* wq) {
|
||||
VLOG(1) << "Notify WQ: [" << owner_->shard_id() << "]";
|
||||
void BlockingController::NotifyWatchQueue(std::string_view key, WatchQueueMap* wqm) {
|
||||
auto w_it = wqm->find(key);
|
||||
CHECK(w_it != wqm->end());
|
||||
DVLOG(1) << "Notify WQ: [" << owner_->shard_id() << "] " << key;
|
||||
WatchQueue* wq = w_it->second.get();
|
||||
|
||||
wq->state = WatchQueue::ACTIVE;
|
||||
|
||||
|
@ -224,6 +232,7 @@ void BlockingController::NotifyWatchQueue(WatchQueue* wq) {
|
|||
do {
|
||||
WatchItem& wi = queue.front();
|
||||
Transaction* head = wi.get();
|
||||
DVLOG(2) << "Pop " << head << " from key " << key;
|
||||
|
||||
queue.pop_front();
|
||||
|
||||
|
@ -233,6 +242,10 @@ void BlockingController::NotifyWatchQueue(WatchQueue* wq) {
|
|||
break;
|
||||
}
|
||||
} while (!queue.empty());
|
||||
|
||||
if (wq->items.empty()) {
|
||||
wqm->erase(w_it);
|
||||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
@ -288,4 +301,17 @@ size_t BlockingController::NumWatched(DbIndex db_indx) const {
|
|||
return it->second->queue_map.size();
|
||||
}
|
||||
|
||||
vector<string> BlockingController::GetWatchedKeys(DbIndex db_indx) const {
|
||||
vector<string> res;
|
||||
auto it = watched_dbs_.find(db_indx);
|
||||
|
||||
if (it != watched_dbs_.end()) {
|
||||
for (const auto& k_v : it->second->queue_map) {
|
||||
res.push_back(k_v.first);
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -44,11 +44,9 @@ class BlockingController {
|
|||
// Called from operations that create keys like lpush, rename etc.
|
||||
void AwakeWatched(DbIndex db_index, std::string_view db_key);
|
||||
|
||||
// void OnTxFinish();
|
||||
|
||||
// void RegisterAwaitForConverge(Transaction* t);
|
||||
|
||||
// Used in tests and debugging functions.
|
||||
size_t NumWatched(DbIndex db_indx) const;
|
||||
std::vector<std::string> GetWatchedKeys(DbIndex db_indx) const;
|
||||
|
||||
private:
|
||||
struct WatchQueue;
|
||||
|
@ -56,9 +54,7 @@ class BlockingController {
|
|||
|
||||
using WatchQueueMap = absl::flat_hash_map<std::string, std::unique_ptr<WatchQueue>>;
|
||||
|
||||
/// Returns the notified transaction,
|
||||
/// or null if all transactions in the queue have expired..
|
||||
void NotifyWatchQueue(WatchQueue* wq);
|
||||
void NotifyWatchQueue(std::string_view key, WatchQueueMap* wqm);
|
||||
|
||||
// void NotifyConvergence(Transaction* tx);
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
#include <filesystem>
|
||||
|
||||
#include "base/logging.h"
|
||||
#include "server/blocking_controller.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "server/error.h"
|
||||
#include "server/main_service.h"
|
||||
|
@ -47,13 +48,11 @@ struct ObjInfo {
|
|||
unsigned bucket_id = 0;
|
||||
unsigned slot_id = 0;
|
||||
|
||||
enum LockStatus { NONE, S, X } lock_status = NONE;
|
||||
|
||||
int64_t ttl = INT64_MAX;
|
||||
bool has_sec_precision = false;
|
||||
|
||||
ObjInfo(unsigned e, unsigned bid) : encoding(e), bucket_id(bid) {
|
||||
}
|
||||
|
||||
ObjInfo() = default;
|
||||
bool found = false;
|
||||
};
|
||||
|
||||
void DoPopulateBatch(std::string_view prefix, size_t val_size, const SetCmd::SetParams& params,
|
||||
|
@ -88,6 +87,7 @@ void DebugCmd::Run(CmdArgList args) {
|
|||
" Examples:",
|
||||
" * DEBUG RELOAD NOSAVE: replace the current database with the contents of an",
|
||||
" existing RDB file.",
|
||||
"WATCHED",
|
||||
"POPULATE <count> [<prefix>] [<size>]",
|
||||
" Create <count> string keys named key:<num>. If <prefix> is specified then",
|
||||
" it is used instead of the 'key' prefix.",
|
||||
|
@ -106,6 +106,9 @@ void DebugCmd::Run(CmdArgList args) {
|
|||
if (subcmd == "RELOAD") {
|
||||
return Reload(args);
|
||||
}
|
||||
if (subcmd == "WATCHED") {
|
||||
return Watched();
|
||||
}
|
||||
|
||||
if (subcmd == "LOAD" && args.size() == 3) {
|
||||
return Load(ArgS(args, 2));
|
||||
|
@ -283,43 +286,72 @@ void DebugCmd::Inspect(string_view key) {
|
|||
EngineShardSet& ess = *shard_set;
|
||||
ShardId sid = Shard(key, ess.size());
|
||||
|
||||
auto cb = [&]() -> facade::OpResult<ObjInfo> {
|
||||
auto cb = [&]() -> ObjInfo {
|
||||
auto& db_slice = EngineShard::tlocal()->db_slice();
|
||||
auto [pt, exp_t] = db_slice.GetTables(cntx_->db_index());
|
||||
|
||||
PrimeIterator it = pt->Find(key);
|
||||
if (!IsValid(it)) {
|
||||
return OpStatus::KEY_NOTFOUND;
|
||||
ObjInfo oinfo;
|
||||
if (IsValid(it)) {
|
||||
oinfo.found = true;
|
||||
oinfo.encoding = it->second.Encoding();
|
||||
oinfo.bucket_id = it.bucket_id();
|
||||
oinfo.slot_id = it.slot_id();
|
||||
if (it->second.HasExpire()) {
|
||||
ExpireIterator exp_it = exp_t->Find(it->first);
|
||||
CHECK(!exp_it.is_done());
|
||||
|
||||
time_t exp_time = db_slice.ExpireTime(exp_it);
|
||||
oinfo.ttl = exp_time - db_slice.Now();
|
||||
oinfo.has_sec_precision = exp_it->second.is_second_precision();
|
||||
}
|
||||
}
|
||||
|
||||
ObjInfo oinfo(it->second.Encoding(), it.bucket_id());
|
||||
oinfo.slot_id = it.slot_id();
|
||||
|
||||
if (it->second.HasExpire()) {
|
||||
ExpireIterator exp_it = exp_t->Find(it->first);
|
||||
CHECK(!exp_it.is_done());
|
||||
|
||||
time_t exp_time = db_slice.ExpireTime(exp_it);
|
||||
oinfo.ttl = exp_time - db_slice.Now();
|
||||
oinfo.has_sec_precision = exp_it->second.is_second_precision();
|
||||
KeyLockArgs lock_args;
|
||||
lock_args.args = ArgSlice{&key, 1};
|
||||
lock_args.db_index = cntx_->db_index();
|
||||
if (!db_slice.CheckLock(IntentLock::EXCLUSIVE, lock_args)) {
|
||||
oinfo.lock_status =
|
||||
db_slice.CheckLock(IntentLock::SHARED, lock_args) ? ObjInfo::S : ObjInfo::X;
|
||||
}
|
||||
|
||||
return oinfo;
|
||||
};
|
||||
|
||||
OpResult<ObjInfo> res = ess.Await(sid, cb);
|
||||
if (res) {
|
||||
string resp;
|
||||
StrAppend(&resp, "encoding:", strEncoding(res->encoding), " bucket_id:", res->bucket_id);
|
||||
StrAppend(&resp, " slot:", res->slot_id);
|
||||
ObjInfo res = ess.Await(sid, cb);
|
||||
string resp;
|
||||
|
||||
if (res->ttl != INT64_MAX) {
|
||||
StrAppend(&resp, " ttl:", res->ttl, res->has_sec_precision ? "s" : "ms");
|
||||
if (res.found) {
|
||||
StrAppend(&resp, "encoding:", strEncoding(res.encoding), " bucket_id:", res.bucket_id);
|
||||
StrAppend(&resp, " slot:", res.slot_id, " shard:", sid);
|
||||
|
||||
if (res.ttl != INT64_MAX) {
|
||||
StrAppend(&resp, " ttl:", res.ttl, res.has_sec_precision ? "s" : "ms");
|
||||
}
|
||||
(*cntx_)->SendSimpleString(resp);
|
||||
} else {
|
||||
(*cntx_)->SendError(res.status());
|
||||
}
|
||||
|
||||
if (res.lock_status != ObjInfo::NONE) {
|
||||
StrAppend(&resp, " lock:", res.lock_status == ObjInfo::X ? "x" : "s");
|
||||
}
|
||||
(*cntx_)->SendSimpleString(resp);
|
||||
}
|
||||
|
||||
void DebugCmd::Watched() {
|
||||
vector<string> watched_keys;
|
||||
boost::fibers::mutex mu;
|
||||
|
||||
auto cb = [&](EngineShard* shard) {
|
||||
auto* bc = shard->blocking_controller();
|
||||
if (bc) {
|
||||
auto keys = bc->GetWatchedKeys(cntx_->db_index());
|
||||
|
||||
lock_guard lk(mu);
|
||||
watched_keys.insert(watched_keys.end(), keys.begin(), keys.end());
|
||||
}
|
||||
};
|
||||
|
||||
shard_set->RunBlockingInParallel(cb);
|
||||
(*cntx_)->SendStringArr(watched_keys);
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -23,6 +23,7 @@ class DebugCmd {
|
|||
void Reload(CmdArgList args);
|
||||
void Load(std::string_view filename);
|
||||
void Inspect(std::string_view key);
|
||||
void Watched();
|
||||
|
||||
ServerFamily& sf_;
|
||||
ConnectionContext* cntx_;
|
||||
|
|
|
@ -127,7 +127,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
|
|||
if (trans_mask & Transaction::AWAKED_Q) {
|
||||
DCHECK(continuation_trans_ == nullptr);
|
||||
|
||||
CHECK_EQ(committed_txid_, trans->notify_txid()) << "TBD";
|
||||
CHECK_EQ(committed_txid_, trans->notify_txid());
|
||||
bool keep = trans->RunInShard(this);
|
||||
if (keep)
|
||||
return;
|
||||
|
@ -198,21 +198,14 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
|
|||
DVLOG(1) << "Skipped TxQueue " << continuation_trans_ << " " << has_awaked_trans;
|
||||
}
|
||||
|
||||
// For SUSPENDED_Q - if transaction has not been notified, it will still be
|
||||
// in the watch queue. We need to unlock an Execute by running a noop.
|
||||
if (trans_mask & Transaction::SUSPENDED_Q) {
|
||||
// This case happens when some other shard notified the transaction and now it
|
||||
// runs FindFirst on all shards.
|
||||
// TxId notify_txid = trans->notify_txid();
|
||||
// DCHECK(HasResultConverged(notify_txid));
|
||||
trans->RunNoop(this);
|
||||
return;
|
||||
}
|
||||
// we need to run trans if it's OOO or when trans is blocked in this shard and should
|
||||
// be treated here as noop.
|
||||
// trans is OOO, it it locked keys that previous transactions have not locked yet.
|
||||
bool should_run = trans_mask & (Transaction::OUT_OF_ORDER | Transaction::SUSPENDED_Q);
|
||||
|
||||
// If trans is out of order, i.e. locks keys that previous transactions have not locked.
|
||||
// It may be that there are other transactions that touch those keys but they necessary ordered
|
||||
// after trans in the queue, hence it's safe to run trans out of order.
|
||||
if (trans && (trans_mask & Transaction::OUT_OF_ORDER)) {
|
||||
if (trans && should_run) {
|
||||
DCHECK(trans != head);
|
||||
DCHECK(!trans->IsMulti()); // multi, global transactions can not be OOO.
|
||||
DCHECK(trans_mask & Transaction::ARMED);
|
||||
|
@ -226,9 +219,6 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
|
|||
|
||||
bool keep = trans->RunInShard(this);
|
||||
DLOG_IF(INFO, !dbg_id.empty()) << "Eager run " << sid << ", " << dbg_id << ", keep " << keep;
|
||||
|
||||
// Should be enforced via Schedule(). TODO: to remove the check once the code is mature.
|
||||
CHECK(!keep) << "multi-hop transactions can not be OOO.";
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -132,6 +132,7 @@ OpResult<ShardFFResult> FindFirst(Transaction* trans) {
|
|||
|
||||
auto cb = [&find_res](auto* t, EngineShard* shard) {
|
||||
auto args = t->ShardArgsInShard(shard->shard_id());
|
||||
|
||||
OpResult<pair<PrimeIterator, unsigned>> ff_res =
|
||||
shard->db_slice().FindFirst(t->db_index(), args);
|
||||
|
||||
|
@ -245,12 +246,16 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) {
|
|||
result = FindFirst(t); // retry - must find something.
|
||||
}
|
||||
|
||||
// We got here
|
||||
if (!result) {
|
||||
t->UnregisterWatch();
|
||||
// cleanups, locks removal etc.
|
||||
auto cb = [this](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
|
||||
t->Execute(std::move(cb), true);
|
||||
|
||||
return result.status();
|
||||
}
|
||||
|
||||
VLOG(1) << "Popping an element";
|
||||
VLOG(1) << "Popping an element " << t->DebugId();
|
||||
ff_result_ = move(result.value());
|
||||
|
||||
auto cb = [this](Transaction* t, EngineShard* shard) { return Pop(t, shard); };
|
||||
|
|
|
@ -30,6 +30,12 @@ class ListFamilyTest : public BaseFamilyTest {
|
|||
ListFamilyTest() {
|
||||
num_threads_ = 4;
|
||||
}
|
||||
|
||||
void WaitForLocked(string_view key) {
|
||||
do {
|
||||
this_fiber::sleep_for(30us);
|
||||
} while (!IsLocked(0, key));
|
||||
}
|
||||
};
|
||||
|
||||
const char kKey1[] = "x";
|
||||
|
@ -181,9 +187,7 @@ TEST_F(ListFamilyTest, BLPopMultiPush) {
|
|||
blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"});
|
||||
});
|
||||
|
||||
do {
|
||||
this_fiber::sleep_for(30us);
|
||||
} while (!IsLocked(0, kKey1));
|
||||
WaitForLocked(kKey1);
|
||||
|
||||
auto p1_fb = pp_->at(1)->LaunchFiber([&] {
|
||||
for (unsigned i = 0; i < 100; ++i) {
|
||||
|
@ -221,9 +225,7 @@ TEST_F(ListFamilyTest, BLPopSerialize) {
|
|||
blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"});
|
||||
});
|
||||
|
||||
do {
|
||||
this_fiber::sleep_for(30us);
|
||||
} while (!IsLocked(0, kKey1));
|
||||
WaitForLocked(kKey1);
|
||||
|
||||
LOG(INFO) << "Starting multi";
|
||||
|
||||
|
@ -293,9 +295,7 @@ TEST_F(ListFamilyTest, WrongTypeDoesNotWake) {
|
|||
blpop_resp = Run({"blpop", kKey1, "0"});
|
||||
});
|
||||
|
||||
do {
|
||||
this_fiber::sleep_for(30us);
|
||||
} while (!IsLocked(0, kKey1));
|
||||
WaitForLocked(kKey1);
|
||||
|
||||
auto p1_fb = pp_->at(1)->LaunchFiber([&] {
|
||||
Run({"multi"});
|
||||
|
@ -320,11 +320,11 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) {
|
|||
|
||||
auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
|
||||
blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"});
|
||||
auto watched = Run({"debug", "watched"});
|
||||
ASSERT_THAT(watched, ArrLen(0));
|
||||
});
|
||||
|
||||
do {
|
||||
this_fiber::sleep_for(30us);
|
||||
} while (!IsLocked(0, kKey1));
|
||||
WaitForLocked(kKey1);
|
||||
|
||||
pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", kKey1, "bar"})); });
|
||||
pop_fb.join();
|
||||
|
@ -336,9 +336,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) {
|
|||
blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"});
|
||||
});
|
||||
|
||||
do {
|
||||
this_fiber::sleep_for(30us);
|
||||
} while (!IsLocked(0, kKey1));
|
||||
WaitForLocked(kKey1);
|
||||
|
||||
pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", kKey2, "bar"})); });
|
||||
pop_fb.join();
|
||||
|
@ -347,6 +345,28 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) {
|
|||
EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey2, "bar"));
|
||||
}
|
||||
|
||||
TEST_F(ListFamilyTest, BPopTwoKeysSameShard) {
|
||||
Run({"exists", "x", "y"});
|
||||
ASSERT_EQ(1, GetDebugInfo().shards_count);
|
||||
RespExpr blpop_resp;
|
||||
|
||||
auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
|
||||
blpop_resp = Run({"blpop", "x", "y", "0"});
|
||||
auto watched = Run({"debug", "watched"});
|
||||
|
||||
EXPECT_FALSE(IsLocked(0, "y"));
|
||||
ASSERT_THAT(watched, ArrLen(0));
|
||||
});
|
||||
|
||||
WaitForLocked("x");
|
||||
|
||||
pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", "x", "bar"})); });
|
||||
pop_fb.join();
|
||||
|
||||
ASSERT_THAT(blpop_resp, ArrLen(2));
|
||||
EXPECT_THAT(blpop_resp.GetVec(), ElementsAre("x", "bar"));
|
||||
}
|
||||
|
||||
TEST_F(ListFamilyTest, BPopRename) {
|
||||
RespExpr blpop_resp;
|
||||
|
||||
|
@ -357,9 +377,7 @@ TEST_F(ListFamilyTest, BPopRename) {
|
|||
blpop_resp = Run({"blpop", kKey1, "0"});
|
||||
});
|
||||
|
||||
do {
|
||||
this_fiber::sleep_for(30us);
|
||||
} while (!IsLocked(0, kKey1));
|
||||
WaitForLocked(kKey1);
|
||||
|
||||
pp_->at(1)->Await([&] {
|
||||
EXPECT_EQ(1, CheckedInt({"lpush", "a", "bar"}));
|
||||
|
@ -377,9 +395,7 @@ TEST_F(ListFamilyTest, BPopFlush) {
|
|||
blpop_resp = Run({"blpop", kKey1, "0"});
|
||||
});
|
||||
|
||||
do {
|
||||
this_fiber::sleep_for(30us);
|
||||
} while (!IsLocked(0, kKey1));
|
||||
WaitForLocked(kKey1);
|
||||
|
||||
pp_->at(1)->Await([&] {
|
||||
Run({"flushdb"});
|
||||
|
|
|
@ -284,6 +284,8 @@ void Transaction::SetExecCmd(const CommandId* cid) {
|
|||
}
|
||||
|
||||
string Transaction::DebugId() const {
|
||||
DCHECK_GT(use_count_.load(memory_order_relaxed), 0u);
|
||||
|
||||
return absl::StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, " (", trans_id(this), ")");
|
||||
}
|
||||
|
||||
|
@ -305,8 +307,7 @@ bool Transaction::RunInShard(EngineShard* shard) {
|
|||
DCHECK(sd.local_mask & ARMED);
|
||||
sd.local_mask &= ~ARMED;
|
||||
|
||||
DCHECK_EQ(sd.local_mask & (SUSPENDED_Q | EXPIRED_Q), 0);
|
||||
|
||||
bool was_suspended = sd.local_mask & SUSPENDED_Q;
|
||||
bool awaked_prerun = (sd.local_mask & AWAKED_Q) != 0;
|
||||
bool incremental_lock = multi_ && multi_->incremental;
|
||||
|
||||
|
@ -332,7 +333,8 @@ bool Transaction::RunInShard(EngineShard* shard) {
|
|||
/*************************************************************************/
|
||||
// Actually running the callback.
|
||||
try {
|
||||
OpStatus status = cb_(this, shard);
|
||||
// if transaction is suspended (blocked in watched queue), then it's a noop.
|
||||
OpStatus status = was_suspended ? OpStatus::OK : cb_(this, shard);
|
||||
|
||||
if (unique_shard_cnt_ == 1) {
|
||||
cb_ = nullptr; // We can do it because only a single thread runs the callback.
|
||||
|
@ -366,20 +368,25 @@ bool Transaction::RunInShard(EngineShard* shard) {
|
|||
|
||||
// If it's a final hop we should release the locks.
|
||||
if (should_release) {
|
||||
bool is_suspended = sd.local_mask & SUSPENDED_Q;
|
||||
bool become_suspended = sd.local_mask & SUSPENDED_Q;
|
||||
|
||||
if (IsGlobal()) {
|
||||
DCHECK(!awaked_prerun && !is_suspended); // Global transactions can not be blocking.
|
||||
DCHECK(!awaked_prerun && !become_suspended); // Global transactions can not be blocking.
|
||||
shard->shard_lock()->Release(Mode());
|
||||
} else { // not global.
|
||||
KeyLockArgs largs = GetLockArgs(idx);
|
||||
DCHECK(sd.local_mask & KEYLOCK_ACQUIRED);
|
||||
|
||||
// If a transaction has been suspended, we keep the lock so that future transaction
|
||||
// touching those keys will be ordered via TxQueue. It's necessary because we preserve
|
||||
// the atomicity of awaked transactions by halting the TxQueue.
|
||||
if (!is_suspended) {
|
||||
if (was_suspended || !become_suspended) {
|
||||
shard->db_slice().Release(mode, largs);
|
||||
sd.local_mask &= ~KEYLOCK_ACQUIRED;
|
||||
|
||||
if (was_suspended || (sd.local_mask & AWAKED_Q)) {
|
||||
shard->blocking_controller()->RemoveWatched(this);
|
||||
}
|
||||
}
|
||||
sd.local_mask &= ~OUT_OF_ORDER;
|
||||
|
||||
|
@ -398,37 +405,6 @@ bool Transaction::RunInShard(EngineShard* shard) {
|
|||
return !should_release; // keep
|
||||
}
|
||||
|
||||
void Transaction::RunNoop(EngineShard* shard) {
|
||||
DVLOG(1) << "RunNoop " << DebugId();
|
||||
|
||||
unsigned idx = SidToId(shard->shard_id());
|
||||
auto& sd = shard_data_[idx];
|
||||
DCHECK(sd.local_mask & ARMED);
|
||||
DCHECK(sd.local_mask & KEYLOCK_ACQUIRED);
|
||||
DCHECK(!multi_);
|
||||
DCHECK(!IsGlobal());
|
||||
|
||||
sd.local_mask &= ~ARMED;
|
||||
|
||||
if (unique_shard_cnt_ == 1) {
|
||||
cb_ = nullptr;
|
||||
local_result_ = OpStatus::OK;
|
||||
}
|
||||
|
||||
if (coordinator_state_ & COORD_EXEC_CONCLUDING) {
|
||||
KeyLockArgs largs = GetLockArgs(idx);
|
||||
shard->db_slice().Release(Mode(), largs);
|
||||
sd.local_mask &= ~KEYLOCK_ACQUIRED;
|
||||
|
||||
if (sd.local_mask & SUSPENDED_Q) {
|
||||
sd.local_mask |= EXPIRED_Q;
|
||||
shard->blocking_controller()->RemoveWatched(this);
|
||||
}
|
||||
}
|
||||
// Decrease run count after we update all the data in the transaction object.
|
||||
CHECK_GE(DecreaseRunCnt(), 1u);
|
||||
}
|
||||
|
||||
void Transaction::ScheduleInternal() {
|
||||
DCHECK(!shard_data_.empty());
|
||||
DCHECK_EQ(0u, txid_);
|
||||
|
@ -939,7 +915,8 @@ bool Transaction::WaitOnWatch(const time_point& tp) {
|
|||
DVLOG(1) << "WaitOnWatch AfterWait";
|
||||
} else {
|
||||
DVLOG(1) << "WaitOnWatch TimeWait for "
|
||||
<< duration_cast<milliseconds>(tp - time_point::clock::now()).count() << " ms";
|
||||
<< duration_cast<milliseconds>(tp - time_point::clock::now()).count() << " ms "
|
||||
<< DebugId();
|
||||
|
||||
status = blocking_ec_.await_until(move(wake_cb), tp);
|
||||
|
||||
|
@ -982,14 +959,6 @@ bool Transaction::WaitOnWatch(const time_point& tp) {
|
|||
return true;
|
||||
}
|
||||
|
||||
void Transaction::UnregisterWatch() {
|
||||
auto cb = [](Transaction* t, EngineShard* shard) {
|
||||
t->RemoveFromWatchedShardCb(shard);
|
||||
return OpStatus::OK;
|
||||
};
|
||||
Execute(std::move(cb), true);
|
||||
}
|
||||
|
||||
// Runs only in the shard thread.
|
||||
OpStatus Transaction::AddToWatchedShardCb(EngineShard* shard) {
|
||||
ShardId idx = SidToId(shard->shard_id());
|
||||
|
@ -1005,25 +974,6 @@ OpStatus Transaction::AddToWatchedShardCb(EngineShard* shard) {
|
|||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
// Runs only in the shard thread.
|
||||
// Quadratic complexity in number of arguments and queue length.
|
||||
bool Transaction::RemoveFromWatchedShardCb(EngineShard* shard) {
|
||||
ShardId idx = SidToId(shard->shard_id());
|
||||
auto& sd = shard_data_[idx];
|
||||
|
||||
constexpr uint16_t kQueueMask =
|
||||
Transaction::SUSPENDED_Q | Transaction::AWAKED_Q | Transaction::EXPIRED_Q;
|
||||
|
||||
if ((sd.local_mask & kQueueMask) == 0)
|
||||
return false;
|
||||
|
||||
sd.local_mask &= ~kQueueMask;
|
||||
|
||||
shard->blocking_controller()->RemoveWatched(this);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void Transaction::ExpireShardCb(EngineShard* shard) {
|
||||
auto lock_args = GetLockArgs(shard->shard_id());
|
||||
shard->db_slice().Release(Mode(), lock_args);
|
||||
|
@ -1171,7 +1121,6 @@ void Transaction::BreakOnClose() {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
|
||||
DCHECK_EQ(0u, cid->opt_mask() & CO::GLOBAL_TRANS);
|
||||
|
||||
|
|
|
@ -21,14 +21,15 @@
|
|||
|
||||
namespace dfly {
|
||||
|
||||
class DbSlice;
|
||||
class EngineShardSet;
|
||||
class EngineShard;
|
||||
class BlockingController;
|
||||
|
||||
using facade::OpStatus;
|
||||
using facade::OpResult;
|
||||
|
||||
class Transaction {
|
||||
friend class BlockingController;
|
||||
|
||||
Transaction(const Transaction&);
|
||||
void operator=(const Transaction&) = delete;
|
||||
|
||||
|
@ -161,7 +162,6 @@ class Transaction {
|
|||
// Expects that the transaction had been scheduled before, and uses Execute(.., true) to register.
|
||||
// Returns false if timeout ocurred, true if was notified by one of the keys.
|
||||
bool WaitOnWatch(const time_point& tp);
|
||||
void UnregisterWatch();
|
||||
|
||||
// Returns true if transaction is awaked, false if it's timed-out and can be removed from the
|
||||
// blocking queue. NotifySuspended may be called from (multiple) shard threads and
|
||||
|
@ -175,8 +175,6 @@ class Transaction {
|
|||
// Returns true if transaction should be kept in the queue.
|
||||
bool RunInShard(EngineShard* shard);
|
||||
|
||||
void RunNoop(EngineShard* shard);
|
||||
|
||||
//! Returns locking arguments needed for DbSlice to Acquire/Release transactional locks.
|
||||
//! Runs in the shard thread.
|
||||
KeyLockArgs GetLockArgs(ShardId sid) const;
|
||||
|
@ -215,7 +213,7 @@ class Transaction {
|
|||
|
||||
// Shard callbacks used within Execute calls
|
||||
OpStatus AddToWatchedShardCb(EngineShard* shard);
|
||||
bool RemoveFromWatchedShardCb(EngineShard* shard);
|
||||
|
||||
void ExpireShardCb(EngineShard* shard);
|
||||
void UnlockMultiShardCb(const std::vector<KeyList>& sharded_keys, EngineShard* shard);
|
||||
|
||||
|
|
|
@ -1,19 +1,18 @@
|
|||
|
||||
import pytest
|
||||
import redis
|
||||
from threading import Thread
|
||||
|
||||
|
||||
from redis.client import NEVER_DECODE
|
||||
|
||||
@pytest.fixture
|
||||
def client():
|
||||
pool = redis.ConnectionPool(decode_responses=True)
|
||||
return redis.Redis(connection_pool=pool)
|
||||
|
||||
client = redis.Redis(connection_pool=pool)
|
||||
return client
|
||||
|
||||
class BLPopWorkerThread:
|
||||
def __init__(self):
|
||||
self.result = None
|
||||
self.thread = None
|
||||
|
||||
def async_blpop(self, client: redis.Redis):
|
||||
self.result = None
|
||||
|
@ -22,22 +21,26 @@ class BLPopWorkerThread:
|
|||
self.result = client.blpop(
|
||||
['list1{t}', 'list2{t}', 'list2{t}', 'list1{t}'], 0.5)
|
||||
|
||||
result = Thread(target=blpop_task, args=(self, client))
|
||||
result.start()
|
||||
return result
|
||||
self.thread = Thread(target=blpop_task, args=(self, client))
|
||||
self.thread.start()
|
||||
|
||||
def wait(self, timeout):
|
||||
self.thread.join(timeout)
|
||||
return not self.thread.is_alive()
|
||||
|
||||
|
||||
@pytest.mark.parametrize('execution_number', range(5))
|
||||
def test_blpop_multiple_keys(client, execution_number):
|
||||
@pytest.mark.parametrize('index', range(50))
|
||||
def test_blpop_multiple_keys(client : redis.Redis, index):
|
||||
wt_blpop = BLPopWorkerThread()
|
||||
thread = wt_blpop.async_blpop(client)
|
||||
client.lpush('list1{t}', 'a')
|
||||
thread.join(timeout=2)
|
||||
assert not thread.is_alive()
|
||||
assert wt_blpop.result[1] == 'a'
|
||||
wt_blpop.async_blpop(client)
|
||||
|
||||
thread = wt_blpop.async_blpop(client)
|
||||
client.lpush('list1{t}', 'a')
|
||||
assert wt_blpop.wait(2)
|
||||
assert wt_blpop.result[1] == 'a'
|
||||
watched = client.execute_command('DEBUG WATCHED')
|
||||
assert watched == []
|
||||
|
||||
wt_blpop.async_blpop(client)
|
||||
client.lpush('list2{t}', 'b')
|
||||
thread.join(timeout=2)
|
||||
assert not thread.is_alive()
|
||||
assert wt_blpop.wait(2)
|
||||
assert wt_blpop.result[1] == 'b'
|
||||
|
|
Loading…
Reference in New Issue