From 35fa69c92837812fb93544748d6ac20893092b40 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 6 Jan 2022 15:48:51 +0200 Subject: [PATCH] Add EXEC transaction support. Introduce dragonfly_test.cc --- server/CMakeLists.txt | 1 + server/command_registry.cc | 11 +- server/command_registry.h | 3 + server/conn_context.h | 14 ++- server/dragonfly_test.cc | 71 +++++++++++++ server/engine_shard_set.cc | 15 +-- server/engine_shard_set.h | 9 +- server/main_service.cc | 104 +++++++++++++++++- server/main_service.h | 7 ++ server/reply_builder.h | 4 + server/transaction.cc | 211 ++++++++++++++++++++++++++++++------- server/transaction.h | 23 +++- 12 files changed, 414 insertions(+), 59 deletions(-) create mode 100644 server/dragonfly_test.cc diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index f3a7e22..947ffe4 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -13,6 +13,7 @@ cxx_link(dragonfly_lib dfly_core redis_lib uring_fiber_lib add_library(dfly_test_lib test_utils.cc) cxx_link(dfly_test_lib dragonfly_lib gtest_main_ext) +cxx_test(dragonfly_test dfly_test_lib LABELS DFLY) cxx_test(redis_parser_test dfly_test_lib LABELS DFLY) cxx_test(list_family_test dfly_test_lib LABELS DFLY) cxx_test(string_family_test dfly_test_lib LABELS DFLY) diff --git a/server/command_registry.cc b/server/command_registry.cc index fabb9fa..72a8d41 100644 --- a/server/command_registry.cc +++ b/server/command_registry.cc @@ -20,6 +20,9 @@ CommandId::CommandId(const char* name, uint32_t mask, int8_t arity, int8_t first int8_t last_key, int8_t step) : name_(name), opt_mask_(mask), arity_(arity), first_key_(first_key), last_key_(last_key), step_key_(step) { + if (mask & CO::ADMIN) { + opt_mask_ |= CO::NOSCRIPT; + } } uint32_t CommandId::OptCount(uint32_t mask) { @@ -87,8 +90,14 @@ const char* OptName(CO::CommandOpt fl) { return "loading"; case RANDOM: return "random"; + case ADMIN: + return "admin"; + case NOSCRIPT: + return "noscript"; + case GLOBAL_TRANS: + return "global-trans"; } - return ""; + return "unknown"; } } // namespace CO diff --git a/server/command_registry.h b/server/command_registry.h index a378dfd..2b50bdf 100644 --- a/server/command_registry.h +++ b/server/command_registry.h @@ -26,6 +26,9 @@ enum CommandOpt : uint32_t { DENYOOM = 0x10, // use-memory in redis. STALE = 0x20, RANDOM = 0x40, + ADMIN = 0x80, // implies NOSCRIPT, + NOSCRIPT = 0x100, + GLOBAL_TRANS = 0x1000, }; const char* OptName(CommandOpt fl); diff --git a/server/conn_context.h b/server/conn_context.h index 7220e8e..0f51d04 100644 --- a/server/conn_context.h +++ b/server/conn_context.h @@ -11,11 +11,23 @@ namespace dfly { class Connection; class EngineShardSet; -class CommandId; + +struct StoredCmd { + const CommandId* descr; + std::vector cmd; + + StoredCmd(const CommandId* d = nullptr) : descr(d) { + } +}; struct ConnectionState { DbIndex db_index = 0; + enum ExecState { EXEC_INACTIVE, EXEC_COLLECT, EXEC_ERROR }; + + ExecState exec_state = EXEC_INACTIVE; + std::vector exec_body; + enum Mask : uint32_t { ASYNC_DISPATCH = 1, // whether a command is handled via async dispatch. CONN_CLOSING = 2, // could be because of unrecoverable error or planned action. diff --git a/server/dragonfly_test.cc b/server/dragonfly_test.cc new file mode 100644 index 0000000..e011a4b --- /dev/null +++ b/server/dragonfly_test.cc @@ -0,0 +1,71 @@ +// Copyright 2021, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#include +#include +#include + +#include "base/gtest.h" +#include "base/logging.h" +#include "server/conn_context.h" +#include "server/main_service.h" +#include "server/redis_parser.h" +#include "server/test_utils.h" +#include "util/uring/uring_pool.h" + +namespace dfly { + +using namespace absl; +using namespace boost; +using namespace std; +using namespace util; +using ::io::Result; +using testing::ElementsAre; +using testing::HasSubstr; + +namespace { + +constexpr unsigned kPoolThreadCount = 4; + +} // namespace + + +// This test is responsible for server and main service +// (connection, transaction etc) families. +class DflyEngineTest : public BaseFamilyTest { + protected: + DflyEngineTest() : BaseFamilyTest() { + num_threads_ = kPoolThreadCount; + } +}; + +TEST_F(DflyEngineTest, Multi) { + RespVec resp = Run({"multi"}); + ASSERT_THAT(resp, RespEq("OK")); + + resp = Run({"get", "x"}); + ASSERT_THAT(resp, RespEq("QUEUED")); + + resp = Run({"get", "y"}); + ASSERT_THAT(resp, RespEq("QUEUED")); + + resp = Run({"exec"}); + ASSERT_THAT(resp, ElementsAre(ArgType(RespExpr::NIL), ArgType(RespExpr::NIL))); + + atomic_bool tx_empty = true; + + ess_->RunBriefInParallel([&](EngineShard* shard) { + if (!shard->txq()->Empty()) + tx_empty.store(false); + }); + EXPECT_TRUE(tx_empty); + + resp = Run({"get", "y"}); + ASSERT_THAT(resp, ElementsAre(ArgType(RespExpr::NIL))); + + ASSERT_FALSE(service_->IsLocked(0, "x")); + ASSERT_FALSE(service_->IsLocked(0, "y")); +} + +} // namespace dfly diff --git a/server/engine_shard_set.cc b/server/engine_shard_set.cc index 98d05a2..11e8205 100644 --- a/server/engine_shard_set.cc +++ b/server/engine_shard_set.cc @@ -93,6 +93,7 @@ void EngineShard::PollExecution(Transaction* trans) { Transaction* head = nullptr; string dbg_id; + while (!txq_.Empty()) { auto val = txq_.Front(); head = absl::get(val); @@ -122,12 +123,10 @@ void EngineShard::PollExecution(Transaction* trans) { if (VLOG_IS_ON(2)) { dbg_id = head->DebugId(); } - bool keep = head->RunInShard(this); - DCHECK(head == absl::get(txq_.Front())); + bool keep = head->RunInShard(this); // We should not access head from this point since RunInShard callback decrements refcount. DLOG_IF(INFO, !dbg_id.empty()) << "RunHead " << dbg_id << ", keep " << keep; - txq_.PopFront(); if (keep) { continuation_trans_ = head; @@ -148,17 +147,21 @@ void EngineShard::PollExecution(Transaction* trans) { dbg_id.clear(); - uint32_t pos = trans->TxQueuePos(sid); if (VLOG_IS_ON(1)) { dbg_id = trans->DebugId(); } - bool keep = trans->RunInShard(this); // resets TxQueuePos, this is why we get it before. + bool keep = trans->RunInShard(this); DLOG_IF(INFO, !dbg_id.empty()) << "Eager run " << sid << ", " << dbg_id << ", keep " << keep; // Should be enforced via Schedule(). TODO: to remove the check once the code is mature. CHECK(!keep) << "multi-hop transactions can not be OOO."; - txq_.Remove(pos); + } +} + +void EngineShard::ShutdownMulti(Transaction* multi) { + if (continuation_trans_ == multi) { + continuation_trans_ = nullptr; } } diff --git a/server/engine_shard_set.h b/server/engine_shard_set.h index 0619b17..05d498a 100644 --- a/server/engine_shard_set.h +++ b/server/engine_shard_set.h @@ -5,7 +5,7 @@ #pragma once extern "C" { - #include "redis/sds.h" +#include "redis/sds.h" } #include @@ -62,9 +62,8 @@ class EngineShard { return committed_txid_; } - TxQueue::Iterator InsertTxQ(Transaction* trans) { - return txq_.Insert(trans); - } + // TODO: Awkward interface. I should solve it somehow. + void ShutdownMulti(Transaction* multi); sds tmp_str; @@ -158,7 +157,7 @@ template void EngineShardSet::RunBlockingInParallel(U&& func) { bc.Wait(); } -template inline ShardId Shard(const View& v, ShardId shard_num) { +inline ShardId Shard(std::string_view v, ShardId shard_num) { XXH64_hash_t hash = XXH64(v.data(), v.size(), 120577240643ULL); return hash % shard_num; } diff --git a/server/main_service.cc b/server/main_service.cc index 1e496e1..6eb3d84 100644 --- a/server/main_service.cc +++ b/server/main_service.cc @@ -8,6 +8,7 @@ extern "C" { #include "redis/redis_aux.h" } +#include #include #include @@ -103,12 +104,20 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) { VLOG(2) << "Got: " << args; string_view cmd_str = ArgS(args, 0); + bool is_trans_cmd = (cmd_str == "EXEC" || cmd_str == "MULTI"); const CommandId* cid = registry_.Find(cmd_str); + absl::Cleanup multi_error = [cntx] { + if (cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE) { + cntx->conn_state.exec_state = ConnectionState::EXEC_ERROR; + } + }; + if (cid == nullptr) { return cntx->SendError(absl::StrCat("unknown command `", cmd_str, "`")); } + bool under_multi = cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE && !is_trans_cmd; if ((cid->arity() > 0 && args.size() != size_t(cid->arity())) || (cid->arity() < 0 && args.size() < size_t(-cid->arity()))) { return cntx->SendError(WrongNumArgsError(cmd_str)); @@ -118,12 +127,31 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) { return cntx->SendError(WrongNumArgsError(cmd_str)); } + if (under_multi && (cid->opt_mask() & CO::ADMIN)) { + cntx->SendError("Can not run admin commands under multi-transactions"); + return; + } + + std::move(multi_error).Cancel(); + + if (cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE && !is_trans_cmd) { + // TODO: protect against aggregating huge transactions. + StoredCmd stored_cmd{cid}; + stored_cmd.cmd.reserve(args.size()); + for (size_t i = 0; i < args.size(); ++i) { + stored_cmd.cmd.emplace_back(ArgS(args, i)); + } + cntx->conn_state.exec_body.push_back(std::move(stored_cmd)); + + return cntx->SendSimpleRespString("QUEUED"); + } + uint64_t start_usec = ProactorBase::GetMonotonicTimeNs(), end_usec; // Create command transaction intrusive_ptr dist_trans; - if (cid->first_key_pos() > 0) { + if (cid->first_key_pos() > 0 || (cid->opt_mask() & CO::GLOBAL_TRANS)) { dist_trans.reset(new Transaction{cid, &shard_set_}); cntx->transaction = dist_trans.get(); @@ -189,6 +217,17 @@ void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view va DispatchCommand(arg_list, cntx); } +bool Service::IsLocked(DbIndex db_index, std::string_view key) const { + ShardId sid = Shard(key, shard_count()); + KeyLockArgs args; + args.db_index = db_index; + args.args = ArgSlice{&key, 1}; + args.key_step = 1; + bool is_open = pp_.at(sid)->AwaitBrief( + [args] { return EngineShard::tlocal()->db_slice().CheckLock(IntentLock::EXCLUSIVE, args); }); + return !is_open; +} + void Service::RegisterHttp(HttpListenerBase* listener) { CHECK_NOTNULL(listener); } @@ -214,6 +253,60 @@ void Service::DbSize(CmdArgList args, ConnectionContext* cntx) { return cntx->SendLong(num_keys.load(memory_order_relaxed)); } +void Service::Quit(CmdArgList args, ConnectionContext* cntx) { + cntx->SendOk(); + cntx->CloseConnection(); +} + +void Service::Exec(CmdArgList args, ConnectionContext* cntx) { + if (cntx->conn_state.exec_state == ConnectionState::EXEC_INACTIVE) { + return cntx->SendError("EXEC without MULTI"); + } + + if (cntx->conn_state.exec_state == ConnectionState::EXEC_ERROR) { + cntx->conn_state.exec_state = ConnectionState::EXEC_INACTIVE; + cntx->conn_state.exec_body.clear(); + return cntx->SendError("-EXECABORT Transaction discarded because of previous errors"); + } + + cntx->SendRespBlob(absl::StrCat("*", cntx->conn_state.exec_body.size(), "\r\n")); + + if (!cntx->ec() && !cntx->conn_state.exec_body.empty()) { + CmdArgVec str_list; + + for (auto& scmd : cntx->conn_state.exec_body) { + str_list.resize(scmd.cmd.size()); + for (size_t i = 0; i < scmd.cmd.size(); ++i) { + string& s = scmd.cmd[i]; + str_list[i] = MutableStrSpan{s.data(), s.size()}; + } + + cntx->transaction->SetExecCmd(scmd.descr); + CmdArgList cmd_arg_list{str_list.data(), str_list.size()}; + cntx->transaction->InitByArgs(cntx->conn_state.db_index, cmd_arg_list); + scmd.descr->Invoke(cmd_arg_list, cntx); + if (cntx->ec()) + break; + } + + VLOG(1) << "Exec unlocking " << cntx->conn_state.exec_body.size() << " commands"; + cntx->transaction->UnlockMulti(); + } + + cntx->conn_state.exec_state = ConnectionState::EXEC_INACTIVE; + cntx->conn_state.exec_body.clear(); + VLOG(1) << "Exec completed"; +} + +void Service::Multi(CmdArgList args, ConnectionContext* cntx) { + if (cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE) { + return cntx->SendError("MULTI calls can not be nested"); + } + cntx->conn_state.exec_state = ConnectionState::EXEC_COLLECT; + // TODO: to protect against huge exec transactions. + return cntx->SendOk(); +} + VarzValue::Map Service::GetVarzStats() { VarzValue::Map res; @@ -234,8 +327,15 @@ inline CommandId::Handler HandlerFunc(Service* se, ServiceFunc f) { void Service::RegisterCommands() { using CI = CommandId; + constexpr auto kExecMask = + CO::LOADING | CO::NOSCRIPT | CO::GLOBAL_TRANS; + registry_ << CI{"DEBUG", CO::RANDOM | CO::READONLY, -2, 0, 0, 0}.HFUNC(Debug) - << CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize); + << CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize) + << CI{"QUIT", CO::READONLY | CO::FAST, 1, 0, 0, 0}.HFUNC(Quit) + << CI{"MULTI", CO::NOSCRIPT | CO::FAST | CO::LOADING | CO::STALE, 1, 0, 0, 0}.HFUNC( + Multi) + << CI{"EXEC", kExecMask, 1, 0, 0, 0}.HFUNC(Exec); StringFamily::Register(®istry_); GenericFamily::Register(®istry_); diff --git a/server/main_service.h b/server/main_service.h index 7d09d31..099b8ed 100644 --- a/server/main_service.h +++ b/server/main_service.h @@ -44,6 +44,9 @@ class Service { return shard_set_.size(); } + // Used by tests. + bool IsLocked(DbIndex db_index, std::string_view key) const; + EngineShardSet& shard_set() { return shard_set_; } @@ -56,6 +59,10 @@ class Service { void Debug(CmdArgList args, ConnectionContext* cntx); void DbSize(CmdArgList args, ConnectionContext* cntx); + void Quit(CmdArgList args, ConnectionContext* cntx); + void Exec(CmdArgList args, ConnectionContext* cntx); + void Multi(CmdArgList args, ConnectionContext* cntx); + void RegisterCommands(); base::VarzValue::Map GetVarzStats(); diff --git a/server/reply_builder.h b/server/reply_builder.h index 0a319bd..9c1bac9 100644 --- a/server/reply_builder.h +++ b/server/reply_builder.h @@ -119,6 +119,10 @@ class ReplyBuilder { as_resp()->SendBulkString(str); } + void CloseConnection() { + serializer_->CloseConnection(); + } + private: RespSerializer* as_resp() { return static_cast(serializer_.get()); diff --git a/server/transaction.cc b/server/transaction.cc index 98ef801..1b1600b 100644 --- a/server/transaction.cc +++ b/server/transaction.cc @@ -28,10 +28,6 @@ IntentLock::Mode Transaction::Mode() const { return (trans_options_ & CO::READONLY) ? IntentLock::SHARED : IntentLock::EXCLUSIVE; } -Transaction::~Transaction() { - DVLOG(2) << "Transaction " << DebugId() << " destroyed"; -} - /** * @brief Construct a new Transaction:: Transaction object * @@ -40,15 +36,26 @@ Transaction::~Transaction() { * @param cs */ Transaction::Transaction(const CommandId* cid, EngineShardSet* ess) : cid_(cid), ess_(ess) { + if (strcmp(cid_->name(), "EXEC") == 0) { + multi_.reset(new Multi); + } trans_options_ = cid_->opt_mask(); bool single_key = cid_->first_key_pos() > 0 && !cid_->is_multi_key(); - if (single_key) { + if (!multi_ && single_key) { shard_data_.resize(1); // Single key optimization } else { // Our shard_data is not sparse, so we must allocate for all threads :( shard_data_.resize(ess_->size()); } + + if (IsGlobal()) { + unique_shard_cnt_ = ess->size(); + } +} + +Transaction::~Transaction() { + DVLOG(2) << "Transaction " << DebugId() << " destroyed"; } /** @@ -75,10 +82,11 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) { CHECK_GT(args.size(), 1U); CHECK_LT(size_t(cid_->first_key_pos()), args.size()); DCHECK_EQ(unique_shard_cnt_, 0u); + DCHECK(!IsGlobal()) << "Global transactions do not have keys"; db_index_ = index; - if (!cid_->is_multi_key()) { // Single key optimization. + if (!multi_ && !cid_->is_multi_key()) { // Single key optimization. auto key = ArgS(args, cid_->first_key_pos()); args_.push_back(key); @@ -97,6 +105,13 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) { v.Clear(); } + IntentLock::Mode mode = IntentLock::EXCLUSIVE; + if (multi_) { + mode = Mode(); + tmp_space.uniq_keys.clear(); + DCHECK_LT(int(mode), 2); + } + size_t key_end = cid_->last_key_pos() > 0 ? cid_->last_key_pos() + 1 : (args.size() + 1 + cid_->last_key_pos()); for (size_t i = 1; i < key_end; ++i) { @@ -105,6 +120,10 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) { shard_index[sid].args.push_back(key); shard_index[sid].original_index.push_back(i - 1); + if (multi_ && tmp_space.uniq_keys.insert(key).second) { + multi_->locks[key].cnt[int(mode)]++; + }; + if (cid_->key_arg_step() == 2) { // value ++i; auto val = ArgS(args, i); @@ -149,9 +168,12 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) { if (unique_shard_cnt_ == 1) { PerShardData* sd; - - shard_data_.resize(1); - sd = &shard_data_.front(); + if (multi_) { + sd = &shard_data_[unique_shard_id_]; + } else { + shard_data_.resize(1); + sd = &shard_data_.front(); + } sd->arg_count = -1; sd->arg_start = -1; } @@ -160,10 +182,26 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) { for (const auto& sd : shard_data_) { DCHECK_EQ(sd.local_mask, 0u); DCHECK_EQ(0, sd.local_mask & ARMED); - DCHECK_EQ(TxQueue::kEnd, sd.pq_pos); + if (!multi_) { + DCHECK_EQ(TxQueue::kEnd, sd.pq_pos); + } } } +void Transaction::SetExecCmd(const CommandId* cid) { + DCHECK(multi_); + DCHECK(!cb_); + + if (txid_ == 0) { + Schedule(); + } + + unique_shard_cnt_ = 0; + cid_ = cid; + trans_options_ = cid->opt_mask(); + cb_ = nullptr; +} + string Transaction::DebugId() const { return absl::StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, " (", trans_id(this), ")"); } @@ -186,7 +224,21 @@ bool Transaction::RunInShard(EngineShard* shard) { DCHECK(sd.local_mask & ARMED); sd.local_mask &= ~ARMED; - DCHECK(sd.local_mask & KEYS_ACQUIRED); + // For multi we unlock transaction (i.e. its keys) in UnlockMulti() call. + // Therefore we differentiate between concluding, which says that this specific + // runnable concludes current operation, and should_release which tells + // whether we should unlock the keys. should_release is false for multi and + // equal to concluding otherwise. + bool should_release = is_concluding_cb_ && !multi_; + + // We make sure that we lock exactly once for each (multi-hop) transaction inside + // multi-transactions. + if (multi_ && ((sd.local_mask & KEYS_ACQUIRED) == 0)) { + sd.local_mask |= KEYS_ACQUIRED; + shard->db_slice().Acquire(Mode(), GetLockArgs(idx)); + } + + DCHECK(IsGlobal() || (sd.local_mask & KEYS_ACQUIRED)); /*************************************************************************/ // Actually running the callback. @@ -203,33 +255,51 @@ bool Transaction::RunInShard(EngineShard* shard) { // at least the coordinator thread owns the reference. DCHECK_GE(use_count(), 1u); + // we remove tx from tx-queue upon first invocation. + // if it needs to run again it runs via a dedicated continuation_trans_ state in EngineShard. + if (sd.pq_pos != TxQueue::kEnd) { + shard->txq()->Remove(sd.pq_pos); + sd.pq_pos = TxQueue::kEnd; + } + // If it's a final hop we should release the locks. - if (is_concluding_cb_) { + if (should_release) { KeyLockArgs largs = GetLockArgs(idx); + // If a transaction has been suspended, we keep the lock so that future transaction + // touching those keys will be ordered via TxQueue. It's necessary because we preserve + // the atomicity of awaked transactions by halting the TxQueue. shard->db_slice().Release(Mode(), largs); sd.local_mask &= ~KEYS_ACQUIRED; } CHECK_GE(DecreaseRunCnt(), 1u); + // From this point on we can not access 'this'. - return !is_concluding_cb_; // keep + return !should_release; // keep } void Transaction::ScheduleInternal(bool single_hop) { DCHECK_EQ(0, state_mask_.load(memory_order_acquire) & SCHEDULED); DCHECK_EQ(0u, txid_); + bool span_all = IsGlobal(); bool out_of_order = false; + uint32_t num_shards; std::function is_active; - num_shards = unique_shard_cnt_; - DCHECK_GT(num_shards, 0u); + if (span_all) { + is_active = [](uint32_t) { return true; }; + num_shards = ess_->size(); + } else { + num_shards = unique_shard_cnt_; + DCHECK_GT(num_shards, 0u); - is_active = [&](uint32_t i) { - return num_shards == 1 ? (i == unique_shard_id_) : shard_data_[i].arg_count > 0; - }; + is_active = [&](uint32_t i) { + return num_shards == 1 ? (i == unique_shard_id_) : shard_data_[i].arg_count > 0; + }; + } while (true) { txid_ = op_seq.fetch_add(1, std::memory_order_relaxed); @@ -250,6 +320,9 @@ void Transaction::ScheduleInternal(bool single_hop) { // It might be possible to do it for multi-hop transactions as well but currently is // too complicated to reason about. if (single_hop && lock_granted_cnt.load(memory_order_relaxed) == num_shards) { + // OOO can not happen with span-all transactions. We ensure it in ScheduleInShard when we + // refuse to acquire locks for these transactions.. + DCHECK(!span_all); out_of_order = true; } DVLOG(1) << "Scheduled " << DebugId() << " OutOfOrder: " << out_of_order; @@ -283,7 +356,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { cb_ = std::move(cb); - bool schedule_fast = (unique_shard_cnt_ == 1); + bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !multi_; if (schedule_fast) { // Single shard (local) optimization. // We never resize shard_data because that would affect MULTI transaction correctness. DCHECK_EQ(1u, shard_data_.size()); @@ -313,7 +386,9 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { ess_->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier. } else { - ScheduleInternal(true); + // Transaction spans multiple shards or it's global (like flushdb) or multi. + if (!multi_) + ScheduleInternal(true); ExecuteAsync(true); } @@ -327,6 +402,56 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { return local_result_; } +// Runs in the coordinator fiber. +void Transaction::UnlockMulti() { + VLOG(1) << "Transaction::UnlockMulti"; + + DCHECK(multi_); + using KeyList = vector>; + vector sharded_keys(ess_->size()); + + // It's LE and not EQ because there may be callbacks in progress that increase use_count_. + DCHECK_LE(1u, use_count()); + + for (const auto& k_v : multi_->locks) { + ShardId sid = Shard(k_v.first, sharded_keys.size()); + sharded_keys[sid].push_back(k_v); + } + + auto cb = [&](EngineShard* shard) { + ShardId sid = shard->shard_id(); + for (const auto& k_v : sharded_keys[sid]) { + auto release = [&](IntentLock::Mode mode) { + if (k_v.second.cnt[mode]) { + shard->db_slice().Release(mode, this->db_index_, k_v.first, k_v.second.cnt[mode]); + } + }; + release(IntentLock::SHARED); + release(IntentLock::EXCLUSIVE); + } + + auto& sd = shard_data_[SidToId(shard->shard_id())]; + + // It does not have to be that all shards in multi transaction execute this tx. + // Hence it could stay in the tx queue. We perform the necessary cleanup and remove it from + // there. + if (sd.pq_pos != TxQueue::kEnd) { + TxQueue* txq = shard->txq(); + DCHECK(!txq->Empty()); + Transaction* trans = absl::get(txq->Front()); + DCHECK(trans == this); + txq->PopFront(); + sd.pq_pos = TxQueue::kEnd; + } + + shard->ShutdownMulti(this); + }; + + ess_->RunBriefInParallel(std::move(cb)); + + DCHECK_EQ(1u, use_count()); +} + // Runs in coordinator thread. void Transaction::Execute(RunnableType cb, bool conclude) { cb_ = std::move(cb); @@ -359,12 +484,14 @@ void Transaction::ExecuteAsync(bool concluding_cb) { // safely. use_count_.fetch_add(unique_shard_cnt_, memory_order_relaxed); + bool is_global = IsGlobal(); + if (unique_shard_cnt_ == 1) { shard_data_[SidToId(unique_shard_id_)].local_mask |= ARMED; } else { for (ShardId i = 0; i < shard_data_.size(); ++i) { auto& sd = shard_data_[i]; - if (sd.arg_count == 0) + if (!is_global && sd.arg_count == 0) continue; DCHECK_LT(sd.arg_count, 1u << 15); sd.local_mask |= ARMED; @@ -408,12 +535,12 @@ void Transaction::ExecuteAsync(bool concluding_cb) { }; // IsArmedInShard is the protector of non-thread safe data. - if (unique_shard_cnt_ == 1) { + if (!is_global && unique_shard_cnt_ == 1) { ess_->Add(unique_shard_id_, std::move(cb)); // serves as a barrier. } else { for (ShardId i = 0; i < shard_data_.size(); ++i) { auto& sd = shard_data_[i]; - if (sd.arg_count == 0) + if (!is_global && sd.arg_count == 0) continue; ess_->Add(i, cb); // serves as a barrier. } @@ -421,6 +548,7 @@ void Transaction::ExecuteAsync(bool concluding_cb) { } void Transaction::RunQuickie() { + DCHECK(!multi_); DCHECK_EQ(1u, shard_data_.size()); DCHECK_EQ(0u, txid_); @@ -454,6 +582,7 @@ KeyLockArgs Transaction::GetLockArgs(ShardId sid) const { // Optimized path that schedules and runs transactions out of order if possible. // Returns true if was eagerly executed, false if it was scheduled into queue. bool Transaction::ScheduleUniqueShard(EngineShard* shard) { + DCHECK(!multi_); DCHECK_EQ(0u, txid_); DCHECK_EQ(1u, shard_data_.size()); @@ -473,8 +602,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { // we can do it because only a single thread writes into txid_ and sd. txid_ = op_seq.fetch_add(1, std::memory_order_relaxed); - TxQueue::Iterator it = shard->InsertTxQ(this); - sd.pq_pos = it; + sd.pq_pos = shard->txq()->Insert(this); DCHECK_EQ(0, sd.local_mask & KEYS_ACQUIRED); bool lock_acquired = shard->db_slice().Acquire(mode, lock_args); @@ -498,25 +626,26 @@ pair Transaction::ScheduleInShard(EngineShard* shard) { return result; } - TxQueue* pq = shard->txq(); + TxQueue* txq = shard->txq(); KeyLockArgs lock_args; IntentLock::Mode mode = Mode(); + bool spans_all = IsGlobal(); bool lock_granted = false; ShardId sid = SidToId(shard->shard_id()); auto& sd = shard_data_[sid]; - bool shard_unlocked = true; - lock_args = GetLockArgs(shard->shard_id()); + if (!spans_all) { + lock_args = GetLockArgs(shard->shard_id()); - // we need to acquire the lock unrelated to shard_unlocked since we register into Tx queue. - // All transactions in the queue must acquire the intent lock. - lock_granted = shard->db_slice().Acquire(mode, lock_args) && shard_unlocked; - sd.local_mask |= KEYS_ACQUIRED; - DVLOG(1) << "Lock granted " << lock_granted << " for trans " << DebugId(); + // All transactions in the queue must acquire the intent lock. + lock_granted = shard->db_slice().Acquire(mode, lock_args); + sd.local_mask |= KEYS_ACQUIRED; + DVLOG(1) << "Lock granted " << lock_granted << " for trans " << DebugId(); + } - if (!pq->Empty()) { + if (!txq->Empty()) { // If the new transaction requires reordering of the pending queue (i.e. it comes before tail) // and some other transaction already locked its keys we can not reorder 'trans' because // that other transaction could have deduced that it can run OOO and eagerly execute. Hence, we @@ -526,7 +655,7 @@ pair Transaction::ScheduleInShard(EngineShard* shard) { // We may record when they disable OOO via barrier_ts so if the queue contains transactions // that were only scheduled afterwards we know they are not free so we can still // reorder the queue. Currently, this optimization is disabled: barrier_ts < pq->HeadScore(). - bool to_proceed = lock_granted || pq->TailScore() < txid_; + bool to_proceed = lock_granted || txq->TailScore() < txid_; if (!to_proceed) { if (sd.local_mask & KEYS_ACQUIRED) { // rollback the lock. shard->db_slice().Release(mode, lock_args); @@ -540,18 +669,18 @@ pair Transaction::ScheduleInShard(EngineShard* shard) { result.second = lock_granted; result.first = true; - TxQueue::Iterator it = pq->Insert(this); + TxQueue::Iterator it = txq->Insert(this); DCHECK_EQ(TxQueue::kEnd, sd.pq_pos); sd.pq_pos = it; - DVLOG(1) << "Insert into tx-queue, sid(" << sid << ") " << DebugId() << ", qlen " << pq->size(); + DVLOG(1) << "Insert into tx-queue, sid(" << sid << ") " << DebugId() << ", qlen " << txq->size(); return result; } bool Transaction::CancelInShard(EngineShard* shard) { - ShardId sid = SidToId(shard->shard_id()); - auto& sd = shard_data_[sid]; + ShardId idx = SidToId(shard->shard_id()); + auto& sd = shard_data_[idx]; auto pos = sd.pq_pos; if (pos == TxQueue::kEnd) @@ -609,4 +738,8 @@ inline uint32_t Transaction::DecreaseRunCnt() { return res; } +bool Transaction::IsGlobal() const { + return (trans_options_ & CO::GLOBAL_TRANS) != 0; +} + } // namespace dfly diff --git a/server/transaction.h b/server/transaction.h index ebff470..5c47056 100644 --- a/server/transaction.h +++ b/server/transaction.h @@ -63,6 +63,8 @@ class Transaction { void InitByArgs(DbIndex index, CmdArgList args); + void SetExecCmd(const CommandId* cid); + std::string DebugId() const; // Runs in engine thread @@ -84,6 +86,7 @@ class Transaction { //! duplicate runs). Supports local transactions under multi as well. //! Can be used in contexts that wait for an event to happen. bool IsArmedInShard(ShardId sid) const { + // For multi transactions shard_data_ spans all shards. if (sid >= shard_data_.size()) sid = 0; @@ -100,11 +103,6 @@ class Transaction { return state_mask_.load(std::memory_order_relaxed); } - // Relevant only when unique_shards_ > 1. - uint32_t TxQueuePos(ShardId sid) const { - return shard_data_[sid].pq_pos; - } - // Schedules a transaction. Usually used for multi-hop transactions like Rename or BLPOP. // For single hop, use ScheduleSingleHop instead. void Schedule() { @@ -130,6 +128,8 @@ class Transaction { return res; } + void UnlockMulti(); + TxId txid() const { return txid_; } @@ -148,6 +148,12 @@ class Transaction { return unique_shard_cnt_; } + bool IsMulti() const { + return bool(multi_); + } + + bool IsGlobal() const; + EngineShardSet* shard_set() { return ess_; } @@ -207,6 +213,8 @@ class Transaction { // Bitmask of LocalState enums. uint16_t local_mask{0}; + // Needed to rollback invalid schedulings or remove OOO transactions from + // tx queue. uint32_t pq_pos = TxQueue::kEnd; PerShardData(PerShardData&&) noexcept { @@ -220,6 +228,10 @@ class Transaction { unsigned cnt[2] = {0, 0}; }; + struct Multi { + absl::flat_hash_map locks; + }; + util::fibers_ext::EventCount blocking_ec_; // used to wake blocking transactions. util::fibers_ext::EventCount run_ec_; @@ -238,6 +250,7 @@ class Transaction { std::vector reverse_index_; RunnableType cb_; + std::unique_ptr multi_; // Initialized when the transaction is multi/exec. const CommandId* cid_; EngineShardSet* ess_;