Identify multi-blpop interaction bug.

Now it fails on missing functionality without deadlocking.
This commit is contained in:
Roman Gershman 2022-01-12 10:39:06 +02:00
parent fc63eec1b6
commit ea399e3e5a
5 changed files with 50 additions and 34 deletions

View File

@ -44,4 +44,7 @@ jobs:
working-directory: ${{github.workspace}}/build
run: |
ninja core/all server/all
CTEST_OUTPUT_ON_FAILURE=1 ninja core/test server/test
# GLOG_logtostderr=1 ctest -V -R list_family_test
GLOG_logtostderr=1 ctest -V -L DFLY
GLOG_logtostderr=1 GLOG_vmodule=transaction=1 CTEST_OUTPUT_ON_FAILURE=1 ninja server/test

View File

@ -86,8 +86,8 @@ void EngineShard::DestroyThreadLocal() {
// Is called by Transaction::ExecuteAsync in order to run transaction tasks.
// Only runs in its own thread.
void EngineShard::PollExecution(Transaction* trans) {
DVLOG(1) << "PollExecution " << (trans ? trans->DebugId() : "");
void EngineShard::PollExecution(const char* context, Transaction* trans) {
DVLOG(1) << "PollExecution " << context << " " << (trans ? trans->DebugId() : "");
ShardId sid = shard_id();
uint16_t trans_mask = trans ? trans->GetLocalMask(sid) : 0;
@ -161,7 +161,9 @@ void EngineShard::PollExecution(Transaction* trans) {
OnTxFinish();
} // while(!txq_.Empty())
} // if (!has_awaked_trans)
} else { // if (continuation_trans_ == nullptr && !has_awaked_trans)
DVLOG(1) << "Skipped TxQueue " << continuation_trans_ << " " << has_awaked_trans;
}
// For SUSPENDED_Q - if transaction has not been notified, it will still be
// in the watch queue. We need to unlock an Execute by running a noop.
@ -222,18 +224,20 @@ Transaction* EngineShard::NotifyWatchQueue(WatchQueue* wq) {
// awakened to eliminate false positives.
void EngineShard::ProcessAwakened(Transaction* completed_t) {
for (DbIndex index : awakened_indices_) {
WatchTable& wt = watch_map_[index];
DbWatchTable& wt = watched_dbs_[index];
for (auto key : wt.awakened_keys) {
string_view sv_key = static_cast<string_view>(key);
auto [it, exp_it] = db_slice_.FindExt(index, sv_key); // Double verify we still got the item.
if (IsValid(it)) {
auto w_it = wt.queue_map.find(sv_key);
CHECK(w_it != wt.queue_map.end());
DVLOG(1) << "NotifyWatchQueue " << key;
Transaction* t2 = NotifyWatchQueue(w_it->second.get());
if (t2) {
awakened_transactions_.insert(t2);
}
if (!IsValid(it))
continue;
auto w_it = wt.queue_map.find(sv_key);
CHECK(w_it != wt.queue_map.end());
DVLOG(1) << "NotifyWatchQueue " << key;
Transaction* t2 = NotifyWatchQueue(w_it->second.get());
if (t2) {
awakened_transactions_.insert(t2);
}
}
wt.awakened_keys.clear();
@ -243,7 +247,7 @@ void EngineShard::ProcessAwakened(Transaction* completed_t) {
if (!completed_t)
return;
auto& wt = watch_map_[completed_t->db_index()];
auto& wt = watched_dbs_[completed_t->db_index()];
KeyLockArgs lock_args = completed_t->GetLockArgs(shard_id());
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
string_view key = lock_args.args[i];
@ -251,9 +255,11 @@ void EngineShard::ProcessAwakened(Transaction* completed_t) {
if (w_it == wt.queue_map.end() || w_it->second->state != WatchQueue::ACTIVE)
continue;
WatchQueue& wq = *w_it->second;
DCHECK_LE(wq.notify_txid, committed_txid_);
auto& queue = wq.items;
DCHECK(!queue.empty()); // since it's active
if (queue.front().trans == completed_t) {
@ -269,6 +275,7 @@ void EngineShard::ProcessAwakened(Transaction* completed_t) {
}
if (queue.empty()) {
DVLOG(1) << "Erasing watchqueue key " << key;
wt.queue_map.erase(w_it);
}
}
@ -277,7 +284,7 @@ void EngineShard::ProcessAwakened(Transaction* completed_t) {
}
void EngineShard::AddWatched(string_view key, Transaction* me) {
WatchTable& wt = watch_map_[me->db_index()];
DbWatchTable& wt = watched_dbs_[me->db_index()];
auto [res, inserted] = wt.queue_map.emplace(key, nullptr);
if (inserted) {
res->second.reset(new WatchQueue);
@ -288,7 +295,7 @@ void EngineShard::AddWatched(string_view key, Transaction* me) {
// Runs in O(N) complexity.
bool EngineShard::RemovedWatched(string_view key, Transaction* me) {
WatchTable& wt = watch_map_[me->db_index()];
DbWatchTable& wt = watched_dbs_[me->db_index()];
auto watch_it = wt.queue_map.find(key);
CHECK(watch_it != wt.queue_map.end());
@ -297,6 +304,7 @@ bool EngineShard::RemovedWatched(string_view key, Transaction* me) {
if (j->trans == me) {
wq.items.erase(j);
if (wq.items.empty()) {
DVLOG(1) << "Erasing watchqueue key " << key;
wt.queue_map.erase(watch_it);
}
return true;
@ -309,10 +317,10 @@ bool EngineShard::RemovedWatched(string_view key, Transaction* me) {
}
void EngineShard::GCWatched(const KeyLockArgs& largs) {
auto& queue_map = watch_map_[largs.db_index].queue_map;
auto& queue_map = watched_dbs_[largs.db_index].queue_map;
for (size_t i = 0; i < largs.args.size(); i += largs.key_step) {
auto key = largs.args[i];
string_view key = largs.args[i];
auto watch_it = queue_map.find(key);
CHECK(watch_it != queue_map.end());
WatchQueue& wq = *watch_it->second;
@ -326,6 +334,7 @@ void EngineShard::GCWatched(const KeyLockArgs& largs) {
} while (!wq.items.empty());
if (wq.items.empty()) {
DVLOG(1) << "Erasing watchqueue key " << key;
queue_map.erase(watch_it);
}
}
@ -333,11 +342,11 @@ void EngineShard::GCWatched(const KeyLockArgs& largs) {
// Called from commands like lpush.
void EngineShard::AwakeWatched(DbIndex db_index, const MainIterator& main_it) {
auto it = watch_map_.find(db_index);
if (it == watch_map_.end())
auto it = watched_dbs_.find(db_index);
if (it == watched_dbs_.end())
return;
WatchTable& wt = it->second;
DbWatchTable& wt = it->second;
if (wt.queue_map.empty()) { /// No blocked transactions.
return;
}
@ -363,10 +372,11 @@ void EngineShard::ShutdownMulti(Transaction* multi) {
if (continuation_trans_ == multi) {
continuation_trans_ = nullptr;
}
OnTxFinish();
}
void EngineShard::WaitForConvergence(TxId notifyid, Transaction* t) {
DVLOG(1) << "ConvergeNotification " << shard_id();
DVLOG(1) << "ConvergeNotification " << t->DebugId() << " at notify " << notifyid;
waiting_convergence_.emplace(notifyid, t);
}

View File

@ -59,7 +59,7 @@ class EngineShard {
// Processes TxQueue, blocked transactions or any other execution state related to that
// shard. Tries executing the passed transaction if possible (does not guarantee though).
void PollExecution(Transaction* trans);
void PollExecution(const char* context, Transaction* trans);
// Returns transaction queue.
TxQueue* txq() {
@ -128,7 +128,8 @@ class EngineShard {
/// or null if all transactions in the queue have expired..
Transaction* NotifyWatchQueue(WatchQueue* wq);
struct WatchTable {
// Watch state per db slice.
struct DbWatchTable {
absl::flat_hash_map<std::string, std::unique_ptr<WatchQueue>> queue_map;
// awakened keys that point to blocked entries that can potentially be unblocked.
@ -136,7 +137,7 @@ class EngineShard {
absl::flat_hash_set<base::string_view_sso> awakened_keys;
};
absl::flat_hash_map<DbIndex, WatchTable> watch_map_;
absl::flat_hash_map<DbIndex, DbWatchTable> watched_dbs_;
absl::flat_hash_set<DbIndex> awakened_indices_;
absl::flat_hash_set<Transaction*> awakened_transactions_;

View File

@ -31,9 +31,9 @@ class ListFamilyTest : public BaseFamilyTest {
}
};
const char* kKey1 = "x";
const char* kKey2 = "b";
const char* kKey3 = "c";
const char kKey1[] = "x";
const char kKey2[] = "b";
const char kKey3[] = "c";
TEST_F(ListFamilyTest, Basic) {
auto resp = Run({"lpush", kKey1, "1"});

View File

@ -565,6 +565,8 @@ void Transaction::UnlockMulti() {
// Hence it could stay in the tx queue. We perform the necessary cleanup and remove it from
// there.
if (sd.pq_pos != TxQueue::kEnd) {
DVLOG(1) << "unlockmulti: TxPopFront " << DebugId();
TxQueue* txq = shard->txq();
DCHECK(!txq->Empty());
Transaction* trans = absl::get<Transaction*>(txq->Front());
@ -577,7 +579,7 @@ void Transaction::UnlockMulti() {
// notify awakened transactions.
shard->ProcessAwakened(nullptr);
shard->PollExecution(nullptr);
shard->PollExecution("unlockmulti", nullptr);
this->DecreaseRunCnt();
};
@ -673,10 +675,10 @@ void Transaction::ExecuteAsync() {
// shard->PollExecution(this) does not necessarily execute this transaction.
// Therefore, everything that should be handled during the callback execution
// should go into RunInShard.
shard->PollExecution(this);
shard->PollExecution("exec_cb", this);
}
DVLOG(2) << "ptr_release " << DebugId() << " " << use_count();
DVLOG(2) << "ptr_release " << DebugId() << " " << seq;
intrusive_ptr_release(this); // against use_count_.fetch_add above.
};
@ -735,7 +737,7 @@ void Transaction::ExpireBlocking() {
// My guess - probably to trigger the run of stalled transactions in case
// this shard concurrently awoke this transaction and stalled the processing
// of TxQueue.
shard->PollExecution(nullptr);
shard->PollExecution("expirecb", nullptr);
CHECK_GE(DecreaseRunCnt(), 1u);
};
@ -803,7 +805,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
DVLOG(1) << "Rescheduling into TxQueue " << DebugId();
shard->PollExecution(nullptr);
shard->PollExecution("schedule_unique", nullptr);
return false;
}