More work on blocking commands like BLPOP.

Fixes #1 and fixes #24.
This commit is contained in:
Roman Gershman 2022-04-27 10:36:25 +03:00
parent 344731d255
commit 72e90bb729
12 changed files with 739 additions and 437 deletions

2
helio

@ -1 +1 @@
Subproject commit 9b96ae52be8ccf35c959c366757ae30174d84a0e
Subproject commit a024151f24180d493b51909b6853cfd16ae6367d

View File

@ -1,7 +1,8 @@
add_executable(dragonfly dfly_main.cc)
cxx_link(dragonfly base dragonfly_lib)
add_library(dragonfly_lib channel_slice.cc command_registry.cc common.cc config_flags.cc
add_library(dragonfly_lib blocking_controller.cc channel_slice.cc command_registry.cc
common.cc config_flags.cc
conn_context.cc db_slice.cc debugcmd.cc
engine_shard_set.cc generic_family.cc hset_family.cc io_mgr.cc
list_family.cc main_service.cc rdb_load.cc rdb_save.cc replica.cc
@ -22,7 +23,7 @@ cxx_test(set_family_test dfly_test_lib LABELS DFLY)
cxx_test(string_family_test dfly_test_lib LABELS DFLY)
cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/redis6_small.rdb LABELS DFLY)
cxx_test(zset_family_test dfly_test_lib LABELS DFLY)
cxx_test(blocking_controller_test dragonfly_lib LABELS DFLY)
add_custom_target(check_dfly WORKING_DIRECTORY .. COMMAND ctest -L DFLY)
add_dependencies(check_dfly dragonfly_test list_family_test

View File

@ -0,0 +1,291 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/blocking_controller.h"
#include <boost/smart_ptr/intrusive_ptr.hpp>
extern "C" {
#include "redis/object.h"
}
#include "base/logging.h"
#include "server/engine_shard_set.h"
#include "server/transaction.h"
namespace dfly {
using namespace std;
struct WatchItem {
Transaction* trans;
Transaction* get() {
return trans;
}
WatchItem(Transaction* t) : trans(t) {
}
};
struct BlockingController::WatchQueue {
deque<WatchItem> items;
TxId notify_txid = UINT64_MAX;
// Updated by both coordinator and shard threads but at different times.
enum State { SUSPENDED, ACTIVE } state = SUSPENDED;
void Suspend() {
state = SUSPENDED;
notify_txid = UINT64_MAX;
}
};
// Watch state per db.
struct BlockingController::DbWatchTable {
WatchQueueMap queue_map;
// awakened keys point to blocked keys that can potentially be unblocked.
// they reference key objects in queue_map.
absl::flat_hash_set<base::string_view_sso> awakened_keys;
void RemoveEntry(WatchQueueMap::iterator it);
// returns true if awake event was added.
// Requires that the key queue be in the required state.
bool AddAwakeEvent(WatchQueue::State cur_state, string_view key);
};
BlockingController::BlockingController(EngineShard* owner) : owner_(owner) {
}
BlockingController::~BlockingController() {
}
void BlockingController::DbWatchTable::RemoveEntry(WatchQueueMap::iterator it) {
DVLOG(1) << "Erasing watchqueue key " << it->first;
awakened_keys.erase(it->first);
queue_map.erase(it);
}
bool BlockingController::DbWatchTable::AddAwakeEvent(WatchQueue::State cur_state, string_view key) {
auto it = queue_map.find(key);
if (it == queue_map.end() || it->second->state != cur_state)
return false; /// nobody watches this key or state does not match.
string_view dbkey = it->first;
return awakened_keys.insert(dbkey).second;
}
// Processes potentially awakened keys and verifies that these are indeed
// awakened to eliminate false positives.
// In addition, optionally removes completed_t from the front of the watch queues.
void BlockingController::RunStep(Transaction* completed_t) {
VLOG(1) << "RunStep [" << owner_->shard_id() << "] " << completed_t;
if (completed_t) {
awakened_transactions_.erase(completed_t);
auto dbit = watched_dbs_.find(completed_t->db_index());
if (dbit != watched_dbs_.end()) {
DbWatchTable& wt = *dbit->second;
ShardId sid = owner_->shard_id();
KeyLockArgs lock_args = completed_t->GetLockArgs(sid);
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
string_view key = lock_args.args[i];
if (wt.AddAwakeEvent(WatchQueue::ACTIVE, key)) {
awakened_indices_.emplace(completed_t->db_index());
}
}
}
}
for (DbIndex index : awakened_indices_) {
auto dbit = watched_dbs_.find(index);
if (dbit == watched_dbs_.end())
continue;
DbWatchTable& wt = *dbit->second;
for (auto key : wt.awakened_keys) {
string_view sv_key = static_cast<string_view>(key);
// Double verify we still got the item.
auto [it, exp_it] = owner_->db_slice().FindExt(index, sv_key);
if (!IsValid(it) || it->second.ObjType() != OBJ_LIST) // Only LIST is allowed to block.
continue;
auto w_it = wt.queue_map.find(sv_key);
CHECK(w_it != wt.queue_map.end());
DVLOG(1) << "NotifyWatchQueue " << key;
WatchQueue* wq = w_it->second.get();
NotifyWatchQueue(wq);
if (wq->items.empty()) {
wt.queue_map.erase(w_it);
}
}
wt.awakened_keys.clear();
if (wt.queue_map.empty()) {
watched_dbs_.erase(dbit);
}
}
awakened_indices_.clear();
}
void BlockingController::AddWatched(Transaction* trans) {
VLOG(1) << "AddWatched [" << owner_->shard_id() << "] " << trans->DebugId();
auto [dbit, added] = watched_dbs_.emplace(trans->db_index(), nullptr);
if (added) {
dbit->second.reset(new DbWatchTable);
}
DbWatchTable& wt = *dbit->second;
auto args = trans->ShardArgsInShard(owner_->shard_id());
for (auto key : args) {
auto [res, inserted] = wt.queue_map.emplace(key, nullptr);
if (inserted) {
res->second.reset(new WatchQueue);
}
res->second->items.emplace_back(trans);
}
}
// Runs in O(N) complexity.
void BlockingController::RemoveWatched(Transaction* trans) {
VLOG(1) << "RemoveWatched [" << owner_->shard_id() << "] " << trans->DebugId();
auto dbit = watched_dbs_.find(trans->db_index());
CHECK(dbit != watched_dbs_.end());
DbWatchTable& wt = *dbit->second;
auto args = trans->ShardArgsInShard(owner_->shard_id());
for (auto key : args) {
auto watch_it = wt.queue_map.find(key);
CHECK(watch_it != wt.queue_map.end());
WatchQueue& wq = *watch_it->second;
bool erased = false;
for (auto items_it = wq.items.begin(); items_it != wq.items.end(); ++items_it) {
if (items_it->trans == trans) {
wq.items.erase(items_it);
erased = true;
break;
}
}
CHECK(erased);
if (wq.items.empty()) {
wt.RemoveEntry(watch_it);
}
}
if (wt.queue_map.empty()) {
watched_dbs_.erase(dbit);
}
awakened_transactions_.erase(trans);
}
// Called from commands like lpush.
void BlockingController::AwakeWatched(DbIndex db_index, string_view db_key) {
auto it = watched_dbs_.find(db_index);
if (it == watched_dbs_.end())
return;
VLOG(1) << "AwakeWatched: db(" << db_index << ") " << db_key;
DbWatchTable& wt = *it->second;
DCHECK(!wt.queue_map.empty());
if (wt.AddAwakeEvent(WatchQueue::SUSPENDED, db_key)) {
awakened_indices_.insert(db_index);
}
}
void BlockingController::RegisterAwaitForConverge(Transaction* t) {
TxId notify_id = t->notify_txid();
DVLOG(1) << "RegisterForConverge " << t->DebugId() << " at notify " << notify_id;
// t->notify_txid might improve in parallel. it does not matter since convergence
// will happen even with stale notify_id.
waiting_convergence_.emplace(notify_id, t);
}
// Internal function called from ProcessAwakened().
// Marks the queue as active and notifies the first transaction in the queue.
void BlockingController::NotifyWatchQueue(WatchQueue* wq) {
VLOG(1) << "Notify WQ: [" << owner_->shard_id() << "]";
wq->state = WatchQueue::ACTIVE;
auto& queue = wq->items;
ShardId sid = owner_->shard_id();
do {
WatchItem& wi = queue.front();
Transaction* head = wi.get();
queue.pop_front();
if (head->NotifySuspended(owner_->committed_txid(), sid)) {
wq->notify_txid = owner_->committed_txid();
awakened_transactions_.insert(head);
break;
}
} while (!queue.empty());
}
void BlockingController::OnTxFinish() {
VLOG(1) << "OnTxFinish [" << owner_->shard_id() << "]";
if (waiting_convergence_.empty())
return;
TxQueue* txq = owner_->txq();
if (txq->Empty()) {
for (const auto& k_v : waiting_convergence_) {
NotifyConvergence(k_v.second);
}
waiting_convergence_.clear();
return;
}
TxId txq_score = txq->HeadScore();
do {
auto tx_waiting = waiting_convergence_.begin();
Transaction* trans = tx_waiting->second;
// Instead of taking the map key, we use upto date notify_txid
// which could meanwhile improve (decrease). Not important though.
TxId notifyid = trans->notify_txid();
if (owner_->committed_txid() < notifyid && txq_score <= notifyid)
break; // we can not converge for notifyid so we can not converge for larger ts as well.
waiting_convergence_.erase(tx_waiting);
NotifyConvergence(trans);
} while (!waiting_convergence_.empty());
}
void BlockingController::NotifyConvergence(Transaction* tx) {
LOG(FATAL) << "TBD";
}
size_t BlockingController::NumWatched(DbIndex db_indx) const {
auto it = watched_dbs_.find(db_indx);
if (it == watched_dbs_.end())
return 0;
return it->second->queue_map.size();
}
} // namespace dfly

View File

@ -0,0 +1,80 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <absl/container/btree_map.h>
#include <absl/container/flat_hash_map.h>
#include <absl/container/flat_hash_set.h>
#include "base/string_view_sso.h"
#include "server/common.h"
namespace dfly {
class Transaction;
class BlockingController {
public:
explicit BlockingController(EngineShard* owner);
~BlockingController();
bool HasAwakedTransaction() const {
return !awakened_transactions_.empty();
}
// Iterates over awakened key candidates in each db and moves verified ones into
// global verified_awakened_ array.
// Returns true if there are active awakened keys, false otherwise.
// It has 2 responsibilities.
// 1: to go over potential wakened keys, verify them and activate watch queues.
// 2: if t is awaked and finished running - to remove it from the head
// of the queue and notify the next one.
// If t is null then second part is omitted.
void RunStep(Transaction* t);
// Blocking API
// TODO: consider moving all watched functions to
// EngineShard with separate per db map.
//! AddWatched adds a transaction to the blocking queue.
void AddWatched(Transaction* me);
void RemoveWatched(Transaction* me);
// Called from operations that create keys like lpush, rename etc.
void AwakeWatched(DbIndex db_index, std::string_view db_key);
void OnTxFinish();
void RegisterAwaitForConverge(Transaction* t);
size_t NumWatched(DbIndex db_indx) const;
private:
struct WatchQueue;
struct DbWatchTable;
using WatchQueueMap = absl::flat_hash_map<std::string, std::unique_ptr<WatchQueue>>;
/// Returns the notified transaction,
/// or null if all transactions in the queue have expired..
void NotifyWatchQueue(WatchQueue* wq);
void NotifyConvergence(Transaction* tx);
EngineShard* owner_;
absl::flat_hash_map<DbIndex, std::unique_ptr<DbWatchTable>> watched_dbs_;
// serves as a temporary queue that aggregates all the possible awakened dbs.
// flushed by RunStep().
absl::flat_hash_set<DbIndex> awakened_indices_;
// tracks currently notified and awaked transactions.
// There can be multiple transactions like this because a transaction
// could awaken arbitrary number of keys.
absl::flat_hash_set<Transaction*> awakened_transactions_;
absl::btree_multimap<TxId, Transaction*> waiting_convergence_;
};
} // namespace dfly

View File

@ -0,0 +1,99 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/blocking_controller.h"
#include <gmock/gmock.h>
#include "base/logging.h"
#include "server/command_registry.h"
#include "server/engine_shard_set.h"
#include "server/transaction.h"
#include "util/uring/uring_pool.h"
namespace dfly {
using namespace util;
using namespace std;
using namespace std::chrono;
using namespace testing;
class BlockingControllerTest : public Test {
protected:
BlockingControllerTest() : cid_("blpop", 0, -3, 1, -2, 1) {
}
void SetUp() override;
void TearDown() override;
std::unique_ptr<ProactorPool> pp_;
std::unique_ptr<EngineShardSet> ess_;
boost::intrusive_ptr<Transaction> trans_;
CommandId cid_;
StringVec str_vec_;
CmdArgVec arg_vec_;
};
constexpr size_t kNumThreads = 3;
void BlockingControllerTest::SetUp() {
pp_.reset(new uring::UringPool(16, kNumThreads));
pp_->Run();
ess_.reset(new EngineShardSet(pp_.get()));
ess_->Init(kNumThreads);
auto cb = [&](uint32_t index, ProactorBase* pb) { ess_->InitThreadLocal(pb, false); };
pp_->AwaitFiberOnAll(cb);
trans_.reset(new Transaction{&cid_, ess_.get()});
str_vec_.assign({"blpop", "x", "z", "0"});
for (auto& s : str_vec_) {
arg_vec_.emplace_back(s);
}
trans_->InitByArgs(0, {arg_vec_.data(), arg_vec_.size()});
CHECK_EQ(0u, Shard("x", ess_->size()));
CHECK_EQ(2u, Shard("z", ess_->size()));
const TestInfo* const test_info = UnitTest::GetInstance()->current_test_info();
LOG(INFO) << "Starting " << test_info->name();
}
void BlockingControllerTest::TearDown() {
ess_->RunBlockingInParallel([](EngineShard*) { EngineShard::DestroyThreadLocal(); });
ess_.reset();
pp_->Stop();
pp_.reset();
}
TEST_F(BlockingControllerTest, Basic) {
ess_->Await(0, [&] {
BlockingController bc(EngineShard::tlocal());
bc.AddWatched(trans_.get());
EXPECT_EQ(1, bc.NumWatched(0));
bc.RemoveWatched(trans_.get());
EXPECT_EQ(0, bc.NumWatched(0));
});
}
TEST_F(BlockingControllerTest, Timeout) {
time_point tp = steady_clock::now() + chrono::milliseconds(10);
trans_->Schedule();
bool res = trans_->WaitOnWatch(tp);
EXPECT_FALSE(res);
unsigned num_watched =
ess_->Await(0, [&] { return EngineShard::tlocal()->blocking_controller()->NumWatched(0); });
EXPECT_EQ(0, num_watched);
trans_.reset();
}
} // namespace dfly

View File

@ -10,6 +10,7 @@ extern "C" {
}
#include "base/logging.h"
#include "server/blocking_controller.h"
#include "server/tiered_storage.h"
#include "server/transaction.h"
#include "util/fiber_sched_algo.h"
@ -33,35 +34,6 @@ vector<EngineShardSet::CachedStats> cached_stats; // initialized in EngineShard
thread_local EngineShard* EngineShard::shard_ = nullptr;
constexpr size_t kQueueLen = 64;
struct WatchItem {
::boost::intrusive_ptr<Transaction> trans;
WatchItem(Transaction* t) : trans(t) {
}
};
struct EngineShard::WatchQueue {
deque<WatchItem> items;
TxId notify_txid = UINT64_MAX;
// Updated by both coordinator and shard threads but at different times.
enum State { SUSPENDED, ACTIVE } state = SUSPENDED;
void Suspend() {
state = SUSPENDED;
notify_txid = UINT64_MAX;
}
};
bool EngineShard::DbWatchTable::RemoveEntry(WatchQueueMap::iterator it) {
DVLOG(1) << "Erasing watchqueue key " << it->first;
awakened_keys.erase(it->first);
queue_map.erase(it);
return queue_map.empty();
}
EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap)
: queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap),
db_slice_(pb->GetIndex(), this) {
@ -125,7 +97,7 @@ void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
shard_->tiered_storage_.reset(new TieredStorage(&shard_->db_slice_));
error_code ec = shard_->tiered_storage_->Open(fn);
CHECK(!ec) << ec.message(); // TODO
CHECK(!ec) << ec.message(); // TODO
}
}
@ -171,12 +143,13 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
DVLOG(1) << "RunContTrans: " << continuation_trans_->DebugId() << " keep: " << to_keep;
if (!to_keep) {
continuation_trans_ = nullptr;
OnTxFinish();
if (blocking_controller_)
blocking_controller_->OnTxFinish();
}
}
}
bool has_awaked_trans = HasAwakedTransaction();
bool has_awaked_trans = blocking_controller_ && blocking_controller_->HasAwakedTransaction();
Transaction* head = nullptr;
string dbg_id;
@ -188,6 +161,8 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
// The fact that Tx is in the queue, already means that coordinator fiber will not progress,
// hence here it's enough to test for run_count and check local_mask.
bool is_armed = head->IsArmedInShard(sid);
DVLOG(2) << "Considering head " << head->DebugId() << " isarmed: " << is_armed;
if (!is_armed)
break;
@ -213,6 +188,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
}
bool keep = head->RunInShard(this);
// We should not access head from this point since RunInShard callback decrements refcount.
DLOG_IF(INFO, !dbg_id.empty()) << "RunHead " << dbg_id << ", keep " << keep;
@ -221,7 +197,8 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
break;
}
OnTxFinish();
if (blocking_controller_)
blocking_controller_->OnTxFinish();
} // while(!txq_.Empty())
} else { // if (continuation_trans_ == nullptr && !has_awaked_trans)
DVLOG(1) << "Skipped TxQueue " << continuation_trans_ << " " << has_awaked_trans;
@ -230,8 +207,10 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
// For SUSPENDED_Q - if transaction has not been notified, it will still be
// in the watch queue. We need to unlock an Execute by running a noop.
if (trans_mask & Transaction::SUSPENDED_Q) {
TxId notify_txid = trans->notify_txid();
DCHECK(HasResultConverged(notify_txid));
// This case happens when some other shard notified the transaction and now it
// runs FindFirst on all shards.
// TxId notify_txid = trans->notify_txid();
// DCHECK(HasResultConverged(notify_txid));
trans->RunNoop(this);
return;
}
@ -239,7 +218,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
// If trans is out of order, i.e. locks keys that previous transactions have not locked.
// It may be that there are other transactions that touch those keys but they necessary ordered
// after trans in the queue, hence it's safe to run trans out of order.
if (trans && trans_mask & Transaction::OUT_OF_ORDER) {
if (trans && (trans_mask & Transaction::OUT_OF_ORDER)) {
DCHECK(trans != head);
DCHECK(!trans->IsMulti()); // multi, global transactions can not be OOO.
DCHECK(trans_mask & Transaction::ARMED);
@ -259,239 +238,15 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
}
}
// Internal function called from ProcessAwakened().
// Marks the queue as active and notifies the first transaction in the queue.
Transaction* EngineShard::NotifyWatchQueue(WatchQueue* wq) {
wq->state = WatchQueue::ACTIVE;
auto& q = wq->items;
ShardId sid = shard_id();
do {
const WatchItem& wi = q.front();
Transaction* head = wi.trans.get();
if (head->NotifySuspended(committed_txid_, sid)) {
wq->notify_txid = committed_txid_;
return head;
}
q.pop_front();
} while (!q.empty());
return nullptr;
}
// Processes potentially awakened keys and verifies that these are indeed
// awakened to eliminate false positives.
// In addition, optionally removes completed_t from the watch queues.
void EngineShard::ProcessAwakened(Transaction* completed_t) {
for (DbIndex index : awakened_indices_) {
DbWatchTable& wt = watched_dbs_[index];
for (auto key : wt.awakened_keys) {
string_view sv_key = static_cast<string_view>(key);
auto [it, exp_it] = db_slice_.FindExt(index, sv_key); // Double verify we still got the item.
if (!IsValid(it) || it->second.ObjType() != OBJ_LIST) // Only LIST is allowed to block.
continue;
auto w_it = wt.queue_map.find(sv_key);
CHECK(w_it != wt.queue_map.end());
DVLOG(1) << "NotifyWatchQueue " << key;
Transaction* t2 = NotifyWatchQueue(w_it->second.get());
if (t2) {
awakened_transactions_.insert(t2);
}
}
wt.awakened_keys.clear();
}
awakened_indices_.clear();
if (!completed_t)
return;
auto dbit = watched_dbs_.find(completed_t->db_index());
if (dbit == watched_dbs_.end())
return;
DbWatchTable& wt = dbit->second;
KeyLockArgs lock_args = completed_t->GetLockArgs(shard_id());
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
string_view key = lock_args.args[i];
auto w_it = wt.queue_map.find(key);
if (w_it == wt.queue_map.end() || w_it->second->state != WatchQueue::ACTIVE)
continue;
WatchQueue& wq = *w_it->second;
DCHECK_LE(wq.notify_txid, committed_txid_);
auto& queue = wq.items;
DCHECK(!queue.empty()); // since it's active
if (queue.front().trans != completed_t)
continue;
DVLOG(1) << "Wakening next transaction for key " << key;
do {
const WatchItem& bi = queue.front();
Transaction* head = bi.trans.get();
if (head->NotifySuspended(wq.notify_txid, shard_id()))
break;
queue.pop_front();
} while (!queue.empty());
if (queue.empty()) {
wt.RemoveEntry(w_it);
}
}
if (wt.queue_map.empty()) {
watched_dbs_.erase(dbit);
}
awakened_transactions_.erase(completed_t);
}
void EngineShard::AddWatched(string_view key, Transaction* me) {
DbWatchTable& wt = watched_dbs_[me->db_index()];
auto [res, inserted] = wt.queue_map.emplace(key, nullptr);
if (inserted) {
res->second.reset(new WatchQueue);
}
res->second->items.emplace_back(me);
}
// Runs in O(N) complexity.
bool EngineShard::RemovedWatched(string_view key, Transaction* me) {
auto dbit = watched_dbs_.find(me->db_index());
CHECK(dbit != watched_dbs_.end());
DbWatchTable& wt = dbit->second;
auto watch_it = wt.queue_map.find(key);
CHECK(watch_it != wt.queue_map.end());
WatchQueue& wq = *watch_it->second;
for (auto j = wq.items.begin(); j != wq.items.end(); ++j) {
if (j->trans == me) {
wq.items.erase(j);
if (wq.items.empty()) {
if (wt.RemoveEntry(watch_it)) {
watched_dbs_.erase(dbit);
}
}
return true;
}
}
LOG(FATAL) << "should not happen";
return false;
}
void EngineShard::GCWatched(const KeyLockArgs& largs) {
auto dbit = watched_dbs_.find(largs.db_index);
CHECK(dbit != watched_dbs_.end());
DbWatchTable& wt = dbit->second;
for (size_t i = 0; i < largs.args.size(); i += largs.key_step) {
string_view key = largs.args[i];
auto watch_it = wt.queue_map.find(key);
CHECK(watch_it != wt.queue_map.end());
WatchQueue& wq = *watch_it->second;
DCHECK(!wq.items.empty());
do {
auto local_mask = wq.items.front().trans->GetLocalMask(shard_id());
if ((local_mask & Transaction::EXPIRED_Q) == 0) {
break;
}
wq.items.pop_front();
} while (!wq.items.empty());
if (wq.items.empty()) {
if (wt.RemoveEntry(watch_it)) {
watched_dbs_.erase(dbit);
return;
}
}
}
}
// Called from commands like lpush.
void EngineShard::AwakeWatched(DbIndex db_index, string_view db_key) {
auto it = watched_dbs_.find(db_index);
if (it == watched_dbs_.end())
return;
DbWatchTable& wt = it->second;
DCHECK(!wt.queue_map.empty());
auto wit = wt.queue_map.find(db_key);
if (wit == wt.queue_map.end())
return; /// Similarly, nobody watches this key.
string_view key = wit->first;
// Already awakened this key.
if (wt.awakened_keys.find(key) != wt.awakened_keys.end())
return;
wt.awakened_keys.insert(wit->first);
awakened_indices_.insert(db_index);
}
void EngineShard::ShutdownMulti(Transaction* multi) {
if (continuation_trans_ == multi) {
continuation_trans_ = nullptr;
}
OnTxFinish();
}
void EngineShard::WaitForConvergence(TxId notifyid, Transaction* t) {
DVLOG(1) << "ConvergeNotification " << t->DebugId() << " at notify " << notifyid;
waiting_convergence_.emplace(notifyid, t);
}
void EngineShard::OnTxFinish() {
DCHECK(continuation_trans_ == nullptr); // By definition of OnTxFinish.
if (waiting_convergence_.empty())
return;
if (txq_.Empty()) {
for (const auto& k_v : waiting_convergence_) {
NotifyConvergence(k_v.second);
}
waiting_convergence_.clear();
return;
}
TxId txq_score = txq_.HeadScore();
do {
auto tx_waiting = waiting_convergence_.begin();
// Instead of taking the map key, we use upto date notify_txid
// That could meanwhile improve. Not important though.
TxId notifyid = tx_waiting->second->notify_txid();
if (notifyid > committed_txid_ && txq_score <= tx_waiting->first)
break;
auto nh = waiting_convergence_.extract(tx_waiting);
NotifyConvergence(nh.mapped());
} while (!waiting_convergence_.empty());
}
void EngineShard::NotifyConvergence(Transaction* tx) {
LOG(FATAL) << "TBD";
if (blocking_controller_)
blocking_controller_->OnTxFinish();
}
#if 0
// There are several cases that contain proof of convergence for this shard:
// 1. txq_ empty - it means that anything that is goonna be scheduled will already be scheduled
// with txid > notifyid.
@ -499,15 +254,35 @@ void EngineShard::NotifyConvergence(Transaction* tx) {
// notifyid.
// 3. committed_txid_ == notifyid, then if a transaction in progress (continuation_trans_ != NULL)
// the this transaction can still affect the result, hence we require continuation_trans_ is null
// which will point to converged result @notifyid.
// 4. Finally with committed_txid_ < notifyid and continuation_trans_ == nullptr,
// which will point to converged result @notifyid. However, we never awake a transaction
// when there is a multi-hop transaction in progress to avoid false positives.
// Therefore, continuation_trans_ must always be null when calling this function.
// 4. Finally with committed_txid_ < notifyid.
// we can check if the next in line (HeadScore) is after notifyid in that case we can also
// conclude regarding the result convergence for this shard.
//
bool EngineShard::HasResultConverged(TxId notifyid) const {
return txq_.Empty() || committed_txid_ > notifyid ||
(continuation_trans_ == nullptr &&
(committed_txid_ == notifyid || txq_.HeadScore() > notifyid));
CHECK(continuation_trans_ == nullptr);
if (committed_txid_ >= notifyid)
return true;
// This could happen if a single lpush (not in transaction) woke multi-shard blpop.
DVLOG(1) << "HasResultConverged: cmtxid - " << committed_txid_ << " vs " << notifyid;
// We must check for txq head - it's not an optimization - we need it for correctness.
// If a multi-transaction has been scheduled and it does not have any presence in
// this shard (no actual keys) and we won't check for it HasResultConverged will
// return false. The blocked transaction will wait for this shard to progress and
// will also block other shards from progressing (where it has been notified).
// If this multi-transaction has presence in those shards, it won't progress there as well.
// Therefore, we will get a deadlock. By checking txid of the head we will avoid this situation:
// if the head.txid is after notifyid then this shard obviously converged.
// if the head.txid <= notifyid that transaction will be able to progress in other shards.
// and we must wait for it to finish.
return txq_.Empty() || txq_.HeadScore() > notifyid;
}
#endif
void EngineShard::CacheStats() {
#if 0
@ -542,6 +317,13 @@ size_t EngineShard::UsedMemory() const {
return mi_resource_.used() + zmalloc_used_memory_tl + SmallString::UsedThreadLocal();
}
void EngineShard::AddBlocked(Transaction* trans) {
if (!blocking_controller_) {
blocking_controller_.reset(new BlockingController(this));
}
blocking_controller_->AddWatched(trans);
}
/**

View File

@ -25,6 +25,7 @@ extern "C" {
namespace dfly {
class TieredStorage;
class BlockingController;
class EngineShard {
public:
@ -89,34 +90,9 @@ class EngineShard {
return &shard_lock_;
}
// Iterates over awakened key candidates in each db and moves verified ones into
// global verified_awakened_ array.
// Returns true if there are active awakened keys, false otherwise.
// It has 2 responsibilities.
// 1: to go over potential wakened keys, verify them and activate watch queues.
// 2: if t is awaked and finished running - to remove it from the head
// of the queue and notify the next one.
// If t is null then second part is omitted.
void ProcessAwakened(Transaction* t);
// Blocking API
// TODO: consider moving all watched functions to
// EngineShard with separate per db map.
//! AddWatched adds a transaction to the blocking queue.
void AddWatched(std::string_view key, Transaction* me);
bool RemovedWatched(std::string_view key, Transaction* me);
void GCWatched(const KeyLockArgs& lock_args);
void AwakeWatched(DbIndex db_index, std::string_view db_key);
bool HasAwakedTransaction() const {
return !awakened_transactions_.empty();
}
// TODO: Awkward interface. I should solve it somehow.
void ShutdownMulti(Transaction* multi);
void WaitForConvergence(TxId notifyid, Transaction* t);
bool HasResultConverged(TxId notifyid) const;
void IncQuickRun() {
stats_.quick_runs++;
@ -131,44 +107,41 @@ class EngineShard {
TieredStorage* tiered_storage() { return tiered_storage_.get(); }
// Adds blocked transaction to the watch-list.
void AddBlocked(Transaction* trans);
BlockingController* blocking_controller() {
return blocking_controller_.get();
}
// for everyone to use for string transformations during atomic cpu sequences.
sds tmp_str1, tmp_str2;
#if 0
size_t TEST_WatchedDbsLen() const {
return watched_dbs_.size();
}
size_t TEST_AwakenIndicesLen() const {
return awakened_indices_.size();
}
size_t TEST_AwakenTransLen() const {
return awakened_transactions_.size();
}
#endif
bool HasResultConverged(TxId notifyid) const;
private:
EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap);
// blocks the calling fiber.
void Shutdown(); // called before destructing EngineShard.
struct WatchQueue;
void OnTxFinish();
void NotifyConvergence(Transaction* tx);
void CacheStats();
/// Returns the notified transaction,
/// or null if all transactions in the queue have expired..
Transaction* NotifyWatchQueue(WatchQueue* wq);
using WatchQueueMap = absl::flat_hash_map<std::string, std::unique_ptr<WatchQueue>>;
// Watch state per db.
struct DbWatchTable {
WatchQueueMap queue_map;
// awakened keys point to blocked keys that can potentially be unblocked.
// they reference key objects in queue_map.
absl::flat_hash_set<base::string_view_sso> awakened_keys;
// Returns true if queue_map is empty and DbWatchTable can be removed as well.
bool RemoveEntry(WatchQueueMap::iterator it);
};
absl::flat_hash_map<DbIndex, DbWatchTable> watched_dbs_;
absl::flat_hash_set<DbIndex> awakened_indices_;
absl::flat_hash_set<Transaction*> awakened_transactions_;
absl::btree_multimap<TxId, Transaction*> waiting_convergence_;
::util::fibers_ext::FiberQueue queue_;
::boost::fibers::fiber fiber_q_;
@ -188,6 +161,7 @@ class EngineShard {
uint32_t periodic_task_ = 0;
uint64_t task_iters_ = 0;
std::unique_ptr<TieredStorage> tiered_storage_;
std::unique_ptr<BlockingController> blocking_controller_;
static thread_local EngineShard* shard_;
};

View File

@ -13,6 +13,7 @@ extern "C" {
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/engine_shard_set.h"
#include "server/blocking_controller.h"
#include "server/error.h"
#include "server/transaction.h"
#include "util/varz.h"
@ -155,8 +156,8 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) {
dest_it = db_slice.AddNew(db_indx_, dest_key, std::move(pv_), src_res_.expire_ts);
}
if (!is_prior_list && dest_it->second.ObjType() == OBJ_LIST) {
es->AwakeWatched(db_indx_, dest_key);
if (!is_prior_list && dest_it->second.ObjType() == OBJ_LIST && es->blocking_controller()) {
es->blocking_controller()->AwakeWatched(db_indx_, dest_key);
}
}
@ -580,7 +581,8 @@ OpResult<uint32_t> GenericFamily::OpExists(const OpArgs& op_args, ArgSlice keys)
OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from_key, string_view to_key,
bool skip_exists) {
auto& db_slice = op_args.shard->db_slice();
auto* es = op_args.shard;
auto& db_slice = es->db_slice();
auto [from_it, from_expire] = db_slice.FindExt(op_args.db_ind, from_key);
if (!IsValid(from_it))
return OpStatus::KEY_NOTFOUND;
@ -619,8 +621,8 @@ OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from_key,
to_it = db_slice.AddNew(op_args.db_ind, to_key, std::move(from_obj), exp_ts);
}
if (!is_prior_list && to_it->second.ObjType() == OBJ_LIST) {
op_args.shard->AwakeWatched(op_args.db_ind, to_key);
if (!is_prior_list && to_it->second.ObjType() == OBJ_LIST && es->blocking_controller()) {
es->blocking_controller()->AwakeWatched(op_args.db_ind, to_key);
}
return OpStatus::OK;
}

View File

@ -11,6 +11,8 @@ extern "C" {
#include <absl/strings/numbers.h>
#include "base/logging.h"
#include "server/blocking_controller.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/engine_shard_set.h"
@ -146,7 +148,6 @@ BPopper::BPopper(ListDir dir) : dir_(dir) {
}
OpStatus BPopper::Run(Transaction* t, unsigned msec) {
OpResult<Transaction::FindFirstResult> result;
using time_point = Transaction::time_point;
time_point tp =
@ -158,19 +159,11 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) {
auto* stats = ServerState::tl_connection_stats();
while (true) {
result = t->FindFirst();
if (result)
break;
if (result.status() != OpStatus::KEY_NOTFOUND) { // Some error occurred.
// We could be registered in the queue due to previous iterations.
t->UnregisterWatch();
break;
}
OpResult<Transaction::FindFirstResult> result = t->FindFirst();
if (result.status() == OpStatus::KEY_NOTFOUND) {
if (is_multi) {
// close transaction and return.
auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
t->Execute(std::move(cb), true);
@ -183,10 +176,14 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) {
if (!wait_succeeded)
return OpStatus::TIMED_OUT;
result = t->FindFirst(); // retry - must find something.
}
if (!result)
if (!result) {
t->UnregisterWatch();
return result.status();
}
VLOG(1) << "Popping an element";
find_sid_ = result->sid;
@ -546,11 +543,10 @@ OpResult<uint32_t> ListFamily::OpPush(const OpArgs& op_args, std::string_view ke
quicklistPush(ql, es->tmp_str1, sdslen(es->tmp_str1), pos);
}
if (new_key) {
// TODO: to use PrimeKey for watched table.
if (new_key && es->blocking_controller()) {
string tmp;
string_view key = it->first.GetSlice(&tmp);
es->AwakeWatched(op_args.db_ind, key);
es->blocking_controller()->AwakeWatched(op_args.db_ind, key);
} else {
es->db_slice().PostUpdate(op_args.db_ind, it);
}

View File

@ -137,7 +137,7 @@ TEST_F(ListFamilyTest, BLPopMultiple) {
EXPECT_THAT(resp0.GetVec(), ElementsAre(kKey1, "3"));
ASSERT_FALSE(IsLocked(0, kKey1));
ASSERT_FALSE(IsLocked(0, kKey2));
ess_->RunBriefInParallel([](EngineShard* es) { ASSERT_FALSE(es->HasAwakedTransaction()); });
// ess_->RunBriefInParallel([](EngineShard* es) { ASSERT_FALSE(es->HasAwakedTransaction()); });
}
TEST_F(ListFamilyTest, BLPopTimeout) {
@ -159,8 +159,10 @@ TEST_F(ListFamilyTest, BLPopTimeout) {
TEST_F(ListFamilyTest, BLPopTimeout2) {
Run({"BLPOP", "blist1", "blist2", "0.1"});
Run({"RPUSH", "blist2", "d"});
Run({"RPUSH", "blist2", "hello"});
auto resp = Run({"BLPOP", "blist1", "blist2", "1"});
ASSERT_THAT(resp, ArrLen(2));
ASSERT_THAT(resp.GetVec(), ElementsAre("blist2", "d"));
@ -168,48 +170,48 @@ TEST_F(ListFamilyTest, BLPopTimeout2) {
Run({"RPUSH", "blist1", "a"});
Run({"DEL", "blist2"});
Run({"RPUSH", "blist2", "d"});
// Run({"BLPOP", "blist1", "blist2", "1"});
Run({"BLPOP", "blist1", "blist2", "1"});
}
TEST_F(ListFamilyTest, LRem) {
auto resp = Run({"rpush", kKey1, "a", "b", "a", "c"});
ASSERT_THAT(resp, IntArg(4));
resp = Run({"lrem", kKey1, "2", "a"});
ASSERT_THAT(resp, IntArg(2));
TEST_F(ListFamilyTest, BLPopMultiPush) {
Run({"exists", kKey1, kKey2, kKey3});
ASSERT_EQ(3, GetDebugInfo().shards_count);
RespExpr blpop_resp;
auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"});
});
resp = Run({"lrange", kKey1, "0", "1"});
ASSERT_THAT(resp, ArrLen(2));
ASSERT_THAT(resp.GetVec(), ElementsAre("b", "c"));
}
do {
this_fiber::sleep_for(30us);
} while (!IsLocked(0, kKey1));
TEST_F(ListFamilyTest, LTrim) {
Run({"rpush", kKey1, "a", "b", "c", "d"});
ASSERT_EQ(Run({"ltrim", kKey1, "-2", "-1"}), "OK");
auto resp = Run({"lrange", kKey1, "0", "1"});
ASSERT_THAT(resp, ArrLen(2));
ASSERT_THAT(resp.GetVec(), ElementsAre("c", "d"));
ASSERT_EQ(Run({"ltrim", kKey1, "0", "0"}), "OK");
ASSERT_EQ(Run({"lrange", kKey1, "0", "1"}), "c");
}
auto p1_fb = pp_->at(1)->LaunchFiber([&] {
for (unsigned i = 0; i < 100; ++i) {
// a filler command to create scheduling queue.
Run({"exists", kKey1, kKey2, kKey3});
}
});
TEST_F(ListFamilyTest, LRange) {
auto resp = Run({"lrange", kKey1, "0", "5"});
ASSERT_THAT(resp, ArrLen(0));
Run({"rpush", kKey1, "0", "1", "2"});
resp = Run({"lrange", kKey1, "-2", "-1"});
auto p2_fb = pp_->at(2)->LaunchFiber([&] {
Run({"multi"});
Run({"lpush", kKey3, "C"});
Run({"exists", kKey2});
Run({"lpush", kKey2, "B"});
Run({"exists", kKey1});
Run({"lpush", kKey1, "A"});
Run({"exists", kKey1, kKey2, kKey3});
auto resp = Run({"exec"});
ASSERT_THAT(resp, ArrLen(6));
});
ASSERT_THAT(resp, ArrLen(2));
ASSERT_THAT(resp.GetVec(), ElementsAre("1", "2"));
}
p1_fb.join();
p2_fb.join();
TEST_F(ListFamilyTest, Lset) {
Run({"rpush", kKey1, "0", "1", "2"});
ASSERT_EQ(Run({"lset", kKey1, "0", "bar"}), "OK");
ASSERT_EQ(Run({"lpop", kKey1}), "bar");
ASSERT_EQ(Run({"lset", kKey1, "-1", "foo"}), "OK");
ASSERT_EQ(Run({"rpop", kKey1}), "foo");
Run({"rpush", kKey2, "a"});
ASSERT_THAT(Run({"lset", kKey2, "1", "foo"}), ErrArg("index out of range"));
pop_fb.join();
ASSERT_THAT(blpop_resp, ArrLen(2));
auto resp_arr = blpop_resp.GetVec();
EXPECT_THAT(resp_arr, ElementsAre(kKey1, "A"));
}
TEST_F(ListFamilyTest, BLPopSerialize) {
@ -228,16 +230,22 @@ TEST_F(ListFamilyTest, BLPopSerialize) {
TxClock cl1, cl2;
auto p1_fb = pp_->at(1)->LaunchFiber([&] {
auto resp = Run({"multi"}); // We use multi to assign ts to lpush.
ASSERT_EQ(resp, "OK");
// auto resp = Run({"multi"}); // We use multi to assign ts to lpush.
// ASSERT_EQ(resp, "OK");
Run({"lpush", kKey1, "A"});
/*for (unsigned i = 0; i < 10; ++i) {
// dummy command to prolong this transaction and make convergence more complicated.
Run({"exists", kKey1, kKey2, kKey3});
}
resp = Run({"exec"});
// Either this lpush has run first or the one below.
// In any case it must be that between 2 invocations of lpush (wrapped in multi)
// blpop will be triggerred and it will empty the list again. Hence, in any case
// lpush kKey1 here and below should return 1.
ASSERT_THAT(resp, IntArg(1));
ASSERT_THAT(resp, ArrLen(11));*/
cl1 = GetDebugInfo("IO1").clock;
LOG(INFO) << "push1 ts: " << cl1;
});
@ -245,12 +253,20 @@ TEST_F(ListFamilyTest, BLPopSerialize) {
auto p2_fb = pp_->at(2)->LaunchFiber([&] {
auto resp = Run({"multi"}); // We use multi to assign ts to lpush.
ASSERT_EQ(resp, "OK");
for (unsigned i = 0; i < 10; ++i) {
// dummy command to prolong this transaction and make convergence more complicated.
Run({"exists", kKey1, kKey2, kKey3});
}
Run({"lpush", kKey1, "B"});
Run({"lpush", kKey2, "C"});
resp = Run({"exec"});
ASSERT_THAT(resp, ArrLen(2));
EXPECT_THAT(resp.GetVec(), ElementsAre(IntArg(1), IntArg(1)));
ASSERT_THAT(resp, ArrLen(12));
/*auto sub_arr = resp.GetVec();
EXPECT_THAT(sub_arr[0], IntArg(1));
EXPECT_THAT(sub_arr[1], IntArg(1));*/
cl2 = GetDebugInfo("IO2").clock;
LOG(INFO) << "push2 ts: " << cl2;
});
@ -355,4 +371,45 @@ TEST_F(ListFamilyTest, BPopRename) {
EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey1, "bar"));
}
TEST_F(ListFamilyTest, LRem) {
auto resp = Run({"rpush", kKey1, "a", "b", "a", "c"});
ASSERT_THAT(resp, IntArg(4));
resp = Run({"lrem", kKey1, "2", "a"});
ASSERT_THAT(resp, IntArg(2));
resp = Run({"lrange", kKey1, "0", "1"});
ASSERT_THAT(resp, ArrLen(2));
ASSERT_THAT(resp.GetVec(), ElementsAre("b", "c"));
}
TEST_F(ListFamilyTest, LTrim) {
Run({"rpush", kKey1, "a", "b", "c", "d"});
ASSERT_EQ(Run({"ltrim", kKey1, "-2", "-1"}), "OK");
auto resp = Run({"lrange", kKey1, "0", "1"});
ASSERT_THAT(resp, ArrLen(2));
ASSERT_THAT(resp.GetVec(), ElementsAre("c", "d"));
ASSERT_EQ(Run({"ltrim", kKey1, "0", "0"}), "OK");
ASSERT_EQ(Run({"lrange", kKey1, "0", "1"}), "c");
}
TEST_F(ListFamilyTest, LRange) {
auto resp = Run({"lrange", kKey1, "0", "5"});
ASSERT_THAT(resp, ArrLen(0));
Run({"rpush", kKey1, "0", "1", "2"});
resp = Run({"lrange", kKey1, "-2", "-1"});
ASSERT_THAT(resp, ArrLen(2));
ASSERT_THAT(resp.GetVec(), ElementsAre("1", "2"));
}
TEST_F(ListFamilyTest, Lset) {
Run({"rpush", kKey1, "0", "1", "2"});
ASSERT_EQ(Run({"lset", kKey1, "0", "bar"}), "OK");
ASSERT_EQ(Run({"lpop", kKey1}), "bar");
ASSERT_EQ(Run({"lset", kKey1, "-1", "foo"}), "OK");
ASSERT_EQ(Run({"rpop", kKey1}), "foo");
Run({"rpush", kKey2, "a"});
ASSERT_THAT(Run({"lset", kKey2, "1", "foo"}), ErrArg("index out of range"));
}
} // namespace dfly

