From ea399e3e5a4ec58d811199b572ce955dfe8d581b Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Wed, 12 Jan 2022 10:39:06 +0200 Subject: [PATCH] Identify multi-blpop interaction bug. Now it fails on missing functionality without deadlocking. --- .github/workflows/ci.yml | 7 +++-- server/engine_shard_set.cc | 52 +++++++++++++++++++++++--------------- server/engine_shard_set.h | 7 ++--- server/list_family_test.cc | 6 ++--- server/transaction.cc | 12 +++++---- 5 files changed, 50 insertions(+), 34 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index aed1051..c8c10b2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -42,6 +42,9 @@ jobs: - name: Build & Test working-directory: ${{github.workspace}}/build - run: | + run: | ninja core/all server/all - CTEST_OUTPUT_ON_FAILURE=1 ninja core/test server/test + # GLOG_logtostderr=1 ctest -V -R list_family_test + GLOG_logtostderr=1 ctest -V -L DFLY + GLOG_logtostderr=1 GLOG_vmodule=transaction=1 CTEST_OUTPUT_ON_FAILURE=1 ninja server/test + \ No newline at end of file diff --git a/server/engine_shard_set.cc b/server/engine_shard_set.cc index 59a217f..9b72545 100644 --- a/server/engine_shard_set.cc +++ b/server/engine_shard_set.cc @@ -86,8 +86,8 @@ void EngineShard::DestroyThreadLocal() { // Is called by Transaction::ExecuteAsync in order to run transaction tasks. // Only runs in its own thread. -void EngineShard::PollExecution(Transaction* trans) { - DVLOG(1) << "PollExecution " << (trans ? trans->DebugId() : ""); +void EngineShard::PollExecution(const char* context, Transaction* trans) { + DVLOG(1) << "PollExecution " << context << " " << (trans ? trans->DebugId() : ""); ShardId sid = shard_id(); uint16_t trans_mask = trans ? trans->GetLocalMask(sid) : 0; @@ -161,7 +161,9 @@ void EngineShard::PollExecution(Transaction* trans) { OnTxFinish(); } // while(!txq_.Empty()) - } // if (!has_awaked_trans) + } else { // if (continuation_trans_ == nullptr && !has_awaked_trans) + DVLOG(1) << "Skipped TxQueue " << continuation_trans_ << " " << has_awaked_trans; + } // For SUSPENDED_Q - if transaction has not been notified, it will still be // in the watch queue. We need to unlock an Execute by running a noop. @@ -222,18 +224,20 @@ Transaction* EngineShard::NotifyWatchQueue(WatchQueue* wq) { // awakened to eliminate false positives. void EngineShard::ProcessAwakened(Transaction* completed_t) { for (DbIndex index : awakened_indices_) { - WatchTable& wt = watch_map_[index]; + DbWatchTable& wt = watched_dbs_[index]; + for (auto key : wt.awakened_keys) { string_view sv_key = static_cast(key); auto [it, exp_it] = db_slice_.FindExt(index, sv_key); // Double verify we still got the item. - if (IsValid(it)) { - auto w_it = wt.queue_map.find(sv_key); - CHECK(w_it != wt.queue_map.end()); - DVLOG(1) << "NotifyWatchQueue " << key; - Transaction* t2 = NotifyWatchQueue(w_it->second.get()); - if (t2) { - awakened_transactions_.insert(t2); - } + if (!IsValid(it)) + continue; + + auto w_it = wt.queue_map.find(sv_key); + CHECK(w_it != wt.queue_map.end()); + DVLOG(1) << "NotifyWatchQueue " << key; + Transaction* t2 = NotifyWatchQueue(w_it->second.get()); + if (t2) { + awakened_transactions_.insert(t2); } } wt.awakened_keys.clear(); @@ -243,7 +247,7 @@ void EngineShard::ProcessAwakened(Transaction* completed_t) { if (!completed_t) return; - auto& wt = watch_map_[completed_t->db_index()]; + auto& wt = watched_dbs_[completed_t->db_index()]; KeyLockArgs lock_args = completed_t->GetLockArgs(shard_id()); for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { string_view key = lock_args.args[i]; @@ -251,9 +255,11 @@ void EngineShard::ProcessAwakened(Transaction* completed_t) { if (w_it == wt.queue_map.end() || w_it->second->state != WatchQueue::ACTIVE) continue; + WatchQueue& wq = *w_it->second; DCHECK_LE(wq.notify_txid, committed_txid_); + auto& queue = wq.items; DCHECK(!queue.empty()); // since it's active if (queue.front().trans == completed_t) { @@ -269,6 +275,7 @@ void EngineShard::ProcessAwakened(Transaction* completed_t) { } if (queue.empty()) { + DVLOG(1) << "Erasing watchqueue key " << key; wt.queue_map.erase(w_it); } } @@ -277,7 +284,7 @@ void EngineShard::ProcessAwakened(Transaction* completed_t) { } void EngineShard::AddWatched(string_view key, Transaction* me) { - WatchTable& wt = watch_map_[me->db_index()]; + DbWatchTable& wt = watched_dbs_[me->db_index()]; auto [res, inserted] = wt.queue_map.emplace(key, nullptr); if (inserted) { res->second.reset(new WatchQueue); @@ -288,7 +295,7 @@ void EngineShard::AddWatched(string_view key, Transaction* me) { // Runs in O(N) complexity. bool EngineShard::RemovedWatched(string_view key, Transaction* me) { - WatchTable& wt = watch_map_[me->db_index()]; + DbWatchTable& wt = watched_dbs_[me->db_index()]; auto watch_it = wt.queue_map.find(key); CHECK(watch_it != wt.queue_map.end()); @@ -297,6 +304,7 @@ bool EngineShard::RemovedWatched(string_view key, Transaction* me) { if (j->trans == me) { wq.items.erase(j); if (wq.items.empty()) { + DVLOG(1) << "Erasing watchqueue key " << key; wt.queue_map.erase(watch_it); } return true; @@ -309,10 +317,10 @@ bool EngineShard::RemovedWatched(string_view key, Transaction* me) { } void EngineShard::GCWatched(const KeyLockArgs& largs) { - auto& queue_map = watch_map_[largs.db_index].queue_map; + auto& queue_map = watched_dbs_[largs.db_index].queue_map; for (size_t i = 0; i < largs.args.size(); i += largs.key_step) { - auto key = largs.args[i]; + string_view key = largs.args[i]; auto watch_it = queue_map.find(key); CHECK(watch_it != queue_map.end()); WatchQueue& wq = *watch_it->second; @@ -326,6 +334,7 @@ void EngineShard::GCWatched(const KeyLockArgs& largs) { } while (!wq.items.empty()); if (wq.items.empty()) { + DVLOG(1) << "Erasing watchqueue key " << key; queue_map.erase(watch_it); } } @@ -333,11 +342,11 @@ void EngineShard::GCWatched(const KeyLockArgs& largs) { // Called from commands like lpush. void EngineShard::AwakeWatched(DbIndex db_index, const MainIterator& main_it) { - auto it = watch_map_.find(db_index); - if (it == watch_map_.end()) + auto it = watched_dbs_.find(db_index); + if (it == watched_dbs_.end()) return; - WatchTable& wt = it->second; + DbWatchTable& wt = it->second; if (wt.queue_map.empty()) { /// No blocked transactions. return; } @@ -363,10 +372,11 @@ void EngineShard::ShutdownMulti(Transaction* multi) { if (continuation_trans_ == multi) { continuation_trans_ = nullptr; } + OnTxFinish(); } void EngineShard::WaitForConvergence(TxId notifyid, Transaction* t) { - DVLOG(1) << "ConvergeNotification " << shard_id(); + DVLOG(1) << "ConvergeNotification " << t->DebugId() << " at notify " << notifyid; waiting_convergence_.emplace(notifyid, t); } diff --git a/server/engine_shard_set.h b/server/engine_shard_set.h index 02603ce..7a57a05 100644 --- a/server/engine_shard_set.h +++ b/server/engine_shard_set.h @@ -59,7 +59,7 @@ class EngineShard { // Processes TxQueue, blocked transactions or any other execution state related to that // shard. Tries executing the passed transaction if possible (does not guarantee though). - void PollExecution(Transaction* trans); + void PollExecution(const char* context, Transaction* trans); // Returns transaction queue. TxQueue* txq() { @@ -128,7 +128,8 @@ class EngineShard { /// or null if all transactions in the queue have expired.. Transaction* NotifyWatchQueue(WatchQueue* wq); - struct WatchTable { + // Watch state per db slice. + struct DbWatchTable { absl::flat_hash_map> queue_map; // awakened keys that point to blocked entries that can potentially be unblocked. @@ -136,7 +137,7 @@ class EngineShard { absl::flat_hash_set awakened_keys; }; - absl::flat_hash_map watch_map_; + absl::flat_hash_map watched_dbs_; absl::flat_hash_set awakened_indices_; absl::flat_hash_set awakened_transactions_; diff --git a/server/list_family_test.cc b/server/list_family_test.cc index 826d73d..6fd3d3a 100644 --- a/server/list_family_test.cc +++ b/server/list_family_test.cc @@ -31,9 +31,9 @@ class ListFamilyTest : public BaseFamilyTest { } }; -const char* kKey1 = "x"; -const char* kKey2 = "b"; -const char* kKey3 = "c"; +const char kKey1[] = "x"; +const char kKey2[] = "b"; +const char kKey3[] = "c"; TEST_F(ListFamilyTest, Basic) { auto resp = Run({"lpush", kKey1, "1"}); diff --git a/server/transaction.cc b/server/transaction.cc index 751ae1b..a2683c5 100644 --- a/server/transaction.cc +++ b/server/transaction.cc @@ -565,6 +565,8 @@ void Transaction::UnlockMulti() { // Hence it could stay in the tx queue. We perform the necessary cleanup and remove it from // there. if (sd.pq_pos != TxQueue::kEnd) { + DVLOG(1) << "unlockmulti: TxPopFront " << DebugId(); + TxQueue* txq = shard->txq(); DCHECK(!txq->Empty()); Transaction* trans = absl::get(txq->Front()); @@ -577,7 +579,7 @@ void Transaction::UnlockMulti() { // notify awakened transactions. shard->ProcessAwakened(nullptr); - shard->PollExecution(nullptr); + shard->PollExecution("unlockmulti", nullptr); this->DecreaseRunCnt(); }; @@ -673,10 +675,10 @@ void Transaction::ExecuteAsync() { // shard->PollExecution(this) does not necessarily execute this transaction. // Therefore, everything that should be handled during the callback execution // should go into RunInShard. - shard->PollExecution(this); + shard->PollExecution("exec_cb", this); } - DVLOG(2) << "ptr_release " << DebugId() << " " << use_count(); + DVLOG(2) << "ptr_release " << DebugId() << " " << seq; intrusive_ptr_release(this); // against use_count_.fetch_add above. }; @@ -735,7 +737,7 @@ void Transaction::ExpireBlocking() { // My guess - probably to trigger the run of stalled transactions in case // this shard concurrently awoke this transaction and stalled the processing // of TxQueue. - shard->PollExecution(nullptr); + shard->PollExecution("expirecb", nullptr); CHECK_GE(DecreaseRunCnt(), 1u); }; @@ -803,7 +805,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { DVLOG(1) << "Rescheduling into TxQueue " << DebugId(); - shard->PollExecution(nullptr); + shard->PollExecution("schedule_unique", nullptr); return false; }