feat(journal): Introduce basic journal support. (#211)
1. No entries data is written into a journal yet. 2. Introduced a state machine to start and stop journal using a new auxillary command "dfly". 3. string_family currently calls an universal command Journal::RecordEntry that should save the current key/value of that entry. Please note that we won't use it for all the operations because for some it's more efficient to record the action itself than the final value. 4. no locking information is recorded yet so atomicity of multi-key operations is not preserved for now. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
05eb323a0d
commit
3bbdb8e4b1
|
@ -1,16 +1,20 @@
|
|||
add_executable(dragonfly dfly_main.cc)
|
||||
cxx_link(dragonfly base dragonfly_lib)
|
||||
|
||||
add_library(dragonfly_lib blocking_controller.cc channel_slice.cc command_registry.cc
|
||||
common.cc config_flags.cc
|
||||
conn_context.cc db_slice.cc debugcmd.cc
|
||||
engine_shard_set.cc generic_family.cc hset_family.cc io_mgr.cc
|
||||
add_library(dfly_transaction db_slice.cc engine_shard_set.cc blocking_controller.cc common.cc
|
||||
io_mgr.cc journal/journal.cc journal/journal_shard.cc table.cc
|
||||
tiered_storage.cc transaction.cc)
|
||||
cxx_link(dfly_transaction uring_fiber_lib dfly_core strings_lib)
|
||||
|
||||
add_library(dragonfly_lib channel_slice.cc command_registry.cc
|
||||
config_flags.cc conn_context.cc debugcmd.cc dflycmd.cc
|
||||
generic_family.cc hset_family.cc
|
||||
list_family.cc main_service.cc rdb_load.cc rdb_save.cc replica.cc
|
||||
snapshot.cc script_mgr.cc server_family.cc
|
||||
set_family.cc stream_family.cc string_family.cc table.cc tiered_storage.cc
|
||||
transaction.cc zset_family.cc version.cc)
|
||||
set_family.cc stream_family.cc string_family.cc
|
||||
zset_family.cc version.cc)
|
||||
|
||||
cxx_link(dragonfly_lib dfly_core dfly_facade redis_lib strings_lib html_lib)
|
||||
cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib)
|
||||
|
||||
add_library(dfly_test_lib test_utils.cc)
|
||||
cxx_link(dfly_test_lib dragonfly_lib facade_test gtest_main_ext)
|
||||
|
|
|
@ -21,13 +21,14 @@ constexpr int64_t kMaxExpireDeadlineSec = (1u << 27) - 1;
|
|||
|
||||
using DbIndex = uint16_t;
|
||||
using ShardId = uint16_t;
|
||||
using LSN = uint64_t;
|
||||
using TxId = uint64_t;
|
||||
using TxClock = uint64_t;
|
||||
|
||||
using facade::MutableSlice;
|
||||
using facade::ArgS;
|
||||
using facade::CmdArgList;
|
||||
using facade::CmdArgVec;
|
||||
using facade::ArgS;
|
||||
using facade::MutableSlice;
|
||||
|
||||
using ArgSlice = absl::Span<const std::string_view>;
|
||||
using StringVec = std::vector<std::string>;
|
||||
|
@ -55,8 +56,8 @@ struct KeyIndex {
|
|||
// relevant for for commands like ZUNIONSTORE/ZINTERSTORE for destination key.
|
||||
unsigned bonus = 0;
|
||||
unsigned start;
|
||||
unsigned end; // does not include this index (open limit).
|
||||
unsigned step; // 1 for commands like mget. 2 for commands like mset.
|
||||
unsigned end; // does not include this index (open limit).
|
||||
unsigned step; // 1 for commands like mget. 2 for commands like mset.
|
||||
|
||||
bool HasSingleKey() const {
|
||||
return bonus == 0 && (start + step >= end);
|
||||
|
@ -69,9 +70,15 @@ struct KeyIndex {
|
|||
|
||||
struct OpArgs {
|
||||
EngineShard* shard;
|
||||
TxId txid;
|
||||
DbIndex db_ind;
|
||||
};
|
||||
|
||||
OpArgs() : shard(nullptr), txid(0), db_ind(0) {
|
||||
}
|
||||
|
||||
OpArgs(EngineShard* s, TxId i, DbIndex d) : shard(s), txid(i), db_ind(d) {
|
||||
}
|
||||
};
|
||||
|
||||
struct TieredStats {
|
||||
size_t external_reads = 0;
|
||||
|
@ -118,7 +125,6 @@ extern size_t max_memory_limit;
|
|||
// set upon server start.
|
||||
extern unsigned kernel_version;
|
||||
|
||||
|
||||
const char* GlobalStateName(GlobalState gs);
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -60,7 +60,8 @@ struct ObjInfo {
|
|||
|
||||
void DoPopulateBatch(std::string_view prefix, size_t val_size, const SetCmd::SetParams& params,
|
||||
const PopulateBatch& batch) {
|
||||
SetCmd sg(&EngineShard::tlocal()->db_slice());
|
||||
OpArgs op_args(EngineShard::tlocal(), 0, params.db_index);
|
||||
SetCmd sg(op_args);
|
||||
|
||||
for (unsigned i = 0; i < batch.sz; ++i) {
|
||||
string key = absl::StrCat(prefix, ":", batch.index[i]);
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
// Copyright 2022, Roman Gershman. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
#include "server/dflycmd.h"
|
||||
|
||||
#include <absl/strings/str_cat.h>
|
||||
#include <absl/strings/strip.h>
|
||||
|
||||
#include "base/flags.h"
|
||||
#include "base/logging.h"
|
||||
#include "facade/dragonfly_connection.h"
|
||||
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "server/error.h"
|
||||
#include "server/journal/journal.h"
|
||||
#include "server/server_state.h"
|
||||
#include "server/transaction.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
ABSL_DECLARE_FLAG(string, dir);
|
||||
|
||||
namespace dfly {
|
||||
|
||||
using namespace facade;
|
||||
using namespace std;
|
||||
using util::ProactorBase;
|
||||
|
||||
DflyCmd::DflyCmd(util::ListenerInterface* listener, journal::Journal* journal) : listener_(listener), journal_(journal) {
|
||||
}
|
||||
|
||||
void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
|
||||
DCHECK_GE(args.size(), 2u);
|
||||
|
||||
ToUpper(&args[1]);
|
||||
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
||||
|
||||
std::string_view sub_cmd = ArgS(args, 1);
|
||||
if (sub_cmd == "JOURNAL") {
|
||||
if (args.size() < 3) {
|
||||
return rb->SendError(WrongNumArgsError("DFLY JOURNAL"));
|
||||
}
|
||||
HandleJournal(args, cntx);
|
||||
return;
|
||||
}
|
||||
|
||||
if (sub_cmd == "THREAD") {
|
||||
util::ProactorPool* pool = shard_set->pool();
|
||||
|
||||
if (args.size() == 2) { // DFLY THREAD : returns connection thread index and number of threads.
|
||||
rb->StartArray(2);
|
||||
rb->SendLong(ProactorBase::GetIndex());
|
||||
rb->SendLong(long(pool->size()));
|
||||
return;
|
||||
}
|
||||
|
||||
// DFLY THREAD to_thread : migrates current connection to a different thread.
|
||||
std::string_view arg = ArgS(args, 2);
|
||||
unsigned num_thread;
|
||||
if (!absl::SimpleAtoi(arg, &num_thread)) {
|
||||
return rb->SendError(kSyntaxErr);
|
||||
}
|
||||
|
||||
if (num_thread < pool->size()) {
|
||||
if (int(num_thread) != ProactorBase::GetIndex()) {
|
||||
listener_->Migrate(cntx->owner(), pool->at(num_thread));
|
||||
}
|
||||
|
||||
return rb->SendOk();
|
||||
}
|
||||
|
||||
rb->SendError(kInvalidIntErr);
|
||||
return;
|
||||
}
|
||||
|
||||
rb->SendError(kSyntaxErr);
|
||||
}
|
||||
|
||||
|
||||
void DflyCmd::HandleJournal(CmdArgList args, ConnectionContext* cntx) {
|
||||
DCHECK_GE(args.size(), 3u);
|
||||
ToUpper(&args[2]);
|
||||
|
||||
std::string_view sub_cmd = ArgS(args, 2);
|
||||
Transaction* trans = cntx->transaction;
|
||||
DCHECK(trans);
|
||||
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
||||
|
||||
if (sub_cmd == "START") {
|
||||
unique_lock lk(mu_);
|
||||
journal::Journal* journal = ServerState::tlocal()->journal();
|
||||
if (!journal) {
|
||||
string dir = absl::GetFlag(FLAGS_dir);
|
||||
journal_->StartLogging(dir);
|
||||
trans->Schedule();
|
||||
auto barrier_cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
|
||||
trans->Execute(barrier_cb, true);
|
||||
|
||||
// tx id starting from which we may reliably fetch journal records.
|
||||
journal_txid_ = trans->txid();
|
||||
}
|
||||
|
||||
return rb->SendLong(journal_txid_);
|
||||
}
|
||||
|
||||
if (sub_cmd == "STOP") {
|
||||
unique_lock lk(mu_);
|
||||
if (journal_->EnterLameDuck()) {
|
||||
auto barrier_cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
|
||||
trans->ScheduleSingleHop(std::move(barrier_cb));
|
||||
|
||||
auto ec = journal_->Close();
|
||||
LOG_IF(ERROR, ec) << "Error closing journal " << ec;
|
||||
journal_txid_ = trans->txid();
|
||||
}
|
||||
|
||||
return rb->SendLong(journal_txid_);
|
||||
}
|
||||
|
||||
string reply = UnknownSubCmd(sub_cmd, "DFLY");
|
||||
return rb->SendError(reply, kSyntaxErrType);
|
||||
}
|
||||
|
||||
} // namespace dfly
|
|
@ -0,0 +1,38 @@
|
|||
// Copyright 2022, Roman Gershman. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <absl/container/btree_map.h>
|
||||
|
||||
#include "server/conn_context.h"
|
||||
|
||||
namespace util {
|
||||
class ListenerInterface;
|
||||
} // namespace util
|
||||
|
||||
namespace dfly {
|
||||
|
||||
class EngineShardSet;
|
||||
|
||||
namespace journal {
|
||||
class Journal;
|
||||
} // namespace journal
|
||||
|
||||
class DflyCmd {
|
||||
public:
|
||||
DflyCmd(util::ListenerInterface* listener, journal::Journal* journal);
|
||||
|
||||
void Run(CmdArgList args, ConnectionContext* cntx);
|
||||
|
||||
private:
|
||||
void HandleJournal(CmdArgList args, ConnectionContext* cntx);
|
||||
|
||||
util::ListenerInterface* listener_;
|
||||
journal::Journal* journal_;
|
||||
::boost::fibers::mutex mu_;
|
||||
TxId journal_txid_ = 0;
|
||||
};
|
||||
|
||||
} // namespace dfly
|
|
@ -27,7 +27,9 @@ ABSL_FLAG(uint32_t, hz, 1000,
|
|||
"and performs other background tasks. Warning: not advised to decrease in production, "
|
||||
"because it can affect expiry precision for PSETEX etc.");
|
||||
|
||||
ABSL_DECLARE_FLAG(bool, cache_mode);
|
||||
ABSL_FLAG(bool, cache_mode, false,
|
||||
"If true, the backend behaves like a cache, "
|
||||
"by evicting entries when getting close to maxmemory limit");
|
||||
|
||||
namespace dfly {
|
||||
|
||||
|
@ -42,8 +44,9 @@ vector<EngineShardSet::CachedStats> cached_stats; // initialized in EngineShard
|
|||
|
||||
} // namespace
|
||||
|
||||
thread_local EngineShard* EngineShard::shard_ = nullptr;
|
||||
constexpr size_t kQueueLen = 64;
|
||||
|
||||
thread_local EngineShard* EngineShard::shard_ = nullptr;
|
||||
EngineShardSet* shard_set = nullptr;
|
||||
|
||||
EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) {
|
||||
|
|
|
@ -26,6 +26,10 @@ extern "C" {
|
|||
|
||||
namespace dfly {
|
||||
|
||||
namespace journal {
|
||||
class Journal;
|
||||
} // namespace journal
|
||||
|
||||
class TieredStorage;
|
||||
class BlockingController;
|
||||
|
||||
|
@ -134,6 +138,14 @@ class EngineShard {
|
|||
return counter_[unsigned(type)].SumTail();
|
||||
}
|
||||
|
||||
journal::Journal* journal() {
|
||||
return journal_;
|
||||
}
|
||||
|
||||
void set_journal(journal::Journal* j) {
|
||||
journal_ = j;
|
||||
}
|
||||
|
||||
void TEST_EnableHeartbeat();
|
||||
|
||||
private:
|
||||
|
@ -160,6 +172,7 @@ class EngineShard {
|
|||
// Logical ts used to order distributed transactions.
|
||||
TxId committed_txid_ = 0;
|
||||
Transaction* continuation_trans_ = nullptr;
|
||||
journal::Journal* journal_ = nullptr;
|
||||
IntentLock shard_lock_;
|
||||
|
||||
uint32_t periodic_task_ = 0;
|
||||
|
|
|
@ -244,7 +244,7 @@ uint64_t ScanGeneric(uint64_t cursor, const ScanOpts& scan_opts, StringVec* keys
|
|||
|
||||
do {
|
||||
ess->Await(sid, [&] {
|
||||
OpArgs op_args{EngineShard::tlocal(), cntx->conn_state.db_index};
|
||||
OpArgs op_args{EngineShard::tlocal(), 0, cntx->conn_state.db_index};
|
||||
|
||||
OpScan(op_args, scan_opts, &cursor, keys);
|
||||
});
|
||||
|
@ -281,7 +281,7 @@ void GenericFamily::Del(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
auto cb = [&result](const Transaction* t, EngineShard* shard) {
|
||||
ArgSlice args = t->ShardArgsInShard(shard->shard_id());
|
||||
auto res = OpDel(OpArgs{shard, t->db_index()}, args);
|
||||
auto res = OpDel(t->GetOpArgs(shard), args);
|
||||
result.fetch_add(res.value_or(0), memory_order_relaxed);
|
||||
|
||||
return OpStatus::OK;
|
||||
|
@ -332,7 +332,7 @@ void GenericFamily::Exists(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
auto cb = [&result](Transaction* t, EngineShard* shard) {
|
||||
ArgSlice args = t->ShardArgsInShard(shard->shard_id());
|
||||
auto res = OpExists(OpArgs{shard, t->db_index()}, args);
|
||||
auto res = OpExists(t->GetOpArgs(shard), args);
|
||||
result.fetch_add(res.value_or(0), memory_order_relaxed);
|
||||
|
||||
return OpStatus::OK;
|
||||
|
@ -362,7 +362,7 @@ void GenericFamily::Expire(CmdArgList args, ConnectionContext* cntx) {
|
|||
ExpireParams params{.ts = int_arg};
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpExpire(OpArgs{shard, t->db_index()}, key, params);
|
||||
return OpExpire(t->GetOpArgs(shard), key, params);
|
||||
};
|
||||
|
||||
OpStatus status = cntx->transaction->ScheduleSingleHop(move(cb));
|
||||
|
@ -381,7 +381,7 @@ void GenericFamily::ExpireAt(CmdArgList args, ConnectionContext* cntx) {
|
|||
ExpireParams params{.ts = int_arg, .absolute = true};
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpExpire(OpArgs{shard, t->db_index()}, key, params);
|
||||
return OpExpire(t->GetOpArgs(shard), key, params);
|
||||
};
|
||||
OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
|
||||
|
@ -425,7 +425,7 @@ void GenericFamily::PexpireAt(CmdArgList args, ConnectionContext* cntx) {
|
|||
ExpireParams params{.ts = int_arg, .absolute = true, .unit = MSEC};
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpExpire(OpArgs{shard, t->db_index()}, key, params);
|
||||
return OpExpire(t->GetOpArgs(shard), key, params);
|
||||
};
|
||||
OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
|
||||
|
@ -524,7 +524,7 @@ OpResult<void> GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des
|
|||
|
||||
if (transaction->unique_shard_cnt() == 1) {
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpRen(OpArgs{shard, t->db_index()}, key[0], key[1], skip_exist_dest);
|
||||
return OpRen(t->GetOpArgs(shard), key[0], key[1], skip_exist_dest);
|
||||
};
|
||||
OpResult<void> result = transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
||||
|
|
|
@ -99,7 +99,7 @@ void HSetFamily::HDel(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
args.remove_prefix(2);
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpDel(OpArgs{shard, t->db_index()}, key, args);
|
||||
return OpDel(t->GetOpArgs(shard), key, args);
|
||||
};
|
||||
|
||||
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -114,7 +114,7 @@ void HSetFamily::HLen(CmdArgList args, ConnectionContext* cntx) {
|
|||
string_view key = ArgS(args, 1);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpLen(OpArgs{shard, t->db_index()}, key);
|
||||
return OpLen(t->GetOpArgs(shard), key);
|
||||
};
|
||||
|
||||
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -157,7 +157,7 @@ void HSetFamily::HMGet(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
args.remove_prefix(2);
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpMGet(OpArgs{shard, t->db_index()}, key, args);
|
||||
return OpMGet(t->GetOpArgs(shard), key, args);
|
||||
};
|
||||
|
||||
OpResult<vector<OptStr>> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -186,7 +186,7 @@ void HSetFamily::HGet(CmdArgList args, ConnectionContext* cntx) {
|
|||
string_view field = ArgS(args, 2);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpGet(OpArgs{shard, t->db_index()}, key, field);
|
||||
return OpGet(t->GetOpArgs(shard), key, field);
|
||||
};
|
||||
|
||||
OpResult<string> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -214,7 +214,7 @@ void HSetFamily::HIncrBy(CmdArgList args, ConnectionContext* cntx) {
|
|||
IncrByParam param{ival};
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpIncrBy(OpArgs{shard, t->db_index()}, key, field, ¶m);
|
||||
return OpIncrBy(t->GetOpArgs(shard), key, field, ¶m);
|
||||
};
|
||||
|
||||
OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
|
@ -249,7 +249,7 @@ void HSetFamily::HIncrByFloat(CmdArgList args, ConnectionContext* cntx) {
|
|||
IncrByParam param{dval};
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpIncrBy(OpArgs{shard, t->db_index()}, key, field, ¶m);
|
||||
return OpIncrBy(t->GetOpArgs(shard), key, field, ¶m);
|
||||
};
|
||||
|
||||
OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
|
@ -284,7 +284,7 @@ void HSetFamily::HGetGeneric(CmdArgList args, ConnectionContext* cntx, uint8_t g
|
|||
string_view key = ArgS(args, 1);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpGetAll(OpArgs{shard, t->db_index()}, key, getall_mask);
|
||||
return OpGetAll(t->GetOpArgs(shard), key, getall_mask);
|
||||
};
|
||||
|
||||
OpResult<vector<string>> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -311,7 +311,7 @@ void HSetFamily::HScan(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpScan(OpArgs{shard, t->db_index()}, key, &cursor);
|
||||
return OpScan(t->GetOpArgs(shard), key, &cursor);
|
||||
};
|
||||
|
||||
OpResult<StringVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -339,7 +339,7 @@ void HSetFamily::HSet(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
args.remove_prefix(2);
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpSet(OpArgs{shard, t->db_index()}, key, args, false);
|
||||
return OpSet(t->GetOpArgs(shard), key, args, false);
|
||||
};
|
||||
|
||||
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -355,7 +355,7 @@ void HSetFamily::HSetNx(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
args.remove_prefix(2);
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpSet(OpArgs{shard, t->db_index()}, key, args, true);
|
||||
return OpSet(t->GetOpArgs(shard), key, args, true);
|
||||
};
|
||||
|
||||
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -371,7 +371,7 @@ void HSetFamily::HStrLen(CmdArgList args, ConnectionContext* cntx) {
|
|||
string_view field = ArgS(args, 2);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpStrLen(OpArgs{shard, t->db_index()}, key, field);
|
||||
return OpStrLen(t->GetOpArgs(shard), key, field);
|
||||
};
|
||||
|
||||
OpResult<size_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
|
|
@ -0,0 +1,126 @@
|
|||
// Copyright 2022, Roman Gershman. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
|
||||
#include "server/journal/journal.h"
|
||||
|
||||
#include <filesystem>
|
||||
|
||||
#include "base/logging.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "server/journal/journal_shard.h"
|
||||
#include "server/server_state.h"
|
||||
|
||||
namespace dfly {
|
||||
namespace journal {
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
using namespace std;
|
||||
using namespace util;
|
||||
namespace fibers = boost::fibers;
|
||||
|
||||
namespace {
|
||||
|
||||
thread_local JournalShard journal_shard;
|
||||
|
||||
} // namespace
|
||||
|
||||
Journal::Journal() {
|
||||
}
|
||||
|
||||
error_code Journal::StartLogging(std::string_view dir) {
|
||||
if (journal_shard.IsOpen()) {
|
||||
return error_code{};
|
||||
}
|
||||
|
||||
auto* pool = shard_set->pool();
|
||||
atomic_uint32_t created{0};
|
||||
lock_guard lk(state_mu_);
|
||||
|
||||
auto open_cb = [&](auto* pb) {
|
||||
auto ec = journal_shard.Open(dir, unsigned(ProactorBase::GetIndex()));
|
||||
if (ec) {
|
||||
LOG(FATAL) << "Could not create journal " << ec; // TODO
|
||||
} else {
|
||||
created.fetch_add(1, memory_order_relaxed);
|
||||
ServerState::tlocal()->set_journal(this);
|
||||
EngineShard* shard = EngineShard::tlocal();
|
||||
if (shard) {
|
||||
shard->set_journal(this);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
pool->AwaitFiberOnAll(open_cb);
|
||||
|
||||
if (created.load(memory_order_acquire) != pool->size()) {
|
||||
LOG(FATAL) << "TBD / revert";
|
||||
}
|
||||
|
||||
return error_code{};
|
||||
}
|
||||
|
||||
error_code Journal::Close() {
|
||||
CHECK(lameduck_.load(memory_order_relaxed));
|
||||
|
||||
VLOG(1) << "Journal::Close";
|
||||
|
||||
fibers::mutex ec_mu;
|
||||
error_code res;
|
||||
|
||||
lock_guard lk(state_mu_);
|
||||
auto close_cb = [&](auto*) {
|
||||
ServerState::tlocal()->set_journal(nullptr);
|
||||
EngineShard* shard = EngineShard::tlocal();
|
||||
if (shard) {
|
||||
shard->set_journal(nullptr);
|
||||
}
|
||||
|
||||
auto ec = journal_shard.Close();
|
||||
|
||||
if (ec) {
|
||||
lock_guard lk2(ec_mu);
|
||||
res = ec;
|
||||
}
|
||||
};
|
||||
|
||||
shard_set->pool()->AwaitFiberOnAll(close_cb);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
bool Journal::SchedStartTx(TxId txid, unsigned num_keys, unsigned num_shards) {
|
||||
if (!journal_shard.IsOpen() || lameduck_.load(memory_order_relaxed))
|
||||
return false;
|
||||
|
||||
journal_shard.AddLogRecord(txid, unsigned(Op::SCHED));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
LSN Journal::GetLsn() const {
|
||||
return journal_shard.cur_lsn();
|
||||
}
|
||||
|
||||
bool Journal::EnterLameDuck() {
|
||||
if (!journal_shard.IsOpen()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool val = false;
|
||||
bool res = lameduck_.compare_exchange_strong(val, true, memory_order_acq_rel);
|
||||
return res;
|
||||
}
|
||||
|
||||
void Journal::OpArgs(TxId txid, Op opcode, Span keys) {
|
||||
DCHECK(journal_shard.IsOpen());
|
||||
|
||||
journal_shard.AddLogRecord(txid, unsigned(opcode));
|
||||
}
|
||||
|
||||
void Journal::RecordEntry(TxId txid, const PrimeKey& key, const PrimeValue& pval) {
|
||||
journal_shard.AddLogRecord(txid, unsigned(Op::VAL));
|
||||
}
|
||||
|
||||
} // namespace journal
|
||||
} // namespace dfly
|
|
@ -0,0 +1,73 @@
|
|||
// Copyright 2022, Roman Gershman. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "server/common.h"
|
||||
#include "server/table.h"
|
||||
#include "util/proactor_pool.h"
|
||||
|
||||
namespace dfly {
|
||||
|
||||
class Transaction;
|
||||
|
||||
namespace journal {
|
||||
|
||||
enum class Op : uint8_t {
|
||||
NOOP = 0,
|
||||
LOCK = 1,
|
||||
UNLOCK = 2,
|
||||
LOCK_SHARD = 3,
|
||||
UNLOCK_SHARD = 4,
|
||||
SCHED = 5,
|
||||
VAL = 10,
|
||||
DEL,
|
||||
MSET,
|
||||
};
|
||||
|
||||
class Journal {
|
||||
public:
|
||||
using Span = absl::Span<const std::string_view>;
|
||||
|
||||
Journal();
|
||||
|
||||
std::error_code StartLogging(std::string_view dir);
|
||||
|
||||
// Returns true if journal has been active and changed its state to lameduck mode
|
||||
// and false otherwise.
|
||||
bool EnterLameDuck(); // still logs ongoing transactions but refuses to start new ones.
|
||||
|
||||
// Requires: journal is in lameduck mode.
|
||||
std::error_code Close();
|
||||
|
||||
// Returns true if transaction was scheduled, false if journal is inactive
|
||||
// or in lameduck mode and does not log new transactions.
|
||||
bool SchedStartTx(TxId txid, unsigned num_keys, unsigned num_shards);
|
||||
|
||||
void AddCmd(TxId txid, Op opcode, Span args) {
|
||||
OpArgs(txid, opcode, args);
|
||||
}
|
||||
|
||||
void Lock(TxId txid, Span keys) {
|
||||
OpArgs(txid, Op::LOCK, keys);
|
||||
}
|
||||
|
||||
void Unlock(TxId txid, Span keys) {
|
||||
OpArgs(txid, Op::UNLOCK, keys);
|
||||
}
|
||||
|
||||
LSN GetLsn() const;
|
||||
|
||||
void RecordEntry(TxId txid, const PrimeKey& key, const PrimeValue& pval);
|
||||
|
||||
private:
|
||||
void OpArgs(TxId id, Op opcode, Span keys);
|
||||
|
||||
mutable boost::fibers::mutex state_mu_;
|
||||
|
||||
std::atomic_bool lameduck_{false};
|
||||
};
|
||||
|
||||
} // namespace journal
|
||||
} // namespace dfly
|
|
@ -0,0 +1,115 @@
|
|||
// Copyright 2022, Roman Gershman. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
|
||||
#include "server/journal/journal_shard.h"
|
||||
|
||||
#include <fcntl.h>
|
||||
|
||||
#include <absl/container/inlined_vector.h>
|
||||
#include <absl/strings/str_cat.h>
|
||||
|
||||
#include <filesystem>
|
||||
|
||||
#include "base/logging.h"
|
||||
#include "util/fibers/fibers_ext.h"
|
||||
|
||||
namespace dfly {
|
||||
namespace journal {
|
||||
using namespace std;
|
||||
using namespace util;
|
||||
namespace fibers = boost::fibers;
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
namespace {
|
||||
|
||||
string ShardName(std::string_view base, unsigned index) {
|
||||
return absl::StrCat(base, "-", absl::Dec(index, absl::kZeroPad4), ".log");
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
#define CHECK_EC(x) \
|
||||
do { \
|
||||
auto __ec$ = (x); \
|
||||
CHECK(!__ec$) << "Error: " << __ec$ << " " << __ec$.message() << " for " << #x; \
|
||||
} while (false)
|
||||
|
||||
|
||||
|
||||
JournalShard::JournalShard() {
|
||||
}
|
||||
|
||||
JournalShard::~JournalShard() {
|
||||
CHECK(!shard_file_);
|
||||
}
|
||||
|
||||
std::error_code JournalShard::Open(const std::string_view dir, unsigned index) {
|
||||
CHECK(!shard_file_);
|
||||
|
||||
fs::path dir_path;
|
||||
|
||||
if (dir.empty()) {
|
||||
} else {
|
||||
dir_path = dir;
|
||||
error_code ec;
|
||||
|
||||
fs::file_status dir_status = fs::status(dir_path, ec);
|
||||
if (ec) {
|
||||
if (ec == errc::no_such_file_or_directory) {
|
||||
fs::create_directory(dir_path, ec);
|
||||
dir_status = fs::status(dir_path, ec);
|
||||
}
|
||||
if (ec)
|
||||
return ec;
|
||||
}
|
||||
// LOG(INFO) << int(dir_status.type());
|
||||
}
|
||||
dir_path.append(ShardName("journal", index));
|
||||
shard_path_ = dir_path;
|
||||
|
||||
// For file integrity guidelines see:
|
||||
// https://lwn.net/Articles/457667/
|
||||
// https://www.evanjones.ca/durability-filesystem.html
|
||||
// NOTE: O_DSYNC is omited.
|
||||
constexpr auto kJournalFlags = O_CLOEXEC | O_CREAT | O_TRUNC | O_RDWR;
|
||||
io::Result<std::unique_ptr<uring::LinuxFile>> res =
|
||||
uring::OpenLinux(shard_path_, kJournalFlags, 0666);
|
||||
if (!res) {
|
||||
return res.error();
|
||||
}
|
||||
DVLOG(1) << "Opened journal " << shard_path_;
|
||||
|
||||
shard_file_ = std::move(res).value();
|
||||
shard_index_ = index;
|
||||
file_offset_ = 0;
|
||||
status_ec_.clear();
|
||||
|
||||
return error_code{};
|
||||
}
|
||||
|
||||
error_code JournalShard::Close() {
|
||||
VLOG(1) << "JournalShard::Close";
|
||||
|
||||
CHECK(shard_file_);
|
||||
lameduck_ = true;
|
||||
|
||||
auto ec = shard_file_->Close();
|
||||
|
||||
DVLOG(1) << "Closing " << shard_path_;
|
||||
LOG_IF(ERROR, ec) << "Error closing journal file " << ec;
|
||||
shard_file_.reset();
|
||||
|
||||
return ec;
|
||||
}
|
||||
|
||||
void JournalShard::AddLogRecord(TxId txid, unsigned opcode) {
|
||||
string line = absl::StrCat(lsn_, " ", txid, " ", opcode, "\n");
|
||||
error_code ec = shard_file_->Write(io::Buffer(line), file_offset_, 0);
|
||||
CHECK_EC(ec);
|
||||
file_offset_ += line.size();
|
||||
++lsn_;
|
||||
}
|
||||
|
||||
} // namespace journal
|
||||
} // namespace dfly
|
|
@ -0,0 +1,56 @@
|
|||
// Copyright 2022, Roman Gershman. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <boost/fiber/condition_variable.hpp>
|
||||
#include <boost/fiber/fiber.hpp>
|
||||
#include <optional>
|
||||
#include <string_view>
|
||||
|
||||
#include "server/common.h"
|
||||
#include "util/uring/uring_file.h"
|
||||
|
||||
namespace dfly {
|
||||
namespace journal {
|
||||
|
||||
class JournalShard {
|
||||
public:
|
||||
JournalShard();
|
||||
~JournalShard();
|
||||
|
||||
std::error_code Open(const std::string_view dir, unsigned index);
|
||||
|
||||
std::error_code Close();
|
||||
|
||||
LSN cur_lsn() const {
|
||||
return lsn_;
|
||||
}
|
||||
|
||||
std::error_code status() const {
|
||||
return status_ec_;
|
||||
}
|
||||
|
||||
bool IsOpen() const {
|
||||
return bool(shard_file_);
|
||||
}
|
||||
|
||||
void AddLogRecord(TxId txid, unsigned opcode);
|
||||
|
||||
private:
|
||||
std::string shard_path_;
|
||||
std::unique_ptr<util::uring::LinuxFile> shard_file_;
|
||||
|
||||
size_t file_offset_ = 0;
|
||||
LSN lsn_ = 1;
|
||||
|
||||
unsigned shard_index_ = -1;
|
||||
|
||||
std::error_code status_ec_;
|
||||
|
||||
bool lameduck_ = false;
|
||||
};
|
||||
|
||||
} // namespace journal
|
||||
} // namespace dfly
|
|
@ -495,7 +495,7 @@ void ListFamily::RPopLPush(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
if (cntx->transaction->unique_shard_cnt() == 1) {
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpRPopLPushSingleShard(OpArgs{shard, t->db_index()}, src, dest);
|
||||
return OpRPopLPushSingleShard(t->GetOpArgs(shard), src, dest);
|
||||
};
|
||||
|
||||
result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -515,7 +515,7 @@ void ListFamily::RPopLPush(CmdArgList args, ConnectionContext* cntx) {
|
|||
auto args = t->ShardArgsInShard(shard->shard_id());
|
||||
DCHECK_EQ(1u, args.size());
|
||||
bool is_dest = args.front() == dest;
|
||||
find_res[is_dest] = RPeek(OpArgs{shard, t->db_index()}, args.front(), !is_dest);
|
||||
find_res[is_dest] = RPeek(t->GetOpArgs(shard), args.front(), !is_dest);
|
||||
return OpStatus::OK;
|
||||
};
|
||||
|
||||
|
@ -530,7 +530,7 @@ void ListFamily::RPopLPush(CmdArgList args, ConnectionContext* cntx) {
|
|||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
auto args = t->ShardArgsInShard(shard->shard_id());
|
||||
bool is_dest = args.front() == dest;
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
OpArgs op_args = t->GetOpArgs(shard);
|
||||
|
||||
if (is_dest) {
|
||||
string_view val{find_res[0].value()};
|
||||
|
@ -564,7 +564,7 @@ void ListFamily::RPopLPush(CmdArgList args, ConnectionContext* cntx) {
|
|||
void ListFamily::LLen(CmdArgList args, ConnectionContext* cntx) {
|
||||
auto key = ArgS(args, 1);
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpLen(OpArgs{shard, t->db_index()}, key);
|
||||
return OpLen(t->GetOpArgs(shard), key);
|
||||
};
|
||||
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
if (result) {
|
||||
|
@ -586,7 +586,7 @@ void ListFamily::LIndex(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpIndex(OpArgs{shard, t->db_index()}, key, index);
|
||||
return OpIndex(t->GetOpArgs(shard), key, index);
|
||||
};
|
||||
|
||||
OpResult<string> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -617,7 +617,7 @@ void ListFamily::LInsert(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpInsert(OpArgs{shard, t->db_index()}, key, pivot, elem, where);
|
||||
return OpInsert(t->GetOpArgs(shard), key, pivot, elem, where);
|
||||
};
|
||||
|
||||
OpResult<int> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -640,7 +640,7 @@ void ListFamily::LTrim(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpTrim(OpArgs{shard, t->db_index()}, key, start, end);
|
||||
return OpTrim(t->GetOpArgs(shard), key, start, end);
|
||||
};
|
||||
cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
(*cntx)->SendOk();
|
||||
|
@ -658,7 +658,7 @@ void ListFamily::LRange(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpRange(OpArgs{shard, t->db_index()}, key, start, end);
|
||||
return OpRange(t->GetOpArgs(shard), key, start, end);
|
||||
};
|
||||
|
||||
auto res = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -682,7 +682,7 @@ void ListFamily::LRem(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpRem(OpArgs{shard, t->db_index()}, key, elem, count);
|
||||
return OpRem(t->GetOpArgs(shard), key, elem, count);
|
||||
};
|
||||
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
if (result) {
|
||||
|
@ -704,7 +704,7 @@ void ListFamily::LSet(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpSet(OpArgs{shard, t->db_index()}, key, elem, count);
|
||||
return OpSet(t->GetOpArgs(shard), key, elem, count);
|
||||
};
|
||||
OpResult<void> result = cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
if (result) {
|
||||
|
@ -769,7 +769,7 @@ void ListFamily::PushGeneric(ListDir dir, bool skip_notexists, CmdArgList args,
|
|||
}
|
||||
absl::Span<std::string_view> span{vals.data(), vals.size()};
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpPush(OpArgs{shard, t->db_index()}, key, dir, skip_notexists, span);
|
||||
return OpPush(t->GetOpArgs(shard), key, dir, skip_notexists, span);
|
||||
};
|
||||
|
||||
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -803,7 +803,7 @@ void ListFamily::PopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cnt
|
|||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpPop(OpArgs{shard, t->db_index()}, key, dir, count, true);
|
||||
return OpPop(t->GetOpArgs(shard), key, dir, count, true);
|
||||
};
|
||||
|
||||
OpResult<StringVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
|
|
@ -44,9 +44,6 @@ ABSL_FLAG(uint32_t, memcache_port, 0, "Memcached port");
|
|||
ABSL_FLAG(uint64_t, maxmemory, 0,
|
||||
"Limit on maximum-memory that is used by the database."
|
||||
"0 - means the program will automatically determine its maximum memory usage");
|
||||
ABSL_FLAG(bool, cache_mode, false,
|
||||
"If true, the backend behaves like a cache, "
|
||||
"by evicting entries when getting close to maxmemory limit");
|
||||
|
||||
ABSL_DECLARE_FLAG(string, requirepass);
|
||||
|
||||
|
|
|
@ -79,13 +79,6 @@ error_code Recv(FiberSocketBase* input, base::IoBuf* dest) {
|
|||
|
||||
constexpr unsigned kRdbEofMarkSize = 40;
|
||||
|
||||
// TODO: to remove usages of this macro and make code crash-less.
|
||||
#define CHECK_EC(x) \
|
||||
do { \
|
||||
auto __ec$ = (x); \
|
||||
CHECK(!__ec$) << "Error: " << __ec$ << " " << __ec$.message() << " for " << #x; \
|
||||
} while (false)
|
||||
|
||||
} // namespace
|
||||
|
||||
Replica::Replica(string host, uint16_t port, Service* se)
|
||||
|
@ -510,7 +503,7 @@ error_code Replica::ConsumeRedisStream() {
|
|||
|
||||
// Master waits for this command in order to start sending replication stream.
|
||||
serializer.SendCommand("REPLCONF ACK 0");
|
||||
CHECK_EC(serializer.ec());
|
||||
RETURN_ON_ERR(serializer.ec());
|
||||
|
||||
VLOG(1) << "Before reading repl-log";
|
||||
|
||||
|
|
|
@ -25,8 +25,10 @@ extern "C" {
|
|||
#include "server/command_registry.h"
|
||||
#include "server/conn_context.h"
|
||||
#include "server/debugcmd.h"
|
||||
#include "server/dflycmd.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "server/error.h"
|
||||
#include "server/journal/journal.h"
|
||||
#include "server/main_service.h"
|
||||
#include "server/rdb_load.h"
|
||||
#include "server/rdb_save.h"
|
||||
|
@ -150,6 +152,7 @@ ServerFamily::ServerFamily(Service* service) : service_(*service) {
|
|||
lsinfo_ = make_shared<LastSaveInfo>();
|
||||
lsinfo_->save_time = start_time_;
|
||||
script_mgr_.reset(new ScriptMgr());
|
||||
journal_.reset(new journal::Journal);
|
||||
}
|
||||
|
||||
ServerFamily::~ServerFamily() {
|
||||
|
@ -159,6 +162,7 @@ void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* m
|
|||
CHECK(acceptor_ == nullptr);
|
||||
acceptor_ = acceptor;
|
||||
main_listener_ = main_listener;
|
||||
dfly_cmd_.reset(new DflyCmd(main_listener, journal_.get()));
|
||||
|
||||
pb_task_ = shard_set->pool()->GetNextProactor();
|
||||
auto cache_cb = [] {
|
||||
|
@ -207,6 +211,11 @@ void ServerFamily::Shutdown() {
|
|||
pb_task_->CancelPeriodic(stats_caching_task_);
|
||||
stats_caching_task_ = 0;
|
||||
|
||||
if (journal_->EnterLameDuck()) {
|
||||
auto ec = journal_->Close();
|
||||
LOG_IF(ERROR, ec) << "Error closing journal " << ec;
|
||||
}
|
||||
|
||||
unique_lock lk(replicaof_mu_);
|
||||
if (replica_) {
|
||||
replica_->Stop();
|
||||
|
@ -1122,6 +1131,10 @@ void ServerFamily::SyncGeneric(std::string_view repl_master_id, uint64_t offs,
|
|||
// TBD.
|
||||
}
|
||||
|
||||
void ServerFamily::Dfly(CmdArgList args, ConnectionContext* cntx) {
|
||||
dfly_cmd_->Run(args, cntx);
|
||||
}
|
||||
|
||||
#define HFUNC(x) SetHandler(HandlerFunc(this, &ServerFamily::x))
|
||||
|
||||
void ServerFamily::Register(CommandRegistry* registry) {
|
||||
|
@ -1148,7 +1161,8 @@ void ServerFamily::Register(CommandRegistry* registry) {
|
|||
<< CI{"ROLE", CO::LOADING | CO::FAST | CO::NOSCRIPT, 1, 0, 0, 0}.HFUNC(Role)
|
||||
<< CI{"SYNC", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Sync)
|
||||
<< CI{"PSYNC", CO::ADMIN | CO::GLOBAL_TRANS, 3, 0, 0, 0}.HFUNC(Psync)
|
||||
<< CI{"SCRIPT", CO::NOSCRIPT, -2, 0, 0, 0}.HFUNC(Script);
|
||||
<< CI{"SCRIPT", CO::NOSCRIPT, -2, 0, 0, 0}.HFUNC(Script)
|
||||
<< CI{"DFLY", CO::ADMIN | CO::GLOBAL_TRANS, -2, 0, 0, 0}.HFUNC(Dfly);
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -17,8 +17,13 @@ class HttpListenerBase;
|
|||
|
||||
namespace dfly {
|
||||
|
||||
namespace journal {
|
||||
class Journal;
|
||||
} // namespace journal
|
||||
|
||||
class ConnectionContext;
|
||||
class CommandRegistry;
|
||||
class DflyCmd;
|
||||
class Service;
|
||||
class Replica;
|
||||
class ScriptMgr;
|
||||
|
@ -41,9 +46,9 @@ struct Metrics {
|
|||
};
|
||||
|
||||
struct LastSaveInfo {
|
||||
time_t save_time; // epoch time in seconds.
|
||||
std::string file_name; //
|
||||
std::vector<std::pair<std::string_view, size_t>> freq_map; // RDB_TYPE_xxx -> count mapping.
|
||||
time_t save_time; // epoch time in seconds.
|
||||
std::string file_name; //
|
||||
std::vector<std::pair<std::string_view, size_t>> freq_map; // RDB_TYPE_xxx -> count mapping.
|
||||
};
|
||||
|
||||
class ServerFamily {
|
||||
|
@ -91,6 +96,7 @@ class ServerFamily {
|
|||
void Config(CmdArgList args, ConnectionContext* cntx);
|
||||
void DbSize(CmdArgList args, ConnectionContext* cntx);
|
||||
void Debug(CmdArgList args, ConnectionContext* cntx);
|
||||
void Dfly(CmdArgList args, ConnectionContext* cntx);
|
||||
void Memory(CmdArgList args, ConnectionContext* cntx);
|
||||
void FlushDb(CmdArgList args, ConnectionContext* cntx);
|
||||
void FlushAll(CmdArgList args, ConnectionContext* cntx);
|
||||
|
@ -111,7 +117,6 @@ class ServerFamily {
|
|||
|
||||
void Load(const std::string& file_name);
|
||||
|
||||
|
||||
boost::fibers::fiber load_fiber_;
|
||||
|
||||
uint32_t stats_caching_task_ = 0;
|
||||
|
@ -125,6 +130,8 @@ class ServerFamily {
|
|||
std::shared_ptr<Replica> replica_; // protected by replica_of_mu_
|
||||
|
||||
std::unique_ptr<ScriptMgr> script_mgr_;
|
||||
std::unique_ptr<journal::Journal> journal_;
|
||||
std::unique_ptr<DflyCmd> dfly_cmd_;
|
||||
|
||||
time_t start_time_ = 0; // in seconds, epoch time.
|
||||
|
||||
|
|
|
@ -15,6 +15,10 @@ typedef struct mi_heap_s mi_heap_t;
|
|||
|
||||
namespace dfly {
|
||||
|
||||
namespace journal {
|
||||
class Journal;
|
||||
} // namespace journal
|
||||
|
||||
// Present in every server thread. This class differs from EngineShard. The latter manages
|
||||
// state around engine shards while the former represents coordinator/connection state.
|
||||
// There may be threads that handle engine shards but not IO, there may be threads that handle IO
|
||||
|
@ -58,6 +62,7 @@ class ServerState { // public struct - to allow initialization.
|
|||
GlobalState gstate() const {
|
||||
return gstate_;
|
||||
}
|
||||
|
||||
void set_gstate(GlobalState s) {
|
||||
gstate_ = s;
|
||||
}
|
||||
|
@ -66,7 +71,9 @@ class ServerState { // public struct - to allow initialization.
|
|||
|
||||
// Returns sum of all requests in the last 6 seconds
|
||||
// (not including the current one).
|
||||
uint32_t MovingSum6() const { return qps_.SumTail(); }
|
||||
uint32_t MovingSum6() const {
|
||||
return qps_.SumTail();
|
||||
}
|
||||
|
||||
void RecordCmd() {
|
||||
++connection_stats.command_cnt;
|
||||
|
@ -78,9 +85,18 @@ class ServerState { // public struct - to allow initialization.
|
|||
return data_heap_;
|
||||
}
|
||||
|
||||
journal::Journal* journal() {
|
||||
return journal_;
|
||||
}
|
||||
|
||||
void set_journal(journal::Journal* j) {
|
||||
journal_ = j;
|
||||
}
|
||||
|
||||
private:
|
||||
int64_t live_transactions_ = 0;
|
||||
mi_heap_t* data_heap_;
|
||||
journal::Journal* journal_ = nullptr;
|
||||
|
||||
std::optional<Interpreter> interpreter_;
|
||||
GlobalState gstate_ = GlobalState::ACTIVE;
|
||||
|
|
|
@ -452,7 +452,7 @@ OpStatus Mover::OpMutate(Transaction* t, EngineShard* es) {
|
|||
ArgSlice largs = t->ShardArgsInShard(es->shard_id());
|
||||
DCHECK_LE(largs.size(), 2u);
|
||||
|
||||
OpArgs op_args{es, t->db_index()};
|
||||
OpArgs op_args = t->GetOpArgs(es);
|
||||
for (auto k : largs) {
|
||||
if (k == src_) {
|
||||
CHECK_EQ(1u, OpRem(op_args, k, {member_}).value()); // must succeed.
|
||||
|
@ -677,8 +677,7 @@ void SetFamily::SAdd(CmdArgList args, ConnectionContext* cntx) {
|
|||
ArgSlice arg_slice{vals.data(), vals.size()};
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
return OpAdd(op_args, key, arg_slice, false);
|
||||
return OpAdd(t->GetOpArgs(shard), key, arg_slice, false);
|
||||
};
|
||||
|
||||
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -741,7 +740,7 @@ void SetFamily::SRem(CmdArgList args, ConnectionContext* cntx) {
|
|||
ArgSlice span{vals.data(), vals.size()};
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpRem(OpArgs{shard, t->db_index()}, key, span);
|
||||
return OpRem(t->GetOpArgs(shard), key, span);
|
||||
};
|
||||
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
||||
|
@ -791,7 +790,7 @@ void SetFamily::SPop(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpPop(OpArgs{shard, t->db_index()}, key, count);
|
||||
return OpPop(t->GetOpArgs(shard), key, count);
|
||||
};
|
||||
|
||||
OpResult<StringVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -821,9 +820,9 @@ void SetFamily::SDiff(CmdArgList args, ConnectionContext* cntx) {
|
|||
ArgSlice largs = t->ShardArgsInShard(shard->shard_id());
|
||||
if (shard->shard_id() == src_shard) {
|
||||
CHECK_EQ(src_key, largs.front());
|
||||
result_set[shard->shard_id()] = OpDiff(OpArgs{shard, t->db_index()}, largs);
|
||||
result_set[shard->shard_id()] = OpDiff(t->GetOpArgs(shard), largs);
|
||||
} else {
|
||||
result_set[shard->shard_id()] = OpUnion(OpArgs{shard, t->db_index()}, largs);
|
||||
result_set[shard->shard_id()] = OpUnion(t->GetOpArgs(shard), largs);
|
||||
}
|
||||
|
||||
return OpStatus::OK;
|
||||
|
@ -864,7 +863,7 @@ void SetFamily::SDiffStore(CmdArgList args, ConnectionContext* cntx) {
|
|||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
OpArgs op_args = t->GetOpArgs(shard);
|
||||
if (shard->shard_id() == src_shard) {
|
||||
CHECK_EQ(src_key, largs.front());
|
||||
result_set[shard->shard_id()] = OpDiff(op_args, largs); // Diff
|
||||
|
@ -887,7 +886,7 @@ void SetFamily::SDiffStore(CmdArgList args, ConnectionContext* cntx) {
|
|||
SvArray result = ToSvArray(rsv.value());
|
||||
auto store_cb = [&](Transaction* t, EngineShard* shard) {
|
||||
if (shard->shard_id() == dest_shard) {
|
||||
OpAdd(OpArgs{shard, t->db_index()}, dest_key, result, true);
|
||||
OpAdd(t->GetOpArgs(shard), dest_key, result, true);
|
||||
}
|
||||
|
||||
return OpStatus::OK;
|
||||
|
@ -966,7 +965,7 @@ void SetFamily::SInterStore(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
auto store_cb = [&](Transaction* t, EngineShard* shard) {
|
||||
if (shard->shard_id() == dest_shard) {
|
||||
OpAdd(OpArgs{shard, t->db_index()}, dest_key, result.value(), true);
|
||||
OpAdd(t->GetOpArgs(shard), dest_key, result.value(), true);
|
||||
}
|
||||
|
||||
return OpStatus::OK;
|
||||
|
@ -981,7 +980,7 @@ void SetFamily::SUnion(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
ArgSlice largs = t->ShardArgsInShard(shard->shard_id());
|
||||
result_set[shard->shard_id()] = OpUnion(OpArgs{shard, t->db_index()}, largs);
|
||||
result_set[shard->shard_id()] = OpUnion(t->GetOpArgs(shard), largs);
|
||||
return OpStatus::OK;
|
||||
};
|
||||
|
||||
|
@ -1012,7 +1011,7 @@ void SetFamily::SUnionStore(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (largs.empty())
|
||||
return OpStatus::OK;
|
||||
}
|
||||
result_set[shard->shard_id()] = OpUnion(OpArgs{shard, t->db_index()}, largs);
|
||||
result_set[shard->shard_id()] = OpUnion(t->GetOpArgs(shard), largs);
|
||||
return OpStatus::OK;
|
||||
};
|
||||
|
||||
|
@ -1030,7 +1029,7 @@ void SetFamily::SUnionStore(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
auto store_cb = [&](Transaction* t, EngineShard* shard) {
|
||||
if (shard->shard_id() == dest_shard) {
|
||||
OpAdd(OpArgs{shard, t->db_index()}, dest_key, result, true);
|
||||
OpAdd(t->GetOpArgs(shard), dest_key, result, true);
|
||||
}
|
||||
|
||||
return OpStatus::OK;
|
||||
|
@ -1055,7 +1054,7 @@ void SetFamily::SScan(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpScan(OpArgs{shard, t->db_index()}, key, &cursor);
|
||||
return OpScan(t->GetOpArgs(shard), key, &cursor);
|
||||
};
|
||||
|
||||
OpResult<StringVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
|
|
@ -494,8 +494,7 @@ void CreateGroup(CmdArgList args, string_view key, ConnectionContext* cntx) {
|
|||
opts.id = ArgS(args, 1);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
return OpCreate(op_args, key, opts);
|
||||
return OpCreate(t->GetOpArgs(shard), key, opts);
|
||||
};
|
||||
|
||||
OpStatus result = cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
|
@ -509,8 +508,7 @@ void CreateGroup(CmdArgList args, string_view key, ConnectionContext* cntx) {
|
|||
|
||||
void DestroyGroup(string_view key, string_view gname, ConnectionContext* cntx) {
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
return OpDestroyGroup(op_args, key, gname);
|
||||
return OpDestroyGroup(t->GetOpArgs(shard), key, gname);
|
||||
};
|
||||
|
||||
OpStatus result = cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
|
@ -529,8 +527,7 @@ void DestroyGroup(string_view key, string_view gname, ConnectionContext* cntx) {
|
|||
void DelConsumer(string_view key, string_view gname, string_view consumer,
|
||||
ConnectionContext* cntx) {
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
return OpDelConsumer(op_args, key, gname, consumer);
|
||||
return OpDelConsumer(t->GetOpArgs(shard), key, gname, consumer);
|
||||
};
|
||||
|
||||
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -551,8 +548,7 @@ void SetId(string_view key, string_view gname, CmdArgList args, ConnectionContex
|
|||
string_view id = ArgS(args, 0);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
return OpSetId(op_args, key, gname, id);
|
||||
return OpSetId(t->GetOpArgs(shard), key, gname, id);
|
||||
};
|
||||
|
||||
OpStatus result = cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
|
@ -607,8 +603,7 @@ void StreamFamily::XAdd(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
args.remove_prefix(1);
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
return OpAdd(op_args, key, add_opts, args);
|
||||
return OpAdd(t->GetOpArgs(shard), key, add_opts, args);
|
||||
};
|
||||
|
||||
OpResult<streamID> add_result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -641,8 +636,7 @@ void StreamFamily::XDel(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
return OpDel(op_args, key, absl::Span{ids.data(), ids.size()});
|
||||
return OpDel(t->GetOpArgs(shard), key, absl::Span{ids.data(), ids.size()});
|
||||
};
|
||||
|
||||
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -751,8 +745,7 @@ void StreamFamily::XInfo(CmdArgList args, ConnectionContext* cntx) {
|
|||
void StreamFamily::XLen(CmdArgList args, ConnectionContext* cntx) {
|
||||
string_view key = ArgS(args, 1);
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
return OpLen(op_args, key);
|
||||
return OpLen(t->GetOpArgs(shard), key);
|
||||
};
|
||||
|
||||
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -781,8 +774,7 @@ void StreamFamily::XSetId(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
return OpSetId2(op_args, key, parsed_id.val);
|
||||
return OpSetId2(t->GetOpArgs(shard), key, parsed_id.val);
|
||||
};
|
||||
|
||||
OpStatus result = cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
|
@ -835,8 +827,7 @@ void StreamFamily::XRangeGeneric(CmdArgList args, bool is_rev, ConnectionContext
|
|||
range_opts.is_rev = is_rev;
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
return OpRange(op_args, key, range_opts);
|
||||
return OpRange(t->GetOpArgs(shard), key, range_opts);
|
||||
};
|
||||
|
||||
OpResult<RecordVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -876,4 +867,4 @@ void StreamFamily::Register(CommandRegistry* registry) {
|
|||
<< CI{"XSETID", CO::WRITE | CO::DENYOOM, 3, 1, 1, 1}.HFUNC(XSetId);
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
} // namespace dfly
|
||||
|
|
|
@ -17,6 +17,7 @@ extern "C" {
|
|||
#include "server/engine_shard_set.h"
|
||||
#include "server/error.h"
|
||||
#include "server/io_mgr.h"
|
||||
#include "server/journal/journal.h"
|
||||
#include "server/tiered_storage.h"
|
||||
#include "server/transaction.h"
|
||||
#include "util/varz.h"
|
||||
|
@ -60,6 +61,12 @@ string_view GetSlice(EngineShard* shard, const PrimeValue& pv, string* tmp) {
|
|||
return pv.GetSlice(tmp);
|
||||
}
|
||||
|
||||
inline void RecordJournal(const OpArgs& op_args, const PrimeKey& pkey, const PrimeKey& pvalue) {
|
||||
if (op_args.shard->journal()) {
|
||||
op_args.shard->journal()->RecordEntry(op_args.txid, pkey, pvalue);
|
||||
}
|
||||
}
|
||||
|
||||
OpResult<uint32_t> OpSetRange(const OpArgs& op_args, string_view key, size_t start,
|
||||
string_view value) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
|
@ -90,9 +97,11 @@ OpResult<uint32_t> OpSetRange(const OpArgs& op_args, string_view key, size_t sta
|
|||
|
||||
db_slice.PreUpdate(op_args.db_ind, it);
|
||||
}
|
||||
|
||||
memcpy(s.data() + start, value.data(), value.size());
|
||||
it->second.SetString(s);
|
||||
db_slice.PostUpdate(op_args.db_ind, it);
|
||||
RecordJournal(op_args, it->first, it->second);
|
||||
|
||||
return it->second.Size();
|
||||
}
|
||||
|
@ -129,110 +138,300 @@ OpResult<string> OpGetRange(const OpArgs& op_args, string_view key, int32_t star
|
|||
return string(slice.substr(start, end - start + 1));
|
||||
};
|
||||
|
||||
} // namespace
|
||||
// Returns the length of the extended string. if prepend is false - appends the val.
|
||||
OpResult<uint32_t> ExtendOrSet(const OpArgs& op_args, std::string_view key, std::string_view val,
|
||||
bool prepend) {
|
||||
auto* shard = op_args.shard;
|
||||
auto& db_slice = shard->db_slice();
|
||||
auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key);
|
||||
if (inserted) {
|
||||
it->second.SetString(val);
|
||||
db_slice.PostUpdate(op_args.db_ind, it);
|
||||
RecordJournal(op_args, it->first, it->second);
|
||||
|
||||
SetCmd::SetCmd(DbSlice* db_slice) : db_slice_(*db_slice) {
|
||||
return val.size();
|
||||
}
|
||||
|
||||
if (it->second.ObjType() != OBJ_STRING)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
||||
string tmp, new_val;
|
||||
string_view slice = GetSlice(op_args.shard, it->second, &tmp);
|
||||
if (prepend)
|
||||
new_val = absl::StrCat(val, slice);
|
||||
else
|
||||
new_val = absl::StrCat(slice, val);
|
||||
|
||||
db_slice.PreUpdate(op_args.db_ind, it);
|
||||
it->second.SetString(new_val);
|
||||
db_slice.PostUpdate(op_args.db_ind, it);
|
||||
RecordJournal(op_args, it->first, it->second);
|
||||
|
||||
return new_val.size();
|
||||
}
|
||||
|
||||
SetCmd::~SetCmd() {
|
||||
OpResult<bool> ExtendOrSkip(const OpArgs& op_args, std::string_view key, std::string_view val,
|
||||
bool prepend) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
OpResult<PrimeIterator> it_res = db_slice.Find(op_args.db_ind, key, OBJ_STRING);
|
||||
if (!it_res) {
|
||||
return false;
|
||||
}
|
||||
|
||||
CompactObj& cobj = (*it_res)->second;
|
||||
|
||||
string tmp, new_val;
|
||||
string_view slice = GetSlice(op_args.shard, cobj, &tmp);
|
||||
if (prepend)
|
||||
new_val = absl::StrCat(val, slice);
|
||||
else
|
||||
new_val = absl::StrCat(slice, val);
|
||||
|
||||
db_slice.PreUpdate(op_args.db_ind, *it_res);
|
||||
cobj.SetString(new_val);
|
||||
db_slice.PostUpdate(op_args.db_ind, *it_res);
|
||||
|
||||
return new_val.size();
|
||||
}
|
||||
|
||||
OpResult<void> SetCmd::Set(const SetParams& params, std::string_view key, std::string_view value) {
|
||||
DCHECK_LT(params.db_index, db_slice_.db_array_size());
|
||||
DCHECK(db_slice_.IsDbValid(params.db_index));
|
||||
OpResult<string> OpGet(const OpArgs& op_args, string_view key) {
|
||||
OpResult<PrimeIterator> it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_STRING);
|
||||
if (!it_res.ok())
|
||||
return it_res.status();
|
||||
|
||||
VLOG(2) << "Set " << key << "(" << db_slice_.shard_id() << ") ";
|
||||
const PrimeValue& pv = it_res.value()->second;
|
||||
|
||||
if (params.how == SET_IF_EXISTS) {
|
||||
auto [it, expire_it] = db_slice_.FindExt(params.db_index, key);
|
||||
return GetString(op_args.shard, pv);
|
||||
}
|
||||
|
||||
if (IsValid(it)) { // existing
|
||||
return SetExisting(params, it, expire_it, value);
|
||||
OpResult<double> OpIncrFloat(const OpArgs& op_args, std::string_view key, double val) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key);
|
||||
|
||||
char buf[128];
|
||||
|
||||
if (inserted) {
|
||||
char* str = RedisReplyBuilder::FormatDouble(val, buf, sizeof(buf));
|
||||
it->second.SetString(str);
|
||||
db_slice.PostUpdate(op_args.db_ind, it);
|
||||
RecordJournal(op_args, it->first, it->second);
|
||||
|
||||
return val;
|
||||
}
|
||||
|
||||
if (it->second.ObjType() != OBJ_STRING)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
||||
if (it->second.Size() == 0)
|
||||
return OpStatus::INVALID_FLOAT;
|
||||
|
||||
string tmp;
|
||||
string_view slice = GetSlice(op_args.shard, it->second, &tmp);
|
||||
|
||||
StringToDoubleConverter stod(StringToDoubleConverter::NO_FLAGS, 0, 0, NULL, NULL);
|
||||
int processed_digits = 0;
|
||||
double base = stod.StringToDouble(slice.data(), slice.size(), &processed_digits);
|
||||
if (unsigned(processed_digits) != slice.size()) {
|
||||
return OpStatus::INVALID_FLOAT;
|
||||
}
|
||||
|
||||
base += val;
|
||||
|
||||
if (isnan(base) || isinf(base)) {
|
||||
return OpStatus::INVALID_FLOAT;
|
||||
}
|
||||
|
||||
char* str = RedisReplyBuilder::FormatDouble(base, buf, sizeof(buf));
|
||||
|
||||
db_slice.PreUpdate(op_args.db_ind, it);
|
||||
it->second.SetString(str);
|
||||
db_slice.PostUpdate(op_args.db_ind, it);
|
||||
RecordJournal(op_args, it->first, it->second);
|
||||
|
||||
return base;
|
||||
}
|
||||
|
||||
// if skip_on_missing - returns KEY_NOTFOUND.
|
||||
OpResult<int64_t> OpIncrBy(const OpArgs& op_args, std::string_view key, int64_t incr,
|
||||
bool skip_on_missing) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
|
||||
// we avoid using AddOrFind because of skip_on_missing option for memcache.
|
||||
auto [it, expire_it] = db_slice.FindExt(op_args.db_ind, key);
|
||||
|
||||
if (!IsValid(it)) {
|
||||
if (skip_on_missing)
|
||||
return OpStatus::KEY_NOTFOUND;
|
||||
|
||||
CompactObj cobj;
|
||||
cobj.SetInt(incr);
|
||||
|
||||
// AddNew calls PostUpdate inside.
|
||||
try {
|
||||
it = db_slice.AddNew(op_args.db_ind, key, std::move(cobj), 0);
|
||||
} catch (bad_alloc&) {
|
||||
return OpStatus::OUT_OF_MEMORY;
|
||||
}
|
||||
return OpStatus::SKIPPED;
|
||||
|
||||
RecordJournal(op_args, it->first, it->second);
|
||||
|
||||
return incr;
|
||||
}
|
||||
|
||||
// New entry
|
||||
tuple<PrimeIterator, ExpireIterator, bool> add_res;
|
||||
try {
|
||||
add_res = db_slice_.AddOrFind2(params.db_index, key);
|
||||
} catch (bad_alloc& e) {
|
||||
return OpStatus::OUT_OF_MEMORY;
|
||||
if (it->second.ObjType() != OBJ_STRING) {
|
||||
return OpStatus::WRONG_TYPE;
|
||||
}
|
||||
|
||||
PrimeIterator it = get<0>(add_res);
|
||||
if (!get<2>(add_res)) {
|
||||
return SetExisting(params, it, get<1>(add_res), value);
|
||||
auto opt_prev = it->second.TryGetInt();
|
||||
if (!opt_prev) {
|
||||
return OpStatus::INVALID_VALUE;
|
||||
}
|
||||
|
||||
// adding new value.
|
||||
PrimeValue tvalue{value};
|
||||
tvalue.SetFlag(params.memcache_flags != 0);
|
||||
it->second = std::move(tvalue);
|
||||
db_slice_.PostUpdate(params.db_index, it);
|
||||
|
||||
if (params.expire_after_ms) {
|
||||
db_slice_.UpdateExpire(params.db_index, it, params.expire_after_ms + db_slice_.Now());
|
||||
long long prev = *opt_prev;
|
||||
if ((incr < 0 && prev < 0 && incr < (LLONG_MIN - prev)) ||
|
||||
(incr > 0 && prev > 0 && incr > (LLONG_MAX - prev))) {
|
||||
return OpStatus::OUT_OF_RANGE;
|
||||
}
|
||||
|
||||
if (params.memcache_flags)
|
||||
db_slice_.SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags);
|
||||
int64_t new_val = prev + incr;
|
||||
DCHECK(!it->second.IsExternal());
|
||||
db_slice.PreUpdate(op_args.db_ind, it);
|
||||
it->second.SetInt(new_val);
|
||||
db_slice.PostUpdate(op_args.db_ind, it);
|
||||
RecordJournal(op_args, it->first, it->second);
|
||||
|
||||
EngineShard* shard = db_slice_.shard_owner();
|
||||
return new_val;
|
||||
}
|
||||
|
||||
if (shard->tiered_storage()) { // external storage enabled.
|
||||
if (value.size() >= kMinTieredLen) {
|
||||
shard->tiered_storage()->UnloadItem(params.db_index, it);
|
||||
// Returns true if keys were set, false otherwise.
|
||||
OpStatus OpMSet(const OpArgs& op_args, ArgSlice args) {
|
||||
DCHECK(!args.empty() && args.size() % 2 == 0);
|
||||
|
||||
SetCmd::SetParams params{op_args.db_ind};
|
||||
SetCmd sg(op_args);
|
||||
|
||||
for (size_t i = 0; i < args.size(); i += 2) {
|
||||
DVLOG(1) << "MSet " << args[i] << ":" << args[i + 1];
|
||||
OpStatus res = sg.Set(params, args[i], args[i + 1]);
|
||||
if (res != OpStatus::OK) { // OOM for example.
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value) {
|
||||
EngineShard* shard = op_args_.shard;
|
||||
auto& db_slice = shard->db_slice();
|
||||
|
||||
DCHECK_LT(params.db_index, db_slice.db_array_size());
|
||||
DCHECK(db_slice.IsDbValid(params.db_index));
|
||||
|
||||
VLOG(2) << "Set " << key << "(" << db_slice.shard_id() << ") ";
|
||||
|
||||
if (params.how == SET_IF_EXISTS) {
|
||||
auto [it, expire_it] = db_slice.FindExt(params.db_index, key);
|
||||
|
||||
if (IsValid(it)) { // existing
|
||||
return SetExisting(params, it, expire_it, value);
|
||||
}
|
||||
|
||||
return OpStatus::SKIPPED;
|
||||
}
|
||||
|
||||
// Trying to add a new entry.
|
||||
tuple<PrimeIterator, ExpireIterator, bool> add_res;
|
||||
try {
|
||||
add_res = db_slice.AddOrFind2(params.db_index, key);
|
||||
} catch (bad_alloc& e) {
|
||||
return OpStatus::OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
PrimeIterator it = get<0>(add_res);
|
||||
if (!get<2>(add_res)) { // Existing.
|
||||
return SetExisting(params, it, get<1>(add_res), value);
|
||||
}
|
||||
|
||||
//
|
||||
// Adding new value.
|
||||
PrimeValue tvalue{value};
|
||||
tvalue.SetFlag(params.memcache_flags != 0);
|
||||
it->second = std::move(tvalue);
|
||||
db_slice.PostUpdate(params.db_index, it);
|
||||
|
||||
if (params.expire_after_ms) {
|
||||
db_slice.UpdateExpire(params.db_index, it, params.expire_after_ms + db_slice.Now());
|
||||
}
|
||||
|
||||
if (params.memcache_flags)
|
||||
db_slice.SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags);
|
||||
|
||||
if (shard->tiered_storage()) { // external storage enabled.
|
||||
// TODO: we may have a bug if we block the fiber inside UnloadItem - "it" may be invalid
|
||||
// afterwards.
|
||||
if (value.size() >= kMinTieredLen) {
|
||||
shard->tiered_storage()->UnloadItem(params.db_index, it);
|
||||
}
|
||||
}
|
||||
|
||||
RecordJournal(op_args_, it->first, it->second);
|
||||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIterator e_it,
|
||||
std::string_view value) {
|
||||
string_view value) {
|
||||
if (params.how == SET_IF_NOTEXIST)
|
||||
return OpStatus::SKIPPED;
|
||||
|
||||
PrimeValue& prime_value = it->second;
|
||||
EngineShard* shard = op_args_.shard;
|
||||
|
||||
if (params.prev_val) {
|
||||
if (prime_value.ObjType() != OBJ_STRING)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
||||
string val = GetString(db_slice_.shard_owner(), prime_value);
|
||||
string val = GetString(shard, prime_value);
|
||||
params.prev_val->emplace(move(val));
|
||||
}
|
||||
|
||||
uint64_t at_ms = params.expire_after_ms ? params.expire_after_ms + db_slice_.Now() : 0;
|
||||
DbSlice& db_slice = shard->db_slice();
|
||||
uint64_t at_ms = params.expire_after_ms ? params.expire_after_ms + db_slice.Now() : 0;
|
||||
if (IsValid(e_it) && at_ms) {
|
||||
e_it->second = db_slice_.FromAbsoluteTime(at_ms);
|
||||
e_it->second = db_slice.FromAbsoluteTime(at_ms);
|
||||
} else {
|
||||
bool changed = db_slice_.UpdateExpire(params.db_index, it, at_ms);
|
||||
// We need to update expiry, or maybe erase the object if it was expired.
|
||||
bool changed = db_slice.UpdateExpire(params.db_index, it, at_ms);
|
||||
if (changed && at_ms == 0) // erased.
|
||||
return OpStatus::OK;
|
||||
return OpStatus::OK; // TODO: to update journal with deletion.
|
||||
}
|
||||
|
||||
db_slice_.PreUpdate(params.db_index, it);
|
||||
db_slice.PreUpdate(params.db_index, it);
|
||||
|
||||
// Check whether we need to update flags table.
|
||||
bool req_flag_update = (params.memcache_flags != 0) != prime_value.HasFlag();
|
||||
if (req_flag_update) {
|
||||
prime_value.SetFlag(params.memcache_flags != 0);
|
||||
db_slice_.SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags);
|
||||
db_slice.SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags);
|
||||
}
|
||||
|
||||
// overwrite existing entry.
|
||||
prime_value.SetString(value);
|
||||
|
||||
if (value.size() >= kMinTieredLen) { // external storage enabled.
|
||||
EngineShard* shard = db_slice_.shard_owner();
|
||||
|
||||
// TODO: if UnloadItem can block the calling fiber, then we have the bug because then "it"
|
||||
// can be invalid after the function returns and the functions that follow may access invalid
|
||||
// entry.
|
||||
if (shard->tiered_storage()) {
|
||||
shard->tiered_storage()->UnloadItem(params.db_index, it);
|
||||
}
|
||||
}
|
||||
|
||||
db_slice_.PostUpdate(params.db_index, it);
|
||||
db_slice.PostUpdate(params.db_index, it);
|
||||
RecordJournal(op_args_, it->first, it->second);
|
||||
|
||||
return OpStatus::OK;
|
||||
}
|
||||
|
@ -261,7 +460,7 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
|
|||
builder->SendError(kSyntaxErr);
|
||||
}
|
||||
|
||||
std::string_view ex = ArgS(args, i);
|
||||
string_view ex = ArgS(args, i);
|
||||
if (!absl::SimpleAtoi(ex, &int_arg)) {
|
||||
return builder->SendError(kInvalidIntErr);
|
||||
}
|
||||
|
@ -291,9 +490,8 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
|
|||
DCHECK(cntx->transaction);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
SetCmd sg(&shard->db_slice());
|
||||
auto status = sg.Set(sparams, key, value).status();
|
||||
return status;
|
||||
SetCmd sg(t->GetOpArgs(shard));
|
||||
return sg.Set(sparams, key, value);
|
||||
};
|
||||
OpResult<void> result = cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
|
||||
|
@ -319,9 +517,7 @@ void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
std::string_view key = ArgS(args, 1);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpGet(OpArgs{shard, t->db_index()}, key);
|
||||
};
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) { return OpGet(t->GetOpArgs(shard), key); };
|
||||
|
||||
DVLOG(1) << "Before Get::ScheduleSingleHopT " << key;
|
||||
Transaction* trans = cntx->transaction;
|
||||
|
@ -350,16 +546,15 @@ void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) {
|
|||
SetCmd::SetParams sparams{cntx->db_index()};
|
||||
sparams.prev_val = &prev_val;
|
||||
|
||||
ShardId sid = Shard(key, shard_set->size());
|
||||
OpResult<void> result = shard_set->Await(sid, [&] {
|
||||
EngineShard* es = EngineShard::tlocal();
|
||||
SetCmd cmd(&es->db_slice());
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
SetCmd cmd(t->GetOpArgs(shard));
|
||||
|
||||
return cmd.Set(sparams, key, value);
|
||||
});
|
||||
};
|
||||
OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
|
||||
if (!result) {
|
||||
(*cntx)->SendError(result.status());
|
||||
if (status != OpStatus::OK) {
|
||||
(*cntx)->SendError(status);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -367,6 +562,7 @@ void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) {
|
|||
(*cntx)->SendBulkString(*prev_val);
|
||||
return;
|
||||
}
|
||||
|
||||
return (*cntx)->SendNull();
|
||||
}
|
||||
|
||||
|
@ -398,7 +594,7 @@ void StringFamily::IncrByFloat(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpIncrFloat(OpArgs{shard, t->db_index()}, key, val);
|
||||
return OpIncrFloat(t->GetOpArgs(shard), key, val);
|
||||
};
|
||||
|
||||
OpResult<double> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -444,7 +640,7 @@ void StringFamily::IncrByGeneric(std::string_view key, int64_t val, ConnectionCo
|
|||
bool skip_on_missing = cntx->protocol() == Protocol::MEMCACHE;
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpResult<int64_t> res = OpIncrBy(OpArgs{shard, t->db_index()}, key, val, skip_on_missing);
|
||||
OpResult<int64_t> res = OpIncrBy(t->GetOpArgs(shard), key, val, skip_on_missing);
|
||||
return res;
|
||||
};
|
||||
|
||||
|
@ -477,7 +673,7 @@ void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContex
|
|||
|
||||
if (cntx->protocol() == Protocol::REDIS) {
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return ExtendOrSet(OpArgs{shard, t->db_index()}, key, sval, prepend);
|
||||
return ExtendOrSet(t->GetOpArgs(shard), key, sval, prepend);
|
||||
};
|
||||
|
||||
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -489,7 +685,7 @@ void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContex
|
|||
DCHECK(cntx->protocol() == Protocol::MEMCACHE);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return ExtendOrSkip(OpArgs{shard, t->db_index()}, key, sval, prepend);
|
||||
return ExtendOrSkip(t->GetOpArgs(shard), key, sval, prepend);
|
||||
};
|
||||
|
||||
OpResult<bool> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -524,9 +720,8 @@ void StringFamily::SetExGeneric(bool seconds, CmdArgList args, ConnectionContext
|
|||
sparams.expire_after_ms = unit_vals;
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
SetCmd sg(&shard->db_slice());
|
||||
auto status = sg.Set(sparams, key, value).status();
|
||||
return status;
|
||||
SetCmd sg(t->GetOpArgs(shard));
|
||||
return sg.Set(sparams, key, value);
|
||||
};
|
||||
|
||||
OpResult<void> result = cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
|
@ -600,9 +795,9 @@ void StringFamily::MSet(CmdArgList args, ConnectionContext* cntx) {
|
|||
LOG(INFO) << "MSET/" << transaction->unique_shard_cnt() << str;
|
||||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* es) {
|
||||
auto args = t->ShardArgsInShard(es->shard_id());
|
||||
return OpMSet(OpArgs{es, t->db_index()}, args);
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
auto args = t->ShardArgsInShard(shard->shard_id());
|
||||
return OpMSet(t->GetOpArgs(shard), args);
|
||||
};
|
||||
|
||||
OpStatus status = transaction->ScheduleSingleHop(std::move(cb));
|
||||
|
@ -635,12 +830,13 @@ void StringFamily::MSetNx(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
transaction->Execute(std::move(cb), false);
|
||||
bool to_skip = exists.load(memory_order_relaxed) == true;
|
||||
auto epilog_cb = [&](Transaction* t, EngineShard* es) {
|
||||
|
||||
auto epilog_cb = [&](Transaction* t, EngineShard* shard) {
|
||||
if (to_skip)
|
||||
return OpStatus::OK;
|
||||
|
||||
auto args = t->ShardArgsInShard(es->shard_id());
|
||||
return OpMSet(OpArgs{es, t->db_index()}, std::move(args));
|
||||
auto args = t->ShardArgsInShard(shard->shard_id());
|
||||
return OpMSet(t->GetOpArgs(shard), std::move(args));
|
||||
};
|
||||
|
||||
transaction->Execute(std::move(epilog_cb), true);
|
||||
|
@ -680,7 +876,7 @@ void StringFamily::GetRange(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpGetRange(OpArgs{shard, t->db_index()}, key, start, end);
|
||||
return OpGetRange(t->GetOpArgs(shard), key, start, end);
|
||||
};
|
||||
|
||||
Transaction* trans = cntx->transaction;
|
||||
|
@ -713,7 +909,7 @@ void StringFamily::SetRange(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<uint32_t> {
|
||||
return OpSetRange(OpArgs{shard, t->db_index()}, key, start, value);
|
||||
return OpSetRange(t->GetOpArgs(shard), key, start, value);
|
||||
};
|
||||
|
||||
Transaction* trans = cntx->transaction;
|
||||
|
@ -758,176 +954,6 @@ auto StringFamily::OpMGet(bool fetch_mcflag, bool fetch_mcver, const Transaction
|
|||
return response;
|
||||
}
|
||||
|
||||
OpStatus StringFamily::OpMSet(const OpArgs& op_args, ArgSlice args) {
|
||||
DCHECK(!args.empty() && args.size() % 2 == 0);
|
||||
|
||||
SetCmd::SetParams params{op_args.db_ind};
|
||||
SetCmd sg(&op_args.shard->db_slice());
|
||||
|
||||
for (size_t i = 0; i < args.size(); i += 2) {
|
||||
DVLOG(1) << "MSet " << args[i] << ":" << args[i + 1];
|
||||
auto res = sg.Set(params, args[i], args[i + 1]);
|
||||
if (!res) { // OOM for example.
|
||||
return res.status();
|
||||
}
|
||||
}
|
||||
|
||||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
OpResult<int64_t> StringFamily::OpIncrBy(const OpArgs& op_args, std::string_view key, int64_t incr,
|
||||
bool skip_on_missing) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
|
||||
// we avoid using AddOrFind because of skip_on_missing option for memcache.
|
||||
auto [it, expire_it] = db_slice.FindExt(op_args.db_ind, key);
|
||||
|
||||
if (!IsValid(it)) {
|
||||
if (skip_on_missing)
|
||||
return OpStatus::KEY_NOTFOUND;
|
||||
|
||||
CompactObj cobj;
|
||||
cobj.SetInt(incr);
|
||||
|
||||
try {
|
||||
db_slice.AddNew(op_args.db_ind, key, std::move(cobj), 0);
|
||||
} catch (bad_alloc&) {
|
||||
return OpStatus::OUT_OF_MEMORY;
|
||||
}
|
||||
return incr;
|
||||
}
|
||||
|
||||
if (it->second.ObjType() != OBJ_STRING) {
|
||||
return OpStatus::WRONG_TYPE;
|
||||
}
|
||||
|
||||
auto opt_prev = it->second.TryGetInt();
|
||||
if (!opt_prev) {
|
||||
return OpStatus::INVALID_VALUE;
|
||||
}
|
||||
|
||||
long long prev = *opt_prev;
|
||||
if ((incr < 0 && prev < 0 && incr < (LLONG_MIN - prev)) ||
|
||||
(incr > 0 && prev > 0 && incr > (LLONG_MAX - prev))) {
|
||||
return OpStatus::OUT_OF_RANGE;
|
||||
}
|
||||
|
||||
int64_t new_val = prev + incr;
|
||||
DCHECK(!it->second.IsExternal());
|
||||
db_slice.PreUpdate(op_args.db_ind, it);
|
||||
it->second.SetInt(new_val);
|
||||
db_slice.PostUpdate(op_args.db_ind, it);
|
||||
return new_val;
|
||||
}
|
||||
|
||||
OpResult<double> StringFamily::OpIncrFloat(const OpArgs& op_args, std::string_view key,
|
||||
double val) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key);
|
||||
|
||||
char buf[128];
|
||||
|
||||
if (inserted) {
|
||||
char* str = RedisReplyBuilder::FormatDouble(val, buf, sizeof(buf));
|
||||
it->second.SetString(str);
|
||||
db_slice.PostUpdate(op_args.db_ind, it);
|
||||
|
||||
return val;
|
||||
}
|
||||
|
||||
if (it->second.ObjType() != OBJ_STRING)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
||||
if (it->second.Size() == 0)
|
||||
return OpStatus::INVALID_FLOAT;
|
||||
|
||||
string tmp;
|
||||
string_view slice = GetSlice(op_args.shard, it->second, &tmp);
|
||||
|
||||
StringToDoubleConverter stod(StringToDoubleConverter::NO_FLAGS, 0, 0, NULL, NULL);
|
||||
int processed_digits = 0;
|
||||
double base = stod.StringToDouble(slice.data(), slice.size(), &processed_digits);
|
||||
if (unsigned(processed_digits) != slice.size()) {
|
||||
return OpStatus::INVALID_FLOAT;
|
||||
}
|
||||
|
||||
base += val;
|
||||
|
||||
if (isnan(base) || isinf(base)) {
|
||||
return OpStatus::INVALID_FLOAT;
|
||||
}
|
||||
|
||||
char* str = RedisReplyBuilder::FormatDouble(base, buf, sizeof(buf));
|
||||
|
||||
db_slice.PreUpdate(op_args.db_ind, it);
|
||||
it->second.SetString(str);
|
||||
db_slice.PostUpdate(op_args.db_ind, it);
|
||||
|
||||
return base;
|
||||
}
|
||||
|
||||
OpResult<uint32_t> StringFamily::ExtendOrSet(const OpArgs& op_args, std::string_view key,
|
||||
std::string_view val, bool prepend) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key);
|
||||
if (inserted) {
|
||||
it->second.SetString(val);
|
||||
db_slice.PostUpdate(op_args.db_ind, it);
|
||||
|
||||
return val.size();
|
||||
}
|
||||
|
||||
if (it->second.ObjType() != OBJ_STRING)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
||||
string tmp, new_val;
|
||||
string_view slice = GetSlice(op_args.shard, it->second, &tmp);
|
||||
if (prepend)
|
||||
new_val = absl::StrCat(val, slice);
|
||||
else
|
||||
new_val = absl::StrCat(slice, val);
|
||||
|
||||
db_slice.PreUpdate(op_args.db_ind, it);
|
||||
it->second.SetString(new_val);
|
||||
db_slice.PostUpdate(op_args.db_ind, it);
|
||||
|
||||
return new_val.size();
|
||||
}
|
||||
|
||||
OpResult<bool> StringFamily::ExtendOrSkip(const OpArgs& op_args, std::string_view key,
|
||||
std::string_view val, bool prepend) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
OpResult<PrimeIterator> it_res = db_slice.Find(op_args.db_ind, key, OBJ_STRING);
|
||||
if (!it_res) {
|
||||
return false;
|
||||
}
|
||||
|
||||
CompactObj& cobj = (*it_res)->second;
|
||||
|
||||
string tmp, new_val;
|
||||
string_view slice = GetSlice(op_args.shard, cobj, &tmp);
|
||||
if (prepend)
|
||||
new_val = absl::StrCat(val, slice);
|
||||
else
|
||||
new_val = absl::StrCat(slice, val);
|
||||
|
||||
db_slice.PreUpdate(op_args.db_ind, *it_res);
|
||||
cobj.SetString(new_val);
|
||||
db_slice.PostUpdate(op_args.db_ind, *it_res);
|
||||
|
||||
return new_val.size();
|
||||
}
|
||||
|
||||
OpResult<string> StringFamily::OpGet(const OpArgs& op_args, string_view key) {
|
||||
OpResult<PrimeIterator> it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_STRING);
|
||||
if (!it_res.ok())
|
||||
return it_res.status();
|
||||
|
||||
const PrimeValue& pv = it_res.value()->second;
|
||||
|
||||
return GetString(op_args.shard, pv);
|
||||
}
|
||||
|
||||
void StringFamily::Init(util::ProactorPool* pp) {
|
||||
set_qps.Init(pp);
|
||||
get_qps.Init(pp);
|
||||
|
|
|
@ -16,11 +16,10 @@ using facade::OpResult;
|
|||
using facade::OpStatus;
|
||||
|
||||
class SetCmd {
|
||||
DbSlice& db_slice_;
|
||||
const OpArgs op_args_;
|
||||
|
||||
public:
|
||||
explicit SetCmd(DbSlice* db_slice);
|
||||
~SetCmd();
|
||||
explicit SetCmd(const OpArgs& op_args) : op_args_(op_args) {}
|
||||
|
||||
enum SetHow { SET_ALWAYS, SET_IF_NOTEXIST, SET_IF_EXISTS };
|
||||
|
||||
|
@ -38,7 +37,7 @@ class SetCmd {
|
|||
}
|
||||
};
|
||||
|
||||
OpResult<void> Set(const SetParams& params, std::string_view key, std::string_view value);
|
||||
OpStatus Set(const SetParams& params, std::string_view key, std::string_view value);
|
||||
|
||||
private:
|
||||
OpStatus SetExisting(const SetParams& params, PrimeIterator it, ExpireIterator e_it,
|
||||
|
@ -86,24 +85,6 @@ class StringFamily {
|
|||
using MGetResponse = std::vector<std::optional<GetResp>>;
|
||||
static MGetResponse OpMGet(bool fetch_mcflag, bool fetch_mcver, const Transaction* t,
|
||||
EngineShard* shard);
|
||||
|
||||
// Returns true if keys were set, false otherwise.
|
||||
static OpStatus OpMSet(const OpArgs& op_args, ArgSlice args);
|
||||
|
||||
// if skip_on_missing - returns KEY_NOTFOUND.
|
||||
static OpResult<int64_t> OpIncrBy(const OpArgs& op_args, std::string_view key, int64_t val,
|
||||
bool skip_on_missing);
|
||||
static OpResult<double> OpIncrFloat(const OpArgs& op_args, std::string_view key, double val);
|
||||
|
||||
// Returns the length of the extended string. if prepend is false - appends the val.
|
||||
static OpResult<uint32_t> ExtendOrSet(const OpArgs& op_args, std::string_view key,
|
||||
std::string_view val, bool prepend);
|
||||
|
||||
// Returns true if was extended, false if the key was not found.
|
||||
static OpResult<bool> ExtendOrSkip(const OpArgs& op_args, std::string_view key,
|
||||
std::string_view val, bool prepend);
|
||||
|
||||
static OpResult<std::string> OpGet(const OpArgs& op_args, std::string_view key);
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -11,6 +11,8 @@
|
|||
#include "server/command_registry.h"
|
||||
#include "server/db_slice.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "server/journal/journal.h"
|
||||
#include "server/server_state.h"
|
||||
|
||||
namespace dfly {
|
||||
|
||||
|
@ -334,9 +336,14 @@ bool Transaction::RunInShard(EngineShard* shard) {
|
|||
|
||||
/*************************************************************************/
|
||||
// Actually running the callback.
|
||||
// If you change the logic here, also please change the logic
|
||||
try {
|
||||
// if transaction is suspended (blocked in watched queue), then it's a noop.
|
||||
OpStatus status = was_suspended ? OpStatus::OK : cb_(this, shard);
|
||||
OpStatus status = OpStatus::OK;
|
||||
|
||||
if (!was_suspended) {
|
||||
status = cb_(this, shard);
|
||||
}
|
||||
|
||||
if (unique_shard_cnt_ == 1) {
|
||||
cb_ = nullptr; // We can do it because only a single thread runs the callback.
|
||||
|
@ -467,6 +474,13 @@ void Transaction::ScheduleInternal() {
|
|||
VLOG(2) << "Scheduled " << DebugId()
|
||||
<< " OutOfOrder: " << bool(coordinator_state_ & COORD_OOO)
|
||||
<< " num_shards: " << num_shards;
|
||||
|
||||
if (mode == IntentLock::EXCLUSIVE) {
|
||||
journal::Journal* j = ServerState::tlocal()->journal();
|
||||
// TODO: we may want to pass custom command name into journal.
|
||||
if (j && j->SchedStartTx(txid_, 0, num_shards)) {
|
||||
}
|
||||
}
|
||||
coordinator_state_ |= COORD_SCHED;
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -179,6 +179,10 @@ class Transaction {
|
|||
//! Runs in the shard thread.
|
||||
KeyLockArgs GetLockArgs(ShardId sid) const;
|
||||
|
||||
OpArgs GetOpArgs(EngineShard* shard) const {
|
||||
return OpArgs{shard, txid_, db_index_};
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
struct LockCnt {
|
||||
|
|
|
@ -970,8 +970,7 @@ void ZSetFamily::ZAdd(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
absl::Span memb_sp{members.data(), members.size()};
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
return OpAdd(op_args, zparams, key, memb_sp);
|
||||
return OpAdd(t->GetOpArgs(shard), zparams, key, memb_sp);
|
||||
};
|
||||
|
||||
OpResult<AddResult> add_result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -1031,8 +1030,7 @@ void ZSetFamily::ZCount(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
return OpCount(op_args, key, si);
|
||||
return OpCount(t->GetOpArgs(shard), key, si);
|
||||
};
|
||||
|
||||
OpResult<unsigned> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -1063,8 +1061,7 @@ void ZSetFamily::ZIncrBy(CmdArgList args, ConnectionContext* cntx) {
|
|||
zparams.flags = ZADD_IN_INCR;
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
return OpAdd(op_args, zparams, key, ScoredMemberSpan{&scored_member, 1});
|
||||
return OpAdd(t->GetOpArgs(shard), zparams, key, ScoredMemberSpan{&scored_member, 1});
|
||||
};
|
||||
|
||||
OpResult<AddResult> add_result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -1139,7 +1136,7 @@ void ZSetFamily::ZInterStore(CmdArgList args, ConnectionContext* cntx) {
|
|||
ZParams zparams;
|
||||
zparams.override = true;
|
||||
add_result =
|
||||
OpAdd(OpArgs{shard, t->db_index()}, zparams, dest_key, ScoredMemberSpan{smvec}).value();
|
||||
OpAdd(t->GetOpArgs(shard), zparams, dest_key, ScoredMemberSpan{smvec}).value();
|
||||
}
|
||||
return OpStatus::OK;
|
||||
};
|
||||
|
@ -1161,8 +1158,7 @@ void ZSetFamily::ZLexCount(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
return OpLexCount(op_args, key, li);
|
||||
return OpLexCount(t->GetOpArgs(shard), key, li);
|
||||
};
|
||||
|
||||
OpResult<unsigned> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -1237,8 +1233,7 @@ void ZSetFamily::ZRangeByLex(CmdArgList args, ConnectionContext* cntx) {
|
|||
range_spec.interval = li;
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
return OpRange(range_spec, op_args, key);
|
||||
return OpRange(range_spec, t->GetOpArgs(shard), key);
|
||||
};
|
||||
|
||||
OpResult<ScoredArray> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -1318,8 +1313,7 @@ void ZSetFamily::ZRem(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
return OpRem(op_args, key, members);
|
||||
return OpRem(t->GetOpArgs(shard), key, members);
|
||||
};
|
||||
|
||||
OpResult<unsigned> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -1335,8 +1329,7 @@ void ZSetFamily::ZScore(CmdArgList args, ConnectionContext* cntx) {
|
|||
string_view member = ArgS(args, 2);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
return OpScore(op_args, key, member);
|
||||
return OpScore(t->GetOpArgs(shard), key, member);
|
||||
};
|
||||
|
||||
OpResult<double> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -1364,7 +1357,7 @@ void ZSetFamily::ZScan(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpScan(OpArgs{shard, t->db_index()}, key, &cursor);
|
||||
return OpScan(t->GetOpArgs(shard), key, &cursor);
|
||||
};
|
||||
|
||||
OpResult<StringVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -1427,7 +1420,7 @@ void ZSetFamily::ZUnionStore(CmdArgList args, ConnectionContext* cntx) {
|
|||
ZParams zparams;
|
||||
zparams.override = true;
|
||||
add_result =
|
||||
OpAdd(OpArgs{shard, t->db_index()}, zparams, dest_key, ScoredMemberSpan{smvec}).value();
|
||||
OpAdd(t->GetOpArgs(shard), zparams, dest_key, ScoredMemberSpan{smvec}).value();
|
||||
}
|
||||
return OpStatus::OK;
|
||||
};
|
||||
|
@ -1449,8 +1442,7 @@ void ZSetFamily::ZRangeByScoreInternal(string_view key, string_view min_s, strin
|
|||
range_spec.interval = si;
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
return OpRange(range_spec, op_args, key);
|
||||
return OpRange(range_spec, t->GetOpArgs(shard), key);
|
||||
};
|
||||
|
||||
OpResult<ScoredArray> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -1480,8 +1472,7 @@ void ZSetFamily::OutputScoredArrayResult(const OpResult<ScoredArray>& result,
|
|||
void ZSetFamily::ZRemRangeGeneric(string_view key, const ZRangeSpec& range_spec,
|
||||
ConnectionContext* cntx) {
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
return OpRemRange(op_args, key, range_spec);
|
||||
return OpRemRange(t->GetOpArgs(shard), key, range_spec);
|
||||
};
|
||||
|
||||
OpResult<unsigned> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -1531,8 +1522,7 @@ void ZSetFamily::ZRangeGeneric(CmdArgList args, bool reverse, ConnectionContext*
|
|||
range_spec.interval = ii;
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
return OpRange(range_spec, op_args, key);
|
||||
return OpRange(range_spec, t->GetOpArgs(shard), key);
|
||||
};
|
||||
|
||||
OpResult<ScoredArray> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -1544,9 +1534,8 @@ void ZSetFamily::ZRankGeneric(CmdArgList args, bool reverse, ConnectionContext*
|
|||
string_view member = ArgS(args, 2);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
|
||||
return OpRank(op_args, key, member, reverse);
|
||||
return OpRank(t->GetOpArgs(shard), key, member, reverse);
|
||||
};
|
||||
|
||||
OpResult<unsigned> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
|
Loading…
Reference in New Issue