This CR fixes #36.

1. Fix a bug in dash table related to snapshotting.
2. Rewrite GlobalState code and make state transitions atomic and well defined.
3. Fix Save/Flush semantics by capturing snapshotted tables together with the snapshot.
This commit is contained in:
Roman Gershman 2022-05-20 12:35:28 +03:00
parent 439070d00d
commit 599c786c16
18 changed files with 250 additions and 203 deletions

View File

@ -286,6 +286,11 @@ class DashTable<_Key, _Value, Policy>::Iterator {
: owner_(me), seg_id_(seg_id), bucket_id_(bid), slot_id_(sid) {
}
Iterator(Owner* me, uint32_t seg_id, uint8_t bid)
: owner_(me), seg_id_(seg_id), bucket_id_(bid), slot_id_(0) {
Seek2Occupied();
}
public:
using iterator_category = std::forward_iterator_tag;
using difference_type = std::ptrdiff_t;
@ -475,7 +480,7 @@ void DashTable<_Key, _Value, Policy>::CVCUponInsert(uint64_t ver_threshold, cons
unsigned num_touched = target->CVCOnInsert(ver_threshold, key_hash, bids);
if (num_touched < UINT16_MAX) {
for (unsigned i = 0; i < num_touched; ++i) {
cb(bucket_iterator{this, seg_id, bids[i], 0});
cb(bucket_iterator{this, seg_id, bids[i]});
}
return;
}
@ -486,7 +491,7 @@ void DashTable<_Key, _Value, Policy>::CVCUponInsert(uint64_t ver_threshold, cons
// and its entries can be reshuffled into different buckets.
for (uint8_t i = 0; i < kPhysicalBucketNum; ++i) {
if (target->GetVersion(i) < ver_threshold) {
cb(bucket_iterator{this, seg_id, i, 0});
cb(bucket_iterator{this, seg_id, i});
}
}
}
@ -505,7 +510,7 @@ void DashTable<_Key, _Value, Policy>::CVCUponBump(uint64_t ver_upperbound, const
target->CVCOnBump(ver_upperbound, it.bucket_id(), it.slot_id(), key_hash, bids);
for (unsigned i = 0; i < num_touched; ++i) {
cb(bucket_iterator{this, seg_id, bids[i], 0});
cb(bucket_iterator{this, seg_id, bids[i]});
}
}
@ -812,10 +817,13 @@ auto DashTable<_Key, _Value, Policy>::Traverse(cursor curs, Cb&& cb) -> cursor {
// We fix bid and go over all segments. Once we reach the end we increase bid and repeat.
do {
SegmentType& s = *segment_[sid];
SegmentType* s = segment_[sid];
assert(s);
auto dt_cb = [&](const SegmentIterator& it) { cb(iterator{this, sid, it.index, it.slot}); };
fetched = s.TraverseLogicalBucket(bid, hash_fun, std::move(dt_cb));
fetched = s->TraverseLogicalBucket(bid, hash_fun, std::move(dt_cb));
sid = NextSeg(sid);
if (sid >= segment_.size()) {
sid = 0;

View File

@ -23,6 +23,8 @@ extern "C" {
#include "redis/zmalloc.h"
}
#pragma clang diagnostic ignored "-Wunused-const-variable"
namespace dfly {
static uint64_t callbackHash(const void* key) {
@ -110,7 +112,6 @@ using Dash64 = DashTable<uint64_t, uint64_t, UInt64Policy>;
constexpr auto kSegTax = Segment::kTaxSize;
constexpr size_t kMaxSize = Segment::kMaxSize;
constexpr size_t kSegSize = sizeof(Segment);
constexpr size_t foo = Segment::kBucketSz;
class DashTest : public testing::Test {
protected:
@ -624,8 +625,9 @@ struct VersionPolicy : public BasicDashPolicy {
}
};
using VersionDT = DashTable<int, int, VersionPolicy>;
TEST_F(DashTest, Version) {
DashTable<int, int, VersionPolicy> dt;
VersionDT dt;
auto [it, inserted] = dt.Insert(1, 1);
EXPECT_EQ(0, it.GetVersion());
@ -653,6 +655,36 @@ TEST_F(DashTest, Version) {
ASSERT_EQ(kNum, items);
}
TEST_F(DashTest, CVCUponInsert) {
VersionDT dt;
auto [it, added] = dt.Insert(10, 20); // added to slot 0
ASSERT_TRUE(added);
int i = 11;
while (true) {
auto [it2, added] = dt.Insert(i, 30);
if (it2.bucket_id() == it.bucket_id() && it2.segment_id() == it.segment_id()) {
ASSERT_EQ(1, it2.slot_id());
break;
}
++i;
}
// freed slot 0 but the bucket still has i at slot 1.
dt.Erase(10);
auto cb = [](VersionDT::bucket_iterator bit) {
LOG(INFO) << "sid: " << bit.segment_id() << " " << bit.bucket_id();
while (!bit.is_done()) {
LOG(INFO) << "key: " << bit->first;
++bit;
}
};
dt.CVCUponInsert(1, i, cb);
}
struct A {
int a = 0;
unsigned moved = 0;

View File

@ -40,7 +40,7 @@ ServerState::~ServerState() {
}
void ServerState::Init() {
gstate_ = GlobalState::IDLE;
gstate_ = GlobalState::ACTIVE;
}
void ServerState::Shutdown() {
@ -56,10 +56,10 @@ Interpreter& ServerState::GetInterpreter() {
return interpreter_.value();
}
const char* GlobalState::Name(S s) {
const char* GlobalStateName(GlobalState s) {
switch (s) {
case GlobalState::IDLE:
return "IDLE";
case GlobalState::ACTIVE:
return "ACTIVE";
case GlobalState::LOADING:
return "LOADING";
case GlobalState::SAVING:

View File

@ -82,6 +82,12 @@ struct TieredStats {
TieredStats& operator+=(const TieredStats&);
};
enum class GlobalState : uint8_t {
ACTIVE,
LOADING,
SAVING,
SHUTTING_DOWN,
};
inline void ToUpper(const MutableSlice* val) {
for (auto& c : *val) {
@ -109,4 +115,7 @@ extern size_t max_memory_limit;
// set upon server start.
extern unsigned kernel_version;
const char* GlobalStateName(GlobalState gs);
} // namespace dfly

View File

@ -225,6 +225,10 @@ class DbSlice {
// and second denotes number of deleted items due to expiration. (second <= first).
std::pair<unsigned, unsigned> DeleteExpired(DbIndex db_indx);
const DbTableArray& databases() const {
return db_arr_;
}
private:
void CreateDb(DbIndex index);
@ -245,7 +249,7 @@ class DbSlice {
mutable SliceEvents events_; // we may change this even for const operations.
std::vector<boost::intrusive_ptr<DbTable>> db_arr_;
DbTableArray db_arr_;
// Used in temporary computations in Acquire/Release.
absl::flat_hash_set<std::string_view> uniq_keys_;

View File

@ -3,6 +3,7 @@
//
#include "server/debugcmd.h"
#include <absl/cleanup/cleanup.h>
#include <absl/strings/str_cat.h>
#include <boost/fiber/operations.hpp>
@ -150,22 +151,20 @@ void DebugCmd::Reload(CmdArgList args) {
}
}
string last_save_file = sf_.LastSaveFile();
string last_save_file = sf_.GetLastSaveInfo()->file_name;
Load(last_save_file);
}
void DebugCmd::Load(std::string_view filename) {
auto [current, switched] = sf_.global_state()->Next(GlobalState::LOADING);
if (!switched) {
LOG(WARNING) << GlobalState::Name(current) << " in progress, ignored";
GlobalState new_state = sf_.service().SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
if (new_state != GlobalState::LOADING) {
LOG(WARNING) << GlobalStateName(new_state) << " in progress, ignored";
return;
}
ProactorPool& pp = sf_.service().proactor_pool();
pp.Await([&](ProactorBase*) {
CHECK(ServerState::tlocal()->gstate() == GlobalState::IDLE);
ServerState::tlocal()->set_gstate(GlobalState::LOADING);
});
absl::Cleanup rev_state = [this] {
sf_.service().SwitchState(GlobalState::SAVING, GlobalState::ACTIVE);
};
const CommandId* cid = sf_.service().FindCmd("FLUSHALL");
intrusive_ptr<Transaction> flush_trans(new Transaction{cid});

View File

@ -1,54 +0,0 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <atomic>
#include <utility>
namespace dfly {
// Switches from IDLE state to any of the non-idle states.
// Switched from non-idle to IDLE.
// Refuses switching from non-idle to another non-idle state directly.
//
class GlobalState {
public:
enum S : uint8_t {
IDLE,
LOADING,
SAVING,
SHUTTING_DOWN,
};
GlobalState(S s = IDLE) : s_(s) {
}
const char* Name() const {
return Name(s_.load(std::memory_order_relaxed));
}
static const char* Name(S s);
std::pair<S, bool> Next(S s) {
S current{IDLE};
bool res = s_.compare_exchange_strong(current, s, std::memory_order_acq_rel);
return std::make_pair(current, res);
}
// Switches to IDLE and returns the previous state.
S Clear() {
return s_.exchange(IDLE, std::memory_order_acq_rel);
}
// Returns the current state.
S Load() const {
return s_.load(std::memory_order_acq_rel);
}
private:
std::atomic<S> s_;
};
} // namespace dfly

View File

@ -319,9 +319,7 @@ void Service::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_i
const InitOpts& opts) {
InitRedisTables();
pp_.AwaitFiberOnAll([&](uint32_t index, ProactorBase* pb) {
ServerState::tlocal()->Init();
});
pp_.AwaitFiberOnAll([&](uint32_t index, ProactorBase* pb) { ServerState::tlocal()->Init(); });
uint32_t shard_num = pp_.size() > 1 ? pp_.size() - 1 : pp_.size();
shard_set->Init(shard_num, !opts.disable_time_update);
@ -336,17 +334,13 @@ void Service::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_i
void Service::Shutdown() {
VLOG(1) << "Service::Shutdown";
auto [current, switched] = server_family_.global_state()->Next(GlobalState::SHUTTING_DOWN);
// TODO: to introduce BlockingNext that waits until the state is switched to idle.
CHECK(switched) << "TBD " << GlobalState::Name(current);
// We mark that we are shuttind down. After this incoming requests will be
// rejected
pp_.AwaitFiberOnAll([](ProactorBase* pb) { ServerState::tlocal()->Shutdown(); });
engine_varz.reset();
request_latency_usec.Shutdown();
// We mark that we are shuttind down.
pp_.AwaitFiberOnAll([](ProactorBase* pb) { ServerState::tlocal()->Shutdown(); });
// to shutdown all the runtime components that depend on EngineShard.
server_family_.Shutdown();
StringFamily::Shutdown();
@ -386,14 +380,14 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
if (cid == nullptr) {
(*cntx)->SendError(absl::StrCat("unknown command `", cmd_str, "`"), "unknown_cmd");
lock_guard lk(stats_mu_);
lock_guard lk(mu_);
if (unknown_cmds_.size() < 1024)
unknown_cmds_[cmd_str]++;
return;
}
if (etl.gstate() == GlobalState::LOADING || etl.gstate() == GlobalState::SHUTTING_DOWN) {
string err = absl::StrCat("Can not execute during ", GlobalState::Name(etl.gstate()));
string err = absl::StrCat("Can not execute during ", GlobalStateName(etl.gstate()));
(*cntx)->SendError(err);
return;
}
@ -664,7 +658,7 @@ bool Service::IsPassProtected() const {
}
absl::flat_hash_map<std::string, unsigned> Service::UknownCmdMap() const {
lock_guard lk(stats_mu_);
lock_guard lk(mu_);
return unknown_cmds_;
}
@ -990,6 +984,16 @@ VarzValue::Map Service::GetVarzStats() {
return res;
}
GlobalState Service::SwitchState(GlobalState from, GlobalState to) {
lock_guard lk(mu_);
if (global_state_ != from)
return global_state_;
global_state_ = to;
pp_.Await([&](ProactorBase*) { ServerState::tlocal()->set_gstate(to); });
return to;
}
using ServiceFunc = void (Service::*)(CmdArgList, ConnectionContext* cntx);
#define HFUNC(x) SetHandler(&Service::x)

View File

@ -71,6 +71,16 @@ class Service : public facade::ServiceInterface {
return server_family_.script_mgr();
}
ServerFamily& server_family() {
return server_family_;
}
// Returns: the new state.
// if from equals the old state then the switch is performed "to" is returned.
// Otherwise, does not switch and returns the current state in the system.
// Upon switch, updates cached global state in threadlocal ServerState struct.
GlobalState SwitchState(GlobalState from , GlobalState to);
private:
static void Quit(CmdArgList args, ConnectionContext* cntx);
static void Multi(CmdArgList args, ConnectionContext* cntx);
@ -100,9 +110,10 @@ class Service : public facade::ServiceInterface {
ServerFamily server_family_;
CommandRegistry registry_;
mutable ::boost::fibers::mutex stats_mu_;
absl::flat_hash_map<std::string, unsigned> unknown_cmds_;
mutable ::boost::fibers::mutex mu_;
GlobalState global_state_ = GlobalState::ACTIVE; // protected by mu_;
};
} // namespace dfly

View File

@ -505,10 +505,10 @@ struct RdbSaver::Impl {
}
};
RdbSaver::RdbSaver(EngineShardSet* ess, ::io::Sink* sink) : ess_(ess), sink_(sink) {
RdbSaver::RdbSaver(::io::Sink* sink) : sink_(sink) {
CHECK_NOTNULL(sink_);
impl_.reset(new Impl(ess->size()));
impl_.reset(new Impl(shard_set->size()));
impl_->serializer.set_sink(sink_);
}
@ -571,10 +571,10 @@ error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) {
}
void RdbSaver::StartSnapshotInShard(EngineShard* shard) {
auto pair = shard->db_slice().GetTables(0);
auto s = make_unique<SliceSnapshot>(pair.first, pair.second, &impl_->channel);
DbTableArray databases = shard->db_slice().databases();
auto s = make_unique<SliceSnapshot>(std::move(databases), &shard->db_slice(), &impl_->channel);
s->Start(&shard->db_slice());
s->Start();
impl_->shard_snapshots[shard->shard_id()] = move(s);
}

View File

@ -22,7 +22,7 @@ class EngineShard;
using RdbTypeFreqMap = absl::flat_hash_map<unsigned, size_t>;
class RdbSaver {
public:
RdbSaver(EngineShardSet* ess, ::io::Sink* sink);
explicit RdbSaver(::io::Sink* sink);
~RdbSaver();
std::error_code SaveHeader();
@ -44,7 +44,6 @@ class RdbSaver {
std::error_code SaveAuxFieldStrStr(std::string_view key, std::string_view val);
std::error_code SaveAuxFieldStrInt(std::string_view key, int64_t val);
EngineShardSet* ess_;
::io::Sink* sink_;
std::unique_ptr<Impl> impl_;
};

View File

@ -23,6 +23,7 @@ using namespace testing;
using namespace std;
using namespace util;
using namespace facade;
using absl::StrCat;
DECLARE_int32(list_compress_depth);
DECLARE_int32(list_max_listpack_size);
@ -146,8 +147,39 @@ TEST_F(RdbTest, ReloadTtl) {
}
TEST_F(RdbTest, SaveFlush) {
Run({"debug", "populate", "1000000"});
Run({"debug", "populate", "500000"});
auto save_fb = pp_->at(1)->LaunchFiber([&] {
RespExpr resp = Run({"save"});
ASSERT_EQ(resp, "OK");
});
usleep(1000);
Run({"flushdb"});
save_fb.join();
auto save_info = service_->server_family().GetLastSaveInfo();
ASSERT_EQ(1, save_info->freq_map.size());
auto& k_v = save_info->freq_map.front();
EXPECT_EQ("string", k_v.first);
EXPECT_EQ(500000, k_v.second);
}
TEST_F(RdbTest, SaveManyDbs) {
Run({"debug", "populate", "50000"});
pp_->at(1)->Await([&] {
Run({"select", "1"});
Run({"debug", "populate", "10000"});
});
auto metrics = service_->server_family().GetMetrics();
ASSERT_EQ(2, metrics.db.size());
EXPECT_EQ(50000, metrics.db[0].key_count);
EXPECT_EQ(10000, metrics.db[1].key_count);
Run({"save"});
auto save_info = service_->server_family().GetLastSaveInfo();
ASSERT_EQ(1, save_info->freq_map.size());
auto& k_v = save_info->freq_map.front();
EXPECT_EQ("string", k_v.first);
// EXPECT_EQ(60000, k_v.second);
}
} // namespace dfly

View File

@ -114,7 +114,8 @@ string InferLoadFile(fs::path data_dir) {
ServerFamily::ServerFamily(Service* service) : service_(*service) {
start_time_ = time(NULL);
lsinfo_.save_time = start_time_;
lsinfo_ = make_shared<LastSaveInfo>();
lsinfo_->save_time = start_time_;
script_mgr_.reset(new ScriptMgr());
}
@ -187,12 +188,14 @@ void ServerFamily::Load(const std::string& load_path) {
}
LOG(INFO) << "Loading " << load_path;
auto [current, switched] = global_state()->Next(GlobalState::LOADING);
if (!switched) {
LOG(WARNING) << GlobalState::Name(current) << " in progress, ignored";
GlobalState new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
if (new_state != GlobalState::LOADING) {
LOG(WARNING) << GlobalStateName(new_state) << " in progress, ignored";
return;
}
#if 0
auto& pool = service_.proactor_pool();
// Deliberitely run on all I/O threads to update the state for non-shard threads as well.
@ -201,7 +204,9 @@ void ServerFamily::Load(const std::string& load_path) {
CHECK(ServerState::tlocal()->gstate() == GlobalState::IDLE);
ServerState::tlocal()->set_gstate(GlobalState::LOADING);
});
#endif
auto& pool = service_.proactor_pool();
// Choose thread that does not handle shards if possible.
// This will balance out the CPU during the load.
ProactorBase* proactor =
@ -226,9 +231,7 @@ error_code ServerFamily::LoadRdb(const std::string& rdb_file) {
ec = res.error();
}
auto& pool = service_.proactor_pool();
pool.Await([](auto*) { ServerState::tlocal()->set_gstate(GlobalState::IDLE); });
global_state()->Clear();
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
return ec;
}
@ -280,14 +283,6 @@ void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext*
}
error_code ServerFamily::DoSave(Transaction* trans, string* err_details) {
auto [current, switched] = global_state_.Next(GlobalState::SAVING);
if (!switched) {
*err_details = StrCat(GlobalState::Name(current), " - can not save database");
return make_error_code(errc::operation_in_progress);
}
absl::Cleanup rev_state = [this] { global_state_.Clear(); };
fs::path dir_path(FLAGS_dir);
error_code ec;
@ -309,22 +304,31 @@ error_code ServerFamily::DoSave(Transaction* trans, string* err_details) {
}
path += filename;
VLOG(1) << "Saving to " << path;
GlobalState new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::SAVING);
if (new_state != GlobalState::SAVING) {
*err_details = StrCat(GlobalStateName(new_state), " - can not save database");
return make_error_code(errc::operation_in_progress);
}
absl::Cleanup rev_state = [this] {
service_.SwitchState(GlobalState::SAVING, GlobalState::ACTIVE);
};
auto res = uring::OpenWrite(path.generic_string());
if (!res) {
return res.error();
}
auto& pool = service_.proactor_pool();
pool.Await([](auto*) { ServerState::tlocal()->set_gstate(GlobalState::SAVING); });
unique_ptr<::io::WriteFile> wf(*res);
VLOG(1) << "Saving to " << path;
auto start = absl::Now();
RdbSaver saver{shard_set, wf.get()};
RdbSaver saver{wf.get()};
RdbTypeFreqMap freq_map;
shared_ptr<LastSaveInfo> save_info;
ec = saver.SaveHeader();
if (!ec) {
auto cb = [&saver](Transaction* t, EngineShard* shard) {
saver.StartSnapshotInShard(shard);
@ -340,17 +344,12 @@ error_code ServerFamily::DoSave(Transaction* trans, string* err_details) {
for (const auto& k_v : freq_map) {
tmp_map[RdbTypeName(k_v.first)] += k_v.second;
}
lock_guard lk(save_mu_);
lsinfo_.freq_map.clear();
save_info = make_shared<LastSaveInfo>();
for (const auto& k_v : tmp_map) {
lsinfo_.freq_map.emplace_back(k_v);
save_info->freq_map.emplace_back(k_v);
}
}
pool.Await([](auto*) { ServerState::tlocal()->set_gstate(GlobalState::IDLE); });
CHECK_EQ(GlobalState::SAVING, global_state_.Clear());
absl::Duration dur = absl::Now() - start;
double seconds = double(absl::ToInt64Milliseconds(dur)) / 1000;
LOG(INFO) << "Saving " << path << " finished after "
@ -362,9 +361,12 @@ error_code ServerFamily::DoSave(Transaction* trans, string* err_details) {
ec = close_ec;
if (!ec) {
save_info->save_time = time(NULL);
save_info->file_name = path.generic_string();
lock_guard lk(save_mu_);
lsinfo_.save_time = time(NULL);
lsinfo_.file_name = path.generic_string();
// swap - to deallocate the old version outstide of the lock.
lsinfo_.swap(save_info);
}
return ec;
@ -383,9 +385,9 @@ error_code ServerFamily::DoFlush(Transaction* transaction, DbIndex db_ind) {
return error_code{};
}
string ServerFamily::LastSaveFile() const {
shared_ptr<const LastSaveInfo> ServerFamily::GetLastSaveInfo() const {
lock_guard lk(save_mu_);
return lsinfo_.file_name;
return lsinfo_;
}
void ServerFamily::DbSize(CmdArgList args, ConnectionContext* cntx) {
@ -700,18 +702,14 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
if (should_enter("PERSISTENCE", true)) {
ADD_HEADER("# PERSISTENCE");
time_t last_save;
string last_filename;
decltype(lsinfo_.freq_map) rdb_counts;
decltype(lsinfo_) save_info;
{
lock_guard lk(save_mu_);
last_save = lsinfo_.save_time;
last_filename = lsinfo_.file_name;
rdb_counts = lsinfo_.freq_map;
save_info = lsinfo_;
}
append("last_save", last_save);
append("last_save_file", last_filename);
for (const auto& k_v : rdb_counts) {
append("last_save", save_info->save_time);
append("last_save_file", save_info->file_name);
for (const auto& k_v : save_info->freq_map) {
append(StrCat("rdb_", k_v.first), k_v.second);
}
}
@ -863,8 +861,12 @@ void ServerFamily::Psync(CmdArgList args, ConnectionContext* cntx) {
}
void ServerFamily::LastSave(CmdArgList args, ConnectionContext* cntx) {
lock_guard lk(save_mu_);
(*cntx)->SendLong(lsinfo_.save_time);
time_t save_time;
{
lock_guard lk(save_mu_);
save_time = lsinfo_->save_time;
}
(*cntx)->SendLong(save_time);
}
void ServerFamily::Latency(CmdArgList args, ConnectionContext* cntx) {

View File

@ -7,7 +7,6 @@
#include "facade/conn_context.h"
#include "facade/redis_parser.h"
#include "server/engine_shard_set.h"
#include "server/global_state.h"
#include "util/proactor_pool.h"
namespace util {
@ -39,6 +38,12 @@ struct Metrics {
facade::ConnectionStats conn_stats;
};
struct LastSaveInfo {
time_t save_time; // epoch time in seconds.
std::string file_name; //
std::vector<std::pair<std::string_view, size_t>> freq_map; // RDB_TYPE_xxx -> count mapping.
};
class ServerFamily {
public:
ServerFamily(Service* service);
@ -54,10 +59,6 @@ class ServerFamily {
Metrics GetMetrics() const;
GlobalState* global_state() {
return &global_state_;
}
ScriptMgr* script_mgr() {
return script_mgr_.get();
}
@ -67,10 +68,9 @@ class ServerFamily {
std::error_code DoSave(Transaction* transaction, std::string* err_details);
std::error_code DoFlush(Transaction* transaction, DbIndex db_ind);
std::string LastSaveFile() const;
std::shared_ptr<const LastSaveInfo> GetLastSaveInfo() const;
std::error_code LoadRdb(const std::string& rdb_file);
private:
uint32_t shard_count() const {
return shard_set->size();
@ -116,16 +116,9 @@ class ServerFamily {
std::unique_ptr<ScriptMgr> script_mgr_;
GlobalState global_state_;
time_t start_time_ = 0; // in seconds, epoch time.
struct LastSaveInfo {
time_t save_time; // epoch time in seconds.
std::string file_name; //
std::vector<std::pair<std::string_view, size_t>> freq_map; // RDB_TYPE_xxx -> count mapping.
};
LastSaveInfo lsinfo_;
std::shared_ptr<LastSaveInfo> lsinfo_; // protected by save_mu_;
};
} // namespace dfly

View File

@ -9,7 +9,6 @@
#include "core/interpreter.h"
#include "server/common.h"
#include "server/global_state.h"
#include "util/sliding_counter.h"
typedef struct mi_heap_s mi_heap_t;
@ -56,10 +55,10 @@ class ServerState { // public struct - to allow initialization.
return live_transactions_;
}
GlobalState::S gstate() const {
GlobalState gstate() const {
return gstate_;
}
void set_gstate(GlobalState::S s) {
void set_gstate(GlobalState s) {
gstate_ = s;
}
@ -84,7 +83,7 @@ class ServerState { // public struct - to allow initialization.
mi_heap_t* data_heap_;
std::optional<Interpreter> interpreter_;
GlobalState::S gstate_ = GlobalState::IDLE;
GlobalState gstate_ = GlobalState::ACTIVE;
using Counter = util::SlidingCounter<7>;
Counter qps_;

View File

@ -25,30 +25,29 @@ using namespace chrono_literals;
namespace this_fiber = ::boost::this_fiber;
using boost::fibers::fiber;
SliceSnapshot::SliceSnapshot(PrimeTable* prime, ExpireTable* et, StringChannel* dest)
: prime_table_(prime), expire_tbl_(et), dest_(dest) {
SliceSnapshot::SliceSnapshot(DbTableArray db_array, DbSlice* slice, StringChannel* dest)
: db_array_(db_array), db_slice_(slice), dest_(dest) {
}
SliceSnapshot::~SliceSnapshot() {
}
void SliceSnapshot::Start(DbSlice* slice) {
void SliceSnapshot::Start() {
DCHECK(!fb_.joinable());
db_slice_ = slice;
auto on_change = [this](DbIndex db_index, const DbSlice::ChangeReq& req) {
OnDbChange(db_index, req);
};
snapshot_version_ = slice->RegisterOnChange(move(on_change));
snapshot_version_ = db_slice_->RegisterOnChange(move(on_change));
VLOG(1) << "DbSaver::Start - saving entries with version less than " << snapshot_version_;
sfile_.reset(new io::StringFile);
rdb_serializer_.reset(new RdbSerializer(sfile_.get()));
fb_ = fiber([slice, this] {
fb_ = fiber([this] {
FiberFunc();
slice->UnregisterOnChange(snapshot_version_);
db_slice_->UnregisterOnChange(snapshot_version_);
});
}
@ -56,11 +55,12 @@ void SliceSnapshot::Join() {
fb_.join();
}
void SliceSnapshot::SerializeSingleEntry(const PrimeKey& pk, const PrimeValue& pv) {
void SliceSnapshot::SerializeSingleEntry(DbIndex db_indx, const PrimeKey& pk,
const PrimeValue& pv) {
time_t expire_time = 0;
if (pv.HasExpire()) {
auto eit = expire_tbl_->Find(pk);
auto eit = db_array_[db_indx]->expire.Find(pk);
expire_time = db_slice_->ExpireTime(eit);
}
@ -74,32 +74,38 @@ void SliceSnapshot::FiberFunc() {
absl::StrCat("SliceSnapshot", ProactorBase::GetIndex()));
PrimeTable::cursor cursor;
VLOG(1) << "Start traversing " << prime_table_->size() << " items";
for (DbIndex db_indx = 0; db_indx < 1; ++db_indx) {
if (!db_array_[db_indx])
continue;
PrimeTable* pt = &db_array_[db_indx]->prime;
VLOG(1) << "Start traversing " << pt->size() << " items";
uint64_t last_yield = 0;
do {
// Traverse a single logical bucket but do not update its versions.
// we can not update a version because entries in the same bucket share part of the version.
// Therefore we save first, and then update version in one atomic swipe.
PrimeTable::cursor next =
prime_table_->Traverse(cursor, [this](auto it) { this->SaveCb(move(it)); });
uint64_t last_yield = 0;
savecb_current_db_ = db_indx;
cursor = next;
do {
// Traverse a single logical bucket but do not update its versions.
// we can not update a version because entries in the same bucket share part of the version.
// Therefore we save first, and then update version in one atomic swipe.
PrimeTable::cursor next = pt->Traverse(cursor, [this](auto it) { this->SaveCb(move(it)); });
// Flush if needed.
FlushSfile(false);
if (serialized_ >= last_yield + 100) {
DVLOG(2) << "Before sleep " << this_fiber::properties<FiberProps>().name();
this_fiber::yield();
last_yield = serialized_;
DVLOG(2) << "After sleep";
// flush in case other fibers (writes commands that pushed previous values) filled the file.
cursor = next;
// Flush if needed.
FlushSfile(false);
}
} while (cursor);
if (serialized_ >= last_yield + 100) {
DVLOG(2) << "Before sleep " << this_fiber::properties<FiberProps>().name();
this_fiber::yield();
last_yield = serialized_;
DVLOG(2) << "After sleep";
// flush in case other fibers (writes commands that pushed previous values) filled the file.
FlushSfile(false);
}
} while (cursor);
DVLOG(2) << "after loop " << this_fiber::properties<FiberProps>().name();
FlushSfile(true);
DVLOG(2) << "after loop " << this_fiber::properties<FiberProps>().name();
FlushSfile(true);
}
dest_->StartClosing();
VLOG(1) << "Exit SnapshotSerializer (serialized/side_saved/cbcalls): " << serialized_ << "/"
@ -159,7 +165,7 @@ bool SliceSnapshot::SaveCb(PrimeIterator it) {
return false;
}
++serialized_ += SerializePhysicalBucket(prime_table_, it);
++serialized_ += SerializePhysicalBucket(savecb_current_db_, it);
return false;
}
@ -169,18 +175,18 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
if (const PrimeTable::bucket_iterator* bit = req.update()) {
if (bit->GetVersion() < snapshot_version_) {
side_saved_ += SerializePhysicalBucket(table, *bit);
side_saved_ += SerializePhysicalBucket(db_index, *bit);
}
} else {
string_view key = get<string_view>(req.change);
table->CVCUponInsert(snapshot_version_, key, [this, table](PrimeTable::bucket_iterator it) {
table->CVCUponInsert(snapshot_version_, key, [this, db_index](PrimeTable::bucket_iterator it) {
DCHECK_LT(it.GetVersion(), snapshot_version_);
side_saved_ += SerializePhysicalBucket(table, it);
side_saved_ += SerializePhysicalBucket(db_index, it);
});
}
}
unsigned SliceSnapshot::SerializePhysicalBucket(PrimeTable* table, PrimeTable::bucket_iterator it) {
unsigned SliceSnapshot::SerializePhysicalBucket(DbIndex db_index, PrimeTable::bucket_iterator it) {
DCHECK_LT(it.GetVersion(), snapshot_version_);
// traverse physical bucket and write it into string file.
@ -189,7 +195,7 @@ unsigned SliceSnapshot::SerializePhysicalBucket(PrimeTable* table, PrimeTable::b
string tmp;
while (!it.is_done()) {
++result;
SerializeSingleEntry(it->first, it->second);
SerializeSingleEntry(db_index, it->first, it->second);
++it;
}

View File

@ -20,10 +20,10 @@ class SliceSnapshot {
using StringChannel =
::util::fibers_ext::SimpleChannel<std::string, base::mpmc_bounded_queue<std::string>>;
SliceSnapshot(PrimeTable* prime, ExpireTable* et, StringChannel* dest);
SliceSnapshot(DbTableArray db_array, DbSlice* slice, StringChannel* dest);
~SliceSnapshot();
void Start(DbSlice* slice);
void Start();
void Join();
uint64_t snapshot_version() const {
@ -34,25 +34,26 @@ class SliceSnapshot {
private:
void FiberFunc();
bool FlushSfile(bool force);
void SerializeSingleEntry(const PrimeKey& pk, const PrimeValue& pv);
void SerializeSingleEntry(DbIndex db_index, const PrimeKey& pk, const PrimeValue& pv);
bool SaveCb(PrimeIterator it);
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
// Returns number of entries serialized.
// Updates the version of the bucket to snapshot version.
unsigned SerializePhysicalBucket(PrimeTable* table, PrimeTable::bucket_iterator it);
unsigned SerializePhysicalBucket(DbIndex db_index, PrimeTable::bucket_iterator it);
::boost::fibers::fiber fb_;
DbTableArray db_array_;
std::unique_ptr<io::StringFile> sfile_;
std::unique_ptr<RdbSerializer> rdb_serializer_;
// version upper bound for entries that should be saved (not included).
uint64_t snapshot_version_ = 0;
PrimeTable* prime_table_;
ExpireTable* expire_tbl_;
DbSlice* db_slice_ = nullptr;
DbSlice* db_slice_;
DbIndex savecb_current_db_; // used by SaveCb
StringChannel* dest_;
size_t serialized_ = 0, skipped_ = 0, side_saved_ = 0, savecb_calls_ = 0;

View File

@ -71,4 +71,6 @@ struct DbTable : boost::intrusive_ref_counter<DbTable, boost::thread_unsafe_coun
void Release(IntentLock::Mode mode, std::string_view key, unsigned count);
};
using DbTableArray = std::vector<boost::intrusive_ptr<DbTable>>;
} // namespace dfly