diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index c12806d..bd4a3ac 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -5,7 +5,7 @@ add_library(dragonfly_lib command_registry.cc common.cc config_flags.cc conn_context.cc db_slice.cc debugcmd.cc dragonfly_listener.cc dragonfly_connection.cc engine_shard_set.cc main_service.cc memcache_parser.cc - redis_parser.cc reply_builder.cc string_family.cc) + redis_parser.cc reply_builder.cc string_family.cc transaction.cc) cxx_link(dragonfly_lib dfly_core uring_fiber_lib fibers_ext strings_lib http_server_lib tls_lib) diff --git a/server/db_slice.cc b/server/db_slice.cc index e2cc120..73b1d4d 100644 --- a/server/db_slice.cc +++ b/server/db_slice.cc @@ -20,14 +20,16 @@ using namespace util; DbSlice::DbSlice(uint32_t index, EngineShard* owner) : shard_id_(index), owner_(owner) { db_arr_.emplace_back(); - CreateDbRedis(0); + CreateDb(0); } DbSlice::~DbSlice() { + // we do not need this code but it's easier to debug in case we encounter + // memory allocation bugs during delete operations. for (auto& db : db_arr_) { - if (!db.main_table) + if (!db) continue; - db.main_table.reset(); + db.reset(); } } @@ -35,9 +37,9 @@ void DbSlice::Reserve(DbIndex db_ind, size_t key_size) { ActivateDb(db_ind); auto& db = db_arr_[db_ind]; - DCHECK(db.main_table); + DCHECK(db); - db.main_table->reserve(key_size); + db->main_table.reserve(key_size); } auto DbSlice::Find(DbIndex db_index, std::string_view key) const -> OpResult { @@ -50,11 +52,10 @@ auto DbSlice::Find(DbIndex db_index, std::string_view key) const -> OpResult DbSlice::FindExt(DbIndex db_ind, std::string_view key) const { - DCHECK_LT(db_ind, db_arr_.size()); - DCHECK(db_arr_[db_ind].main_table); + DCHECK(IsDbValid(db_ind)); auto& db = db_arr_[db_ind]; - MainIterator it = db.main_table->find(key); + MainIterator it = db->main_table.find(key); if (it == MainIterator{}) { return make_pair(it, ExpireIterator{}); @@ -62,14 +63,14 @@ pair DbSlice::FindExt(DbIndex db_ind, std::string_ ExpireIterator expire_it; if (it->second.HasExpire()) { // check expiry state - expire_it = db.expire_table->find(it->first); + expire_it = db->expire_table.find(it->first); CHECK(expire_it != ExpireIterator{}); if (expire_it->second <= now_ms_) { - db.expire_table->erase(expire_it); + db->expire_table.erase(expire_it); - db.stats.obj_memory_usage -= (it->first.capacity() + it->second.str.capacity()); - db.main_table->erase(it); + db->stats.obj_memory_usage -= (it->first.capacity() + it->second.str.capacity()); + db->main_table.erase(it); return make_pair(MainIterator{}, ExpireIterator{}); } } @@ -78,14 +79,13 @@ pair DbSlice::FindExt(DbIndex db_ind, std::string_ } auto DbSlice::AddOrFind(DbIndex db_index, std::string_view key) -> pair { - DCHECK_LT(db_index, db_arr_.size()); - DCHECK(db_arr_[db_index].main_table); + DCHECK(IsDbValid(db_index)); auto& db = db_arr_[db_index]; - pair res = db.main_table->emplace(key, MainValue{}); + pair res = db->main_table.emplace(key, MainValue{}); if (res.second) { // new entry - db.stats.obj_memory_usage += res.first->first.capacity(); + db->stats.obj_memory_usage += res.first->first.capacity(); return make_pair(res.first, true); } @@ -96,14 +96,13 @@ auto DbSlice::AddOrFind(DbIndex db_index, std::string_view key) -> pairsecond.HasExpire()) { - CHECK_EQ(1u, db.expire_table->erase(it->first)); + CHECK_EQ(1u, db->expire_table.erase(it->first)); it->second.SetExpire(false); return true; } if (!it->second.HasExpire() && at) { - CHECK(db.expire_table->emplace(it->first, at).second); + CHECK(db->expire_table.emplace(it->first, at).second); it->second.SetExpire(true); return true; @@ -132,18 +131,18 @@ void DbSlice::AddNew(DbIndex db_ind, std::string_view key, MainValue obj, uint64 } bool DbSlice::AddIfNotExist(DbIndex db_ind, std::string_view key, MainValue obj, - uint64_t expire_at_ms) { + uint64_t expire_at_ms) { auto& db = db_arr_[db_ind]; - auto [new_entry, success] = db.main_table->emplace(key, obj); + auto [new_entry, success] = db->main_table.emplace(key, obj); if (!success) return false; // in this case obj won't be moved and will be destroyed during unwinding. - db.stats.obj_memory_usage += (new_entry->first.capacity() + new_entry->second.str.capacity()); + db->stats.obj_memory_usage += (new_entry->first.capacity() + new_entry->second.str.capacity()); if (expire_at_ms) { new_entry->second.SetExpire(true); - CHECK(db.expire_table->emplace(new_entry->first, expire_at_ms).second); + CHECK(db->expire_table.emplace(new_entry->first, expire_at_ms).second); } return true; @@ -153,9 +152,83 @@ size_t DbSlice::DbSize(DbIndex db_ind) const { DCHECK_LT(db_ind, db_array_size()); if (IsDbValid(db_ind)) { - return db_arr_[db_ind].main_table->size(); + return db_arr_[db_ind]->main_table.size(); } return 0; } +bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) { + DCHECK(!lock_args.args.empty()); + + auto& lt = db_arr_[lock_args.db_index]->lock_table; + bool lock_acquired = true; + + if (lock_args.args.size() == 1) { + lock_acquired = lt[lock_args.args.front()].Acquire(mode); + } else { + uniq_keys_.clear(); + + for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { + auto s = lock_args.args[i]; + if (uniq_keys_.insert(s).second) { + bool res = lt[s].Acquire(mode); + lock_acquired &= res; + } + } + } + + DVLOG(2) << "Acquire " << IntentLock::ModeName(mode) << " for " << lock_args.args[0] + << " has_acquired: " << lock_acquired; + + return lock_acquired; +} + +void DbSlice::Release(IntentLock::Mode mode, const KeyLockArgs& lock_args) { + if (lock_args.args.size() == 1) { + Release(mode, lock_args.db_index, lock_args.args.front(), 1); + } else { + auto& lt = db_arr_[lock_args.db_index]->lock_table; + uniq_keys_.clear(); + for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { + auto s = lock_args.args[i]; + if (uniq_keys_.insert(s).second) { + auto it = lt.find(s); + CHECK(it != lt.end()); + it->second.Release(mode); + if (it->second.IsFree()) { + lt.erase(it); + } + } + } + DVLOG(1) << "Release " << IntentLock::ModeName(mode) << " for " << lock_args.args[0]; + } +} + +void DbSlice::Release(IntentLock::Mode mode, DbIndex db_index, std::string_view key, + unsigned count) { + DVLOG(1) << "Release " << IntentLock::ModeName(mode) << " " << count << " for " << key; + + auto& lt = db_arr_[db_index]->lock_table; + auto it = lt.find(key); + CHECK(it != lt.end()) << key; + it->second.Release(mode, count); + if (it->second.IsFree()) { + lt.erase(it); + } +} + +bool DbSlice::CheckLock(IntentLock::Mode mode, const KeyLockArgs& lock_args) const { + DCHECK(!lock_args.args.empty()); + + const auto& lt = db_arr_[lock_args.db_index]->lock_table; + for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { + auto s = lock_args.args[i]; + auto it = lt.find(s); + if (it != lt.end() && !it->second.Check(mode)) { + return false; + } + } + return true; +} + } // namespace dfly diff --git a/server/db_slice.h b/server/db_slice.h index c23de79..258e19e 100644 --- a/server/db_slice.h +++ b/server/db_slice.h @@ -4,8 +4,12 @@ #pragma once -#include "server/common_types.h" +#include +#include + +#include "core/intent_lock.h" #include "core/op_status.h" +#include "server/common_types.h" #include "server/table.h" namespace util { @@ -43,7 +47,6 @@ class DbSlice { // Returns (value, expire) dict entries if key exists, null if it does not exist or has expired. std::pair FindExt(DbIndex db_ind, std::string_view key) const; - // Return .second=true if insertion ocurred, false if we return the existing key. std::pair AddOrFind(DbIndex db_ind, std::string_view key); @@ -61,22 +64,31 @@ class DbSlice { // Creates a database with index `db_ind`. If such database exists does nothing. void ActivateDb(DbIndex db_ind); - ShardId shard_id() const { return shard_id_;} + ShardId shard_id() const { + return shard_id_; + } + + bool Acquire(IntentLock::Mode m, const KeyLockArgs& lock_args); + + void Release(IntentLock::Mode m, const KeyLockArgs& lock_args); + void Release(IntentLock::Mode m, DbIndex db_index, std::string_view key, unsigned count); + + // Returns true if all keys can be locked under m. Does not lock them though. + bool CheckLock(IntentLock::Mode m, const KeyLockArgs& lock_args) const; size_t db_array_size() const { return db_arr_.size(); } bool IsDbValid(DbIndex id) const { - return bool(db_arr_[id].main_table); + return id < db_arr_.size() && bool(db_arr_[id]); } // Returns existing keys count in the db. size_t DbSize(DbIndex db_ind) const; private: - - void CreateDbRedis(unsigned index); + void CreateDb(DbIndex index); ShardId shard_id_; @@ -84,14 +96,20 @@ class DbSlice { uint64_t now_ms_ = 0; // Used for expire logic, represents a real clock. - struct DbRedis { - std::unique_ptr main_table; - std::unique_ptr expire_table; + using LockTable = absl::flat_hash_map; + + struct DbWrapper { + MainTable main_table; + ExpireTable expire_table; + LockTable lock_table; mutable InternalDbStats stats; }; - std::vector db_arr_; + std::vector> db_arr_; + + // Used in temporary computations in Acquire/Release. + absl::flat_hash_set uniq_keys_; }; } // namespace dfly diff --git a/server/engine_shard_set.cc b/server/engine_shard_set.cc index d3deef1..966e5c0 100644 --- a/server/engine_shard_set.cc +++ b/server/engine_shard_set.cc @@ -5,6 +5,7 @@ #include "server/engine_shard_set.h" #include "base/logging.h" +#include "server/transaction.h" #include "util/fiber_sched_algo.h" #include "util/varz.h" @@ -12,14 +13,14 @@ namespace dfly { using namespace std; using namespace util; -namespace fibers = ::boost::fibers; namespace this_fiber = ::boost::this_fiber; +namespace fibers = ::boost::fibers; thread_local EngineShard* EngineShard::shard_ = nullptr; constexpr size_t kQueueLen = 64; EngineShard::EngineShard(util::ProactorBase* pb) - : queue_(kQueueLen), db_slice_(pb->GetIndex(), this) { + : queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), db_slice_(pb->GetIndex(), this) { fiber_q_ = fibers::fiber([this, index = pb->GetIndex()] { this_fiber::properties().set_name(absl::StrCat("shard_queue", index)); queue_.Run(); @@ -57,6 +58,105 @@ void EngineShard::DestroyThreadLocal() { VLOG(1) << "Shard reset " << index; } + +void EngineShard::RunContinuationTransaction() { + auto sid = shard_id(); + + if (continuation_trans_->IsArmedInShard(sid)) { + bool to_keep = continuation_trans_->RunInShard(sid); + DVLOG(1) << "RunContTransaction " << continuation_trans_->DebugId() << " keep: " << to_keep; + if (!to_keep) { + continuation_trans_ = nullptr; + } + } +} + +// Is called by Transaction::ExecuteAsync in order to run transaction tasks. +// Only runs in its own thread. +void EngineShard::Execute(Transaction* trans) { + ShardId sid = shard_id(); + + if (continuation_trans_) { + if (trans == continuation_trans_) + trans = nullptr; + RunContinuationTransaction(); + + // Once we start executing transaction we do not continue until it's finished. + // This preserves atomicity property of multi-hop transactions. + if (continuation_trans_) + return; + } + + DCHECK(!continuation_trans_); + + Transaction* head = nullptr; + string dbg_id; + while (!txq_.Empty()) { + auto val = txq_.Front(); + head = absl::get(val); + + bool is_armed = head->IsArmedInShard(sid); + if (!is_armed) + break; + + // It could be that head is processed and unblocks multi-hop transaction . + // The transaction will schedule again and will arm another callback. + // Then we will reach invalid state by running trans after this loop, + // which is not what we want. + // This function should not process 2 different callbacks for the same transaction. + // Hence we make sure to reset trans if it has been processed via tx-queue. + if (head == trans) + trans = nullptr; + TxId txid = head->txid(); + + // Could be equal to ts in case the same transaction had few hops. + DCHECK_LE(committed_txid_, txid); + + // We update committed_ts_ before calling Run() to avoid cases where a late transaction might + // try to push back this one. + committed_txid_ = txid; + if (VLOG_IS_ON(2)) { + dbg_id = head->DebugId(); + } + bool keep = head->RunInShard(sid); + // We should not access head from this point since RunInShard callback decrements refcount. + DLOG_IF(INFO, !dbg_id.empty()) << "RunHead " << dbg_id << ", keep " << keep; + txq_.PopFront(); + + if (keep) { + continuation_trans_ = head; + break; + } + } + + if (!trans) + return; + + if (txq_.Empty()) + return; + + // If trans is out of order, i.e. locks keys that previous transactions have not locked. + // It may be that there are other transactions that touch those keys but they necessary ordered + // after trans in the queue, hence it's safe to run trans out of order. + if (trans->IsOutOfOrder() && trans->IsArmedInShard(sid)) { + DCHECK(trans != head); + + dbg_id.clear(); + + uint32_t pos = trans->TxQueuePos(sid); + if (VLOG_IS_ON(1)) { + dbg_id = trans->DebugId(); + } + + bool keep = trans->RunInShard(sid); // resets TxQueuePos, this is why we get it before. + DLOG_IF(INFO, !dbg_id.empty()) << "Eager run " << sid << ", " << dbg_id << ", keep " << keep; + + // Should be enforced via Schedule(). TODO: to remove the check once the code is mature. + CHECK(!keep) << "multi-hop transactions can not be OOO."; + txq_.Remove(pos); + } +} + void EngineShardSet::Init(uint32_t sz) { CHECK_EQ(0u, size()); diff --git a/server/engine_shard_set.h b/server/engine_shard_set.h index 0ea6b40..23cf8ff 100644 --- a/server/engine_shard_set.h +++ b/server/engine_shard_set.h @@ -4,20 +4,19 @@ #pragma once - #include +#include "core/tx_queue.h" #include "server/db_slice.h" -#include "util/fibers/fibers_ext.h" #include "util/fibers/fiberqueue_threadpool.h" +#include "util/fibers/fibers_ext.h" #include "util/proactor_pool.h" namespace dfly { class EngineShard { public: - - //EngineShard() is private down below. + // EngineShard() is private down below. ~EngineShard(); static void InitThreadLocal(util::ProactorBase* pb); @@ -43,12 +42,36 @@ class EngineShard { return &queue_; } + // Executes a transaction. This transaction is pending in the queue. + void Execute(Transaction* trans); + + // Returns transaction queue. + TxQueue* txq() { + return &txq_; + } + + TxId committed_txid() const { + return committed_txid_; + } + + TxQueue::Iterator InsertTxQ(Transaction* trans) { + return txq_.Insert(trans); + } + private: EngineShard(util::ProactorBase* pb); + void RunContinuationTransaction(); + ::util::fibers_ext::FiberQueue queue_; ::boost::fibers::fiber fiber_q_; + TxQueue txq_; + + // Logical ts used to order distributed transactions. + TxId committed_txid_ = 0; + Transaction* continuation_trans_ = nullptr; + DbSlice db_slice_; uint32_t periodic_task_ = 0; @@ -80,7 +103,13 @@ class EngineShardSet { return shard_queue_[sid]->Add(std::forward(f)); } - template void RunBriefInParallel(U&& func); + // Runs a brief function on all shards. + template void RunBriefInParallel(U&& func) { + RunBriefInParallel(std::forward(func), [](auto i) { return true; }); + } + + template void RunBriefInParallel(U&& func, P&& pred); + template void RunBlockingInParallel(U&& func); private: @@ -88,16 +117,14 @@ class EngineShardSet { std::vector shard_queue_; }; -/** - * @brief - * - * @tparam U - a function that receives EngineShard* argument and returns void. - * @param func - */ -template void EngineShardSet::RunBriefInParallel(U&& func) { - util::fibers_ext::BlockingCounter bc{size()}; +template void EngineShardSet::RunBriefInParallel(U&& func, P&& pred) { + util::fibers_ext::BlockingCounter bc{0}; for (uint32_t i = 0; i < size(); ++i) { + if (!pred(i)) + continue; + + bc.Add(1); util::ProactorBase* dest = pp_->at(i); dest->AsyncBrief([f = std::forward(func), bc]() mutable { f(EngineShard::tlocal()); diff --git a/server/main_service.cc b/server/main_service.cc index c8ce021..7043387 100644 --- a/server/main_service.cc +++ b/server/main_service.cc @@ -15,6 +15,7 @@ #include "server/debugcmd.h" #include "server/error.h" #include "server/string_family.h" +#include "server/transaction.h" #include "util/metrics/metrics.h" #include "util/uring/uring_fiber_algo.h" #include "util/varz.h" @@ -27,7 +28,7 @@ namespace dfly { using namespace std; using namespace util; using base::VarzValue; - +using ::boost::intrusive_ptr; namespace fibers = ::boost::fibers; namespace this_fiber = ::boost::this_fiber; @@ -96,7 +97,23 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) { (cid->arity() < 0 && args.size() < size_t(-cid->arity()))) { return cntx->SendError(WrongNumArgsError(cmd_str)); } + uint64_t start_usec = ProactorBase::GetMonotonicTimeNs(), end_usec; + + // Create command transaction + intrusive_ptr dist_trans; + + if (cid->first_key_pos() > 0) { + dist_trans.reset(new Transaction{cid, &shard_set_}); + cntx->transaction = dist_trans.get(); + + if (cid->first_key_pos() > 0) { + dist_trans->InitByArgs(args); + } + } else { + cntx->transaction = nullptr; + } + cntx->cid = cid; cmd_req.Inc({cid->name()}); cid->Invoke(args, cntx); diff --git a/server/string_family.cc b/server/string_family.cc index 661664f..ffe84f3 100644 --- a/server/string_family.cc +++ b/server/string_family.cc @@ -11,6 +11,7 @@ #include "server/conn_context.h" #include "server/engine_shard_set.h" #include "server/error.h" +#include "server/transaction.h" #include "util/varz.h" namespace dfly { @@ -115,13 +116,14 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) { } } - ShardId sid = Shard(key, cntx->shard_set->size()); - OpResult result = cntx->shard_set->Await(sid, [&] { - EngineShard* es = EngineShard::tlocal(); - SetCmd cmd(&es->db_slice()); + DCHECK(cntx->transaction); - return cmd.Set(sparams, key, value); - }); + auto cb = [&](Transaction* t, EngineShard* shard) { + SetCmd sg(&shard->db_slice()); + auto status = sg.Set(sparams, key, value).status(); + return status; + }; + OpResult result = cntx->transaction->ScheduleSingleHop(std::move(cb)); if (result == OpStatus::OK) { return cntx->SendStored(); @@ -135,23 +137,23 @@ void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) { get_qps.Inc(); std::string_view key = ArgS(args, 1); - ShardId sid = Shard(key, cntx->shard_set->size()); - OpResult result = cntx->shard_set->Await(sid, [&] { - EngineShard* es = EngineShard::tlocal(); - OpResult opres_it = es->db_slice().Find(0, key); - OpResult res; - if (opres_it) { - res = opres_it.value()->second.str; - } else { - res = opres_it.status(); - } - return res; - }); + auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { + OpResult it_res = shard->db_slice().Find(0, key); + if (!it_res.ok()) + return it_res.status(); + + string val = it_res.value()->second.str; + + return val; + }; + + DVLOG(1) << "Before Get::ScheduleSingleHopT " << key; + Transaction* trans = cntx->transaction; + OpResult result = trans->ScheduleSingleHopT(std::move(cb)); if (result) { - DVLOG(1) << "GET " - << ": " << key << " " << result.value(); + DVLOG(1) << "GET " << trans->DebugId() << ": " << key << " " << result.value(); cntx->SendGetReply(key, 0, result.value()); } else { DVLOG(1) << "GET " << key << " nil"; @@ -169,7 +171,7 @@ void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) { ShardId sid = Shard(key, cntx->shard_set->size()); OpResult result = cntx->shard_set->Await(sid, [&] { - EngineShard* es = EngineShard::tlocal(); + EngineShard* es = EngineShard::tlocal(); SetCmd cmd(&es->db_slice()); return cmd.Set(sparams, key, value); diff --git a/server/transaction.cc b/server/transaction.cc new file mode 100644 index 0000000..2ce0f9e --- /dev/null +++ b/server/transaction.cc @@ -0,0 +1,569 @@ +// Copyright 2021, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/transaction.h" + +#include "base/logging.h" +#include "server/command_registry.h" +#include "server/db_slice.h" +#include "server/engine_shard_set.h" + +namespace dfly { + +using namespace std; +using namespace util; + +thread_local Transaction::TLTmpSpace Transaction::tmp_space; + +namespace { + +std::atomic_uint64_t op_seq{1}; + +constexpr size_t kTransSize = sizeof(Transaction); + + +} // namespace + + +IntentLock::Mode Transaction::Mode() const { + return (trans_options_ & CO::READONLY) ? IntentLock::SHARED : IntentLock::EXCLUSIVE; +} + +Transaction::~Transaction() { + DVLOG(2) << "Transaction " << DebugId() << " destroyed"; +} + +/** + * @brief Construct a new Transaction:: Transaction object + * + * @param cid + * @param ess + * @param cs + */ +Transaction::Transaction(const CommandId* cid, EngineShardSet* ess) : cid_(cid), ess_(ess) { + trans_options_ = cid_->opt_mask(); + + bool single_key = cid_->first_key_pos() > 0 && !cid_->is_multi_key(); + if (single_key) { + dist_.shard_data.resize(1); // Single key optimization + } else { + // Our shard_data is not sparse, so we must allocate for all threads :( + dist_.shard_data.resize(ess_->size()); + } +} + +/** + * + * There are 4 options that we consider here: + * a. T spans a single shard and its not multi. + * unique_shard_id_ is predefined before the schedule() is called. + * In that case only a single thread will be scheduled and it will use shard_data[0] just becase + * shard_data.size() = 1. Engine thread can access any data because there is schedule barrier + * between InitByArgs and RunInShard/IsArmedInShard functions. + * b. T spans multiple shards and its not multi + * In that case multiple threads will be scheduled. Similarly they have a schedule barrier, + * and IsArmedInShard can read any variable from shard_data[x]. + * c. Trans spans a single shard and it's multi. shard_data has size of ess_.size. + * IsArmedInShard will check shard_data[x]. + * d. Trans spans multiple shards and it's multi. Similarly shard_data[x] will be checked. + * unique_shard_cnt_ and unique_shard_id_ are not accessed until shard_data[x] is armed, hence + * we have a barrier between coordinator and engine-threads. Therefore there should not be + * data races. + * + **/ + +void Transaction::InitByArgs(CmdArgList args) { + CHECK_GT(args.size(), 1U); + CHECK_LT(size_t(cid_->first_key_pos()), args.size()); + DCHECK_EQ(unique_shard_cnt_, 0u); + + if (!cid_->is_multi_key()) { // Single key optimization. + auto key = ArgS(args, cid_->first_key_pos()); + args_.push_back(key); + + unique_shard_cnt_ = 1; + unique_shard_id_ = Shard(key, ess_->size()); + num_keys_ = 1; + return; + } + + CHECK(cid_->key_arg_step() == 1 || cid_->key_arg_step() == 2); + CHECK(cid_->key_arg_step() == 1 || (args.size() % 2) == 1); + + // Reuse thread-local temporary storage. Since this code is non-preemptive we can use it here. + auto& shard_index = tmp_space.shard_cache; + shard_index.resize(dist_.shard_data.size()); + for (auto& v : shard_index) { + v.Clear(); + } + + size_t key_end = cid_->last_key_pos() > 0 ? cid_->last_key_pos() + 1 + : (args.size() + 1 + cid_->last_key_pos()); + for (size_t i = 1; i < key_end; ++i) { + std::string_view key = ArgS(args, i); + uint32_t sid = Shard(key, dist_.shard_data.size()); + shard_index[sid].args.push_back(key); + shard_index[sid].original_index.push_back(i - 1); + ++num_keys_; + + if (cid_->key_arg_step() == 2) { // value + ++i; + auto val = ArgS(args, i); + shard_index[sid].args.push_back(val); + shard_index[sid].original_index.push_back(i - 1); + } + } + + args_.resize(key_end - 1); + dist_.reverse_index.resize(args_.size()); + + auto next_arg = args_.begin(); + auto rev_indx_it = dist_.reverse_index.begin(); + + // slice.arg_start/arg_count point to args_ array which is sorted according to shard of each key. + // reverse_index_[i] says what's the original position of args_[i] in args. + for (size_t i = 0; i < dist_.shard_data.size(); ++i) { + auto& sd = dist_.shard_data[i]; + auto& si = shard_index[i]; + CHECK_LT(si.args.size(), 1u << 15); + sd.arg_count = si.args.size(); + sd.arg_start = next_arg - args_.begin(); + sd.local_mask = 0; + if (!sd.arg_count) + continue; + + ++unique_shard_cnt_; + unique_shard_id_ = i; + uint32_t orig_indx = 0; + for (size_t j = 0; j < si.args.size(); ++j) { + *next_arg = si.args[j]; + *rev_indx_it = si.original_index[orig_indx]; + + ++next_arg; + ++orig_indx; + ++rev_indx_it; + } + } + + CHECK(next_arg == args_.end()); + DVLOG(1) << "InitByArgs " << DebugId(); + + if (unique_shard_cnt_ == 1) { + PerShardData* sd; + + dist_.shard_data.resize(1); + sd = &dist_.shard_data.front(); + sd->arg_count = -1; + sd->arg_start = -1; + } + + // Validation. + for (const auto& sd : dist_.shard_data) { + DCHECK_EQ(sd.local_mask, 0u); + DCHECK_EQ(0, sd.local_mask & ARMED); + DCHECK_EQ(TxQueue::kEnd, sd.pq_pos); + } +} + +string Transaction::DebugId() const { + return absl::StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, " (", trans_id(this), ")"); +} + +// Runs in the dbslice thread. Returns true if transaction needs to be kept in the queue. +bool Transaction::RunInShard(ShardId sid) { + CHECK(cb_); + DCHECK_GT(txid_, 0u); + + EngineShard* shard = EngineShard::tlocal(); + + // Unlike with regular transactions we do not acquire locks upon scheduling + // because Scheduling is done before multi-exec batch is executed. Therefore we + // lock keys right before the execution of each statement. + + DVLOG(1) << "RunInShard: " << DebugId() << " sid:" << sid; + + sid = TranslateSidInShard(sid); + auto& sd = dist_.shard_data[sid]; + DCHECK(sd.local_mask & ARMED); + sd.local_mask &= ~ARMED; + + bool concluding = dist_.is_concluding_cb; + + DCHECK(sd.local_mask & KEYS_ACQUIRED); + + // Actually running the callback. + OpStatus status = cb_(this, shard); + + // If it's a final hop we should release the locks. + if (concluding) { + + auto largs = GetLockArgs(sid); + shard->db_slice().Release(Mode(), largs); + sd.local_mask &= ~KEYS_ACQUIRED; + } + + if (unique_shard_cnt_ == 1) { + cb_ = nullptr; // We can do it because only a single thread runs the callback. + local_result_ = status; + } else { + CHECK_EQ(OpStatus::OK, status); + } + + // This shard should own a reference for transaction as well as coordinator thread. + DCHECK_GT(use_count(), 1u); + CHECK_GE(Disarm(), 1u); + + // must be computed before intrusive_ptr_release call. + if (concluding) { + sd.pq_pos = TxQueue::kEnd; + // For multi-transaction we need to clear this flag to allow locking of the next set of keys + // during the next child transaction. + sd.local_mask &= ~KEYS_ACQUIRED; + DVLOG(2) << "ptr_release " << DebugId() << " " << this->use_count(); + + intrusive_ptr_release(this); // Against ScheduleInternal. + } + + return !concluding; // keep +} + +void Transaction::ScheduleInternal(bool single_hop) { + DCHECK_EQ(0, state_mask_.load(memory_order_acquire) & SCHEDULED); + DCHECK_EQ(0u, txid_); + + uint32_t num_shards; + std::function is_active; + + num_shards = unique_shard_cnt_; + DCHECK_GT(num_shards, 0u); + + is_active = [&](uint32_t i) { + return num_shards == 1 ? (i == unique_shard_id_) : dist_.shard_data[i].arg_count > 0; + }; + + // intrusive_ptr_add num_shards times. + use_count_.fetch_add(num_shards, memory_order_relaxed); + + while (true) { + txid_ = op_seq.fetch_add(1, std::memory_order_relaxed); + + std::atomic_uint32_t lock_acquire_cnt{0}; + std::atomic_uint32_t success{0}; + + auto cb = [&](EngineShard* shard) { + pair res = ScheduleInShard(shard); + success.fetch_add(res.first, memory_order_relaxed); + lock_acquire_cnt.fetch_add(res.second, memory_order_relaxed); + }; + + ess_->RunBriefInParallel(std::move(cb), is_active); + + if (success.load(memory_order_acquire) == num_shards) { + // We allow out of order execution only for single hop transactions. + // It might be possible to do it for multi-hop transactions as well but currently is + // too complicated to reason about. + if (single_hop && lock_acquire_cnt.load(memory_order_relaxed) == num_shards) { + dist_.out_of_order.store(true, memory_order_relaxed); + } + DVLOG(1) << "Scheduled " << DebugId() << " OutOfOrder: " << dist_.out_of_order; + + state_mask_.fetch_or(SCHEDULED, memory_order_release); + break; + } + + DVLOG(1) << "Cancelling " << DebugId(); + + auto cancel = [&](EngineShard* shard) { + success.fetch_sub(CancelInShard(shard), memory_order_relaxed); + }; + + ess_->RunBriefInParallel(std::move(cancel), is_active); + CHECK_EQ(0u, success.load(memory_order_relaxed)); + } +} + +// Optimized "Schedule and execute" function for the most common use-case of a single hop +// transactions like set/mset/mget etc. Does not apply for more complicated cases like RENAME or +// BLPOP where a data must be read from multiple shards before performing another hop. +OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { + DCHECK(!cb_); + + cb_ = std::move(cb); + + bool run_eager = false; + bool schedule_fast = (unique_shard_cnt_ == 1); + if (schedule_fast) { // Single shard (local) optimization. + // We never resize shard_data because that would affect MULTI transaction correctness. + DCHECK_EQ(1u, dist_.shard_data.size()); + + dist_.shard_data[0].local_mask |= ARMED; + arm_count_.fetch_add(1, memory_order_release); // Decreases in RunLocal. + auto schedule_cb = [&] { return ScheduleUniqueShard(EngineShard::tlocal()); }; + run_eager = ess_->Await(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier. + (void)run_eager; + } else { // Transaction spans multiple shards or it's global (like flushdb) + ScheduleInternal(true); + ExecuteAsync(true); + } + + DVLOG(1) << "Before DoneWait " << DebugId() << " " << args_.front(); + WaitArm(); + DVLOG(1) << "After DoneWait"; + + cb_ = nullptr; + state_mask_.fetch_or(AFTERRUN, memory_order_release); + + return local_result_; +} + +// Runs in coordinator thread. +void Transaction::Execute(RunnableType cb, bool conclude) { + cb_ = std::move(cb); + + ExecuteAsync(conclude); + + DVLOG(1) << "Wait on " << DebugId(); + WaitArm(); + DVLOG(1) << "Wait on " << DebugId() << " completed"; + cb_ = nullptr; + dist_.out_of_order.store(false, memory_order_relaxed); + + uint32_t mask = conclude ? AFTERRUN : RUNNING; + state_mask_.fetch_or(mask, memory_order_release); +} + +// Runs in coordinator thread. +void Transaction::ExecuteAsync(bool concluding_cb) { + DVLOG(1) << "ExecuteAsync " << DebugId() << " concluding " << concluding_cb; + + dist_.is_concluding_cb = concluding_cb; + + DCHECK_GT(unique_shard_cnt_, 0u); + + // We do not necessarily Execute this transaction in 'cb' below. It well may be that it will be + // executed by the engine shard once it has been armed and coordinator thread will finish the + // transaction before engine shard thread stops accessing it. Therefore, we increase reference + // by number of callbacks accessesing 'this' to allow callbacks to execute shard->Execute(this); + // safely. + use_count_.fetch_add(unique_shard_cnt_, memory_order_relaxed); + + if (unique_shard_cnt_ == 1) { + dist_.shard_data[TranslateSidInShard(unique_shard_id_)].local_mask |= ARMED; + } else { + for (ShardId i = 0; i < dist_.shard_data.size(); ++i) { + auto& sd = dist_.shard_data[i]; + if (sd.arg_count == 0) + continue; + DCHECK_LT(sd.arg_count, 1u << 15); + sd.local_mask |= ARMED; + } + } + + // this fence prevents that a read or write operation before a release fence will be reordered + // with a write operation after a release fence. Specifically no writes below will be reordered + // upwards. Important, because it protects non-threadsafe local_mask from being accessed by + // IsArmedInShard in other threads. + arm_count_.fetch_add(unique_shard_cnt_, memory_order_acq_rel); + + auto cb = [this] { + EngineShard* shard = EngineShard::tlocal(); + DVLOG(2) << "TriggerExec " << DebugId() << " sid:" << shard->shard_id(); + + // Everything that should be handled during the callback execution should go into RunInShard. + shard->Execute(this); + + DVLOG(2) << "ptr_release " << DebugId() << " " << use_count(); + intrusive_ptr_release(this); // against use_count_.fetch_add above. + }; + + // IsArmedInShard is the protector of non-thread safe data. + if (unique_shard_cnt_ == 1) { + ess_->Add(unique_shard_id_, std::move(cb)); // serves as a barrier. + } else { + for (ShardId i = 0; i < dist_.shard_data.size(); ++i) { + auto& sd = dist_.shard_data[i]; + if (sd.arg_count == 0) + continue; + ess_->Add(i, cb); // serves as a barrier. + } + } +} + +void Transaction::RunQuickSingle() { + DCHECK_EQ(1u, dist_.shard_data.size()); + DCHECK_EQ(0u, txid_); + + EngineShard* shard = EngineShard::tlocal(); + auto& sd = dist_.shard_data[0]; + DCHECK_EQ(0, sd.local_mask & KEYS_ACQUIRED); + + DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id() << " " << args_[0]; + CHECK(cb_) << DebugId() << " " << shard->shard_id() << " " << args_[0]; + + local_result_ = cb_(this, shard); + + sd.local_mask &= ~ARMED; + cb_ = nullptr; // We can do it because only a single shard runs the callback. + CHECK_GE(Disarm(), 1u); +} + +const char* Transaction::Name() const { + return cid_->name(); +} + +KeyLockArgs Transaction::GetLockArgs(ShardId sid) const { + KeyLockArgs res; + res.db_index = 0; // TODO + res.key_step = cid_->key_arg_step(); + res.args = ShardArgsInShard(sid); + + return res; +} + +// Runs within a engine shard thread. +// Optimized path that schedules and runs transactions out of order if possible. +// Returns true if was eagerly executed, false if it was scheduled into queue. +bool Transaction::ScheduleUniqueShard(EngineShard* shard) { + DCHECK_EQ(0u, txid_); + DCHECK_EQ(1u, dist_.shard_data.size()); + + auto mode = Mode(); + auto lock_args = GetLockArgs(shard->shard_id()); + + auto& sd = dist_.shard_data.front(); + DCHECK_EQ(TxQueue::kEnd, sd.pq_pos); + + // Fast path - for uncontended keys, just run the callback. + // That applies for single key operations like set, get, lpush etc. + if (shard->db_slice().CheckLock(mode, lock_args)) { + RunQuickSingle(); // TODO: for journal - this can become multi-shard + // transaction on replica. + return true; + } + + intrusive_ptr_add_ref(this); + + // we can do it because only a single thread writes into txid_ and sd. + txid_ = op_seq.fetch_add(1, std::memory_order_relaxed); + TxQueue::Iterator it = shard->InsertTxQ(this); + sd.pq_pos = it; + + DCHECK_EQ(0, sd.local_mask & KEYS_ACQUIRED); + bool lock_acquired = shard->db_slice().Acquire(mode, lock_args); + sd.local_mask |= KEYS_ACQUIRED; + DCHECK(!lock_acquired); // Because CheckLock above failed. + + state_mask_.fetch_or(SCHEDULED, memory_order_release); + + return false; +} + +// This function should not block since it's run via RunBriefInParallel. +pair Transaction::ScheduleInShard(EngineShard* shard) { + // schedule_success, lock_granted. + pair result{false, false}; + + if (shard->committed_txid() >= txid_) { + return result; + } + + TxQueue* pq = shard->txq(); + KeyLockArgs lock_args; + IntentLock::Mode mode = Mode(); + + bool lock_granted = false; + ShardId sid = TranslateSidInShard(shard->shard_id()); + + auto& sd = dist_.shard_data[sid]; + + bool shard_unlocked = true; + lock_args = GetLockArgs(shard->shard_id()); + + // we need to acquire the lock unrelated to shard_unlocked since we register into Tx queue. + // All transactions in the queue must acquire the intent lock. + lock_granted = shard->db_slice().Acquire(mode, lock_args) && shard_unlocked; + sd.local_mask |= KEYS_ACQUIRED; + DVLOG(1) << "Lock granted " << lock_granted << " for trans " << DebugId(); + + if (!pq->Empty()) { + // If the new transaction requires reordering of the pending queue (i.e. it comes before tail) + // and some other transaction already locked its keys we can not reorder 'trans' because + // that other transaction could have deduced that it can run OOO and eagerly execute. Hence, we + // fail this scheduling attempt for trans. + // However, when we schedule span-all transactions we can still reorder them. The reason is + // before we start scheduling them we lock the shards and disable OOO. + // We may record when they disable OOO via barrier_ts so if the queue contains transactions + // that were only scheduled afterwards we know they are not free so we can still + // reorder the queue. Currently, this optimization is disabled: barrier_ts < pq->HeadRank(). + bool to_proceed = lock_granted || pq->TailScore() < txid_; + if (!to_proceed) { + if (sd.local_mask & KEYS_ACQUIRED) { // rollback the lock. + shard->db_slice().Release(mode, lock_args); + sd.local_mask &= ~KEYS_ACQUIRED; + } + + return result; // false, false + } + } + + result.second = lock_granted; + result.first = true; + + TxQueue::Iterator it = pq->Insert(this); + DCHECK_EQ(TxQueue::kEnd, sd.pq_pos); + sd.pq_pos = it; + + DVLOG(1) << "Insert into tx-queue, sid(" << sid << ") " << DebugId() << ", qlen " << pq->size(); + + return result; +} + +bool Transaction::CancelInShard(EngineShard* shard) { + ShardId sid = TranslateSidInShard(shard->shard_id()); + auto& sd = dist_.shard_data[sid]; + + auto pos = sd.pq_pos; + if (pos == TxQueue::kEnd) + return false; + + sd.pq_pos = TxQueue::kEnd; + + TxQueue* pq = shard->txq(); + auto val = pq->At(pos); + Transaction* trans = absl::get(val); + DCHECK(trans == this) << "Pos " << pos << ", pq size " << pq->size() << ", trans " << trans; + pq->Remove(pos); + + if (sd.local_mask & KEYS_ACQUIRED) { + auto mode = Mode(); + auto lock_args = GetLockArgs(shard->shard_id()); + shard->db_slice().Release(mode, lock_args); + sd.local_mask &= ~KEYS_ACQUIRED; + } + return true; +} + +// runs in engine-shard thread. +ArgSlice Transaction::ShardArgsInShard(ShardId sid) const { + DCHECK(!args_.empty()); + DCHECK_NOTNULL(EngineShard::tlocal()); + + // We can read unique_shard_cnt_ only because ShardArgsInShard is called after IsArmedInShard + // barrier. + if (unique_shard_cnt_ == 1) { + return args_; + } + + const auto& sd = dist_.shard_data[sid]; + return ArgSlice{args_.data() + sd.arg_start, sd.arg_count}; +} + +size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const { + if (unique_shard_cnt_ == 1) + return arg_index; + + return dist_.reverse_index[dist_.shard_data[shard_id].arg_start + arg_index]; +} + +} // namespace dfly diff --git a/server/transaction.h b/server/transaction.h new file mode 100644 index 0000000..f87572a --- /dev/null +++ b/server/transaction.h @@ -0,0 +1,285 @@ +// Copyright 2021, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include + +#include "core/intent_lock.h" +#include "core/tx_queue.h" +#include "core/op_status.h" +#include "server/common_types.h" +#include "server/table.h" +#include "util/fibers/fibers_ext.h" + +namespace dfly { + +class DbSlice; +class EngineShardSet; +class EngineShard; + +class Transaction { + Transaction(const Transaction&); + void operator=(const Transaction&) = delete; + + ~Transaction(); + + // Transactions are reference counted. + friend void intrusive_ptr_add_ref(Transaction* trans) noexcept { + trans->use_count_.fetch_add(1, std::memory_order_relaxed); + } + + friend void intrusive_ptr_release(Transaction* trans) noexcept { + if (1 == trans->use_count_.fetch_sub(1, std::memory_order_release)) { + std::atomic_thread_fence(std::memory_order_acquire); + delete trans; + } + } + + public: + using RunnableType = std::function; + using time_point = ::std::chrono::steady_clock::time_point; + + enum LocalState : uint8_t { + ARMED = 1, // Transaction was armed with the callback + KEYS_ACQUIRED = 0x20, + }; + + enum State : uint8_t { + SCHEDULED = 1, + RUNNING = 2, // For running multi-hop execution callbacks. + AFTERRUN = 4, // Once transaction finished running. + }; + + Transaction(const CommandId* cid, EngineShardSet* ess); + + void InitByArgs(CmdArgList args); + + std::string DebugId() const; + + // Runs in engine thread + ArgSlice ShardArgsInShard(ShardId sid) const; + + // Maps the index in ShardKeys(shard_id) slice back to the index in the original array passed to + // InitByArgs. + // Runs in the coordinator thread. + size_t ReverseArgIndex(ShardId shard_id, size_t arg_index) const; + + //! Returns true if the transaction spans this shard_id. + //! Runs from the coordinator thread. + bool IsActive(ShardId shard_id) const { + return unique_shard_cnt_ == 1 ? unique_shard_id_ == shard_id + : dist_.shard_data[shard_id].arg_count > 0; + } + + //! Returns true if the transaction is armed for execution on this sid (used to avoid + //! duplicate runs). Supports local transactions under multi as well. + bool IsArmedInShard(ShardId sid) const { + if (sid >= dist_.shard_data.size()) + sid = 0; + // We use acquire so that no reordering will move before this load. + return arm_count_.load(std::memory_order_acquire) > 0 && + dist_.shard_data[sid].local_mask & ARMED; + } + + // Called from engine set shard threads. + uint16_t GetLocalMask(ShardId sid) const { + sid = TranslateSidInShard(sid); + return dist_.shard_data[sid].local_mask; + } + + uint32_t GetStateMask() const { + return state_mask_.load(std::memory_order_relaxed); + } + + bool IsOutOfOrder() const { + return dist_.out_of_order.load(std::memory_order_relaxed); + } + + // Relevant only when unique_shards_ > 1. + uint32_t TxQueuePos(ShardId sid) const { + return dist_.shard_data[sid].pq_pos; + } + + // if conclude is true, removes the transaction from the pending queue. + void Execute(RunnableType cb, bool conclude); + + // for multi-key scenarios cb should return Status::Ok since otherwise the return value + // will be ill-defined. + OpStatus ScheduleSingleHop(RunnableType cb); + + // Fits only for single key scenarios because it writes into shared variable res from + // potentially multiple threads. + template auto ScheduleSingleHopT(F&& f) -> decltype(f(this, nullptr)) { + decltype(f(this, nullptr)) res; + + ScheduleSingleHop([&res, f = std::forward(f)](Transaction* t, EngineShard* shard) { + res = f(t, shard); + return res.status(); + }); + return res; + } + + TxId txid() const { + return txid_; + } + + // TODO: for multi trans_options_ changes with every operation. + // Does it mean we lock every key differently during the same transaction? + IntentLock::Mode Mode() const; + + const char* Name() const; + + uint32_t unique_shard_cnt() const { + return unique_shard_cnt_; + } + + EngineShardSet* shard_set() { return ess_; } + + + // Called by EngineShard when performing Execute over the tx queue. + // Returns true if transaction should be kept in the queue. + bool RunInShard(ShardId sid); + + private: + ShardId TranslateSidInShard(ShardId sid) const { + return sid < dist_.shard_data.size() ? sid : 0; + } + + void ScheduleInternal(bool single_hop); + + void ExecuteAsync(bool concluding_cb); + + // Optimized version of RunInShard for single shard uncontended cases. + void RunQuickSingle(); + + //! Returns true if transaction run out-of-order during the scheduling phase. + bool ScheduleUniqueShard(EngineShard* shard); + + /// Returns pair(schedule_success, lock_granted) + /// schedule_success is true if transaction was scheduled on db_slice. + /// lock_granted is true if lock was granted for all the keys on this shard. + /// Runs in the shard thread. + std::pair ScheduleInShard(EngineShard* shard); + + // Returns true if operation was cancelled for this shard. Runs in the shard thread. + bool CancelInShard(EngineShard* shard); + + //! Returns locking arguments needed for DbSlice to Acquire/Release transactional locks. + //! Runs in the shard thread. + KeyLockArgs GetLockArgs(ShardId sid) const; + + void WaitArm() { + arm_ec_.await([this] { return 0 == this->arm_count_.load(std::memory_order_relaxed); }); + } + + uint32_t Disarm() { + // We use release so that no stores will be reordered after. + uint32_t res = arm_count_.fetch_sub(1, std::memory_order_release); + arm_ec_.notify(); + return res; + } + + uint32_t use_count() const { return use_count_.load(std::memory_order_relaxed); } + + struct PerShardData { + uint32_t arg_start = 0; // Indices into args_ array. + uint16_t arg_count = 0; + + // Accessed only within the engine-shard thread. + // Bitmask of LocalState enums. + uint16_t local_mask{0}; + + uint32_t pq_pos = TxQueue::kEnd; + + PerShardData(PerShardData&&) noexcept { + } + + PerShardData() = default; + }; + enum { kPerShardSize = sizeof(PerShardData) }; + + struct Dist { + // shard_data spans all the shards in ess_. + // I wish we could use a dense array of size [0..uniq_shards] but since + // multiple threads access this array to synchronize between themselves using + // PerShardData.state, it can be tricky. The complication comes from multi_ transactions where + // scheduled transaction is accessed between operations as well. + absl::InlinedVector shard_data; // length = shard_count + + // Reverse argument mapping. Allows to reconstruct responses according to the original order of + // keys. + std::vector reverse_index; + + // NOTE: to move to bitmask if it grows. + // Written by coordinator thread, read by shard threads but not concurrently. + // Says whether the current callback function is concluding for this operation. + bool is_concluding_cb{true}; + + // out_of_order true - transaction can execute before other scheduled transactions, + // not necessary according to its queue order. + std::atomic_bool out_of_order{false}; + }; + + enum { kDistSize = sizeof(Dist) }; + + const CommandId* cid_; + EngineShardSet* ess_; + TxId txid_{0}; + + std::atomic_uint32_t use_count_{0}, arm_count_{0}; + + // unique_shard_cnt_ and unique_shard_id_ is accessed only by coordinator thread. + uint32_t unique_shard_cnt_{0}; // number of unique shards span by args_ + ShardId unique_shard_id_{kInvalidSid}; + + // Written by coordination thread but may be read by Shard threads. + std::atomic state_mask_{0}; + + DbIndex db_index_ = 0; + + // For single-hop transactions with unique_shards_ == 1, hence no data-race. + OpStatus local_result_ = OpStatus::OK; + uint32_t trans_options_ = 0; + uint32_t num_keys_ = 0; + + Dist dist_; + + util::fibers_ext::EventCount arm_ec_; + + //! Stores arguments of the transaction (i.e. keys + values) ordered by shards. + absl::InlinedVector args_; + + RunnableType cb_; + + struct PerShardCache { + std::vector args; + std::vector original_index; + + void Clear() { + args.clear(); + original_index.clear(); + } + }; + + struct TLTmpSpace { + std::vector shard_cache; + absl::flat_hash_set uniq_keys; + }; + + static thread_local TLTmpSpace tmp_space; +}; + +inline uint16_t trans_id(const Transaction* ptr) { + return intptr_t(ptr) & 0xFFFF; +} + +} // namespace dfly