From e7071b73b193003186cc2b55951b878b80ee662e Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Sat, 8 Jan 2022 00:01:35 +0200 Subject: [PATCH] Refactor server commands into server_family. Introduce FlushDb. Minor cleanups and renamings. --- core/intent_lock.h | 1 - server/CMakeLists.txt | 6 +- server/db_slice.cc | 37 ++++++++- server/db_slice.h | 11 ++- server/engine_shard_set.h | 24 +++++- server/main_service.cc | 59 +++++++-------- server/main_service.h | 8 +- server/server_family.cc | 153 ++++++++++++++++++++++++++++++++++++++ server/server_family.h | 48 ++++++++++++ server/transaction.cc | 68 +++++++++-------- server/transaction.h | 14 +--- 11 files changed, 344 insertions(+), 85 deletions(-) create mode 100644 server/server_family.cc create mode 100644 server/server_family.h diff --git a/core/intent_lock.h b/core/intent_lock.h index 1c202f2..2787f90 100644 --- a/core/intent_lock.h +++ b/core/intent_lock.h @@ -9,7 +9,6 @@ namespace dfly { // SHARED - can be acquired multiple times as long as other intents are absent. // EXCLUSIVE - is acquired only if it's the only lock recorded. -// BLOCKED_READY - can not be acquired - it's recorded for intent purposes. // Transactions at the head of tx-queue are considered to be the ones that acquired the lock class IntentLock { public: diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 947ffe4..689a22e 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -5,7 +5,7 @@ add_library(dragonfly_lib command_registry.cc common.cc config_flags.cc conn_context.cc db_slice.cc debugcmd.cc dragonfly_listener.cc dragonfly_connection.cc engine_shard_set.cc generic_family.cc list_family.cc main_service.cc memcache_parser.cc - redis_parser.cc reply_builder.cc string_family.cc transaction.cc) + redis_parser.cc reply_builder.cc server_family.cc string_family.cc transaction.cc) cxx_link(dragonfly_lib dfly_core redis_lib uring_fiber_lib fibers_ext strings_lib http_server_lib tls_lib) @@ -21,5 +21,5 @@ cxx_test(generic_family_test dfly_test_lib LABELS DFLY) cxx_test(memcache_parser_test dfly_test_lib LABELS DFLY) add_custom_target(check_dfly DEPENDS COMMAND ctest -L DFLY) -add_dependencies(check_dfly redis_parser_test list_family_test string_family_test - generic_family_test memcache_parser_test) +add_dependencies(check_dfly dragonfly_test list_family_test + generic_family_test memcache_parser_test redis_parser_test string_family_test) diff --git a/server/db_slice.cc b/server/db_slice.cc index 40f179e..aea6ff5 100644 --- a/server/db_slice.cc +++ b/server/db_slice.cc @@ -122,6 +122,37 @@ bool DbSlice::Del(DbIndex db_ind, const MainIterator& it) { return true; } +size_t DbSlice::FlushDb(DbIndex db_ind) { + auto flush_single = [this](DbIndex id) { + auto& db = db_arr_[id]; + + CHECK(db); + + size_t removed = db->main_table.size(); + db->main_table.clear(); + db->expire_table.clear(); + + db->stats.obj_memory_usage = 0; + + return removed; + }; + + if (db_ind != kDbAll) { + CHECK_LT(db_ind, db_arr_.size()); + + return flush_single(db_ind); + } + + size_t removed = 0; + for (size_t i = 0; i < db_arr_.size(); ++i) { + if (db_arr_[i]) { + removed += flush_single(i); + } + } + return removed; +} + + // Returns true if a state has changed, false otherwise. bool DbSlice::Expire(DbIndex db_ind, MainIterator it, uint64_t at) { auto& db = db_arr_[db_ind]; @@ -142,11 +173,11 @@ bool DbSlice::Expire(DbIndex db_ind, MainIterator it, uint64_t at) { return false; } -void DbSlice::AddNew(DbIndex db_ind, std::string_view key, MainValue obj, uint64_t expire_at_ms) { +void DbSlice::AddNew(DbIndex db_ind, string_view key, MainValue obj, uint64_t expire_at_ms) { CHECK(AddIfNotExist(db_ind, key, std::move(obj), expire_at_ms)); } -bool DbSlice::AddIfNotExist(DbIndex db_ind, std::string_view key, MainValue obj, +bool DbSlice::AddIfNotExist(DbIndex db_ind, string_view key, MainValue obj, uint64_t expire_at_ms) { auto& db = db_arr_[db_ind]; @@ -220,7 +251,7 @@ void DbSlice::Release(IntentLock::Mode mode, const KeyLockArgs& lock_args) { } } -void DbSlice::Release(IntentLock::Mode mode, DbIndex db_index, std::string_view key, +void DbSlice::Release(IntentLock::Mode mode, DbIndex db_index, string_view key, unsigned count) { DVLOG(1) << "Release " << IntentLock::ModeName(mode) << " " << count << " for " << key; diff --git a/server/db_slice.h b/server/db_slice.h index fc3dbec..e6697f8 100644 --- a/server/db_slice.h +++ b/server/db_slice.h @@ -42,7 +42,7 @@ class DbSlice { return now_ms_; } - OpResult Find(DbIndex db_index, std::string_view key, unsigned obj_type) const; + OpResult Find(DbIndex db_index, std::string_view key, unsigned req_obj_type) const; // Returns (value, expire) dict entries if key exists, null if it does not exist or has expired. std::pair FindExt(DbIndex db_ind, std::string_view key) const; @@ -66,6 +66,15 @@ class DbSlice { bool Del(DbIndex db_ind, const MainIterator& it); + constexpr static DbIndex kDbAll = 0xFFFF; + + /** + * @brief Flushes the database of index db_ind. If kDbAll is passed then flushes all the + * databases. + * + */ + size_t FlushDb(DbIndex db_ind); + ShardId shard_id() const { return shard_id_; } diff --git a/server/engine_shard_set.h b/server/engine_shard_set.h index 05d498a..231961b 100644 --- a/server/engine_shard_set.h +++ b/server/engine_shard_set.h @@ -20,6 +20,11 @@ namespace dfly { class EngineShard { public: + struct Stats { + uint64_t ooo_runs = 0; + uint64_t quick_runs = 0; + }; + // EngineShard() is private down below. ~EngineShard(); @@ -62,9 +67,24 @@ class EngineShard { return committed_txid_; } + // Signals whether shard-wide lock is active. + // Transactions that conflict with shard locks must subscribe into pending queue. + IntentLock* shard_lock() { + return &shard_lock_; + } + // TODO: Awkward interface. I should solve it somehow. void ShutdownMulti(Transaction* multi); + void IncQuickRun() { + stats_.quick_runs++; + } + + const Stats& stats() const { + return stats_; + } + + // for everyone to use for string transformations during atomic cpu sequences. sds tmp_str; private: @@ -74,12 +94,14 @@ class EngineShard { ::boost::fibers::fiber fiber_q_; TxQueue txq_; + DbSlice db_slice_; + Stats stats_; // Logical ts used to order distributed transactions. TxId committed_txid_ = 0; Transaction* continuation_trans_ = nullptr; + IntentLock shard_lock_; - DbSlice db_slice_; uint32_t periodic_task_ = 0; static thread_local EngineShard* shard_; diff --git a/server/main_service.cc b/server/main_service.cc index 6eb3d84..696bb10 100644 --- a/server/main_service.cc +++ b/server/main_service.cc @@ -17,7 +17,6 @@ extern "C" { #include "base/logging.h" #include "server/conn_context.h" -#include "server/debugcmd.h" #include "server/error.h" #include "server/generic_family.h" #include "server/list_family.h" @@ -51,7 +50,7 @@ constexpr size_t kMaxThreadSize = 1024; } // namespace -Service::Service(ProactorPool* pp) : shard_set_(pp), pp_(*pp) { +Service::Service(ProactorPool* pp) : shard_set_(pp), pp_(*pp), server_family_(this) { CHECK(pp); // We support less than 1024 threads. @@ -89,8 +88,12 @@ void Service::Shutdown() { engine_varz.reset(); request_latency_usec.Shutdown(); ping_qps.Shutdown(); + + // to shutdown all the runtime components that depend on EngineShard. + server_family_.Shutdown(); StringFamily::Shutdown(); GenericFamily::Shutdown(); + cmd_req.Shutdown(); shard_set_.RunBlockingInParallel([&](EngineShard*) { EngineShard::DestroyThreadLocal(); }); } @@ -232,27 +235,6 @@ void Service::RegisterHttp(HttpListenerBase* listener) { CHECK_NOTNULL(listener); } -void Service::Debug(CmdArgList args, ConnectionContext* cntx) { - ToUpper(&args[1]); - - DebugCmd dbg_cmd{&shard_set_, cntx}; - - return dbg_cmd.Run(args); -} - -void Service::DbSize(CmdArgList args, ConnectionContext* cntx) { - atomic_ulong num_keys{0}; - - shard_set_.RunBriefInParallel( - [&](EngineShard* shard) { - auto db_size = shard->db_slice().DbSize(cntx->conn_state.db_index); - num_keys.fetch_add(db_size, memory_order_relaxed); - }, - [](ShardId) { return true; }); - - return cntx->SendLong(num_keys.load(memory_order_relaxed)); -} - void Service::Quit(CmdArgList args, ConnectionContext* cntx) { cntx->SendOk(); cntx->CloseConnection(); @@ -317,12 +299,9 @@ VarzValue::Map Service::GetVarzStats() { return res; } -using ServiceFunc = void (Service::*)(CmdArgList args, ConnectionContext* cntx); -inline CommandId::Handler HandlerFunc(Service* se, ServiceFunc f) { - return [=](CmdArgList args, ConnectionContext* cntx) { return (se->*f)(args, cntx); }; -} +using ServiceFunc = void (Service::*)(CmdArgList, ConnectionContext* cntx); -#define HFUNC(x) SetHandler(HandlerFunc(this, &Service::x)) +#define HFUNC(x) SetHandler(&Service::x) void Service::RegisterCommands() { using CI = CommandId; @@ -330,16 +309,32 @@ void Service::RegisterCommands() { constexpr auto kExecMask = CO::LOADING | CO::NOSCRIPT | CO::GLOBAL_TRANS; - registry_ << CI{"DEBUG", CO::RANDOM | CO::READONLY, -2, 0, 0, 0}.HFUNC(Debug) - << CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize) - << CI{"QUIT", CO::READONLY | CO::FAST, 1, 0, 0, 0}.HFUNC(Quit) + auto cb_exec = [this](CmdArgList sp, ConnectionContext* cntx) { + this->Exec(std::move(sp), cntx); + }; + + registry_ << CI{"QUIT", CO::READONLY | CO::FAST, 1, 0, 0, 0}.HFUNC(Quit) << CI{"MULTI", CO::NOSCRIPT | CO::FAST | CO::LOADING | CO::STALE, 1, 0, 0, 0}.HFUNC( Multi) - << CI{"EXEC", kExecMask, 1, 0, 0, 0}.HFUNC(Exec); + << CI{"EXEC", kExecMask, 1, 0, 0, 0}.SetHandler(cb_exec); StringFamily::Register(®istry_); GenericFamily::Register(®istry_); ListFamily::Register(®istry_); + server_family_.Register(®istry_); + + LOG(INFO) << "Multi-key commands are: "; + + registry_.Traverse([](std::string_view key, const CI& cid) { + if (cid.is_multi_key()) { + string key_len; + if (cid.last_key_pos() < 0) + key_len = "unlimited"; + else + key_len = absl::StrCat(cid.last_key_pos() - cid.first_key_pos() + 1); + LOG(INFO) << " " << key << ": with " << key_len << " keys"; + } + }); } } // namespace dfly diff --git a/server/main_service.h b/server/main_service.h index 099b8ed..c7b2f41 100644 --- a/server/main_service.h +++ b/server/main_service.h @@ -9,6 +9,7 @@ #include "server/engine_shard_set.h" #include "util/http/http_handler.h" #include "server/memcache_parser.h" +#include "server/server_family.h" namespace util { class AcceptServer; @@ -56,12 +57,10 @@ class Service { } private: - void Debug(CmdArgList args, ConnectionContext* cntx); - void DbSize(CmdArgList args, ConnectionContext* cntx); - void Quit(CmdArgList args, ConnectionContext* cntx); + static void Quit(CmdArgList args, ConnectionContext* cntx); void Exec(CmdArgList args, ConnectionContext* cntx); - void Multi(CmdArgList args, ConnectionContext* cntx); + static void Multi(CmdArgList args, ConnectionContext* cntx); void RegisterCommands(); @@ -70,6 +69,7 @@ class Service { CommandRegistry registry_; EngineShardSet shard_set_; util::ProactorPool& pp_; + ServerFamily server_family_; }; } // namespace dfly diff --git a/server/server_family.cc b/server/server_family.cc new file mode 100644 index 0000000..0927f33 --- /dev/null +++ b/server/server_family.cc @@ -0,0 +1,153 @@ +// Copyright 2021, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/server_family.h" + +#include +#include // for master_id_ generation. +#include + +#include + +extern "C" { +#include "redis/redis_aux.h" +} + +#include "base/logging.h" +#include "server/command_registry.h" +#include "server/conn_context.h" +#include "server/debugcmd.h" +#include "server/engine_shard_set.h" +#include "server/error.h" +#include "server/main_service.h" +#include "server/transaction.h" +#include "util/accept_server.h" + +DECLARE_uint32(port); + +namespace dfly { + +using namespace std; +using namespace util; +namespace fibers = ::boost::fibers; + +namespace fs = std::filesystem; + +namespace { + +using EngineFunc = void (ServerFamily::*)(CmdArgList args, ConnectionContext* cntx); + +inline CommandId::Handler HandlerFunc(ServerFamily* se, EngineFunc f) { + return [=](CmdArgList args, ConnectionContext* cntx) { return (se->*f)(args, cntx); }; +} + +using CI = CommandId; + +} // namespace + +ServerFamily::ServerFamily(Service* engine) + : engine_(*engine), pp_(engine->proactor_pool()), ess_(engine->shard_set()) { +} + +ServerFamily::~ServerFamily() { +} + +void ServerFamily::Init(util::AcceptServer* acceptor) { + CHECK(acceptor_ == nullptr); + acceptor_ = acceptor; +} + +void ServerFamily::Shutdown() { + VLOG(1) << "ServerFamily::Shutdown"; +} + +void ServerFamily::DbSize(CmdArgList args, ConnectionContext* cntx) { + atomic_ulong num_keys{0}; + + ess_.RunBriefInParallel( + [&](EngineShard* shard) { + auto db_size = shard->db_slice().DbSize(cntx->conn_state.db_index); + num_keys.fetch_add(db_size, memory_order_relaxed); + }, + [](ShardId) { return true; }); + + return cntx->SendLong(num_keys.load(memory_order_relaxed)); +} + +void ServerFamily::FlushDb(CmdArgList args, ConnectionContext* cntx) { + DCHECK(cntx->transaction); + Transaction* transaction = cntx->transaction; + transaction->Schedule(); // TODO: to convert to ScheduleSingleHop ? + + transaction->Execute( + [](Transaction* t, EngineShard* shard) { + shard->db_slice().FlushDb(t->db_index()); + return OpStatus::OK; + }, + true); + + cntx->SendOk(); +} + +void ServerFamily::FlushAll(CmdArgList args, ConnectionContext* cntx) { + if (args.size() > 1) { + cntx->SendError(kSyntaxErr); + return; + } + + DCHECK(cntx->transaction); + Transaction* transaction = cntx->transaction; + transaction->Schedule(); + + transaction->Execute( + [](Transaction* t, EngineShard* shard) { + shard->db_slice().FlushDb(DbSlice::kDbAll); + return OpStatus::OK; + }, + true); + + cntx->SendOk(); +} + +void ServerFamily::Debug(CmdArgList args, ConnectionContext* cntx) { + ToUpper(&args[1]); + + DebugCmd dbg_cmd{&ess_, cntx}; + + return dbg_cmd.Run(args); +} + +void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { + const char kInfo1[] = + R"(# Server +redis_version:6.2.0 +redis_mode:standalone +arch_bits:64 +multiplexing_api:iouring +atomicvar_api:atomic-builtin +tcp_port:)"; + + string info = absl::StrCat(kInfo1, FLAGS_port, "\n"); + + cntx->SendBulkString(info); +} + +void ServerFamily::_Shutdown(CmdArgList args, ConnectionContext* cntx) { + CHECK_NOTNULL(acceptor_)->Stop(); + cntx->SendOk(); +} + +#define HFUNC(x) SetHandler(HandlerFunc(this, &ServerFamily::x)) + +void ServerFamily::Register(CommandRegistry* registry) { + *registry << CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize) + << CI{"DEBUG", CO::RANDOM | CO::READONLY, -2, 0, 0, 0}.HFUNC(Debug) + << CI{"FLUSHDB", CO::WRITE | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(FlushDb) + << CI{"FLUSHALL", CO::WRITE | CO::GLOBAL_TRANS, -1, 0, 0, 0}.HFUNC(FlushAll) + << CI{"INFO", CO::LOADING | CO::STALE, -1, 0, 0, 0}.HFUNC(Info) + << CI{"SHUTDOWN", CO::ADMIN | CO::NOSCRIPT | CO::LOADING | CO::STALE, 1, 0, 0, 0}.HFUNC( + _Shutdown); +} + +} // namespace dfly diff --git a/server/server_family.h b/server/server_family.h new file mode 100644 index 0000000..9a39f9d --- /dev/null +++ b/server/server_family.h @@ -0,0 +1,48 @@ +// Copyright 2021, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include "server/engine_shard_set.h" +#include "util/proactor_pool.h" + +namespace util { +class AcceptServer; +} // namespace util + +namespace dfly { + +class ConnectionContext; +class CommandRegistry; +class Service; + +class ServerFamily { + public: + ServerFamily(Service* engine); + ~ServerFamily(); + + void Init(util::AcceptServer* acceptor); + void Register(CommandRegistry* registry); + void Shutdown(); + + private: + uint32_t shard_count() const { + return ess_.size(); + } + + void Debug(CmdArgList args, ConnectionContext* cntx); + void DbSize(CmdArgList args, ConnectionContext* cntx); + void FlushDb(CmdArgList args, ConnectionContext* cntx); + void FlushAll(CmdArgList args, ConnectionContext* cntx); + void Info(CmdArgList args, ConnectionContext* cntx); + void _Shutdown(CmdArgList args, ConnectionContext* cntx); + + Service& engine_; + util::ProactorPool& pp_; + EngineShardSet& ess_; + + util::AcceptServer* acceptor_ = nullptr; +}; + +} // namespace dfly diff --git a/server/transaction.cc b/server/transaction.cc index 1b1600b..714af63 100644 --- a/server/transaction.cc +++ b/server/transaction.cc @@ -233,12 +233,12 @@ bool Transaction::RunInShard(EngineShard* shard) { // We make sure that we lock exactly once for each (multi-hop) transaction inside // multi-transactions. - if (multi_ && ((sd.local_mask & KEYS_ACQUIRED) == 0)) { - sd.local_mask |= KEYS_ACQUIRED; + if (multi_ && ((sd.local_mask & KEYLOCK_ACQUIRED) == 0)) { + sd.local_mask |= KEYLOCK_ACQUIRED; shard->db_slice().Acquire(Mode(), GetLockArgs(idx)); } - DCHECK(IsGlobal() || (sd.local_mask & KEYS_ACQUIRED)); + DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED)); /*************************************************************************/ // Actually running the callback. @@ -270,7 +270,7 @@ bool Transaction::RunInShard(EngineShard* shard) { // touching those keys will be ordered via TxQueue. It's necessary because we preserve // the atomicity of awaked transactions by halting the TxQueue. shard->db_slice().Release(Mode(), largs); - sd.local_mask &= ~KEYS_ACQUIRED; + sd.local_mask &= ~KEYLOCK_ACQUIRED; } CHECK_GE(DecreaseRunCnt(), 1u); @@ -280,7 +280,6 @@ bool Transaction::RunInShard(EngineShard* shard) { } void Transaction::ScheduleInternal(bool single_hop) { - DCHECK_EQ(0, state_mask_.load(memory_order_acquire) & SCHEDULED); DCHECK_EQ(0u, txid_); bool span_all = IsGlobal(); @@ -288,10 +287,14 @@ void Transaction::ScheduleInternal(bool single_hop) { uint32_t num_shards; std::function is_active; + IntentLock::Mode mode = Mode(); if (span_all) { is_active = [](uint32_t) { return true; }; num_shards = ess_->size(); + // Lock shards + auto cb = [mode](EngineShard* shard) { shard->shard_lock()->Acquire(mode); }; + ess_->RunBriefInParallel(std::move(cb)); } else { num_shards = unique_shard_cnt_; DCHECK_GT(num_shards, 0u); @@ -327,7 +330,6 @@ void Transaction::ScheduleInternal(bool single_hop) { } DVLOG(1) << "Scheduled " << DebugId() << " OutOfOrder: " << out_of_order; - state_mask_.fetch_or(SCHEDULED, memory_order_release); break; } @@ -397,14 +399,13 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { DVLOG(1) << "ScheduleSingleHop after Wait " << DebugId(); cb_ = nullptr; - state_mask_.fetch_or(AFTERRUN, memory_order_release); return local_result_; } // Runs in the coordinator fiber. void Transaction::UnlockMulti() { - VLOG(1) << "Transaction::UnlockMulti"; + VLOG(1) << "UnlockMulti"; DCHECK(multi_); using KeyList = vector>; @@ -418,7 +419,10 @@ void Transaction::UnlockMulti() { sharded_keys[sid].push_back(k_v); } - auto cb = [&](EngineShard* shard) { + auto cb = [&] { + EngineShard* shard = EngineShard::tlocal(); + + shard->shard_lock()->Release(IntentLock::EXCLUSIVE); ShardId sid = shard->shard_id(); for (const auto& k_v : sharded_keys[sid]) { auto release = [&](IntentLock::Mode mode) { @@ -433,8 +437,7 @@ void Transaction::UnlockMulti() { auto& sd = shard_data_[SidToId(shard->shard_id())]; // It does not have to be that all shards in multi transaction execute this tx. - // Hence it could stay in the tx queue. We perform the necessary cleanup and remove it from - // there. + // Hence it could stay in the tx queue. We perform the necessary cleanup and remove it from there. if (sd.pq_pos != TxQueue::kEnd) { TxQueue* txq = shard->txq(); DCHECK(!txq->Empty()); @@ -445,11 +448,19 @@ void Transaction::UnlockMulti() { } shard->ShutdownMulti(this); + shard->PollExecution(nullptr); + + this->DecreaseRunCnt(); }; - ess_->RunBriefInParallel(std::move(cb)); + uint32_t prev = run_count_.fetch_add(shard_data_.size(), memory_order_relaxed); + DCHECK_EQ(prev, 0u); - DCHECK_EQ(1u, use_count()); + for (ShardId i = 0; i < shard_data_.size(); ++i) { + ess_->Add(i, cb); + } + WaitForShardCallbacks(); + DCHECK_GE(use_count(), 1u); } // Runs in coordinator thread. @@ -463,9 +474,6 @@ void Transaction::Execute(RunnableType cb, bool conclude) { DVLOG(1) << "Wait on Exec " << DebugId() << " completed"; cb_ = nullptr; - - uint32_t mask = conclude ? AFTERRUN : RUNNING; - state_mask_.fetch_or(mask, memory_order_relaxed); } // Runs in coordinator thread. @@ -547,14 +555,14 @@ void Transaction::ExecuteAsync(bool concluding_cb) { } } -void Transaction::RunQuickie() { +void Transaction::RunQuickie(EngineShard* shard) { DCHECK(!multi_); DCHECK_EQ(1u, shard_data_.size()); DCHECK_EQ(0u, txid_); - EngineShard* shard = EngineShard::tlocal(); + shard->IncQuickRun(); auto& sd = shard_data_[0]; - DCHECK_EQ(0, sd.local_mask & KEYS_ACQUIRED); + DCHECK_EQ(0, sd.local_mask & (KEYLOCK_ACQUIRED | OUT_OF_ORDER)); DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id() << " " << args_[0]; CHECK(cb_) << DebugId() << " " << shard->shard_id() << " " << args_[0]; @@ -595,8 +603,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { // Fast path - for uncontended keys, just run the callback. // That applies for single key operations like set, get, lpush etc. if (shard->db_slice().CheckLock(mode, lock_args)) { - RunQuickie(); // TODO: for journal - this can become multi-shard - // transaction on replica. + RunQuickie(shard); return true; } @@ -604,12 +611,11 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { txid_ = op_seq.fetch_add(1, std::memory_order_relaxed); sd.pq_pos = shard->txq()->Insert(this); - DCHECK_EQ(0, sd.local_mask & KEYS_ACQUIRED); + DCHECK_EQ(0, sd.local_mask & KEYLOCK_ACQUIRED); bool lock_acquired = shard->db_slice().Acquire(mode, lock_args); - sd.local_mask |= KEYS_ACQUIRED; + sd.local_mask |= KEYLOCK_ACQUIRED; DCHECK(!lock_acquired); // Because CheckLock above failed. - state_mask_.fetch_or(SCHEDULED, memory_order_release); DVLOG(1) << "Rescheduling into TxQueue " << DebugId(); shard->PollExecution(nullptr); @@ -637,11 +643,13 @@ pair Transaction::ScheduleInShard(EngineShard* shard) { auto& sd = shard_data_[sid]; if (!spans_all) { + bool shard_unlocked = shard->shard_lock()->Check(mode); lock_args = GetLockArgs(shard->shard_id()); + // we need to acquire the lock unrelated to shard_unlocked since we register into Tx queue. // All transactions in the queue must acquire the intent lock. - lock_granted = shard->db_slice().Acquire(mode, lock_args); - sd.local_mask |= KEYS_ACQUIRED; + lock_granted = shard->db_slice().Acquire(mode, lock_args) && shard_unlocked; + sd.local_mask |= KEYLOCK_ACQUIRED; DVLOG(1) << "Lock granted " << lock_granted << " for trans " << DebugId(); } @@ -657,9 +665,9 @@ pair Transaction::ScheduleInShard(EngineShard* shard) { // reorder the queue. Currently, this optimization is disabled: barrier_ts < pq->HeadScore(). bool to_proceed = lock_granted || txq->TailScore() < txid_; if (!to_proceed) { - if (sd.local_mask & KEYS_ACQUIRED) { // rollback the lock. + if (sd.local_mask & KEYLOCK_ACQUIRED) { // rollback the lock. shard->db_slice().Release(mode, lock_args); - sd.local_mask &= ~KEYS_ACQUIRED; + sd.local_mask &= ~KEYLOCK_ACQUIRED; } return result; // false, false @@ -694,11 +702,11 @@ bool Transaction::CancelInShard(EngineShard* shard) { DCHECK(trans == this) << "Pos " << pos << ", pq size " << pq->size() << ", trans " << trans; pq->Remove(pos); - if (sd.local_mask & KEYS_ACQUIRED) { + if (sd.local_mask & KEYLOCK_ACQUIRED) { auto mode = Mode(); auto lock_args = GetLockArgs(shard->shard_id()); shard->db_slice().Release(mode, lock_args); - sd.local_mask &= ~KEYS_ACQUIRED; + sd.local_mask &= ~KEYLOCK_ACQUIRED; } return true; } diff --git a/server/transaction.h b/server/transaction.h index 5c47056..88f0fcb 100644 --- a/server/transaction.h +++ b/server/transaction.h @@ -47,16 +47,10 @@ class Transaction { using RunnableType = std::function; using time_point = ::std::chrono::steady_clock::time_point; - enum LocalState : uint8_t { - ARMED = 1, // Transaction was armed with the callback + enum LocalMask : uint16_t { + ARMED = 1, // Transaction was armed with the callback OUT_OF_ORDER = 2, - KEYS_ACQUIRED = 4, - }; - - enum State : uint8_t { - SCHEDULED = 1, - RUNNING = 2, // For running multi-hop execution callbacks. - AFTERRUN = 4, // Once transaction finished running. + KEYLOCK_ACQUIRED = 4, }; Transaction(const CommandId* cid, EngineShardSet* ess); @@ -172,7 +166,7 @@ class Transaction { void ExecuteAsync(bool concluding_cb); // Optimized version of RunInShard for single shard uncontended cases. - void RunQuickie(); + void RunQuickie(EngineShard* shard); //! Returns true if transaction run out-of-order during the scheduling phase. bool ScheduleUniqueShard(EngineShard* shard);