Add EXEC transaction support. Introduce dragonfly_test.cc
This commit is contained in:
parent
9e5a5ea2f2
commit
35fa69c928
|
@ -13,6 +13,7 @@ cxx_link(dragonfly_lib dfly_core redis_lib uring_fiber_lib
|
||||||
add_library(dfly_test_lib test_utils.cc)
|
add_library(dfly_test_lib test_utils.cc)
|
||||||
cxx_link(dfly_test_lib dragonfly_lib gtest_main_ext)
|
cxx_link(dfly_test_lib dragonfly_lib gtest_main_ext)
|
||||||
|
|
||||||
|
cxx_test(dragonfly_test dfly_test_lib LABELS DFLY)
|
||||||
cxx_test(redis_parser_test dfly_test_lib LABELS DFLY)
|
cxx_test(redis_parser_test dfly_test_lib LABELS DFLY)
|
||||||
cxx_test(list_family_test dfly_test_lib LABELS DFLY)
|
cxx_test(list_family_test dfly_test_lib LABELS DFLY)
|
||||||
cxx_test(string_family_test dfly_test_lib LABELS DFLY)
|
cxx_test(string_family_test dfly_test_lib LABELS DFLY)
|
||||||
|
|
|
@ -20,6 +20,9 @@ CommandId::CommandId(const char* name, uint32_t mask, int8_t arity, int8_t first
|
||||||
int8_t last_key, int8_t step)
|
int8_t last_key, int8_t step)
|
||||||
: name_(name), opt_mask_(mask), arity_(arity), first_key_(first_key), last_key_(last_key),
|
: name_(name), opt_mask_(mask), arity_(arity), first_key_(first_key), last_key_(last_key),
|
||||||
step_key_(step) {
|
step_key_(step) {
|
||||||
|
if (mask & CO::ADMIN) {
|
||||||
|
opt_mask_ |= CO::NOSCRIPT;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t CommandId::OptCount(uint32_t mask) {
|
uint32_t CommandId::OptCount(uint32_t mask) {
|
||||||
|
@ -87,8 +90,14 @@ const char* OptName(CO::CommandOpt fl) {
|
||||||
return "loading";
|
return "loading";
|
||||||
case RANDOM:
|
case RANDOM:
|
||||||
return "random";
|
return "random";
|
||||||
|
case ADMIN:
|
||||||
|
return "admin";
|
||||||
|
case NOSCRIPT:
|
||||||
|
return "noscript";
|
||||||
|
case GLOBAL_TRANS:
|
||||||
|
return "global-trans";
|
||||||
}
|
}
|
||||||
return "";
|
return "unknown";
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace CO
|
} // namespace CO
|
||||||
|
|
|
@ -26,6 +26,9 @@ enum CommandOpt : uint32_t {
|
||||||
DENYOOM = 0x10, // use-memory in redis.
|
DENYOOM = 0x10, // use-memory in redis.
|
||||||
STALE = 0x20,
|
STALE = 0x20,
|
||||||
RANDOM = 0x40,
|
RANDOM = 0x40,
|
||||||
|
ADMIN = 0x80, // implies NOSCRIPT,
|
||||||
|
NOSCRIPT = 0x100,
|
||||||
|
GLOBAL_TRANS = 0x1000,
|
||||||
};
|
};
|
||||||
|
|
||||||
const char* OptName(CommandOpt fl);
|
const char* OptName(CommandOpt fl);
|
||||||
|
|
|
@ -11,11 +11,23 @@ namespace dfly {
|
||||||
|
|
||||||
class Connection;
|
class Connection;
|
||||||
class EngineShardSet;
|
class EngineShardSet;
|
||||||
class CommandId;
|
|
||||||
|
struct StoredCmd {
|
||||||
|
const CommandId* descr;
|
||||||
|
std::vector<std::string> cmd;
|
||||||
|
|
||||||
|
StoredCmd(const CommandId* d = nullptr) : descr(d) {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
struct ConnectionState {
|
struct ConnectionState {
|
||||||
DbIndex db_index = 0;
|
DbIndex db_index = 0;
|
||||||
|
|
||||||
|
enum ExecState { EXEC_INACTIVE, EXEC_COLLECT, EXEC_ERROR };
|
||||||
|
|
||||||
|
ExecState exec_state = EXEC_INACTIVE;
|
||||||
|
std::vector<StoredCmd> exec_body;
|
||||||
|
|
||||||
enum Mask : uint32_t {
|
enum Mask : uint32_t {
|
||||||
ASYNC_DISPATCH = 1, // whether a command is handled via async dispatch.
|
ASYNC_DISPATCH = 1, // whether a command is handled via async dispatch.
|
||||||
CONN_CLOSING = 2, // could be because of unrecoverable error or planned action.
|
CONN_CLOSING = 2, // could be because of unrecoverable error or planned action.
|
||||||
|
|
|
@ -0,0 +1,71 @@
|
||||||
|
// Copyright 2021, Roman Gershman. All rights reserved.
|
||||||
|
// See LICENSE for licensing terms.
|
||||||
|
//
|
||||||
|
|
||||||
|
#include <absl/strings/str_join.h>
|
||||||
|
#include <absl/strings/strip.h>
|
||||||
|
#include <gmock/gmock.h>
|
||||||
|
|
||||||
|
#include "base/gtest.h"
|
||||||
|
#include "base/logging.h"
|
||||||
|
#include "server/conn_context.h"
|
||||||
|
#include "server/main_service.h"
|
||||||
|
#include "server/redis_parser.h"
|
||||||
|
#include "server/test_utils.h"
|
||||||
|
#include "util/uring/uring_pool.h"
|
||||||
|
|
||||||
|
namespace dfly {
|
||||||
|
|
||||||
|
using namespace absl;
|
||||||
|
using namespace boost;
|
||||||
|
using namespace std;
|
||||||
|
using namespace util;
|
||||||
|
using ::io::Result;
|
||||||
|
using testing::ElementsAre;
|
||||||
|
using testing::HasSubstr;
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
constexpr unsigned kPoolThreadCount = 4;
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
|
||||||
|
// This test is responsible for server and main service
|
||||||
|
// (connection, transaction etc) families.
|
||||||
|
class DflyEngineTest : public BaseFamilyTest {
|
||||||
|
protected:
|
||||||
|
DflyEngineTest() : BaseFamilyTest() {
|
||||||
|
num_threads_ = kPoolThreadCount;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST_F(DflyEngineTest, Multi) {
|
||||||
|
RespVec resp = Run({"multi"});
|
||||||
|
ASSERT_THAT(resp, RespEq("OK"));
|
||||||
|
|
||||||
|
resp = Run({"get", "x"});
|
||||||
|
ASSERT_THAT(resp, RespEq("QUEUED"));
|
||||||
|
|
||||||
|
resp = Run({"get", "y"});
|
||||||
|
ASSERT_THAT(resp, RespEq("QUEUED"));
|
||||||
|
|
||||||
|
resp = Run({"exec"});
|
||||||
|
ASSERT_THAT(resp, ElementsAre(ArgType(RespExpr::NIL), ArgType(RespExpr::NIL)));
|
||||||
|
|
||||||
|
atomic_bool tx_empty = true;
|
||||||
|
|
||||||
|
ess_->RunBriefInParallel([&](EngineShard* shard) {
|
||||||
|
if (!shard->txq()->Empty())
|
||||||
|
tx_empty.store(false);
|
||||||
|
});
|
||||||
|
EXPECT_TRUE(tx_empty);
|
||||||
|
|
||||||
|
resp = Run({"get", "y"});
|
||||||
|
ASSERT_THAT(resp, ElementsAre(ArgType(RespExpr::NIL)));
|
||||||
|
|
||||||
|
ASSERT_FALSE(service_->IsLocked(0, "x"));
|
||||||
|
ASSERT_FALSE(service_->IsLocked(0, "y"));
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace dfly
|
|
@ -93,6 +93,7 @@ void EngineShard::PollExecution(Transaction* trans) {
|
||||||
|
|
||||||
Transaction* head = nullptr;
|
Transaction* head = nullptr;
|
||||||
string dbg_id;
|
string dbg_id;
|
||||||
|
|
||||||
while (!txq_.Empty()) {
|
while (!txq_.Empty()) {
|
||||||
auto val = txq_.Front();
|
auto val = txq_.Front();
|
||||||
head = absl::get<Transaction*>(val);
|
head = absl::get<Transaction*>(val);
|
||||||
|
@ -122,12 +123,10 @@ void EngineShard::PollExecution(Transaction* trans) {
|
||||||
if (VLOG_IS_ON(2)) {
|
if (VLOG_IS_ON(2)) {
|
||||||
dbg_id = head->DebugId();
|
dbg_id = head->DebugId();
|
||||||
}
|
}
|
||||||
bool keep = head->RunInShard(this);
|
|
||||||
DCHECK(head == absl::get<Transaction*>(txq_.Front()));
|
|
||||||
|
|
||||||
|
bool keep = head->RunInShard(this);
|
||||||
// We should not access head from this point since RunInShard callback decrements refcount.
|
// We should not access head from this point since RunInShard callback decrements refcount.
|
||||||
DLOG_IF(INFO, !dbg_id.empty()) << "RunHead " << dbg_id << ", keep " << keep;
|
DLOG_IF(INFO, !dbg_id.empty()) << "RunHead " << dbg_id << ", keep " << keep;
|
||||||
txq_.PopFront();
|
|
||||||
|
|
||||||
if (keep) {
|
if (keep) {
|
||||||
continuation_trans_ = head;
|
continuation_trans_ = head;
|
||||||
|
@ -148,17 +147,21 @@ void EngineShard::PollExecution(Transaction* trans) {
|
||||||
|
|
||||||
dbg_id.clear();
|
dbg_id.clear();
|
||||||
|
|
||||||
uint32_t pos = trans->TxQueuePos(sid);
|
|
||||||
if (VLOG_IS_ON(1)) {
|
if (VLOG_IS_ON(1)) {
|
||||||
dbg_id = trans->DebugId();
|
dbg_id = trans->DebugId();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool keep = trans->RunInShard(this); // resets TxQueuePos, this is why we get it before.
|
bool keep = trans->RunInShard(this);
|
||||||
DLOG_IF(INFO, !dbg_id.empty()) << "Eager run " << sid << ", " << dbg_id << ", keep " << keep;
|
DLOG_IF(INFO, !dbg_id.empty()) << "Eager run " << sid << ", " << dbg_id << ", keep " << keep;
|
||||||
|
|
||||||
// Should be enforced via Schedule(). TODO: to remove the check once the code is mature.
|
// Should be enforced via Schedule(). TODO: to remove the check once the code is mature.
|
||||||
CHECK(!keep) << "multi-hop transactions can not be OOO.";
|
CHECK(!keep) << "multi-hop transactions can not be OOO.";
|
||||||
txq_.Remove(pos);
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void EngineShard::ShutdownMulti(Transaction* multi) {
|
||||||
|
if (continuation_trans_ == multi) {
|
||||||
|
continuation_trans_ = nullptr;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#include "redis/sds.h"
|
#include "redis/sds.h"
|
||||||
}
|
}
|
||||||
|
|
||||||
#include <xxhash.h>
|
#include <xxhash.h>
|
||||||
|
@ -62,9 +62,8 @@ class EngineShard {
|
||||||
return committed_txid_;
|
return committed_txid_;
|
||||||
}
|
}
|
||||||
|
|
||||||
TxQueue::Iterator InsertTxQ(Transaction* trans) {
|
// TODO: Awkward interface. I should solve it somehow.
|
||||||
return txq_.Insert(trans);
|
void ShutdownMulti(Transaction* multi);
|
||||||
}
|
|
||||||
|
|
||||||
sds tmp_str;
|
sds tmp_str;
|
||||||
|
|
||||||
|
@ -158,7 +157,7 @@ template <typename U> void EngineShardSet::RunBlockingInParallel(U&& func) {
|
||||||
bc.Wait();
|
bc.Wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename View> inline ShardId Shard(const View& v, ShardId shard_num) {
|
inline ShardId Shard(std::string_view v, ShardId shard_num) {
|
||||||
XXH64_hash_t hash = XXH64(v.data(), v.size(), 120577240643ULL);
|
XXH64_hash_t hash = XXH64(v.data(), v.size(), 120577240643ULL);
|
||||||
return hash % shard_num;
|
return hash % shard_num;
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ extern "C" {
|
||||||
#include "redis/redis_aux.h"
|
#include "redis/redis_aux.h"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#include <absl/cleanup/cleanup.h>
|
||||||
#include <absl/strings/ascii.h>
|
#include <absl/strings/ascii.h>
|
||||||
#include <xxhash.h>
|
#include <xxhash.h>
|
||||||
|
|
||||||
|
@ -103,12 +104,20 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) {
|
||||||
VLOG(2) << "Got: " << args;
|
VLOG(2) << "Got: " << args;
|
||||||
|
|
||||||
string_view cmd_str = ArgS(args, 0);
|
string_view cmd_str = ArgS(args, 0);
|
||||||
|
bool is_trans_cmd = (cmd_str == "EXEC" || cmd_str == "MULTI");
|
||||||
const CommandId* cid = registry_.Find(cmd_str);
|
const CommandId* cid = registry_.Find(cmd_str);
|
||||||
|
|
||||||
|
absl::Cleanup multi_error = [cntx] {
|
||||||
|
if (cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE) {
|
||||||
|
cntx->conn_state.exec_state = ConnectionState::EXEC_ERROR;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
if (cid == nullptr) {
|
if (cid == nullptr) {
|
||||||
return cntx->SendError(absl::StrCat("unknown command `", cmd_str, "`"));
|
return cntx->SendError(absl::StrCat("unknown command `", cmd_str, "`"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool under_multi = cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE && !is_trans_cmd;
|
||||||
if ((cid->arity() > 0 && args.size() != size_t(cid->arity())) ||
|
if ((cid->arity() > 0 && args.size() != size_t(cid->arity())) ||
|
||||||
(cid->arity() < 0 && args.size() < size_t(-cid->arity()))) {
|
(cid->arity() < 0 && args.size() < size_t(-cid->arity()))) {
|
||||||
return cntx->SendError(WrongNumArgsError(cmd_str));
|
return cntx->SendError(WrongNumArgsError(cmd_str));
|
||||||
|
@ -118,12 +127,31 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) {
|
||||||
return cntx->SendError(WrongNumArgsError(cmd_str));
|
return cntx->SendError(WrongNumArgsError(cmd_str));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (under_multi && (cid->opt_mask() & CO::ADMIN)) {
|
||||||
|
cntx->SendError("Can not run admin commands under multi-transactions");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::move(multi_error).Cancel();
|
||||||
|
|
||||||
|
if (cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE && !is_trans_cmd) {
|
||||||
|
// TODO: protect against aggregating huge transactions.
|
||||||
|
StoredCmd stored_cmd{cid};
|
||||||
|
stored_cmd.cmd.reserve(args.size());
|
||||||
|
for (size_t i = 0; i < args.size(); ++i) {
|
||||||
|
stored_cmd.cmd.emplace_back(ArgS(args, i));
|
||||||
|
}
|
||||||
|
cntx->conn_state.exec_body.push_back(std::move(stored_cmd));
|
||||||
|
|
||||||
|
return cntx->SendSimpleRespString("QUEUED");
|
||||||
|
}
|
||||||
|
|
||||||
uint64_t start_usec = ProactorBase::GetMonotonicTimeNs(), end_usec;
|
uint64_t start_usec = ProactorBase::GetMonotonicTimeNs(), end_usec;
|
||||||
|
|
||||||
// Create command transaction
|
// Create command transaction
|
||||||
intrusive_ptr<Transaction> dist_trans;
|
intrusive_ptr<Transaction> dist_trans;
|
||||||
|
|
||||||
if (cid->first_key_pos() > 0) {
|
if (cid->first_key_pos() > 0 || (cid->opt_mask() & CO::GLOBAL_TRANS)) {
|
||||||
dist_trans.reset(new Transaction{cid, &shard_set_});
|
dist_trans.reset(new Transaction{cid, &shard_set_});
|
||||||
cntx->transaction = dist_trans.get();
|
cntx->transaction = dist_trans.get();
|
||||||
|
|
||||||
|
@ -189,6 +217,17 @@ void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view va
|
||||||
DispatchCommand(arg_list, cntx);
|
DispatchCommand(arg_list, cntx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool Service::IsLocked(DbIndex db_index, std::string_view key) const {
|
||||||
|
ShardId sid = Shard(key, shard_count());
|
||||||
|
KeyLockArgs args;
|
||||||
|
args.db_index = db_index;
|
||||||
|
args.args = ArgSlice{&key, 1};
|
||||||
|
args.key_step = 1;
|
||||||
|
bool is_open = pp_.at(sid)->AwaitBrief(
|
||||||
|
[args] { return EngineShard::tlocal()->db_slice().CheckLock(IntentLock::EXCLUSIVE, args); });
|
||||||
|
return !is_open;
|
||||||
|
}
|
||||||
|
|
||||||
void Service::RegisterHttp(HttpListenerBase* listener) {
|
void Service::RegisterHttp(HttpListenerBase* listener) {
|
||||||
CHECK_NOTNULL(listener);
|
CHECK_NOTNULL(listener);
|
||||||
}
|
}
|
||||||
|
@ -214,6 +253,60 @@ void Service::DbSize(CmdArgList args, ConnectionContext* cntx) {
|
||||||
return cntx->SendLong(num_keys.load(memory_order_relaxed));
|
return cntx->SendLong(num_keys.load(memory_order_relaxed));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Service::Quit(CmdArgList args, ConnectionContext* cntx) {
|
||||||
|
cntx->SendOk();
|
||||||
|
cntx->CloseConnection();
|
||||||
|
}
|
||||||
|
|
||||||
|
void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
|
||||||
|
if (cntx->conn_state.exec_state == ConnectionState::EXEC_INACTIVE) {
|
||||||
|
return cntx->SendError("EXEC without MULTI");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cntx->conn_state.exec_state == ConnectionState::EXEC_ERROR) {
|
||||||
|
cntx->conn_state.exec_state = ConnectionState::EXEC_INACTIVE;
|
||||||
|
cntx->conn_state.exec_body.clear();
|
||||||
|
return cntx->SendError("-EXECABORT Transaction discarded because of previous errors");
|
||||||
|
}
|
||||||
|
|
||||||
|
cntx->SendRespBlob(absl::StrCat("*", cntx->conn_state.exec_body.size(), "\r\n"));
|
||||||
|
|
||||||
|
if (!cntx->ec() && !cntx->conn_state.exec_body.empty()) {
|
||||||
|
CmdArgVec str_list;
|
||||||
|
|
||||||
|
for (auto& scmd : cntx->conn_state.exec_body) {
|
||||||
|
str_list.resize(scmd.cmd.size());
|
||||||
|
for (size_t i = 0; i < scmd.cmd.size(); ++i) {
|
||||||
|
string& s = scmd.cmd[i];
|
||||||
|
str_list[i] = MutableStrSpan{s.data(), s.size()};
|
||||||
|
}
|
||||||
|
|
||||||
|
cntx->transaction->SetExecCmd(scmd.descr);
|
||||||
|
CmdArgList cmd_arg_list{str_list.data(), str_list.size()};
|
||||||
|
cntx->transaction->InitByArgs(cntx->conn_state.db_index, cmd_arg_list);
|
||||||
|
scmd.descr->Invoke(cmd_arg_list, cntx);
|
||||||
|
if (cntx->ec())
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
VLOG(1) << "Exec unlocking " << cntx->conn_state.exec_body.size() << " commands";
|
||||||
|
cntx->transaction->UnlockMulti();
|
||||||
|
}
|
||||||
|
|
||||||
|
cntx->conn_state.exec_state = ConnectionState::EXEC_INACTIVE;
|
||||||
|
cntx->conn_state.exec_body.clear();
|
||||||
|
VLOG(1) << "Exec completed";
|
||||||
|
}
|
||||||
|
|
||||||
|
void Service::Multi(CmdArgList args, ConnectionContext* cntx) {
|
||||||
|
if (cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE) {
|
||||||
|
return cntx->SendError("MULTI calls can not be nested");
|
||||||
|
}
|
||||||
|
cntx->conn_state.exec_state = ConnectionState::EXEC_COLLECT;
|
||||||
|
// TODO: to protect against huge exec transactions.
|
||||||
|
return cntx->SendOk();
|
||||||
|
}
|
||||||
|
|
||||||
VarzValue::Map Service::GetVarzStats() {
|
VarzValue::Map Service::GetVarzStats() {
|
||||||
VarzValue::Map res;
|
VarzValue::Map res;
|
||||||
|
|
||||||
|
@ -234,8 +327,15 @@ inline CommandId::Handler HandlerFunc(Service* se, ServiceFunc f) {
|
||||||
void Service::RegisterCommands() {
|
void Service::RegisterCommands() {
|
||||||
using CI = CommandId;
|
using CI = CommandId;
|
||||||
|
|
||||||
|
constexpr auto kExecMask =
|
||||||
|
CO::LOADING | CO::NOSCRIPT | CO::GLOBAL_TRANS;
|
||||||
|
|
||||||
registry_ << CI{"DEBUG", CO::RANDOM | CO::READONLY, -2, 0, 0, 0}.HFUNC(Debug)
|
registry_ << CI{"DEBUG", CO::RANDOM | CO::READONLY, -2, 0, 0, 0}.HFUNC(Debug)
|
||||||
<< CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize);
|
<< CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize)
|
||||||
|
<< CI{"QUIT", CO::READONLY | CO::FAST, 1, 0, 0, 0}.HFUNC(Quit)
|
||||||
|
<< CI{"MULTI", CO::NOSCRIPT | CO::FAST | CO::LOADING | CO::STALE, 1, 0, 0, 0}.HFUNC(
|
||||||
|
Multi)
|
||||||
|
<< CI{"EXEC", kExecMask, 1, 0, 0, 0}.HFUNC(Exec);
|
||||||
|
|
||||||
StringFamily::Register(®istry_);
|
StringFamily::Register(®istry_);
|
||||||
GenericFamily::Register(®istry_);
|
GenericFamily::Register(®istry_);
|
||||||
|
|
|
@ -44,6 +44,9 @@ class Service {
|
||||||
return shard_set_.size();
|
return shard_set_.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Used by tests.
|
||||||
|
bool IsLocked(DbIndex db_index, std::string_view key) const;
|
||||||
|
|
||||||
EngineShardSet& shard_set() {
|
EngineShardSet& shard_set() {
|
||||||
return shard_set_;
|
return shard_set_;
|
||||||
}
|
}
|
||||||
|
@ -56,6 +59,10 @@ class Service {
|
||||||
void Debug(CmdArgList args, ConnectionContext* cntx);
|
void Debug(CmdArgList args, ConnectionContext* cntx);
|
||||||
void DbSize(CmdArgList args, ConnectionContext* cntx);
|
void DbSize(CmdArgList args, ConnectionContext* cntx);
|
||||||
|
|
||||||
|
void Quit(CmdArgList args, ConnectionContext* cntx);
|
||||||
|
void Exec(CmdArgList args, ConnectionContext* cntx);
|
||||||
|
void Multi(CmdArgList args, ConnectionContext* cntx);
|
||||||
|
|
||||||
void RegisterCommands();
|
void RegisterCommands();
|
||||||
|
|
||||||
base::VarzValue::Map GetVarzStats();
|
base::VarzValue::Map GetVarzStats();
|
||||||
|
|
|
@ -119,6 +119,10 @@ class ReplyBuilder {
|
||||||
as_resp()->SendBulkString(str);
|
as_resp()->SendBulkString(str);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void CloseConnection() {
|
||||||
|
serializer_->CloseConnection();
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
RespSerializer* as_resp() {
|
RespSerializer* as_resp() {
|
||||||
return static_cast<RespSerializer*>(serializer_.get());
|
return static_cast<RespSerializer*>(serializer_.get());
|
||||||
|
|
|
@ -28,10 +28,6 @@ IntentLock::Mode Transaction::Mode() const {
|
||||||
return (trans_options_ & CO::READONLY) ? IntentLock::SHARED : IntentLock::EXCLUSIVE;
|
return (trans_options_ & CO::READONLY) ? IntentLock::SHARED : IntentLock::EXCLUSIVE;
|
||||||
}
|
}
|
||||||
|
|
||||||
Transaction::~Transaction() {
|
|
||||||
DVLOG(2) << "Transaction " << DebugId() << " destroyed";
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Construct a new Transaction:: Transaction object
|
* @brief Construct a new Transaction:: Transaction object
|
||||||
*
|
*
|
||||||
|
@ -40,15 +36,26 @@ Transaction::~Transaction() {
|
||||||
* @param cs
|
* @param cs
|
||||||
*/
|
*/
|
||||||
Transaction::Transaction(const CommandId* cid, EngineShardSet* ess) : cid_(cid), ess_(ess) {
|
Transaction::Transaction(const CommandId* cid, EngineShardSet* ess) : cid_(cid), ess_(ess) {
|
||||||
|
if (strcmp(cid_->name(), "EXEC") == 0) {
|
||||||
|
multi_.reset(new Multi);
|
||||||
|
}
|
||||||
trans_options_ = cid_->opt_mask();
|
trans_options_ = cid_->opt_mask();
|
||||||
|
|
||||||
bool single_key = cid_->first_key_pos() > 0 && !cid_->is_multi_key();
|
bool single_key = cid_->first_key_pos() > 0 && !cid_->is_multi_key();
|
||||||
if (single_key) {
|
if (!multi_ && single_key) {
|
||||||
shard_data_.resize(1); // Single key optimization
|
shard_data_.resize(1); // Single key optimization
|
||||||
} else {
|
} else {
|
||||||
// Our shard_data is not sparse, so we must allocate for all threads :(
|
// Our shard_data is not sparse, so we must allocate for all threads :(
|
||||||
shard_data_.resize(ess_->size());
|
shard_data_.resize(ess_->size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (IsGlobal()) {
|
||||||
|
unique_shard_cnt_ = ess->size();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Transaction::~Transaction() {
|
||||||
|
DVLOG(2) << "Transaction " << DebugId() << " destroyed";
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -75,10 +82,11 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) {
|
||||||
CHECK_GT(args.size(), 1U);
|
CHECK_GT(args.size(), 1U);
|
||||||
CHECK_LT(size_t(cid_->first_key_pos()), args.size());
|
CHECK_LT(size_t(cid_->first_key_pos()), args.size());
|
||||||
DCHECK_EQ(unique_shard_cnt_, 0u);
|
DCHECK_EQ(unique_shard_cnt_, 0u);
|
||||||
|
DCHECK(!IsGlobal()) << "Global transactions do not have keys";
|
||||||
|
|
||||||
db_index_ = index;
|
db_index_ = index;
|
||||||
|
|
||||||
if (!cid_->is_multi_key()) { // Single key optimization.
|
if (!multi_ && !cid_->is_multi_key()) { // Single key optimization.
|
||||||
auto key = ArgS(args, cid_->first_key_pos());
|
auto key = ArgS(args, cid_->first_key_pos());
|
||||||
args_.push_back(key);
|
args_.push_back(key);
|
||||||
|
|
||||||
|
@ -97,6 +105,13 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) {
|
||||||
v.Clear();
|
v.Clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
IntentLock::Mode mode = IntentLock::EXCLUSIVE;
|
||||||
|
if (multi_) {
|
||||||
|
mode = Mode();
|
||||||
|
tmp_space.uniq_keys.clear();
|
||||||
|
DCHECK_LT(int(mode), 2);
|
||||||
|
}
|
||||||
|
|
||||||
size_t key_end = cid_->last_key_pos() > 0 ? cid_->last_key_pos() + 1
|
size_t key_end = cid_->last_key_pos() > 0 ? cid_->last_key_pos() + 1
|
||||||
: (args.size() + 1 + cid_->last_key_pos());
|
: (args.size() + 1 + cid_->last_key_pos());
|
||||||
for (size_t i = 1; i < key_end; ++i) {
|
for (size_t i = 1; i < key_end; ++i) {
|
||||||
|
@ -105,6 +120,10 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) {
|
||||||
shard_index[sid].args.push_back(key);
|
shard_index[sid].args.push_back(key);
|
||||||
shard_index[sid].original_index.push_back(i - 1);
|
shard_index[sid].original_index.push_back(i - 1);
|
||||||
|
|
||||||
|
if (multi_ && tmp_space.uniq_keys.insert(key).second) {
|
||||||
|
multi_->locks[key].cnt[int(mode)]++;
|
||||||
|
};
|
||||||
|
|
||||||
if (cid_->key_arg_step() == 2) { // value
|
if (cid_->key_arg_step() == 2) { // value
|
||||||
++i;
|
++i;
|
||||||
auto val = ArgS(args, i);
|
auto val = ArgS(args, i);
|
||||||
|
@ -149,9 +168,12 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) {
|
||||||
|
|
||||||
if (unique_shard_cnt_ == 1) {
|
if (unique_shard_cnt_ == 1) {
|
||||||
PerShardData* sd;
|
PerShardData* sd;
|
||||||
|
if (multi_) {
|
||||||
shard_data_.resize(1);
|
sd = &shard_data_[unique_shard_id_];
|
||||||
sd = &shard_data_.front();
|
} else {
|
||||||
|
shard_data_.resize(1);
|
||||||
|
sd = &shard_data_.front();
|
||||||
|
}
|
||||||
sd->arg_count = -1;
|
sd->arg_count = -1;
|
||||||
sd->arg_start = -1;
|
sd->arg_start = -1;
|
||||||
}
|
}
|
||||||
|
@ -160,10 +182,26 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) {
|
||||||
for (const auto& sd : shard_data_) {
|
for (const auto& sd : shard_data_) {
|
||||||
DCHECK_EQ(sd.local_mask, 0u);
|
DCHECK_EQ(sd.local_mask, 0u);
|
||||||
DCHECK_EQ(0, sd.local_mask & ARMED);
|
DCHECK_EQ(0, sd.local_mask & ARMED);
|
||||||
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
|
if (!multi_) {
|
||||||
|
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Transaction::SetExecCmd(const CommandId* cid) {
|
||||||
|
DCHECK(multi_);
|
||||||
|
DCHECK(!cb_);
|
||||||
|
|
||||||
|
if (txid_ == 0) {
|
||||||
|
Schedule();
|
||||||
|
}
|
||||||
|
|
||||||
|
unique_shard_cnt_ = 0;
|
||||||
|
cid_ = cid;
|
||||||
|
trans_options_ = cid->opt_mask();
|
||||||
|
cb_ = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
string Transaction::DebugId() const {
|
string Transaction::DebugId() const {
|
||||||
return absl::StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, " (", trans_id(this), ")");
|
return absl::StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, " (", trans_id(this), ")");
|
||||||
}
|
}
|
||||||
|
@ -186,7 +224,21 @@ bool Transaction::RunInShard(EngineShard* shard) {
|
||||||
DCHECK(sd.local_mask & ARMED);
|
DCHECK(sd.local_mask & ARMED);
|
||||||
sd.local_mask &= ~ARMED;
|
sd.local_mask &= ~ARMED;
|
||||||
|
|
||||||
DCHECK(sd.local_mask & KEYS_ACQUIRED);
|
// For multi we unlock transaction (i.e. its keys) in UnlockMulti() call.
|
||||||
|
// Therefore we differentiate between concluding, which says that this specific
|
||||||
|
// runnable concludes current operation, and should_release which tells
|
||||||
|
// whether we should unlock the keys. should_release is false for multi and
|
||||||
|
// equal to concluding otherwise.
|
||||||
|
bool should_release = is_concluding_cb_ && !multi_;
|
||||||
|
|
||||||
|
// We make sure that we lock exactly once for each (multi-hop) transaction inside
|
||||||
|
// multi-transactions.
|
||||||
|
if (multi_ && ((sd.local_mask & KEYS_ACQUIRED) == 0)) {
|
||||||
|
sd.local_mask |= KEYS_ACQUIRED;
|
||||||
|
shard->db_slice().Acquire(Mode(), GetLockArgs(idx));
|
||||||
|
}
|
||||||
|
|
||||||
|
DCHECK(IsGlobal() || (sd.local_mask & KEYS_ACQUIRED));
|
||||||
|
|
||||||
/*************************************************************************/
|
/*************************************************************************/
|
||||||
// Actually running the callback.
|
// Actually running the callback.
|
||||||
|
@ -203,33 +255,51 @@ bool Transaction::RunInShard(EngineShard* shard) {
|
||||||
// at least the coordinator thread owns the reference.
|
// at least the coordinator thread owns the reference.
|
||||||
DCHECK_GE(use_count(), 1u);
|
DCHECK_GE(use_count(), 1u);
|
||||||
|
|
||||||
|
// we remove tx from tx-queue upon first invocation.
|
||||||
|
// if it needs to run again it runs via a dedicated continuation_trans_ state in EngineShard.
|
||||||
|
if (sd.pq_pos != TxQueue::kEnd) {
|
||||||
|
shard->txq()->Remove(sd.pq_pos);
|
||||||
|
sd.pq_pos = TxQueue::kEnd;
|
||||||
|
}
|
||||||
|
|
||||||
// If it's a final hop we should release the locks.
|
// If it's a final hop we should release the locks.
|
||||||
if (is_concluding_cb_) {
|
if (should_release) {
|
||||||
KeyLockArgs largs = GetLockArgs(idx);
|
KeyLockArgs largs = GetLockArgs(idx);
|
||||||
|
|
||||||
|
// If a transaction has been suspended, we keep the lock so that future transaction
|
||||||
|
// touching those keys will be ordered via TxQueue. It's necessary because we preserve
|
||||||
|
// the atomicity of awaked transactions by halting the TxQueue.
|
||||||
shard->db_slice().Release(Mode(), largs);
|
shard->db_slice().Release(Mode(), largs);
|
||||||
sd.local_mask &= ~KEYS_ACQUIRED;
|
sd.local_mask &= ~KEYS_ACQUIRED;
|
||||||
}
|
}
|
||||||
|
|
||||||
CHECK_GE(DecreaseRunCnt(), 1u);
|
CHECK_GE(DecreaseRunCnt(), 1u);
|
||||||
|
// From this point on we can not access 'this'.
|
||||||
|
|
||||||
return !is_concluding_cb_; // keep
|
return !should_release; // keep
|
||||||
}
|
}
|
||||||
|
|
||||||
void Transaction::ScheduleInternal(bool single_hop) {
|
void Transaction::ScheduleInternal(bool single_hop) {
|
||||||
DCHECK_EQ(0, state_mask_.load(memory_order_acquire) & SCHEDULED);
|
DCHECK_EQ(0, state_mask_.load(memory_order_acquire) & SCHEDULED);
|
||||||
DCHECK_EQ(0u, txid_);
|
DCHECK_EQ(0u, txid_);
|
||||||
|
|
||||||
|
bool span_all = IsGlobal();
|
||||||
bool out_of_order = false;
|
bool out_of_order = false;
|
||||||
|
|
||||||
uint32_t num_shards;
|
uint32_t num_shards;
|
||||||
std::function<bool(uint32_t)> is_active;
|
std::function<bool(uint32_t)> is_active;
|
||||||
|
|
||||||
num_shards = unique_shard_cnt_;
|
if (span_all) {
|
||||||
DCHECK_GT(num_shards, 0u);
|
is_active = [](uint32_t) { return true; };
|
||||||
|
num_shards = ess_->size();
|
||||||
|
} else {
|
||||||
|
num_shards = unique_shard_cnt_;
|
||||||
|
DCHECK_GT(num_shards, 0u);
|
||||||
|
|
||||||
is_active = [&](uint32_t i) {
|
is_active = [&](uint32_t i) {
|
||||||
return num_shards == 1 ? (i == unique_shard_id_) : shard_data_[i].arg_count > 0;
|
return num_shards == 1 ? (i == unique_shard_id_) : shard_data_[i].arg_count > 0;
|
||||||
};
|
};
|
||||||
|
}
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
txid_ = op_seq.fetch_add(1, std::memory_order_relaxed);
|
txid_ = op_seq.fetch_add(1, std::memory_order_relaxed);
|
||||||
|
@ -250,6 +320,9 @@ void Transaction::ScheduleInternal(bool single_hop) {
|
||||||
// It might be possible to do it for multi-hop transactions as well but currently is
|
// It might be possible to do it for multi-hop transactions as well but currently is
|
||||||
// too complicated to reason about.
|
// too complicated to reason about.
|
||||||
if (single_hop && lock_granted_cnt.load(memory_order_relaxed) == num_shards) {
|
if (single_hop && lock_granted_cnt.load(memory_order_relaxed) == num_shards) {
|
||||||
|
// OOO can not happen with span-all transactions. We ensure it in ScheduleInShard when we
|
||||||
|
// refuse to acquire locks for these transactions..
|
||||||
|
DCHECK(!span_all);
|
||||||
out_of_order = true;
|
out_of_order = true;
|
||||||
}
|
}
|
||||||
DVLOG(1) << "Scheduled " << DebugId() << " OutOfOrder: " << out_of_order;
|
DVLOG(1) << "Scheduled " << DebugId() << " OutOfOrder: " << out_of_order;
|
||||||
|
@ -283,7 +356,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
|
||||||
|
|
||||||
cb_ = std::move(cb);
|
cb_ = std::move(cb);
|
||||||
|
|
||||||
bool schedule_fast = (unique_shard_cnt_ == 1);
|
bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !multi_;
|
||||||
if (schedule_fast) { // Single shard (local) optimization.
|
if (schedule_fast) { // Single shard (local) optimization.
|
||||||
// We never resize shard_data because that would affect MULTI transaction correctness.
|
// We never resize shard_data because that would affect MULTI transaction correctness.
|
||||||
DCHECK_EQ(1u, shard_data_.size());
|
DCHECK_EQ(1u, shard_data_.size());
|
||||||
|
@ -313,7 +386,9 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
|
||||||
|
|
||||||
ess_->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier.
|
ess_->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier.
|
||||||
} else {
|
} else {
|
||||||
ScheduleInternal(true);
|
// Transaction spans multiple shards or it's global (like flushdb) or multi.
|
||||||
|
if (!multi_)
|
||||||
|
ScheduleInternal(true);
|
||||||
ExecuteAsync(true);
|
ExecuteAsync(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -327,6 +402,56 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
|
||||||
return local_result_;
|
return local_result_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Runs in the coordinator fiber.
|
||||||
|
void Transaction::UnlockMulti() {
|
||||||
|
VLOG(1) << "Transaction::UnlockMulti";
|
||||||
|
|
||||||
|
DCHECK(multi_);
|
||||||
|
using KeyList = vector<pair<std::string_view, LockCnt>>;
|
||||||
|
vector<KeyList> sharded_keys(ess_->size());
|
||||||
|
|
||||||
|
// It's LE and not EQ because there may be callbacks in progress that increase use_count_.
|
||||||
|
DCHECK_LE(1u, use_count());
|
||||||
|
|
||||||
|
for (const auto& k_v : multi_->locks) {
|
||||||
|
ShardId sid = Shard(k_v.first, sharded_keys.size());
|
||||||
|
sharded_keys[sid].push_back(k_v);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto cb = [&](EngineShard* shard) {
|
||||||
|
ShardId sid = shard->shard_id();
|
||||||
|
for (const auto& k_v : sharded_keys[sid]) {
|
||||||
|
auto release = [&](IntentLock::Mode mode) {
|
||||||
|
if (k_v.second.cnt[mode]) {
|
||||||
|
shard->db_slice().Release(mode, this->db_index_, k_v.first, k_v.second.cnt[mode]);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
release(IntentLock::SHARED);
|
||||||
|
release(IntentLock::EXCLUSIVE);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto& sd = shard_data_[SidToId(shard->shard_id())];
|
||||||
|
|
||||||
|
// It does not have to be that all shards in multi transaction execute this tx.
|
||||||
|
// Hence it could stay in the tx queue. We perform the necessary cleanup and remove it from
|
||||||
|
// there.
|
||||||
|
if (sd.pq_pos != TxQueue::kEnd) {
|
||||||
|
TxQueue* txq = shard->txq();
|
||||||
|
DCHECK(!txq->Empty());
|
||||||
|
Transaction* trans = absl::get<Transaction*>(txq->Front());
|
||||||
|
DCHECK(trans == this);
|
||||||
|
txq->PopFront();
|
||||||
|
sd.pq_pos = TxQueue::kEnd;
|
||||||
|
}
|
||||||
|
|
||||||
|
shard->ShutdownMulti(this);
|
||||||
|
};
|
||||||
|
|
||||||
|
ess_->RunBriefInParallel(std::move(cb));
|
||||||
|
|
||||||
|
DCHECK_EQ(1u, use_count());
|
||||||
|
}
|
||||||
|
|
||||||
// Runs in coordinator thread.
|
// Runs in coordinator thread.
|
||||||
void Transaction::Execute(RunnableType cb, bool conclude) {
|
void Transaction::Execute(RunnableType cb, bool conclude) {
|
||||||
cb_ = std::move(cb);
|
cb_ = std::move(cb);
|
||||||
|
@ -359,12 +484,14 @@ void Transaction::ExecuteAsync(bool concluding_cb) {
|
||||||
// safely.
|
// safely.
|
||||||
use_count_.fetch_add(unique_shard_cnt_, memory_order_relaxed);
|
use_count_.fetch_add(unique_shard_cnt_, memory_order_relaxed);
|
||||||
|
|
||||||
|
bool is_global = IsGlobal();
|
||||||
|
|
||||||
if (unique_shard_cnt_ == 1) {
|
if (unique_shard_cnt_ == 1) {
|
||||||
shard_data_[SidToId(unique_shard_id_)].local_mask |= ARMED;
|
shard_data_[SidToId(unique_shard_id_)].local_mask |= ARMED;
|
||||||
} else {
|
} else {
|
||||||
for (ShardId i = 0; i < shard_data_.size(); ++i) {
|
for (ShardId i = 0; i < shard_data_.size(); ++i) {
|
||||||
auto& sd = shard_data_[i];
|
auto& sd = shard_data_[i];
|
||||||
if (sd.arg_count == 0)
|
if (!is_global && sd.arg_count == 0)
|
||||||
continue;
|
continue;
|
||||||
DCHECK_LT(sd.arg_count, 1u << 15);
|
DCHECK_LT(sd.arg_count, 1u << 15);
|
||||||
sd.local_mask |= ARMED;
|
sd.local_mask |= ARMED;
|
||||||
|
@ -408,12 +535,12 @@ void Transaction::ExecuteAsync(bool concluding_cb) {
|
||||||
};
|
};
|
||||||
|
|
||||||
// IsArmedInShard is the protector of non-thread safe data.
|
// IsArmedInShard is the protector of non-thread safe data.
|
||||||
if (unique_shard_cnt_ == 1) {
|
if (!is_global && unique_shard_cnt_ == 1) {
|
||||||
ess_->Add(unique_shard_id_, std::move(cb)); // serves as a barrier.
|
ess_->Add(unique_shard_id_, std::move(cb)); // serves as a barrier.
|
||||||
} else {
|
} else {
|
||||||
for (ShardId i = 0; i < shard_data_.size(); ++i) {
|
for (ShardId i = 0; i < shard_data_.size(); ++i) {
|
||||||
auto& sd = shard_data_[i];
|
auto& sd = shard_data_[i];
|
||||||
if (sd.arg_count == 0)
|
if (!is_global && sd.arg_count == 0)
|
||||||
continue;
|
continue;
|
||||||
ess_->Add(i, cb); // serves as a barrier.
|
ess_->Add(i, cb); // serves as a barrier.
|
||||||
}
|
}
|
||||||
|
@ -421,6 +548,7 @@ void Transaction::ExecuteAsync(bool concluding_cb) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void Transaction::RunQuickie() {
|
void Transaction::RunQuickie() {
|
||||||
|
DCHECK(!multi_);
|
||||||
DCHECK_EQ(1u, shard_data_.size());
|
DCHECK_EQ(1u, shard_data_.size());
|
||||||
DCHECK_EQ(0u, txid_);
|
DCHECK_EQ(0u, txid_);
|
||||||
|
|
||||||
|
@ -454,6 +582,7 @@ KeyLockArgs Transaction::GetLockArgs(ShardId sid) const {
|
||||||
// Optimized path that schedules and runs transactions out of order if possible.
|
// Optimized path that schedules and runs transactions out of order if possible.
|
||||||
// Returns true if was eagerly executed, false if it was scheduled into queue.
|
// Returns true if was eagerly executed, false if it was scheduled into queue.
|
||||||
bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
|
bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
|
||||||
|
DCHECK(!multi_);
|
||||||
DCHECK_EQ(0u, txid_);
|
DCHECK_EQ(0u, txid_);
|
||||||
DCHECK_EQ(1u, shard_data_.size());
|
DCHECK_EQ(1u, shard_data_.size());
|
||||||
|
|
||||||
|
@ -473,8 +602,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
|
||||||
|
|
||||||
// we can do it because only a single thread writes into txid_ and sd.
|
// we can do it because only a single thread writes into txid_ and sd.
|
||||||
txid_ = op_seq.fetch_add(1, std::memory_order_relaxed);
|
txid_ = op_seq.fetch_add(1, std::memory_order_relaxed);
|
||||||
TxQueue::Iterator it = shard->InsertTxQ(this);
|
sd.pq_pos = shard->txq()->Insert(this);
|
||||||
sd.pq_pos = it;
|
|
||||||
|
|
||||||
DCHECK_EQ(0, sd.local_mask & KEYS_ACQUIRED);
|
DCHECK_EQ(0, sd.local_mask & KEYS_ACQUIRED);
|
||||||
bool lock_acquired = shard->db_slice().Acquire(mode, lock_args);
|
bool lock_acquired = shard->db_slice().Acquire(mode, lock_args);
|
||||||
|
@ -498,25 +626,26 @@ pair<bool, bool> Transaction::ScheduleInShard(EngineShard* shard) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
TxQueue* pq = shard->txq();
|
TxQueue* txq = shard->txq();
|
||||||
KeyLockArgs lock_args;
|
KeyLockArgs lock_args;
|
||||||
IntentLock::Mode mode = Mode();
|
IntentLock::Mode mode = Mode();
|
||||||
|
|
||||||
|
bool spans_all = IsGlobal();
|
||||||
bool lock_granted = false;
|
bool lock_granted = false;
|
||||||
ShardId sid = SidToId(shard->shard_id());
|
ShardId sid = SidToId(shard->shard_id());
|
||||||
|
|
||||||
auto& sd = shard_data_[sid];
|
auto& sd = shard_data_[sid];
|
||||||
|
|
||||||
bool shard_unlocked = true;
|
if (!spans_all) {
|
||||||
lock_args = GetLockArgs(shard->shard_id());
|
lock_args = GetLockArgs(shard->shard_id());
|
||||||
|
|
||||||
// we need to acquire the lock unrelated to shard_unlocked since we register into Tx queue.
|
// All transactions in the queue must acquire the intent lock.
|
||||||
// All transactions in the queue must acquire the intent lock.
|
lock_granted = shard->db_slice().Acquire(mode, lock_args);
|
||||||
lock_granted = shard->db_slice().Acquire(mode, lock_args) && shard_unlocked;
|
sd.local_mask |= KEYS_ACQUIRED;
|
||||||
sd.local_mask |= KEYS_ACQUIRED;
|
DVLOG(1) << "Lock granted " << lock_granted << " for trans " << DebugId();
|
||||||
DVLOG(1) << "Lock granted " << lock_granted << " for trans " << DebugId();
|
}
|
||||||
|
|
||||||
if (!pq->Empty()) {
|
if (!txq->Empty()) {
|
||||||
// If the new transaction requires reordering of the pending queue (i.e. it comes before tail)
|
// If the new transaction requires reordering of the pending queue (i.e. it comes before tail)
|
||||||
// and some other transaction already locked its keys we can not reorder 'trans' because
|
// and some other transaction already locked its keys we can not reorder 'trans' because
|
||||||
// that other transaction could have deduced that it can run OOO and eagerly execute. Hence, we
|
// that other transaction could have deduced that it can run OOO and eagerly execute. Hence, we
|
||||||
|
@ -526,7 +655,7 @@ pair<bool, bool> Transaction::ScheduleInShard(EngineShard* shard) {
|
||||||
// We may record when they disable OOO via barrier_ts so if the queue contains transactions
|
// We may record when they disable OOO via barrier_ts so if the queue contains transactions
|
||||||
// that were only scheduled afterwards we know they are not free so we can still
|
// that were only scheduled afterwards we know they are not free so we can still
|
||||||
// reorder the queue. Currently, this optimization is disabled: barrier_ts < pq->HeadScore().
|
// reorder the queue. Currently, this optimization is disabled: barrier_ts < pq->HeadScore().
|
||||||
bool to_proceed = lock_granted || pq->TailScore() < txid_;
|
bool to_proceed = lock_granted || txq->TailScore() < txid_;
|
||||||
if (!to_proceed) {
|
if (!to_proceed) {
|
||||||
if (sd.local_mask & KEYS_ACQUIRED) { // rollback the lock.
|
if (sd.local_mask & KEYS_ACQUIRED) { // rollback the lock.
|
||||||
shard->db_slice().Release(mode, lock_args);
|
shard->db_slice().Release(mode, lock_args);
|
||||||
|
@ -540,18 +669,18 @@ pair<bool, bool> Transaction::ScheduleInShard(EngineShard* shard) {
|
||||||
result.second = lock_granted;
|
result.second = lock_granted;
|
||||||
result.first = true;
|
result.first = true;
|
||||||
|
|
||||||
TxQueue::Iterator it = pq->Insert(this);
|
TxQueue::Iterator it = txq->Insert(this);
|
||||||
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
|
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
|
||||||
sd.pq_pos = it;
|
sd.pq_pos = it;
|
||||||
|
|
||||||
DVLOG(1) << "Insert into tx-queue, sid(" << sid << ") " << DebugId() << ", qlen " << pq->size();
|
DVLOG(1) << "Insert into tx-queue, sid(" << sid << ") " << DebugId() << ", qlen " << txq->size();
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Transaction::CancelInShard(EngineShard* shard) {
|
bool Transaction::CancelInShard(EngineShard* shard) {
|
||||||
ShardId sid = SidToId(shard->shard_id());
|
ShardId idx = SidToId(shard->shard_id());
|
||||||
auto& sd = shard_data_[sid];
|
auto& sd = shard_data_[idx];
|
||||||
|
|
||||||
auto pos = sd.pq_pos;
|
auto pos = sd.pq_pos;
|
||||||
if (pos == TxQueue::kEnd)
|
if (pos == TxQueue::kEnd)
|
||||||
|
@ -609,4 +738,8 @@ inline uint32_t Transaction::DecreaseRunCnt() {
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool Transaction::IsGlobal() const {
|
||||||
|
return (trans_options_ & CO::GLOBAL_TRANS) != 0;
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace dfly
|
} // namespace dfly
|
||||||
|
|
|
@ -63,6 +63,8 @@ class Transaction {
|
||||||
|
|
||||||
void InitByArgs(DbIndex index, CmdArgList args);
|
void InitByArgs(DbIndex index, CmdArgList args);
|
||||||
|
|
||||||
|
void SetExecCmd(const CommandId* cid);
|
||||||
|
|
||||||
std::string DebugId() const;
|
std::string DebugId() const;
|
||||||
|
|
||||||
// Runs in engine thread
|
// Runs in engine thread
|
||||||
|
@ -84,6 +86,7 @@ class Transaction {
|
||||||
//! duplicate runs). Supports local transactions under multi as well.
|
//! duplicate runs). Supports local transactions under multi as well.
|
||||||
//! Can be used in contexts that wait for an event to happen.
|
//! Can be used in contexts that wait for an event to happen.
|
||||||
bool IsArmedInShard(ShardId sid) const {
|
bool IsArmedInShard(ShardId sid) const {
|
||||||
|
// For multi transactions shard_data_ spans all shards.
|
||||||
if (sid >= shard_data_.size())
|
if (sid >= shard_data_.size())
|
||||||
sid = 0;
|
sid = 0;
|
||||||
|
|
||||||
|
@ -100,11 +103,6 @@ class Transaction {
|
||||||
return state_mask_.load(std::memory_order_relaxed);
|
return state_mask_.load(std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Relevant only when unique_shards_ > 1.
|
|
||||||
uint32_t TxQueuePos(ShardId sid) const {
|
|
||||||
return shard_data_[sid].pq_pos;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Schedules a transaction. Usually used for multi-hop transactions like Rename or BLPOP.
|
// Schedules a transaction. Usually used for multi-hop transactions like Rename or BLPOP.
|
||||||
// For single hop, use ScheduleSingleHop instead.
|
// For single hop, use ScheduleSingleHop instead.
|
||||||
void Schedule() {
|
void Schedule() {
|
||||||
|
@ -130,6 +128,8 @@ class Transaction {
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void UnlockMulti();
|
||||||
|
|
||||||
TxId txid() const {
|
TxId txid() const {
|
||||||
return txid_;
|
return txid_;
|
||||||
}
|
}
|
||||||
|
@ -148,6 +148,12 @@ class Transaction {
|
||||||
return unique_shard_cnt_;
|
return unique_shard_cnt_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool IsMulti() const {
|
||||||
|
return bool(multi_);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool IsGlobal() const;
|
||||||
|
|
||||||
EngineShardSet* shard_set() {
|
EngineShardSet* shard_set() {
|
||||||
return ess_;
|
return ess_;
|
||||||
}
|
}
|
||||||
|
@ -207,6 +213,8 @@ class Transaction {
|
||||||
// Bitmask of LocalState enums.
|
// Bitmask of LocalState enums.
|
||||||
uint16_t local_mask{0};
|
uint16_t local_mask{0};
|
||||||
|
|
||||||
|
// Needed to rollback invalid schedulings or remove OOO transactions from
|
||||||
|
// tx queue.
|
||||||
uint32_t pq_pos = TxQueue::kEnd;
|
uint32_t pq_pos = TxQueue::kEnd;
|
||||||
|
|
||||||
PerShardData(PerShardData&&) noexcept {
|
PerShardData(PerShardData&&) noexcept {
|
||||||
|
@ -220,6 +228,10 @@ class Transaction {
|
||||||
unsigned cnt[2] = {0, 0};
|
unsigned cnt[2] = {0, 0};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct Multi {
|
||||||
|
absl::flat_hash_map<std::string_view, LockCnt> locks;
|
||||||
|
};
|
||||||
|
|
||||||
util::fibers_ext::EventCount blocking_ec_; // used to wake blocking transactions.
|
util::fibers_ext::EventCount blocking_ec_; // used to wake blocking transactions.
|
||||||
util::fibers_ext::EventCount run_ec_;
|
util::fibers_ext::EventCount run_ec_;
|
||||||
|
|
||||||
|
@ -238,6 +250,7 @@ class Transaction {
|
||||||
std::vector<uint32_t> reverse_index_;
|
std::vector<uint32_t> reverse_index_;
|
||||||
|
|
||||||
RunnableType cb_;
|
RunnableType cb_;
|
||||||
|
std::unique_ptr<Multi> multi_; // Initialized when the transaction is multi/exec.
|
||||||
|
|
||||||
const CommandId* cid_;
|
const CommandId* cid_;
|
||||||
EngineShardSet* ess_;
|
EngineShardSet* ess_;
|
||||||
|
|
Loading…
Reference in New Issue