diff --git a/core/op_status.h b/core/op_status.h index af8e995..c44c97c 100644 --- a/core/op_status.h +++ b/core/op_status.h @@ -14,6 +14,7 @@ enum class OpStatus : uint16_t { KEY_NOTFOUND, SKIPPED, WRONG_TYPE, + TIMED_OUT, }; class OpResultBase { diff --git a/server/command_registry.cc b/server/command_registry.cc index 72a8d41..0599f24 100644 --- a/server/command_registry.cc +++ b/server/command_registry.cc @@ -94,6 +94,8 @@ const char* OptName(CO::CommandOpt fl) { return "admin"; case NOSCRIPT: return "noscript"; + case BLOCKING: + return "blocking"; case GLOBAL_TRANS: return "global-trans"; } diff --git a/server/command_registry.h b/server/command_registry.h index 2b50bdf..df26ee0 100644 --- a/server/command_registry.h +++ b/server/command_registry.h @@ -28,6 +28,7 @@ enum CommandOpt : uint32_t { RANDOM = 0x40, ADMIN = 0x80, // implies NOSCRIPT, NOSCRIPT = 0x100, + BLOCKING = 0x200, GLOBAL_TRANS = 0x1000, }; diff --git a/server/conn_context.h b/server/conn_context.h index 0f51d04..31ee31f 100644 --- a/server/conn_context.h +++ b/server/conn_context.h @@ -51,6 +51,7 @@ class ConnectionContext : public ReplyBuilder { struct DebugInfo { uint32_t shards_count = 0; TxClock clock = 0; + bool is_ooo = false; }; DebugInfo last_command_debug; diff --git a/server/db_slice.cc b/server/db_slice.cc index aea6ff5..8e4290a 100644 --- a/server/db_slice.cc +++ b/server/db_slice.cc @@ -4,6 +4,10 @@ #include "server/db_slice.h" +extern "C" { +#include "redis/object.h" +} + #include <boost/fiber/fiber.hpp> #include <boost/fiber/operations.hpp> @@ -42,12 +46,17 @@ void DbSlice::Reserve(DbIndex db_ind, size_t key_size) { db->main_table.reserve(key_size); } -auto DbSlice::Find(DbIndex db_index, std::string_view key, unsigned obj_type) const -> OpResult<MainIterator> { +auto DbSlice::Find(DbIndex db_index, std::string_view key, unsigned req_obj_type) const + -> OpResult<MainIterator> { auto [it, expire_it] = FindExt(db_index, key); if (!IsValid(it)) return OpStatus::KEY_NOTFOUND; + if (it->second.ObjType() != req_obj_type) { + return OpStatus::WRONG_TYPE; + } + return it; } @@ -78,19 +87,58 @@ pair<MainIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, std::string_ return make_pair(it, expire_it); } +OpResult<pair<MainIterator, unsigned>> DbSlice::FindFirst(DbIndex db_index, const ArgSlice& args) { + DCHECK(!args.empty()); + + for (unsigned i = 0; i < args.size(); ++i) { + string_view s = args[i]; + OpResult<MainIterator> res = Find(db_index, s, OBJ_LIST); + if (res) + return make_pair(res.value(), i); + if (res.status() != OpStatus::KEY_NOTFOUND) + return res.status(); + } + + VLOG(1) << "FindFirst " << args.front() << " not found"; + return OpStatus::KEY_NOTFOUND; +} + auto DbSlice::AddOrFind(DbIndex db_index, std::string_view key) -> pair<MainIterator, bool> { DCHECK(IsDbValid(db_index)); auto& db = db_arr_[db_index]; + MainIterator existing; pair<MainIterator, bool> res = db->main_table.emplace(key, MainValue{}); if (res.second) { // new entry db->stats.obj_memory_usage += res.first->first.capacity(); return make_pair(res.first, true); } + existing = res.first; - return res; + DCHECK(IsValid(existing)); + + if (existing->second.HasExpire()) { + auto expire_it = db->expire_table.find(existing->first); + CHECK(IsValid(expire_it)); + + if (expire_it->second <= now_ms_) { + db->expire_table.erase(expire_it); + + // Keep the entry but free the object. + db->stats.obj_memory_usage -= existing->second.str.capacity(); + existing->second.obj_type = OBJ_STRING; + if (existing->second.robj) { + decrRefCountVoid(existing->second.robj); + existing->second.robj = nullptr; + } + + return make_pair(existing, true); + } + } + + return make_pair(existing, false); } void DbSlice::ActivateDb(DbIndex db_ind) { @@ -152,7 +200,6 @@ size_t DbSlice::FlushDb(DbIndex db_ind) { return removed; } - // Returns true if a state has changed, false otherwise. bool DbSlice::Expire(DbIndex db_ind, MainIterator it, uint64_t at) { auto& db = db_arr_[db_ind]; @@ -177,8 +224,7 @@ void DbSlice::AddNew(DbIndex db_ind, string_view key, MainValue obj, uint64_t ex CHECK(AddIfNotExist(db_ind, key, std::move(obj), expire_at_ms)); } -bool DbSlice::AddIfNotExist(DbIndex db_ind, string_view key, MainValue obj, - uint64_t expire_at_ms) { +bool DbSlice::AddIfNotExist(DbIndex db_ind, string_view key, MainValue obj, uint64_t expire_at_ms) { auto& db = db_arr_[db_ind]; auto [new_entry, success] = db->main_table.emplace(key, obj); @@ -251,8 +297,7 @@ void DbSlice::Release(IntentLock::Mode mode, const KeyLockArgs& lock_args) { } } -void DbSlice::Release(IntentLock::Mode mode, DbIndex db_index, string_view key, - unsigned count) { +void DbSlice::Release(IntentLock::Mode mode, DbIndex db_index, string_view key, unsigned count) { DVLOG(1) << "Release " << IntentLock::ModeName(mode) << " " << count << " for " << key; auto& lt = db_arr_[db_index]->lock_table; diff --git a/server/db_slice.h b/server/db_slice.h index e6697f8..9218f79 100644 --- a/server/db_slice.h +++ b/server/db_slice.h @@ -47,6 +47,10 @@ class DbSlice { // Returns (value, expire) dict entries if key exists, null if it does not exist or has expired. std::pair<MainIterator, ExpireIterator> FindExt(DbIndex db_ind, std::string_view key) const; + // Returns dictEntry, args-index if found, KEY_NOTFOUND otherwise. + // If multiple keys are found, returns the first index in the ArgSlice. + OpResult<std::pair<MainIterator, unsigned>> FindFirst(DbIndex db_index, const ArgSlice& args); + // Return .second=true if insertion ocurred, false if we return the existing key. std::pair<MainIterator, bool> AddOrFind(DbIndex db_ind, std::string_view key); diff --git a/server/dragonfly_test.cc b/server/dragonfly_test.cc index c849ddd..cd56200 100644 --- a/server/dragonfly_test.cc +++ b/server/dragonfly_test.cc @@ -70,15 +70,16 @@ TEST_F(DflyEngineTest, Multi) { ASSERT_FALSE(service_->IsLocked(0, kKey1)); ASSERT_FALSE(service_->IsLocked(0, kKey4)); + ASSERT_FALSE(service_->IsShardSetLocked()); } - TEST_F(DflyEngineTest, MultiEmpty) { RespVec resp = Run({"multi"}); ASSERT_THAT(resp, RespEq("OK")); resp = Run({"exec"}); ASSERT_THAT(resp[0], ArrLen(0)); + ASSERT_FALSE(service_->IsShardSetLocked()); } TEST_F(DflyEngineTest, MultiSeq) { @@ -95,6 +96,7 @@ TEST_F(DflyEngineTest, MultiSeq) { ASSERT_FALSE(service_->IsLocked(0, kKey1)); ASSERT_FALSE(service_->IsLocked(0, kKey4)); + ASSERT_FALSE(service_->IsShardSetLocked()); EXPECT_THAT(resp, ElementsAre(StrArg("OK"), StrArg("1"), ArrLen(2))); const RespExpr::Vec& arr = *get<RespVec*>(resp[2].u); @@ -139,6 +141,7 @@ TEST_F(DflyEngineTest, MultiConsistent) { fb.join(); ASSERT_FALSE(service_->IsLocked(0, kKey1)); ASSERT_FALSE(service_->IsLocked(0, kKey4)); + ASSERT_FALSE(service_->IsShardSetLocked()); } TEST_F(DflyEngineTest, MultiRename) { @@ -153,6 +156,65 @@ TEST_F(DflyEngineTest, MultiRename) { EXPECT_THAT(resp, ElementsAre(StrArg("OK"), StrArg("OK"))); ASSERT_FALSE(service_->IsLocked(0, kKey1)); ASSERT_FALSE(service_->IsLocked(0, kKey4)); + ASSERT_FALSE(service_->IsShardSetLocked()); } +TEST_F(DflyEngineTest, MultiHop) { + Run({"set", kKey1, "1"}); + + auto p1_fb = pp_->at(1)->LaunchFiber([&] { + for (int i = 0; i < 100; ++i) { + auto resp = Run({"rename", kKey1, kKey2}); + ASSERT_THAT(resp, RespEq("OK")); + EXPECT_EQ(2, GetDebugInfo("IO1").shards_count); + + resp = Run({"rename", kKey2, kKey1}); + ASSERT_THAT(resp, RespEq("OK")); + } + }); + + // mset should be executed either as ooo or via tx-queue because previous transactions + // have been unblocked and executed as well. In other words, this mset should never block + // on serializability constraints. + auto p2_fb = pp_->at(2)->LaunchFiber([&] { + for (int i = 0; i < 100; ++i) { + Run({"mset", kKey3, "1", kKey4, "2"}); + } + }); + + p1_fb.join(); + p2_fb.join(); +} + +TEST_F(DflyEngineTest, FlushDb) { + Run({"mset", kKey1, "1", kKey4, "2"}); + auto resp = Run({"flushdb"}); + ASSERT_THAT(resp, RespEq("OK")); + + auto fb0 = pp_->at(0)->LaunchFiber([&] { + for (unsigned i = 0; i < 100; ++i) { + Run({"flushdb"}); + } + }); + + pp_->at(1)->AwaitBlocking([&] { + for (unsigned i = 0; i < 100; ++i) { + Run({"mset", kKey1, "1", kKey4, "2"}); + auto resp = Run({"exists", kKey1, kKey4}); + int64_t ival = get<int64_t>(resp[0].u); + ASSERT_TRUE(ival == 0 || ival == 2) << i << " " << ival; + } + }); + + fb0.join(); + + ASSERT_FALSE(service_->IsLocked(0, kKey1)); + ASSERT_FALSE(service_->IsLocked(0, kKey4)); + ASSERT_FALSE(service_->IsShardSetLocked()); +} + +// TODO: to test transactions with a single shard since then all transactions become local. +// To consider having a parameter in dragonfly engine controlling number of shards +// unconditionally from number of cpus. TO TEST BLPOP under multi for single/multi argument case. + } // namespace dfly diff --git a/server/engine_shard_set.h b/server/engine_shard_set.h index 231961b..7a729e8 100644 --- a/server/engine_shard_set.h +++ b/server/engine_shard_set.h @@ -135,12 +135,12 @@ class EngineShardSet { } // Runs a brief function on all shards. Waits for it to complete. - template <typename U> void RunBriefInParallel(U&& func) { + template <typename U> void RunBriefInParallel(U&& func) const { RunBriefInParallel(std::forward<U>(func), [](auto i) { return true; }); } // Runs a brief function on selected shards. Waits for it to complete. - template <typename U, typename P> void RunBriefInParallel(U&& func, P&& pred); + template <typename U, typename P> void RunBriefInParallel(U&& func, P&& pred) const; template <typename U> void RunBlockingInParallel(U&& func); @@ -149,7 +149,8 @@ class EngineShardSet { std::vector<util::fibers_ext::FiberQueue*> shard_queue_; }; -template <typename U, typename P> void EngineShardSet::RunBriefInParallel(U&& func, P&& pred) { +template <typename U, typename P> +void EngineShardSet::RunBriefInParallel(U&& func, P&& pred) const { util::fibers_ext::BlockingCounter bc{0}; for (uint32_t i = 0; i < size(); ++i) { diff --git a/server/generic_family.cc b/server/generic_family.cc index 737ba17..8f4ecd9 100644 --- a/server/generic_family.cc +++ b/server/generic_family.cc @@ -26,8 +26,7 @@ class Renamer { Renamer(DbIndex dind, ShardId source_id) : db_indx_(dind), src_sid_(source_id) { } - // TODO: to implement locking semantics. - OpResult<void> FindAndLock(ShardId shard_id, const ArgSlice& args); + OpResult<void> Find(ShardId shard_id, const ArgSlice& args); OpResult<void> status() const { return status_; @@ -36,75 +35,82 @@ class Renamer { Transaction::RunnableType Finalize(bool skip_exist_dest); private: - void SwapValues(EngineShard* shard, const ArgSlice& args); + void MoveValues(EngineShard* shard, const ArgSlice& args); DbIndex db_indx_; ShardId src_sid_; - pair<MainIterator, ExpireIterator> find_res_[2]; - uint64_t expire_; - MainValue src_val_; + struct FindResult { + string_view key; + MainValue val; + uint64_t expire_ts; + bool found = false; + }; + + FindResult src_res_, dest_res_; // index 0 for source, 1 for destination OpResult<void> status_; }; -OpResult<void> Renamer::FindAndLock(ShardId shard_id, const ArgSlice& args) { +OpResult<void> Renamer::Find(ShardId shard_id, const ArgSlice& args) { CHECK_EQ(1u, args.size()); - unsigned indx = (shard_id == src_sid_) ? 0 : 1; + FindResult* res = (shard_id == src_sid_) ? &src_res_ : &dest_res_; - find_res_[indx] = EngineShard::tlocal()->db_slice().FindExt(db_indx_, args.front()); + res->key = args.front(); + auto [it, exp_it] = EngineShard::tlocal()->db_slice().FindExt(db_indx_, res->key); + + res->found = IsValid(it); + if (IsValid(it)) { + res->val = it->second; // TODO: won't work for robj because we copy pointers. + res->expire_ts = IsValid(exp_it) ? exp_it->second : 0; + } return OpStatus::OK; }; -void Renamer::SwapValues(EngineShard* shard, const ArgSlice& args) { - auto& dest = find_res_[1]; +void Renamer::MoveValues(EngineShard* shard, const ArgSlice& args) { auto shard_id = shard->shard_id(); - // NOTE: This object juggling between shards won't work if we want to maintain heap per shard - // model. + // TODO: when we want to maintain heap per shard model this code will require additional + // work if (shard_id == src_sid_) { // Handle source key. // delete the source entry. - CHECK(shard->db_slice().Del(db_indx_, find_res_[0].first)); + auto it = shard->db_slice().FindExt(db_indx_, src_res_.key).first; + CHECK(shard->db_slice().Del(db_indx_, it)); return; } // Handle destination - MainIterator dest_it = dest.first; + string_view dest_key = dest_res_.key; + MainIterator dest_it = shard->db_slice().FindExt(db_indx_, dest_key).first; if (IsValid(dest_it)) { - dest_it->second = std::move(src_val_); // we just move the source. - shard->db_slice().Expire(db_indx_, dest_it, expire_); + // we just move the source. We won't be able to do it with heap per shard model. + dest_it->second = std::move(src_res_.val); + shard->db_slice().Expire(db_indx_, dest_it, src_res_.expire_ts); } else { // we just add the key to destination with the source object. - string_view key = args.front(); // from key - shard->db_slice().AddNew(db_indx_, key, std::move(src_val_), expire_); + shard->db_slice().AddNew(db_indx_, dest_key, src_res_.val, src_res_.expire_ts); } } Transaction::RunnableType Renamer::Finalize(bool skip_exist_dest) { - const auto& src = find_res_[0]; - const auto& dest = find_res_[1]; - auto cleanup = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; - if (!IsValid(src.first)) { + if (!src_res_.found) { status_ = OpStatus::KEY_NOTFOUND; return cleanup; } - if (IsValid(dest.first) && skip_exist_dest) { + if (dest_res_.found && skip_exist_dest) { status_ = OpStatus::KEY_EXISTS; return cleanup; } - expire_ = IsValid(src.second) ? src.second->second : 0; - src_val_ = std::move(src.first->second); - // Src key exist and we need to override the destination. return [this](Transaction* t, EngineShard* shard) { - this->SwapValues(shard, t->ShardArgsInShard(shard->shard_id())); + this->MoveValues(shard, t->ShardArgsInShard(shard->shard_id())); return OpStatus::OK; }; @@ -295,7 +301,7 @@ OpResult<void> GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des transaction->Execute( [&renamer](Transaction* t, EngineShard* shard) { auto args = t->ShardArgsInShard(shard->shard_id()); - return renamer.FindAndLock(shard->shard_id(), args).status(); + return renamer.Find(shard->shard_id(), args).status(); }, false); diff --git a/server/list_family_test.cc b/server/list_family_test.cc index ac370bb..e422a36 100644 --- a/server/list_family_test.cc +++ b/server/list_family_test.cc @@ -39,5 +39,18 @@ TEST_F(ListFamilyTest, Basic) { ASSERT_THAT(resp[0], IntArg(1)); } +TEST_F(ListFamilyTest, Expire) { + auto resp = Run({"lpush", kKey1, "1"}); + EXPECT_THAT(resp[0], IntArg(1)); + + constexpr uint64_t kNow = 232279092000; + UpdateTime(kNow); + resp = Run({"expire", kKey1, "1"}); + EXPECT_THAT(resp[0], IntArg(1)); + + UpdateTime(kNow + 1000); + resp = Run({"lpush", kKey1, "1"}); + EXPECT_THAT(resp[0], IntArg(1)); +} } // namespace dfly diff --git a/server/main_service.cc b/server/main_service.cc index 696bb10..fd5bff1 100644 --- a/server/main_service.cc +++ b/server/main_service.cc @@ -50,7 +50,7 @@ constexpr size_t kMaxThreadSize = 1024; } // namespace -Service::Service(ProactorPool* pp) : shard_set_(pp), pp_(*pp), server_family_(this) { +Service::Service(ProactorPool* pp) : shard_set_(pp), pp_(*pp), server_family_(this) { CHECK(pp); // We support less than 1024 threads. @@ -89,7 +89,7 @@ void Service::Shutdown() { request_latency_usec.Shutdown(); ping_qps.Shutdown(); - // to shutdown all the runtime components that depend on EngineShard. + // to shutdown all the runtime components that depend on EngineShard. server_family_.Shutdown(); StringFamily::Shutdown(); GenericFamily::Shutdown(); @@ -174,7 +174,9 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) { request_latency_usec.IncBy(cmd_str, (end_usec - start_usec) / 1000); if (dist_trans) { cntx->last_command_debug.clock = dist_trans->txid(); + cntx->last_command_debug.is_ooo = dist_trans->IsOOO(); } + cntx->transaction = nullptr; } void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view value, @@ -231,6 +233,17 @@ bool Service::IsLocked(DbIndex db_index, std::string_view key) const { return !is_open; } +bool Service::IsShardSetLocked() const { + std::atomic_uint res{0}; + + shard_set_.RunBriefInParallel([&](EngineShard* shard) { + bool unlocked = shard->shard_lock()->Check(IntentLock::SHARED); + res.fetch_add(!unlocked, memory_order_relaxed); + }); + + return res.load() != 0; +} + void Service::RegisterHttp(HttpListenerBase* listener) { CHECK_NOTNULL(listener); } @@ -240,6 +253,15 @@ void Service::Quit(CmdArgList args, ConnectionContext* cntx) { cntx->CloseConnection(); } +void Service::Multi(CmdArgList args, ConnectionContext* cntx) { + if (cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE) { + return cntx->SendError("MULTI calls can not be nested"); + } + cntx->conn_state.exec_state = ConnectionState::EXEC_COLLECT; + // TODO: to protect against huge exec transactions. + return cntx->SendOk(); +} + void Service::Exec(CmdArgList args, ConnectionContext* cntx) { if (cntx->conn_state.exec_state == ConnectionState::EXEC_INACTIVE) { return cntx->SendError("EXEC without MULTI"); @@ -280,15 +302,6 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) { VLOG(1) << "Exec completed"; } -void Service::Multi(CmdArgList args, ConnectionContext* cntx) { - if (cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE) { - return cntx->SendError("MULTI calls can not be nested"); - } - cntx->conn_state.exec_state = ConnectionState::EXEC_COLLECT; - // TODO: to protect against huge exec transactions. - return cntx->SendOk(); -} - VarzValue::Map Service::GetVarzStats() { VarzValue::Map res; @@ -306,8 +319,7 @@ using ServiceFunc = void (Service::*)(CmdArgList, ConnectionContext* cntx); void Service::RegisterCommands() { using CI = CommandId; - constexpr auto kExecMask = - CO::LOADING | CO::NOSCRIPT | CO::GLOBAL_TRANS; + constexpr auto kExecMask = CO::LOADING | CO::NOSCRIPT | CO::GLOBAL_TRANS; auto cb_exec = [this](CmdArgList sp, ConnectionContext* cntx) { this->Exec(std::move(sp), cntx); diff --git a/server/main_service.h b/server/main_service.h index c7b2f41..5cc33c4 100644 --- a/server/main_service.h +++ b/server/main_service.h @@ -47,6 +47,7 @@ class Service { // Used by tests. bool IsLocked(DbIndex db_index, std::string_view key) const; + bool IsShardSetLocked() const; EngineShardSet& shard_set() { return shard_set_; diff --git a/server/reply_builder.cc b/server/reply_builder.cc index e394485..d30c37e 100644 --- a/server/reply_builder.cc +++ b/server/reply_builder.cc @@ -208,4 +208,18 @@ void ReplyBuilder::SendSimpleStrArr(const std::string_view* arr, uint32_t count) serializer_->SendDirect(res); } +void ReplyBuilder::SendNullArray() { + as_resp()->SendDirect("*-1\r\n"); +} + +void ReplyBuilder::SendStringArr(absl::Span<const std::string_view> arr) { + string res = absl::StrCat("*", arr.size(), kCRLF); + + for (size_t i = 0; i < arr.size(); ++i) { + StrAppend(&res, "$", arr[i].size(), kCRLF); + res.append(arr[i]).append(kCRLF); + } + as_resp()->SendDirect(res); +} + } // namespace dfly diff --git a/server/reply_builder.h b/server/reply_builder.h index 9c1bac9..48f7361 100644 --- a/server/reply_builder.h +++ b/server/reply_builder.h @@ -108,6 +108,9 @@ class ReplyBuilder { // Resp specific. // This one is prefixed with + and with clrf added automatically to each item.. void SendSimpleStrArr(const std::string_view* arr, uint32_t count); + void SendNullArray(); + + void SendStringArr(absl::Span<const std::string_view> arr); void SendNull() { as_resp()->SendNull(); diff --git a/server/transaction.cc b/server/transaction.cc index 714af63..c6a455d 100644 --- a/server/transaction.cc +++ b/server/transaction.cc @@ -264,13 +264,18 @@ bool Transaction::RunInShard(EngineShard* shard) { // If it's a final hop we should release the locks. if (should_release) { - KeyLockArgs largs = GetLockArgs(idx); + if (IsGlobal()) { + shard->shard_lock()->Release(Mode()); + } else { // not global. + KeyLockArgs largs = GetLockArgs(idx); - // If a transaction has been suspended, we keep the lock so that future transaction - // touching those keys will be ordered via TxQueue. It's necessary because we preserve - // the atomicity of awaked transactions by halting the TxQueue. - shard->db_slice().Release(Mode(), largs); - sd.local_mask &= ~KEYLOCK_ACQUIRED; + // If a transaction has been suspended, we keep the lock so that future transaction + // touching those keys will be ordered via TxQueue. It's necessary because we preserve + // the atomicity of awaked transactions by halting the TxQueue. + shard->db_slice().Release(Mode(), largs); + sd.local_mask &= ~KEYLOCK_ACQUIRED; + sd.local_mask &= ~OUT_OF_ORDER; + } } CHECK_GE(DecreaseRunCnt(), 1u); @@ -405,7 +410,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { // Runs in the coordinator fiber. void Transaction::UnlockMulti() { - VLOG(1) << "UnlockMulti"; + VLOG(1) << "UnlockMulti " << DebugId(); DCHECK(multi_); using KeyList = vector<pair<std::string_view, LockCnt>>; @@ -437,7 +442,8 @@ void Transaction::UnlockMulti() { auto& sd = shard_data_[SidToId(shard->shard_id())]; // It does not have to be that all shards in multi transaction execute this tx. - // Hence it could stay in the tx queue. We perform the necessary cleanup and remove it from there. + // Hence it could stay in the tx queue. We perform the necessary cleanup and remove it from + // there. if (sd.pq_pos != TxQueue::kEnd) { TxQueue* txq = shard->txq(); DCHECK(!txq->Empty()); @@ -461,6 +467,8 @@ void Transaction::UnlockMulti() { } WaitForShardCallbacks(); DCHECK_GE(use_count(), 1u); + + VLOG(1) << "UnlockMultiEnd " << DebugId(); } // Runs in coordinator thread. @@ -517,21 +525,23 @@ void Transaction::ExecuteAsync(bool concluding_cb) { // We verify seq lock has the same generation number. See below for more info. auto cb = [seq, this] { EngineShard* shard = EngineShard::tlocal(); - DVLOG(2) << "EngineShard::Exec " << DebugId() << " sid:" << shard->shard_id() << " " - << run_count_.load(memory_order_relaxed); uint16_t local_mask = GetLocalMask(shard->shard_id()); // we use fetch_add with release trick to make sure that local_mask is loaded before // we load seq_after. We could gain similar result with "atomic_thread_fence(acquire)" uint32_t seq_after = seqlock_.fetch_add(0, memory_order_release); + bool should_poll = (seq_after == seq) && (local_mask & ARMED); + + DVLOG(2) << "EngineShard::Exec " << DebugId() << " sid:" << shard->shard_id() << " " + << run_count_.load(memory_order_relaxed) << ", should_poll: " << should_poll; // We verify that this callback is still relevant. // If we still have the same sequence number and local_mask is ARMED it means // the coordinator thread has not crossed WaitForShardCallbacks barrier. // Otherwise, this callback is redundant. We may still call PollExecution but // we should not pass this to it since it can be in undefined state for this callback. - if (seq_after == seq && (local_mask & ARMED)) { + if (should_poll) { // shard->PollExecution(this) does not necessarily execute this transaction. // Therefore, everything that should be handled during the callback execution // should go into RunInShard. @@ -561,6 +571,7 @@ void Transaction::RunQuickie(EngineShard* shard) { DCHECK_EQ(0u, txid_); shard->IncQuickRun(); + auto& sd = shard_data_[0]; DCHECK_EQ(0, sd.local_mask & (KEYLOCK_ACQUIRED | OUT_OF_ORDER)); @@ -734,13 +745,14 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const { } inline uint32_t Transaction::DecreaseRunCnt() { + // to protect against cases where Transaction is destroyed before run_ec_.notify + // finishes running. We can not put it inside the (res == 1) block because then it's too late. + ::boost::intrusive_ptr guard(this); + // We use release so that no stores will be reordered after. uint32_t res = run_count_.fetch_sub(1, std::memory_order_release); if (res == 1) { - // to protect against cases where Transaction is destroyed before run_ec_.notify - // finishes running. - ::boost::intrusive_ptr guard(this); run_ec_.notify(); } return res; diff --git a/server/transaction.h b/server/transaction.h index 88f0fcb..5a4c00a 100644 --- a/server/transaction.h +++ b/server/transaction.h @@ -93,10 +93,6 @@ class Transaction { return shard_data_[SidToId(sid)].local_mask; } - uint32_t GetStateMask() const { - return state_mask_.load(std::memory_order_relaxed); - } - // Schedules a transaction. Usually used for multi-hop transactions like Rename or BLPOP. // For single hop, use ScheduleSingleHop instead. void Schedule() { @@ -148,6 +144,10 @@ class Transaction { bool IsGlobal() const; + bool IsOOO() const { + return false; + } + EngineShardSet* shard_set() { return ess_; } @@ -257,9 +257,6 @@ class Transaction { uint32_t trans_options_ = 0; - // Written by coordination thread but may be read by Shard threads. - // A mask of State values. Mostly used for debugging and for invariant checks. - std::atomic<uint16_t> state_mask_{0}; ShardId unique_shard_id_{kInvalidSid}; DbIndex db_index_ = 0;