View File

@ -7,6 +7,7 @@
#include <absl/strings/match.h>
#include "base/logging.h"
#include "server/blocking_controller.h"
#include "server/command_registry.h"
#include "server/db_slice.h"
#include "server/engine_shard_set.h"
@ -410,7 +411,8 @@ bool Transaction::RunInShard(EngineShard* shard) {
// 1: to go over potential wakened keys, verify them and activate watch queues.
// 2: if this transaction was notified and finished running - to remove it from the head
// of the queue and notify the next one.
shard->ProcessAwakened(awaked_prerun ? this : nullptr);
if (shard->blocking_controller())
shard->blocking_controller()->RunStep(awaked_prerun ? this : nullptr);
}
}
@ -444,7 +446,7 @@ void Transaction::RunNoop(EngineShard* shard) {
if (sd.local_mask & SUSPENDED_Q) {
sd.local_mask |= EXPIRED_Q;
shard->GCWatched(largs);
shard->blocking_controller()->RemoveWatched(this);
}
}
// Decrease run count after we update all the data in the transaction object.
@ -517,7 +519,7 @@ void Transaction::ScheduleInternal() {
DVLOG(1) << "Cancelling " << DebugId();
auto cancel = [&](EngineShard* shard) {
success.fetch_sub(CancelInShard(shard), memory_order_relaxed);
success.fetch_sub(CancelShardCb(shard), memory_order_relaxed);
};
ess_->RunBriefInParallel(std::move(cancel), is_active);
@ -645,7 +647,8 @@ void Transaction::UnlockMulti() {
shard->ShutdownMulti(this);
// notify awakened transactions.
shard->ProcessAwakened(nullptr);
if (shard->blocking_controller())
shard->blocking_controller()->RunStep(nullptr);
shard->PollExecution("unlockmulti", nullptr);
this->DecreaseRunCnt();
@ -665,6 +668,8 @@ void Transaction::UnlockMulti() {
// Runs in coordinator thread.
void Transaction::Execute(RunnableType cb, bool conclude) {
DCHECK(coordinator_state_ & COORD_SCHED);
cb_ = std::move(cb);
coordinator_state_ |= COORD_EXEC;
@ -730,7 +735,7 @@ void Transaction::ExecuteAsync() {
uint32_t seq_after = seqlock_.fetch_add(0, memory_order_release);
bool should_poll = (seq_after == seq) && (local_mask & ARMED);
DVLOG(2) << "EngineShard::Exec " << DebugId() << " sid:" << shard->shard_id() << " "
DVLOG(2) << "PollExecCb " << DebugId() << " sid(" << shard->shard_id() << ") "
<< run_count_.load(memory_order_relaxed) << ", should_poll: " << should_poll;
// We verify that this callback is still relevant.
@ -790,42 +795,25 @@ void Transaction::RunQuickie(EngineShard* shard) {
}
// runs in coordinator thread.
// Marks the transaction as expired but does not remove it from the waiting queue.
// Marks the transaction as expired and removes it from the waiting queue.
void Transaction::ExpireBlocking() {
DVLOG(1) << "ExpireBlocking " << DebugId();
DCHECK(!IsGlobal());
run_count_.store(unique_shard_cnt_, memory_order_release);
auto expire_cb = [this] {
EngineShard* shard = EngineShard::tlocal();
auto lock_args = GetLockArgs(shard->shard_id());
shard->db_slice().Release(Mode(), lock_args);
unsigned sd_idx = SidToId(shard->shard_id());
auto& sd = shard_data_[sd_idx];
sd.local_mask |= EXPIRED_Q;
sd.local_mask &= ~KEYLOCK_ACQUIRED;
// Need to see why I decided to call this.
// My guess - probably to trigger the run of stalled transactions in case
// this shard concurrently awoke this transaction and stalled the processing
// of TxQueue.
shard->PollExecution("expirecb", nullptr);
CHECK_GE(DecreaseRunCnt(), 1u);
};
auto expire_cb = [this] { ExpireShardCb(EngineShard::tlocal()); };
if (unique_shard_cnt_ == 1) {
DCHECK_LT(unique_shard_id_, ess_->size());
ess_->Add(unique_shard_id_, std::move(expire_cb));
ess_->Add(unique_shard_id_, move(expire_cb));
} else {
for (ShardId i = 0; i < shard_data_.size(); ++i) {
auto& sd = shard_data_[i];
DCHECK_EQ(0, sd.local_mask & ARMED);
if (sd.arg_count == 0)
continue;
ess_->Add(i, expire_cb);
}
}
@ -950,7 +938,7 @@ pair<bool, bool> Transaction::ScheduleInShard(EngineShard* shard) {
return result;
}
bool Transaction::CancelInShard(EngineShard* shard) {
bool Transaction::CancelShardCb(EngineShard* shard) {
ShardId idx = SidToId(shard->shard_id());
auto& sd = shard_data_[idx];
@ -1001,17 +989,16 @@ bool Transaction::WaitOnWatch(const time_point& tp) {
VLOG(2) << "WaitOnWatch Start use_count(" << use_count() << ")";
using namespace chrono;
// wake_txid_.store(kuint64max, std::memory_order_relaxed);
Execute([](auto* t, auto* shard) { return t->AddToWatchedShardCb(shard); }, true);
Execute([](Transaction* t, EngineShard* shard) { return t->AddToWatchedShardCb(shard); }, true);
coordinator_state_ |= COORD_BLOCKED;
bool res = true; // returns false if timeout occurs.
auto wake_cb = [this] {
return (coordinator_state_ & COORD_CANCELLED) ||
notify_txid_.load(memory_order_relaxed) != kuint64max;
};
cv_status status = cv_status::no_timeout;
cv_status status = cv_status::no_timeout;
if (tp == time_point::max()) {
DVLOG(1) << "WaitOnWatch foreva " << DebugId();
blocking_ec_.await(move(wake_cb));
@ -1031,20 +1018,14 @@ bool Transaction::WaitOnWatch(const time_point& tp) {
return false;
}
#if 0
// We were notified by a shard, so lets make sure that our notifications converged to a stable
// form.
if (unique_shard_cnt_ > 1) {
run_count_.store(unique_shard_cnt_, memory_order_release);
auto converge_cb = [this] {
EngineShard* shard = EngineShard::tlocal();
auto& sd = shard_data_[shard->shard_id()];
TxId notify = notify_txid();
if ((sd.local_mask & AWAKED_Q) || shard->HasResultConverged(notify)) {
CHECK_GE(DecreaseRunCnt(), 1u);
return;
}
shard->WaitForConvergence(notify, this);
auto converge_cb = [this] {
this->CheckForConvergence(EngineShard::tlocal());
};
for (ShardId i = 0; i < shard_data_.size(); ++i) {
@ -1059,11 +1040,12 @@ bool Transaction::WaitOnWatch(const time_point& tp) {
WaitForShardCallbacks();
DVLOG(1) << "Convergence finished " << DebugId();
}
#endif
// Lift blocking mask.
coordinator_state_ &= ~COORD_BLOCKED;
return res;
return true;
}
void Transaction::UnregisterWatch() {
@ -1082,10 +1064,7 @@ OpStatus Transaction::AddToWatchedShardCb(EngineShard* shard) {
CHECK_EQ(0, sd.local_mask & SUSPENDED_Q);
DCHECK_EQ(0, sd.local_mask & ARMED);
auto args = ShardArgsInShard(shard->shard_id());
for (auto s : args) {
shard->AddWatched(s, this);
}
shard->AddBlocked(this);
sd.local_mask |= SUSPENDED_Q;
DVLOG(1) << "AddWatched " << DebugId() << " local_mask:" << sd.local_mask;
@ -1106,14 +1085,53 @@ bool Transaction::RemoveFromWatchedShardCb(EngineShard* shard) {
sd.local_mask &= ~kQueueMask;
// TODO: what if args have keys and values?
auto args = ShardArgsInShard(shard->shard_id());
for (auto s : args) {
shard->RemovedWatched(s, this);
}
shard->blocking_controller()->RemoveWatched(this);
return true;
}
void Transaction::ExpireShardCb(EngineShard* shard) {
auto lock_args = GetLockArgs(shard->shard_id());
shard->db_slice().Release(Mode(), lock_args);
unsigned sd_idx = SidToId(shard->shard_id());
auto& sd = shard_data_[sd_idx];
sd.local_mask |= EXPIRED_Q;
sd.local_mask &= ~KEYLOCK_ACQUIRED;
shard->blocking_controller()->RemoveWatched(this);
// Need to see why I decided to call this.
// My guess - probably to trigger the run of stalled transactions in case
// this shard concurrently awoke this transaction and stalled the processing
// of TxQueue.
shard->PollExecution("expirecb", nullptr);
CHECK_GE(DecreaseRunCnt(), 1u);
}
#if 0
// HasResultConverged has detailed documentation on how convergence is determined.
void Transaction::CheckForConvergence(EngineShard* shard) {
unsigned idx = SidToId(shard->shard_id());
auto& sd = shard_data_[idx];
TxId notify = notify_txid();
if ((sd.local_mask & AWAKED_Q) || shard->HasResultConverged(notify)) {
CHECK_GE(DecreaseRunCnt(), 1u);
return;
}
LOG(DFATAL) << "TBD";
BlockingController* bc = shard->blocking_controller();
CHECK(bc); // must be present because we have watched this shard before.
bc->RegisterAwaitForConverge(this);
}
#endif
inline uint32_t Transaction::DecreaseRunCnt() {
// to protect against cases where Transaction is destroyed before run_ec_.notify
// finishes running. We can not put it inside the (res == 1) block because then it's too late.
@ -1132,7 +1150,7 @@ bool Transaction::IsGlobal() const {
}
// Runs only in the shard thread.
// Returns true if the transcton has changed its state from suspended to awakened,
// Returns true if the transacton has changed its state from suspended to awakened,
// false, otherwise.
bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) {
unsigned idx = SidToId(sid);

View File

@ -220,11 +220,13 @@ class Transaction {
std::pair<bool, bool> ScheduleInShard(EngineShard* shard);
// Returns true if operation was cancelled for this shard. Runs in the shard thread.
bool CancelInShard(EngineShard* shard);
bool CancelShardCb(EngineShard* shard);
// Shard callbacks used within Execute calls
OpStatus AddToWatchedShardCb(EngineShard* shard);
bool RemoveFromWatchedShardCb(EngineShard* shard);
void ExpireShardCb(EngineShard* shard);
void CheckForConvergence(EngineShard* shard);
void WaitForShardCallbacks() {
run_ec_.await([this] { return 0 == run_count_.load(std::memory_order_relaxed); });
@ -251,7 +253,7 @@ class Transaction {
// Bitmask of LocalState enums.
uint16_t local_mask{0};
// Needed to rollback invalid schedulings or remove OOO transactions from
// Needed to rollback inconsistent schedulings or remove OOO transactions from
// tx queue.
uint32_t pq_pos = TxQueue::kEnd;