Introduce VLL transactions

This commit is contained in:
Roman Gershman 2021-12-22 17:17:52 +02:00
parent a0dfb3171a
commit ebd404ff5d
9 changed files with 1167 additions and 76 deletions

View File

@ -5,7 +5,7 @@ add_library(dragonfly_lib command_registry.cc common.cc config_flags.cc
conn_context.cc db_slice.cc debugcmd.cc dragonfly_listener.cc
dragonfly_connection.cc engine_shard_set.cc
main_service.cc memcache_parser.cc
redis_parser.cc reply_builder.cc string_family.cc)
redis_parser.cc reply_builder.cc string_family.cc transaction.cc)
cxx_link(dragonfly_lib dfly_core uring_fiber_lib
fibers_ext strings_lib http_server_lib tls_lib)

View File

@ -20,14 +20,16 @@ using namespace util;
DbSlice::DbSlice(uint32_t index, EngineShard* owner) : shard_id_(index), owner_(owner) {
db_arr_.emplace_back();
CreateDbRedis(0);
CreateDb(0);
}
DbSlice::~DbSlice() {
// we do not need this code but it's easier to debug in case we encounter
// memory allocation bugs during delete operations.
for (auto& db : db_arr_) {
if (!db.main_table)
if (!db)
continue;
db.main_table.reset();
db.reset();
}
}
@ -35,9 +37,9 @@ void DbSlice::Reserve(DbIndex db_ind, size_t key_size) {
ActivateDb(db_ind);
auto& db = db_arr_[db_ind];
DCHECK(db.main_table);
DCHECK(db);
db.main_table->reserve(key_size);
db->main_table.reserve(key_size);
}
auto DbSlice::Find(DbIndex db_index, std::string_view key) const -> OpResult<MainIterator> {
@ -50,11 +52,10 @@ auto DbSlice::Find(DbIndex db_index, std::string_view key) const -> OpResult<Mai
}
pair<MainIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, std::string_view key) const {
DCHECK_LT(db_ind, db_arr_.size());
DCHECK(db_arr_[db_ind].main_table);
DCHECK(IsDbValid(db_ind));
auto& db = db_arr_[db_ind];
MainIterator it = db.main_table->find(key);
MainIterator it = db->main_table.find(key);
if (it == MainIterator{}) {
return make_pair(it, ExpireIterator{});
@ -62,14 +63,14 @@ pair<MainIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, std::string_
ExpireIterator expire_it;
if (it->second.HasExpire()) { // check expiry state
expire_it = db.expire_table->find(it->first);
expire_it = db->expire_table.find(it->first);
CHECK(expire_it != ExpireIterator{});
if (expire_it->second <= now_ms_) {
db.expire_table->erase(expire_it);
db->expire_table.erase(expire_it);
db.stats.obj_memory_usage -= (it->first.capacity() + it->second.str.capacity());
db.main_table->erase(it);
db->stats.obj_memory_usage -= (it->first.capacity() + it->second.str.capacity());
db->main_table.erase(it);
return make_pair(MainIterator{}, ExpireIterator{});
}
}
@ -78,14 +79,13 @@ pair<MainIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, std::string_
}
auto DbSlice::AddOrFind(DbIndex db_index, std::string_view key) -> pair<MainIterator, bool> {
DCHECK_LT(db_index, db_arr_.size());
DCHECK(db_arr_[db_index].main_table);
DCHECK(IsDbValid(db_index));
auto& db = db_arr_[db_index];
pair<MainIterator, bool> res = db.main_table->emplace(key, MainValue{});
pair<MainIterator, bool> res = db->main_table.emplace(key, MainValue{});
if (res.second) { // new entry
db.stats.obj_memory_usage += res.first->first.capacity();
db->stats.obj_memory_usage += res.first->first.capacity();
return make_pair(res.first, true);
}
@ -96,14 +96,13 @@ auto DbSlice::AddOrFind(DbIndex db_index, std::string_view key) -> pair<MainIter
void DbSlice::ActivateDb(DbIndex db_ind) {
if (db_arr_.size() <= db_ind)
db_arr_.resize(db_ind + 1);
CreateDbRedis(db_ind);
CreateDb(db_ind);
}
void DbSlice::CreateDbRedis(unsigned index) {
void DbSlice::CreateDb(DbIndex index) {
auto& db = db_arr_[index];
if (!db.main_table) {
db.main_table.reset(new MainTable);
db.expire_table.reset(new ExpireTable);
if (!db) {
db.reset(new DbWrapper);
}
}
@ -111,14 +110,14 @@ void DbSlice::CreateDbRedis(unsigned index) {
bool DbSlice::Expire(DbIndex db_ind, MainIterator it, uint64_t at) {
auto& db = db_arr_[db_ind];
if (at == 0 && it->second.HasExpire()) {
CHECK_EQ(1u, db.expire_table->erase(it->first));
CHECK_EQ(1u, db->expire_table.erase(it->first));
it->second.SetExpire(false);
return true;
}
if (!it->second.HasExpire() && at) {
CHECK(db.expire_table->emplace(it->first, at).second);
CHECK(db->expire_table.emplace(it->first, at).second);
it->second.SetExpire(true);
return true;
@ -132,18 +131,18 @@ void DbSlice::AddNew(DbIndex db_ind, std::string_view key, MainValue obj, uint64
}
bool DbSlice::AddIfNotExist(DbIndex db_ind, std::string_view key, MainValue obj,
uint64_t expire_at_ms) {
uint64_t expire_at_ms) {
auto& db = db_arr_[db_ind];
auto [new_entry, success] = db.main_table->emplace(key, obj);
auto [new_entry, success] = db->main_table.emplace(key, obj);
if (!success)
return false; // in this case obj won't be moved and will be destroyed during unwinding.
db.stats.obj_memory_usage += (new_entry->first.capacity() + new_entry->second.str.capacity());
db->stats.obj_memory_usage += (new_entry->first.capacity() + new_entry->second.str.capacity());
if (expire_at_ms) {
new_entry->second.SetExpire(true);
CHECK(db.expire_table->emplace(new_entry->first, expire_at_ms).second);
CHECK(db->expire_table.emplace(new_entry->first, expire_at_ms).second);
}
return true;
@ -153,9 +152,83 @@ size_t DbSlice::DbSize(DbIndex db_ind) const {
DCHECK_LT(db_ind, db_array_size());
if (IsDbValid(db_ind)) {
return db_arr_[db_ind].main_table->size();
return db_arr_[db_ind]->main_table.size();
}
return 0;
}
bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
DCHECK(!lock_args.args.empty());
auto& lt = db_arr_[lock_args.db_index]->lock_table;
bool lock_acquired = true;
if (lock_args.args.size() == 1) {
lock_acquired = lt[lock_args.args.front()].Acquire(mode);
} else {
uniq_keys_.clear();
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
auto s = lock_args.args[i];
if (uniq_keys_.insert(s).second) {
bool res = lt[s].Acquire(mode);
lock_acquired &= res;
}
}
}
DVLOG(2) << "Acquire " << IntentLock::ModeName(mode) << " for " << lock_args.args[0]
<< " has_acquired: " << lock_acquired;
return lock_acquired;
}
void DbSlice::Release(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
if (lock_args.args.size() == 1) {
Release(mode, lock_args.db_index, lock_args.args.front(), 1);
} else {
auto& lt = db_arr_[lock_args.db_index]->lock_table;
uniq_keys_.clear();
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
auto s = lock_args.args[i];
if (uniq_keys_.insert(s).second) {
auto it = lt.find(s);
CHECK(it != lt.end());
it->second.Release(mode);
if (it->second.IsFree()) {
lt.erase(it);
}
}
}
DVLOG(1) << "Release " << IntentLock::ModeName(mode) << " for " << lock_args.args[0];
}
}
void DbSlice::Release(IntentLock::Mode mode, DbIndex db_index, std::string_view key,
unsigned count) {
DVLOG(1) << "Release " << IntentLock::ModeName(mode) << " " << count << " for " << key;
auto& lt = db_arr_[db_index]->lock_table;
auto it = lt.find(key);
CHECK(it != lt.end()) << key;
it->second.Release(mode, count);
if (it->second.IsFree()) {
lt.erase(it);
}
}
bool DbSlice::CheckLock(IntentLock::Mode mode, const KeyLockArgs& lock_args) const {
DCHECK(!lock_args.args.empty());
const auto& lt = db_arr_[lock_args.db_index]->lock_table;
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
auto s = lock_args.args[i];
auto it = lt.find(s);
if (it != lt.end() && !it->second.Check(mode)) {
return false;
}
}
return true;
}
} // namespace dfly

View File

@ -4,8 +4,12 @@
#pragma once
#include "server/common_types.h"
#include <absl/container/flat_hash_map.h>
#include <absl/container/flat_hash_set.h>
#include "core/intent_lock.h"
#include "core/op_status.h"
#include "server/common_types.h"
#include "server/table.h"
namespace util {
@ -43,7 +47,6 @@ class DbSlice {
// Returns (value, expire) dict entries if key exists, null if it does not exist or has expired.
std::pair<MainIterator, ExpireIterator> FindExt(DbIndex db_ind, std::string_view key) const;
// Return .second=true if insertion ocurred, false if we return the existing key.
std::pair<MainIterator, bool> AddOrFind(DbIndex db_ind, std::string_view key);
@ -61,22 +64,31 @@ class DbSlice {
// Creates a database with index `db_ind`. If such database exists does nothing.
void ActivateDb(DbIndex db_ind);
ShardId shard_id() const { return shard_id_;}
ShardId shard_id() const {
return shard_id_;
}
bool Acquire(IntentLock::Mode m, const KeyLockArgs& lock_args);
void Release(IntentLock::Mode m, const KeyLockArgs& lock_args);
void Release(IntentLock::Mode m, DbIndex db_index, std::string_view key, unsigned count);
// Returns true if all keys can be locked under m. Does not lock them though.
bool CheckLock(IntentLock::Mode m, const KeyLockArgs& lock_args) const;
size_t db_array_size() const {
return db_arr_.size();
}
bool IsDbValid(DbIndex id) const {
return bool(db_arr_[id].main_table);
return id < db_arr_.size() && bool(db_arr_[id]);
}
// Returns existing keys count in the db.
size_t DbSize(DbIndex db_ind) const;
private:
void CreateDbRedis(unsigned index);
void CreateDb(DbIndex index);
ShardId shard_id_;
@ -84,14 +96,20 @@ class DbSlice {
uint64_t now_ms_ = 0; // Used for expire logic, represents a real clock.
struct DbRedis {
std::unique_ptr<MainTable> main_table;
std::unique_ptr<ExpireTable> expire_table;
using LockTable = absl::flat_hash_map<std::string, IntentLock>;
struct DbWrapper {
MainTable main_table;
ExpireTable expire_table;
LockTable lock_table;
mutable InternalDbStats stats;
};
std::vector<DbRedis> db_arr_;
std::vector<std::unique_ptr<DbWrapper>> db_arr_;
// Used in temporary computations in Acquire/Release.
absl::flat_hash_set<std::string_view> uniq_keys_;
};
} // namespace dfly

View File

@ -5,6 +5,7 @@
#include "server/engine_shard_set.h"
#include "base/logging.h"
#include "server/transaction.h"
#include "util/fiber_sched_algo.h"
#include "util/varz.h"
@ -12,14 +13,14 @@ namespace dfly {
using namespace std;
using namespace util;
namespace fibers = ::boost::fibers;
namespace this_fiber = ::boost::this_fiber;
namespace fibers = ::boost::fibers;
thread_local EngineShard* EngineShard::shard_ = nullptr;
constexpr size_t kQueueLen = 64;
EngineShard::EngineShard(util::ProactorBase* pb)
: queue_(kQueueLen), db_slice_(pb->GetIndex(), this) {
: queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), db_slice_(pb->GetIndex(), this) {
fiber_q_ = fibers::fiber([this, index = pb->GetIndex()] {
this_fiber::properties<FiberProps>().set_name(absl::StrCat("shard_queue", index));
queue_.Run();
@ -57,6 +58,105 @@ void EngineShard::DestroyThreadLocal() {
VLOG(1) << "Shard reset " << index;
}
void EngineShard::RunContinuationTransaction() {
auto sid = shard_id();
if (continuation_trans_->IsArmedInShard(sid)) {
bool to_keep = continuation_trans_->RunInShard(sid);
DVLOG(1) << "RunContTransaction " << continuation_trans_->DebugId() << " keep: " << to_keep;
if (!to_keep) {
continuation_trans_ = nullptr;
}
}
}
// Is called by Transaction::ExecuteAsync in order to run transaction tasks.
// Only runs in its own thread.
void EngineShard::Execute(Transaction* trans) {
ShardId sid = shard_id();
if (continuation_trans_) {
if (trans == continuation_trans_)
trans = nullptr;
RunContinuationTransaction();
// Once we start executing transaction we do not continue until it's finished.
// This preserves atomicity property of multi-hop transactions.
if (continuation_trans_)
return;
}
DCHECK(!continuation_trans_);
Transaction* head = nullptr;
string dbg_id;
while (!txq_.Empty()) {
auto val = txq_.Front();
head = absl::get<Transaction*>(val);
bool is_armed = head->IsArmedInShard(sid);
if (!is_armed)
break;
// It could be that head is processed and unblocks multi-hop transaction .
// The transaction will schedule again and will arm another callback.
// Then we will reach invalid state by running trans after this loop,
// which is not what we want.
// This function should not process 2 different callbacks for the same transaction.
// Hence we make sure to reset trans if it has been processed via tx-queue.
if (head == trans)
trans = nullptr;
TxId txid = head->txid();
// Could be equal to ts in case the same transaction had few hops.
DCHECK_LE(committed_txid_, txid);
// We update committed_ts_ before calling Run() to avoid cases where a late transaction might
// try to push back this one.
committed_txid_ = txid;
if (VLOG_IS_ON(2)) {
dbg_id = head->DebugId();
}
bool keep = head->RunInShard(sid);
// We should not access head from this point since RunInShard callback decrements refcount.
DLOG_IF(INFO, !dbg_id.empty()) << "RunHead " << dbg_id << ", keep " << keep;
txq_.PopFront();
if (keep) {
continuation_trans_ = head;
break;
}
}
if (!trans)
return;
if (txq_.Empty())
return;
// If trans is out of order, i.e. locks keys that previous transactions have not locked.
// It may be that there are other transactions that touch those keys but they necessary ordered
// after trans in the queue, hence it's safe to run trans out of order.
if (trans->IsOutOfOrder() && trans->IsArmedInShard(sid)) {
DCHECK(trans != head);
dbg_id.clear();
uint32_t pos = trans->TxQueuePos(sid);
if (VLOG_IS_ON(1)) {
dbg_id = trans->DebugId();
}
bool keep = trans->RunInShard(sid); // resets TxQueuePos, this is why we get it before.
DLOG_IF(INFO, !dbg_id.empty()) << "Eager run " << sid << ", " << dbg_id << ", keep " << keep;
// Should be enforced via Schedule(). TODO: to remove the check once the code is mature.
CHECK(!keep) << "multi-hop transactions can not be OOO.";
txq_.Remove(pos);
}
}
void EngineShardSet::Init(uint32_t sz) {
CHECK_EQ(0u, size());

View File

@ -4,20 +4,19 @@
#pragma once
#include <xxhash.h>
#include "core/tx_queue.h"
#include "server/db_slice.h"
#include "util/fibers/fibers_ext.h"
#include "util/fibers/fiberqueue_threadpool.h"
#include "util/fibers/fibers_ext.h"
#include "util/proactor_pool.h"
namespace dfly {
class EngineShard {
public:
//EngineShard() is private down below.
// EngineShard() is private down below.
~EngineShard();
static void InitThreadLocal(util::ProactorBase* pb);
@ -43,12 +42,36 @@ class EngineShard {
return &queue_;
}
// Executes a transaction. This transaction is pending in the queue.
void Execute(Transaction* trans);
// Returns transaction queue.
TxQueue* txq() {
return &txq_;
}
TxId committed_txid() const {
return committed_txid_;
}
TxQueue::Iterator InsertTxQ(Transaction* trans) {
return txq_.Insert(trans);
}
private:
EngineShard(util::ProactorBase* pb);
void RunContinuationTransaction();
::util::fibers_ext::FiberQueue queue_;
::boost::fibers::fiber fiber_q_;
TxQueue txq_;
// Logical ts used to order distributed transactions.
TxId committed_txid_ = 0;
Transaction* continuation_trans_ = nullptr;
DbSlice db_slice_;
uint32_t periodic_task_ = 0;
@ -80,7 +103,13 @@ class EngineShardSet {
return shard_queue_[sid]->Add(std::forward<F>(f));
}
template <typename U> void RunBriefInParallel(U&& func);
// Runs a brief function on all shards.
template <typename U> void RunBriefInParallel(U&& func) {
RunBriefInParallel(std::forward<U>(func), [](auto i) { return true; });
}
template <typename U, typename P> void RunBriefInParallel(U&& func, P&& pred);
template <typename U> void RunBlockingInParallel(U&& func);
private:
@ -88,16 +117,14 @@ class EngineShardSet {
std::vector<util::fibers_ext::FiberQueue*> shard_queue_;
};
/**
* @brief
*
* @tparam U - a function that receives EngineShard* argument and returns void.
* @param func
*/
template <typename U> void EngineShardSet::RunBriefInParallel(U&& func) {
util::fibers_ext::BlockingCounter bc{size()};
template <typename U, typename P> void EngineShardSet::RunBriefInParallel(U&& func, P&& pred) {
util::fibers_ext::BlockingCounter bc{0};
for (uint32_t i = 0; i < size(); ++i) {
if (!pred(i))
continue;
bc.Add(1);
util::ProactorBase* dest = pp_->at(i);
dest->AsyncBrief([f = std::forward<U>(func), bc]() mutable {
f(EngineShard::tlocal());

View File

@ -15,6 +15,7 @@
#include "server/debugcmd.h"
#include "server/error.h"
#include "server/string_family.h"
#include "server/transaction.h"
#include "util/metrics/metrics.h"
#include "util/uring/uring_fiber_algo.h"
#include "util/varz.h"
@ -27,7 +28,7 @@ namespace dfly {
using namespace std;
using namespace util;
using base::VarzValue;
using ::boost::intrusive_ptr;
namespace fibers = ::boost::fibers;
namespace this_fiber = ::boost::this_fiber;
@ -96,7 +97,23 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) {
(cid->arity() < 0 && args.size() < size_t(-cid->arity()))) {
return cntx->SendError(WrongNumArgsError(cmd_str));
}
uint64_t start_usec = ProactorBase::GetMonotonicTimeNs(), end_usec;
// Create command transaction
intrusive_ptr<Transaction> dist_trans;
if (cid->first_key_pos() > 0) {
dist_trans.reset(new Transaction{cid, &shard_set_});
cntx->transaction = dist_trans.get();
if (cid->first_key_pos() > 0) {
dist_trans->InitByArgs(args);
}
} else {
cntx->transaction = nullptr;
}
cntx->cid = cid;
cmd_req.Inc({cid->name()});
cid->Invoke(args, cntx);

View File

@ -11,6 +11,7 @@
#include "server/conn_context.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/transaction.h"
#include "util/varz.h"
namespace dfly {
@ -115,13 +116,14 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
}
}
ShardId sid = Shard(key, cntx->shard_set->size());
OpResult<void> result = cntx->shard_set->Await(sid, [&] {
EngineShard* es = EngineShard::tlocal();
SetCmd cmd(&es->db_slice());
DCHECK(cntx->transaction);
return cmd.Set(sparams, key, value);
});
auto cb = [&](Transaction* t, EngineShard* shard) {
SetCmd sg(&shard->db_slice());
auto status = sg.Set(sparams, key, value).status();
return status;
};
OpResult<void> result = cntx->transaction->ScheduleSingleHop(std::move(cb));
if (result == OpStatus::OK) {
return cntx->SendStored();
@ -135,23 +137,23 @@ void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) {
get_qps.Inc();
std::string_view key = ArgS(args, 1);
ShardId sid = Shard(key, cntx->shard_set->size());
OpResult<string> result = cntx->shard_set->Await(sid, [&] {
EngineShard* es = EngineShard::tlocal();
OpResult<MainIterator> opres_it = es->db_slice().Find(0, key);
OpResult<string> res;
if (opres_it) {
res = opres_it.value()->second.str;
} else {
res = opres_it.status();
}
return res;
});
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<string> {
OpResult<MainIterator> it_res = shard->db_slice().Find(0, key);
if (!it_res.ok())
return it_res.status();
string val = it_res.value()->second.str;
return val;
};
DVLOG(1) << "Before Get::ScheduleSingleHopT " << key;
Transaction* trans = cntx->transaction;
OpResult<string> result = trans->ScheduleSingleHopT(std::move(cb));
if (result) {
DVLOG(1) << "GET "
<< ": " << key << " " << result.value();
DVLOG(1) << "GET " << trans->DebugId() << ": " << key << " " << result.value();
cntx->SendGetReply(key, 0, result.value());
} else {
DVLOG(1) << "GET " << key << " nil";
@ -169,7 +171,7 @@ void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) {
ShardId sid = Shard(key, cntx->shard_set->size());
OpResult<void> result = cntx->shard_set->Await(sid, [&] {
EngineShard* es = EngineShard::tlocal();
EngineShard* es = EngineShard::tlocal();
SetCmd cmd(&es->db_slice());
return cmd.Set(sparams, key, value);

569
server/transaction.cc Normal file
View File

@ -0,0 +1,569 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/transaction.h"
#include "base/logging.h"
#include "server/command_registry.h"
#include "server/db_slice.h"
#include "server/engine_shard_set.h"
namespace dfly {
using namespace std;
using namespace util;
thread_local Transaction::TLTmpSpace Transaction::tmp_space;
namespace {
std::atomic_uint64_t op_seq{1};
constexpr size_t kTransSize = sizeof(Transaction);
} // namespace
IntentLock::Mode Transaction::Mode() const {
return (trans_options_ & CO::READONLY) ? IntentLock::SHARED : IntentLock::EXCLUSIVE;
}
Transaction::~Transaction() {
DVLOG(2) << "Transaction " << DebugId() << " destroyed";
}
/**
* @brief Construct a new Transaction:: Transaction object
*
* @param cid
* @param ess
* @param cs
*/
Transaction::Transaction(const CommandId* cid, EngineShardSet* ess) : cid_(cid), ess_(ess) {
trans_options_ = cid_->opt_mask();
bool single_key = cid_->first_key_pos() > 0 && !cid_->is_multi_key();
if (single_key) {
dist_.shard_data.resize(1); // Single key optimization
} else {
// Our shard_data is not sparse, so we must allocate for all threads :(
dist_.shard_data.resize(ess_->size());
}
}
/**
*
* There are 4 options that we consider here:
* a. T spans a single shard and its not multi.
* unique_shard_id_ is predefined before the schedule() is called.
* In that case only a single thread will be scheduled and it will use shard_data[0] just becase
* shard_data.size() = 1. Engine thread can access any data because there is schedule barrier
* between InitByArgs and RunInShard/IsArmedInShard functions.
* b. T spans multiple shards and its not multi
* In that case multiple threads will be scheduled. Similarly they have a schedule barrier,
* and IsArmedInShard can read any variable from shard_data[x].
* c. Trans spans a single shard and it's multi. shard_data has size of ess_.size.
* IsArmedInShard will check shard_data[x].
* d. Trans spans multiple shards and it's multi. Similarly shard_data[x] will be checked.
* unique_shard_cnt_ and unique_shard_id_ are not accessed until shard_data[x] is armed, hence
* we have a barrier between coordinator and engine-threads. Therefore there should not be
* data races.
*
**/
void Transaction::InitByArgs(CmdArgList args) {
CHECK_GT(args.size(), 1U);
CHECK_LT(size_t(cid_->first_key_pos()), args.size());
DCHECK_EQ(unique_shard_cnt_, 0u);
if (!cid_->is_multi_key()) { // Single key optimization.
auto key = ArgS(args, cid_->first_key_pos());
args_.push_back(key);
unique_shard_cnt_ = 1;
unique_shard_id_ = Shard(key, ess_->size());
num_keys_ = 1;
return;
}
CHECK(cid_->key_arg_step() == 1 || cid_->key_arg_step() == 2);
CHECK(cid_->key_arg_step() == 1 || (args.size() % 2) == 1);
// Reuse thread-local temporary storage. Since this code is non-preemptive we can use it here.
auto& shard_index = tmp_space.shard_cache;
shard_index.resize(dist_.shard_data.size());
for (auto& v : shard_index) {
v.Clear();
}
size_t key_end = cid_->last_key_pos() > 0 ? cid_->last_key_pos() + 1
: (args.size() + 1 + cid_->last_key_pos());
for (size_t i = 1; i < key_end; ++i) {
std::string_view key = ArgS(args, i);
uint32_t sid = Shard(key, dist_.shard_data.size());
shard_index[sid].args.push_back(key);
shard_index[sid].original_index.push_back(i - 1);
++num_keys_;
if (cid_->key_arg_step() == 2) { // value
++i;
auto val = ArgS(args, i);
shard_index[sid].args.push_back(val);
shard_index[sid].original_index.push_back(i - 1);
}
}
args_.resize(key_end - 1);
dist_.reverse_index.resize(args_.size());
auto next_arg = args_.begin();
auto rev_indx_it = dist_.reverse_index.begin();
// slice.arg_start/arg_count point to args_ array which is sorted according to shard of each key.
// reverse_index_[i] says what's the original position of args_[i] in args.
for (size_t i = 0; i < dist_.shard_data.size(); ++i) {
auto& sd = dist_.shard_data[i];
auto& si = shard_index[i];
CHECK_LT(si.args.size(), 1u << 15);
sd.arg_count = si.args.size();
sd.arg_start = next_arg - args_.begin();
sd.local_mask = 0;
if (!sd.arg_count)
continue;
++unique_shard_cnt_;
unique_shard_id_ = i;
uint32_t orig_indx = 0;
for (size_t j = 0; j < si.args.size(); ++j) {
*next_arg = si.args[j];
*rev_indx_it = si.original_index[orig_indx];
++next_arg;
++orig_indx;
++rev_indx_it;
}
}
CHECK(next_arg == args_.end());
DVLOG(1) << "InitByArgs " << DebugId();
if (unique_shard_cnt_ == 1) {
PerShardData* sd;
dist_.shard_data.resize(1);
sd = &dist_.shard_data.front();
sd->arg_count = -1;
sd->arg_start = -1;
}
// Validation.
for (const auto& sd : dist_.shard_data) {
DCHECK_EQ(sd.local_mask, 0u);
DCHECK_EQ(0, sd.local_mask & ARMED);
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
}
}
string Transaction::DebugId() const {
return absl::StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, " (", trans_id(this), ")");
}
// Runs in the dbslice thread. Returns true if transaction needs to be kept in the queue.
bool Transaction::RunInShard(ShardId sid) {
CHECK(cb_);
DCHECK_GT(txid_, 0u);
EngineShard* shard = EngineShard::tlocal();
// Unlike with regular transactions we do not acquire locks upon scheduling
// because Scheduling is done before multi-exec batch is executed. Therefore we
// lock keys right before the execution of each statement.
DVLOG(1) << "RunInShard: " << DebugId() << " sid:" << sid;
sid = TranslateSidInShard(sid);
auto& sd = dist_.shard_data[sid];
DCHECK(sd.local_mask & ARMED);
sd.local_mask &= ~ARMED;
bool concluding = dist_.is_concluding_cb;
DCHECK(sd.local_mask & KEYS_ACQUIRED);
// Actually running the callback.
OpStatus status = cb_(this, shard);
// If it's a final hop we should release the locks.
if (concluding) {
auto largs = GetLockArgs(sid);
shard->db_slice().Release(Mode(), largs);
sd.local_mask &= ~KEYS_ACQUIRED;
}
if (unique_shard_cnt_ == 1) {
cb_ = nullptr; // We can do it because only a single thread runs the callback.
local_result_ = status;
} else {
CHECK_EQ(OpStatus::OK, status);
}
// This shard should own a reference for transaction as well as coordinator thread.
DCHECK_GT(use_count(), 1u);
CHECK_GE(Disarm(), 1u);
// must be computed before intrusive_ptr_release call.
if (concluding) {
sd.pq_pos = TxQueue::kEnd;
// For multi-transaction we need to clear this flag to allow locking of the next set of keys
// during the next child transaction.
sd.local_mask &= ~KEYS_ACQUIRED;
DVLOG(2) << "ptr_release " << DebugId() << " " << this->use_count();
intrusive_ptr_release(this); // Against ScheduleInternal.
}
return !concluding; // keep
}
void Transaction::ScheduleInternal(bool single_hop) {
DCHECK_EQ(0, state_mask_.load(memory_order_acquire) & SCHEDULED);
DCHECK_EQ(0u, txid_);
uint32_t num_shards;
std::function<bool(uint32_t)> is_active;
num_shards = unique_shard_cnt_;
DCHECK_GT(num_shards, 0u);
is_active = [&](uint32_t i) {
return num_shards == 1 ? (i == unique_shard_id_) : dist_.shard_data[i].arg_count > 0;
};
// intrusive_ptr_add num_shards times.
use_count_.fetch_add(num_shards, memory_order_relaxed);
while (true) {
txid_ = op_seq.fetch_add(1, std::memory_order_relaxed);
std::atomic_uint32_t lock_acquire_cnt{0};
std::atomic_uint32_t success{0};
auto cb = [&](EngineShard* shard) {
pair<bool, bool> res = ScheduleInShard(shard);
success.fetch_add(res.first, memory_order_relaxed);
lock_acquire_cnt.fetch_add(res.second, memory_order_relaxed);
};
ess_->RunBriefInParallel(std::move(cb), is_active);
if (success.load(memory_order_acquire) == num_shards) {
// We allow out of order execution only for single hop transactions.
// It might be possible to do it for multi-hop transactions as well but currently is
// too complicated to reason about.
if (single_hop && lock_acquire_cnt.load(memory_order_relaxed) == num_shards) {
dist_.out_of_order.store(true, memory_order_relaxed);
}
DVLOG(1) << "Scheduled " << DebugId() << " OutOfOrder: " << dist_.out_of_order;
state_mask_.fetch_or(SCHEDULED, memory_order_release);
break;
}
DVLOG(1) << "Cancelling " << DebugId();
auto cancel = [&](EngineShard* shard) {
success.fetch_sub(CancelInShard(shard), memory_order_relaxed);
};
ess_->RunBriefInParallel(std::move(cancel), is_active);
CHECK_EQ(0u, success.load(memory_order_relaxed));
}
}
// Optimized "Schedule and execute" function for the most common use-case of a single hop
// transactions like set/mset/mget etc. Does not apply for more complicated cases like RENAME or
// BLPOP where a data must be read from multiple shards before performing another hop.
OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
DCHECK(!cb_);
cb_ = std::move(cb);
bool run_eager = false;
bool schedule_fast = (unique_shard_cnt_ == 1);
if (schedule_fast) { // Single shard (local) optimization.
// We never resize shard_data because that would affect MULTI transaction correctness.
DCHECK_EQ(1u, dist_.shard_data.size());
dist_.shard_data[0].local_mask |= ARMED;
arm_count_.fetch_add(1, memory_order_release); // Decreases in RunLocal.
auto schedule_cb = [&] { return ScheduleUniqueShard(EngineShard::tlocal()); };
run_eager = ess_->Await(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier.
(void)run_eager;
} else { // Transaction spans multiple shards or it's global (like flushdb)
ScheduleInternal(true);
ExecuteAsync(true);
}
DVLOG(1) << "Before DoneWait " << DebugId() << " " << args_.front();
WaitArm();
DVLOG(1) << "After DoneWait";
cb_ = nullptr;
state_mask_.fetch_or(AFTERRUN, memory_order_release);
return local_result_;
}
// Runs in coordinator thread.
void Transaction::Execute(RunnableType cb, bool conclude) {
cb_ = std::move(cb);
ExecuteAsync(conclude);
DVLOG(1) << "Wait on " << DebugId();
WaitArm();
DVLOG(1) << "Wait on " << DebugId() << " completed";
cb_ = nullptr;
dist_.out_of_order.store(false, memory_order_relaxed);
uint32_t mask = conclude ? AFTERRUN : RUNNING;
state_mask_.fetch_or(mask, memory_order_release);
}
// Runs in coordinator thread.
void Transaction::ExecuteAsync(bool concluding_cb) {
DVLOG(1) << "ExecuteAsync " << DebugId() << " concluding " << concluding_cb;
dist_.is_concluding_cb = concluding_cb;
DCHECK_GT(unique_shard_cnt_, 0u);
// We do not necessarily Execute this transaction in 'cb' below. It well may be that it will be
// executed by the engine shard once it has been armed and coordinator thread will finish the
// transaction before engine shard thread stops accessing it. Therefore, we increase reference
// by number of callbacks accessesing 'this' to allow callbacks to execute shard->Execute(this);
// safely.
use_count_.fetch_add(unique_shard_cnt_, memory_order_relaxed);
if (unique_shard_cnt_ == 1) {
dist_.shard_data[TranslateSidInShard(unique_shard_id_)].local_mask |= ARMED;
} else {
for (ShardId i = 0; i < dist_.shard_data.size(); ++i) {
auto& sd = dist_.shard_data[i];
if (sd.arg_count == 0)
continue;
DCHECK_LT(sd.arg_count, 1u << 15);
sd.local_mask |= ARMED;
}
}
// this fence prevents that a read or write operation before a release fence will be reordered
// with a write operation after a release fence. Specifically no writes below will be reordered
// upwards. Important, because it protects non-threadsafe local_mask from being accessed by
// IsArmedInShard in other threads.
arm_count_.fetch_add(unique_shard_cnt_, memory_order_acq_rel);
auto cb = [this] {
EngineShard* shard = EngineShard::tlocal();
DVLOG(2) << "TriggerExec " << DebugId() << " sid:" << shard->shard_id();
// Everything that should be handled during the callback execution should go into RunInShard.
shard->Execute(this);
DVLOG(2) << "ptr_release " << DebugId() << " " << use_count();
intrusive_ptr_release(this); // against use_count_.fetch_add above.
};
// IsArmedInShard is the protector of non-thread safe data.
if (unique_shard_cnt_ == 1) {
ess_->Add(unique_shard_id_, std::move(cb)); // serves as a barrier.
} else {
for (ShardId i = 0; i < dist_.shard_data.size(); ++i) {
auto& sd = dist_.shard_data[i];
if (sd.arg_count == 0)
continue;
ess_->Add(i, cb); // serves as a barrier.
}
}
}
void Transaction::RunQuickSingle() {
DCHECK_EQ(1u, dist_.shard_data.size());
DCHECK_EQ(0u, txid_);
EngineShard* shard = EngineShard::tlocal();
auto& sd = dist_.shard_data[0];
DCHECK_EQ(0, sd.local_mask & KEYS_ACQUIRED);
DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id() << " " << args_[0];
CHECK(cb_) << DebugId() << " " << shard->shard_id() << " " << args_[0];
local_result_ = cb_(this, shard);
sd.local_mask &= ~ARMED;
cb_ = nullptr; // We can do it because only a single shard runs the callback.
CHECK_GE(Disarm(), 1u);
}
const char* Transaction::Name() const {
return cid_->name();
}
KeyLockArgs Transaction::GetLockArgs(ShardId sid) const {
KeyLockArgs res;
res.db_index = 0; // TODO
res.key_step = cid_->key_arg_step();
res.args = ShardArgsInShard(sid);
return res;
}
// Runs within a engine shard thread.
// Optimized path that schedules and runs transactions out of order if possible.
// Returns true if was eagerly executed, false if it was scheduled into queue.
bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
DCHECK_EQ(0u, txid_);
DCHECK_EQ(1u, dist_.shard_data.size());
auto mode = Mode();
auto lock_args = GetLockArgs(shard->shard_id());
auto& sd = dist_.shard_data.front();
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
// Fast path - for uncontended keys, just run the callback.
// That applies for single key operations like set, get, lpush etc.
if (shard->db_slice().CheckLock(mode, lock_args)) {
RunQuickSingle(); // TODO: for journal - this can become multi-shard
// transaction on replica.
return true;
}
intrusive_ptr_add_ref(this);
// we can do it because only a single thread writes into txid_ and sd.
txid_ = op_seq.fetch_add(1, std::memory_order_relaxed);
TxQueue::Iterator it = shard->InsertTxQ(this);
sd.pq_pos = it;
DCHECK_EQ(0, sd.local_mask & KEYS_ACQUIRED);
bool lock_acquired = shard->db_slice().Acquire(mode, lock_args);
sd.local_mask |= KEYS_ACQUIRED;
DCHECK(!lock_acquired); // Because CheckLock above failed.
state_mask_.fetch_or(SCHEDULED, memory_order_release);
return false;
}
// This function should not block since it's run via RunBriefInParallel.
pair<bool, bool> Transaction::ScheduleInShard(EngineShard* shard) {
// schedule_success, lock_granted.
pair<bool, bool> result{false, false};
if (shard->committed_txid() >= txid_) {
return result;
}
TxQueue* pq = shard->txq();
KeyLockArgs lock_args;
IntentLock::Mode mode = Mode();
bool lock_granted = false;
ShardId sid = TranslateSidInShard(shard->shard_id());
auto& sd = dist_.shard_data[sid];
bool shard_unlocked = true;
lock_args = GetLockArgs(shard->shard_id());
// we need to acquire the lock unrelated to shard_unlocked since we register into Tx queue.
// All transactions in the queue must acquire the intent lock.
lock_granted = shard->db_slice().Acquire(mode, lock_args) && shard_unlocked;
sd.local_mask |= KEYS_ACQUIRED;
DVLOG(1) << "Lock granted " << lock_granted << " for trans " << DebugId();
if (!pq->Empty()) {
// If the new transaction requires reordering of the pending queue (i.e. it comes before tail)
// and some other transaction already locked its keys we can not reorder 'trans' because
// that other transaction could have deduced that it can run OOO and eagerly execute. Hence, we
// fail this scheduling attempt for trans.
// However, when we schedule span-all transactions we can still reorder them. The reason is
// before we start scheduling them we lock the shards and disable OOO.
// We may record when they disable OOO via barrier_ts so if the queue contains transactions
// that were only scheduled afterwards we know they are not free so we can still
// reorder the queue. Currently, this optimization is disabled: barrier_ts < pq->HeadRank().
bool to_proceed = lock_granted || pq->TailScore() < txid_;
if (!to_proceed) {
if (sd.local_mask & KEYS_ACQUIRED) { // rollback the lock.
shard->db_slice().Release(mode, lock_args);
sd.local_mask &= ~KEYS_ACQUIRED;
}
return result; // false, false
}
}
result.second = lock_granted;
result.first = true;
TxQueue::Iterator it = pq->Insert(this);
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
sd.pq_pos = it;
DVLOG(1) << "Insert into tx-queue, sid(" << sid << ") " << DebugId() << ", qlen " << pq->size();
return result;
}
bool Transaction::CancelInShard(EngineShard* shard) {
ShardId sid = TranslateSidInShard(shard->shard_id());
auto& sd = dist_.shard_data[sid];
auto pos = sd.pq_pos;
if (pos == TxQueue::kEnd)
return false;
sd.pq_pos = TxQueue::kEnd;
TxQueue* pq = shard->txq();
auto val = pq->At(pos);
Transaction* trans = absl::get<Transaction*>(val);
DCHECK(trans == this) << "Pos " << pos << ", pq size " << pq->size() << ", trans " << trans;
pq->Remove(pos);
if (sd.local_mask & KEYS_ACQUIRED) {
auto mode = Mode();
auto lock_args = GetLockArgs(shard->shard_id());
shard->db_slice().Release(mode, lock_args);
sd.local_mask &= ~KEYS_ACQUIRED;
}
return true;
}
// runs in engine-shard thread.
ArgSlice Transaction::ShardArgsInShard(ShardId sid) const {
DCHECK(!args_.empty());
DCHECK_NOTNULL(EngineShard::tlocal());
// We can read unique_shard_cnt_ only because ShardArgsInShard is called after IsArmedInShard
// barrier.
if (unique_shard_cnt_ == 1) {
return args_;
}
const auto& sd = dist_.shard_data[sid];
return ArgSlice{args_.data() + sd.arg_start, sd.arg_count};
}
size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const {
if (unique_shard_cnt_ == 1)
return arg_index;
return dist_.reverse_index[dist_.shard_data[shard_id].arg_start + arg_index];
}
} // namespace dfly

285
server/transaction.h Normal file
View File

@ -0,0 +1,285 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <absl/container/flat_hash_map.h>
#include <absl/container/flat_hash_set.h>
#include <absl/container/inlined_vector.h>
#include <string_view>
#include <variant>
#include <boost/smart_ptr/intrusive_ptr.hpp>
#include <vector>
#include "core/intent_lock.h"
#include "core/tx_queue.h"
#include "core/op_status.h"
#include "server/common_types.h"
#include "server/table.h"
#include "util/fibers/fibers_ext.h"
namespace dfly {
class DbSlice;
class EngineShardSet;
class EngineShard;
class Transaction {
Transaction(const Transaction&);
void operator=(const Transaction&) = delete;
~Transaction();
// Transactions are reference counted.
friend void intrusive_ptr_add_ref(Transaction* trans) noexcept {
trans->use_count_.fetch_add(1, std::memory_order_relaxed);
}
friend void intrusive_ptr_release(Transaction* trans) noexcept {
if (1 == trans->use_count_.fetch_sub(1, std::memory_order_release)) {
std::atomic_thread_fence(std::memory_order_acquire);
delete trans;
}
}
public:
using RunnableType = std::function<OpStatus(Transaction* t, EngineShard*)>;
using time_point = ::std::chrono::steady_clock::time_point;
enum LocalState : uint8_t {
ARMED = 1, // Transaction was armed with the callback
KEYS_ACQUIRED = 0x20,
};
enum State : uint8_t {
SCHEDULED = 1,
RUNNING = 2, // For running multi-hop execution callbacks.
AFTERRUN = 4, // Once transaction finished running.
};
Transaction(const CommandId* cid, EngineShardSet* ess);
void InitByArgs(CmdArgList args);
std::string DebugId() const;
// Runs in engine thread
ArgSlice ShardArgsInShard(ShardId sid) const;
// Maps the index in ShardKeys(shard_id) slice back to the index in the original array passed to
// InitByArgs.
// Runs in the coordinator thread.
size_t ReverseArgIndex(ShardId shard_id, size_t arg_index) const;
//! Returns true if the transaction spans this shard_id.
//! Runs from the coordinator thread.
bool IsActive(ShardId shard_id) const {
return unique_shard_cnt_ == 1 ? unique_shard_id_ == shard_id
: dist_.shard_data[shard_id].arg_count > 0;
}
//! Returns true if the transaction is armed for execution on this sid (used to avoid
//! duplicate runs). Supports local transactions under multi as well.
bool IsArmedInShard(ShardId sid) const {
if (sid >= dist_.shard_data.size())
sid = 0;
// We use acquire so that no reordering will move before this load.
return arm_count_.load(std::memory_order_acquire) > 0 &&
dist_.shard_data[sid].local_mask & ARMED;
}
// Called from engine set shard threads.
uint16_t GetLocalMask(ShardId sid) const {
sid = TranslateSidInShard(sid);
return dist_.shard_data[sid].local_mask;
}
uint32_t GetStateMask() const {
return state_mask_.load(std::memory_order_relaxed);
}
bool IsOutOfOrder() const {
return dist_.out_of_order.load(std::memory_order_relaxed);
}
// Relevant only when unique_shards_ > 1.
uint32_t TxQueuePos(ShardId sid) const {
return dist_.shard_data[sid].pq_pos;
}
// if conclude is true, removes the transaction from the pending queue.
void Execute(RunnableType cb, bool conclude);
// for multi-key scenarios cb should return Status::Ok since otherwise the return value
// will be ill-defined.
OpStatus ScheduleSingleHop(RunnableType cb);
// Fits only for single key scenarios because it writes into shared variable res from
// potentially multiple threads.
template <typename F> auto ScheduleSingleHopT(F&& f) -> decltype(f(this, nullptr)) {
decltype(f(this, nullptr)) res;
ScheduleSingleHop([&res, f = std::forward<F>(f)](Transaction* t, EngineShard* shard) {
res = f(t, shard);
return res.status();
});
return res;
}
TxId txid() const {
return txid_;
}
// TODO: for multi trans_options_ changes with every operation.
// Does it mean we lock every key differently during the same transaction?
IntentLock::Mode Mode() const;
const char* Name() const;
uint32_t unique_shard_cnt() const {
return unique_shard_cnt_;
}
EngineShardSet* shard_set() { return ess_; }
// Called by EngineShard when performing Execute over the tx queue.
// Returns true if transaction should be kept in the queue.
bool RunInShard(ShardId sid);
private:
ShardId TranslateSidInShard(ShardId sid) const {
return sid < dist_.shard_data.size() ? sid : 0;
}
void ScheduleInternal(bool single_hop);
void ExecuteAsync(bool concluding_cb);
// Optimized version of RunInShard for single shard uncontended cases.
void RunQuickSingle();
//! Returns true if transaction run out-of-order during the scheduling phase.
bool ScheduleUniqueShard(EngineShard* shard);
/// Returns pair(schedule_success, lock_granted)
/// schedule_success is true if transaction was scheduled on db_slice.
/// lock_granted is true if lock was granted for all the keys on this shard.
/// Runs in the shard thread.
std::pair<bool, bool> ScheduleInShard(EngineShard* shard);
// Returns true if operation was cancelled for this shard. Runs in the shard thread.
bool CancelInShard(EngineShard* shard);
//! Returns locking arguments needed for DbSlice to Acquire/Release transactional locks.
//! Runs in the shard thread.
KeyLockArgs GetLockArgs(ShardId sid) const;
void WaitArm() {
arm_ec_.await([this] { return 0 == this->arm_count_.load(std::memory_order_relaxed); });
}
uint32_t Disarm() {
// We use release so that no stores will be reordered after.
uint32_t res = arm_count_.fetch_sub(1, std::memory_order_release);
arm_ec_.notify();
return res;
}
uint32_t use_count() const { return use_count_.load(std::memory_order_relaxed); }
struct PerShardData {
uint32_t arg_start = 0; // Indices into args_ array.
uint16_t arg_count = 0;
// Accessed only within the engine-shard thread.
// Bitmask of LocalState enums.
uint16_t local_mask{0};
uint32_t pq_pos = TxQueue::kEnd;
PerShardData(PerShardData&&) noexcept {
}
PerShardData() = default;
};
enum { kPerShardSize = sizeof(PerShardData) };
struct Dist {
// shard_data spans all the shards in ess_.
// I wish we could use a dense array of size [0..uniq_shards] but since
// multiple threads access this array to synchronize between themselves using
// PerShardData.state, it can be tricky. The complication comes from multi_ transactions where
// scheduled transaction is accessed between operations as well.
absl::InlinedVector<PerShardData, 4> shard_data; // length = shard_count
// Reverse argument mapping. Allows to reconstruct responses according to the original order of
// keys.
std::vector<uint32_t> reverse_index;
// NOTE: to move to bitmask if it grows.
// Written by coordinator thread, read by shard threads but not concurrently.
// Says whether the current callback function is concluding for this operation.
bool is_concluding_cb{true};
// out_of_order true - transaction can execute before other scheduled transactions,
// not necessary according to its queue order.
std::atomic_bool out_of_order{false};
};
enum { kDistSize = sizeof(Dist) };
const CommandId* cid_;
EngineShardSet* ess_;
TxId txid_{0};
std::atomic_uint32_t use_count_{0}, arm_count_{0};
// unique_shard_cnt_ and unique_shard_id_ is accessed only by coordinator thread.
uint32_t unique_shard_cnt_{0}; // number of unique shards span by args_
ShardId unique_shard_id_{kInvalidSid};
// Written by coordination thread but may be read by Shard threads.
std::atomic<uint16_t> state_mask_{0};
DbIndex db_index_ = 0;
// For single-hop transactions with unique_shards_ == 1, hence no data-race.
OpStatus local_result_ = OpStatus::OK;
uint32_t trans_options_ = 0;
uint32_t num_keys_ = 0;
Dist dist_;
util::fibers_ext::EventCount arm_ec_;
//! Stores arguments of the transaction (i.e. keys + values) ordered by shards.
absl::InlinedVector<std::string_view, 4> args_;
RunnableType cb_;
struct PerShardCache {
std::vector<std::string_view> args;
std::vector<uint32_t> original_index;
void Clear() {
args.clear();
original_index.clear();
}
};
struct TLTmpSpace {
std::vector<PerShardCache> shard_cache;
absl::flat_hash_set<std::string_view> uniq_keys;
};
static thread_local TLTmpSpace tmp_space;
};
inline uint16_t trans_id(const Transaction* ptr) {
return intptr_t(ptr) & 0xFFFF;
}
} // namespace dfly