Implement DEBUG RELOAD option.

Fix c++ and openssl deprecation warnings.
This commit is contained in:
Roman Gershman 2022-04-12 08:34:48 +03:00
parent d2b6907f2d
commit 7f8346cded
9 changed files with 219 additions and 145 deletions

View File

@ -375,10 +375,11 @@ DashTable<_Key, _Value, Policy>::~DashTable() {
Clear(); Clear();
auto* resource = segment_.get_allocator().resource(); auto* resource = segment_.get_allocator().resource();
std::pmr::polymorphic_allocator<SegmentType> pa(resource); std::pmr::polymorphic_allocator<SegmentType> pa(resource);
using alloc_traits = std::allocator_traits<decltype(pa)>;
IterateUnique([&](SegmentType* seg) { IterateUnique([&](SegmentType* seg) {
pa.destroy(seg); alloc_traits::destroy(pa, seg);
pa.deallocate(seg, 1); alloc_traits::deallocate(pa, seg, 1);
return false; return false;
}); });
} }
@ -438,6 +439,8 @@ void DashTable<_Key, _Value, Policy>::Clear() {
**********/ **********/
if (global_depth_ > initial_depth_) { if (global_depth_ > initial_depth_) {
std::pmr::polymorphic_allocator<SegmentType> pa(segment_.get_allocator()); std::pmr::polymorphic_allocator<SegmentType> pa(segment_.get_allocator());
using alloc_traits = std::allocator_traits<decltype(pa)>;
size_t dest = 0, src = 0; size_t dest = 0, src = 0;
size_t new_size = (1 << initial_depth_); size_t new_size = (1 << initial_depth_);
@ -448,8 +451,8 @@ void DashTable<_Key, _Value, Policy>::Clear() {
seg->set_local_depth(initial_depth_); seg->set_local_depth(initial_depth_);
segment_[dest++] = seg; segment_[dest++] = seg;
} else { } else {
pa.destroy(seg); alloc_traits::destroy(pa, seg);
pa.deallocate(seg, 1); alloc_traits::deallocate(pa, seg, 1);
} }
src = next_src; src = next_src;

View File

@ -5,7 +5,7 @@
#include "core/interpreter.h" #include "core/interpreter.h"
#include <absl/strings/str_cat.h> #include <absl/strings/str_cat.h>
#include <openssl/sha.h> #include <openssl/evp.h>
#include <cstring> #include <cstring>
#include <optional> #include <optional>
@ -25,6 +25,16 @@ using namespace std;
namespace { namespace {
// EVP_Q_digest is not present in the older versions of OpenSSL.
int EVPDigest(const void* data, size_t datalen, unsigned char* md, size_t* mdlen) {
unsigned int temp = 0;
int ret = EVP_Digest(data, datalen, md, &temp, EVP_sha1(), NULL);
if (mdlen != NULL)
*mdlen = temp;
return ret;
}
class RedisTranslator : public ObjectExplorer { class RedisTranslator : public ObjectExplorer {
public: public:
RedisTranslator(lua_State* lua) : lua_(lua) { RedisTranslator(lua_State* lua) : lua_(lua) {
@ -155,7 +165,6 @@ void SetGlobalArrayInternal(lua_State* lua, const char* name, MutSliceSpan args)
lua_setglobal(lua, name); lua_setglobal(lua, name);
} }
/* This function is used in order to push an error on the Lua stack in the /* This function is used in order to push an error on the Lua stack in the
* format used by redis.pcall to return errors, which is a lua table * format used by redis.pcall to return errors, which is a lua table
* with a single "err" field set to the error string. Note that this * with a single "err" field set to the error string. Note that this
@ -247,6 +256,7 @@ debug = nil
lua_setglobal(lua, "dofile"); lua_setglobal(lua, "dofile");
} }
// dest must have at least 41 chars.
void ToHex(const uint8_t* src, char* dest) { void ToHex(const uint8_t* src, char* dest) {
const char cset[] = "0123456789abcdef"; const char cset[] = "0123456789abcdef";
for (size_t j = 0; j < 20; j++) { for (size_t j = 0; j < 20; j++) {
@ -266,16 +276,13 @@ int RedisSha1Command(lua_State* lua) {
size_t len; size_t len;
const char* s = lua_tolstring(lua, 1, &len); const char* s = lua_tolstring(lua, 1, &len);
SHA_CTX ctx; uint8_t digest[EVP_MAX_MD_SIZE];
uint8_t buf[20]; EVPDigest(s, len, digest, NULL);
char digest[41];
SHA1_Init(&ctx); char hex[41];
SHA1_Update(&ctx, s, len); ToHex(digest, hex);
SHA1_Final(buf, &ctx);
ToHex(buf, digest);
lua_pushstring(lua, digest); lua_pushstring(lua, hex);
return 1; return 1;
} }
@ -353,13 +360,10 @@ Interpreter::~Interpreter() {
} }
void Interpreter::FuncSha1(string_view body, char* fp) { void Interpreter::FuncSha1(string_view body, char* fp) {
SHA_CTX ctx; uint8_t digest[EVP_MAX_MD_SIZE];
uint8_t buf[20]; EVPDigest(body.data(), body.size(), digest, NULL);
SHA1_Init(&ctx); ToHex(digest, fp);
SHA1_Update(&ctx, body.data(), body.size());
SHA1_Final(buf, &ctx);
ToHex(buf, fp);
} }
auto Interpreter::AddFunction(string_view body, string* result) -> AddResult { auto Interpreter::AddFunction(string_view body, string* result) -> AddResult {
@ -500,7 +504,7 @@ bool Interpreter::IsTableSafe() const {
i = 0; i = 0;
len = lua_rawlen(lua_, -1); len = lua_rawlen(lua_, -1);
} else { } else {
lua_pop(lua_, 1); // pop table element lua_pop(lua_, 1); // pop table element
++i; ++i;
} }
} }

View File

@ -11,8 +11,10 @@
#include "base/logging.h" #include "base/logging.h"
#include "server/engine_shard_set.h" #include "server/engine_shard_set.h"
#include "server/error.h" #include "server/error.h"
#include "server/main_service.h"
#include "server/rdb_load.h" #include "server/rdb_load.h"
#include "server/string_family.h" #include "server/string_family.h"
#include "server/transaction.h"
#include "util/uring/uring_fiber_algo.h" #include "util/uring/uring_fiber_algo.h"
#include "util/uring/uring_file.h" #include "util/uring/uring_file.h"
@ -24,6 +26,7 @@ namespace dfly {
using namespace std; using namespace std;
using namespace util; using namespace util;
namespace this_fiber = ::boost::this_fiber; namespace this_fiber = ::boost::this_fiber;
using boost::intrusive_ptr;
using boost::fibers::fiber; using boost::fibers::fiber;
using namespace facade; using namespace facade;
namespace fs = std::filesystem; namespace fs = std::filesystem;
@ -52,7 +55,7 @@ void DoPopulateBatch(std::string_view prefix, size_t val_size, const SetCmd::Set
} }
} }
DebugCmd::DebugCmd(EngineShardSet* ess, ConnectionContext* cntx) : ess_(ess), cntx_(cntx) { DebugCmd::DebugCmd(ServerFamily* owner, ConnectionContext* cntx) : sf_(*owner), cntx_(cntx) {
} }
void DebugCmd::Run(CmdArgList args) { void DebugCmd::Run(CmdArgList args) {
@ -99,6 +102,7 @@ void DebugCmd::Run(CmdArgList args) {
void DebugCmd::Reload(CmdArgList args) { void DebugCmd::Reload(CmdArgList args) {
bool save = true; bool save = true;
for (size_t i = 2; i < args.size(); ++i) { for (size_t i = 2; i < args.size(); ++i) {
ToUpper(&args[i]); ToUpper(&args[i]);
std::string_view opt = ArgS(args, i); std::string_view opt = ArgS(args, i);
@ -110,18 +114,38 @@ void DebugCmd::Reload(CmdArgList args) {
return (*cntx_)->SendError("DEBUG RELOAD only supports the NOSAVE options."); return (*cntx_)->SendError("DEBUG RELOAD only supports the NOSAVE options.");
} }
} }
error_code ec;
if (save) { if (save) {
return (*cntx_)->SendError("NOSAVE required (TBD)."); string err_details;
const CommandId* cid = sf_.service().FindCmd("SAVE");
CHECK_NOTNULL(cid);
intrusive_ptr<Transaction> trans(new Transaction{cid, &sf_.service().shard_set()});
trans->InitByArgs(0, {});
ec = sf_.DoSave(trans.get(), &err_details);
if (ec) {
return (*cntx_)->SendError(absl::StrCat(err_details, ec.message()));
}
} }
if (FLAGS_dbfilename.empty()) { const CommandId* cid = sf_.service().FindCmd("FLUSHALL");
return (*cntx_)->SendError("dbfilename is not set"); intrusive_ptr<Transaction> flush_trans(new Transaction{cid, &sf_.service().shard_set()});
flush_trans->InitByArgs(0, {});
ec = sf_.DoFlush(flush_trans.get(), DbSlice::kDbAll);
if (ec) {
LOG(ERROR) << "Error flushing db " << ec.message();
} }
fs::path dir_path(FLAGS_dir); string last_save_file = sf_.LastSaveFile();
string filename = FLAGS_dbfilename; fs::path path(last_save_file);
fs::path path = dir_path;
path.append(filename); if (last_save_file.empty()) {
fs::path dir_path(FLAGS_dir);
string filename = FLAGS_dbfilename;
dir_path.append(filename);
path = dir_path;
}
auto res = uring::OpenRead(path.generic_string()); auto res = uring::OpenRead(path.generic_string());
if (!res) { if (!res) {
@ -132,7 +156,7 @@ void DebugCmd::Reload(CmdArgList args) {
io::FileSource fs(*res); io::FileSource fs(*res);
RdbLoader loader; RdbLoader loader;
error_code ec = loader.Load(&fs); ec = loader.Load(&fs);
if (ec) { if (ec) {
(*cntx_)->SendError(ec.message()); (*cntx_)->SendError(ec.message());
@ -162,7 +186,8 @@ void DebugCmd::Populate(CmdArgList args) {
return (*cntx_)->SendError(kUintErr); return (*cntx_)->SendError(kUintErr);
} }
size_t runners_count = ess_->pool()->size(); ProactorPool& pp = sf_.service().proactor_pool();
size_t runners_count = pp.size();
vector<pair<uint64_t, uint64_t>> ranges(runners_count - 1); vector<pair<uint64_t, uint64_t>> ranges(runners_count - 1);
uint64_t batch_size = total_count / runners_count; uint64_t batch_size = total_count / runners_count;
size_t from = 0; size_t from = 0;
@ -178,8 +203,8 @@ void DebugCmd::Populate(CmdArgList args) {
auto range = ranges[i]; auto range = ranges[i];
// whatever we do, we should not capture i by reference. // whatever we do, we should not capture i by reference.
fb_arr[i] = ess_->pool()->at(i)->LaunchFiber( fb_arr[i] = pp.at(i)->LaunchFiber(
[=] { this->PopulateRangeFiber(range.first, range.second, prefix, val_size); }); [=, this] { this->PopulateRangeFiber(range.first, range.second, prefix, val_size); });
} }
for (auto& fb : fb_arr) for (auto& fb : fb_arr)
fb.join(); fb.join();
@ -195,18 +220,19 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view
string key = absl::StrCat(prefix, ":"); string key = absl::StrCat(prefix, ":");
size_t prefsize = key.size(); size_t prefsize = key.size();
DbIndex db_indx = cntx_->db_index(); DbIndex db_indx = cntx_->db_index();
std::vector<PopulateBatch> ps(ess_->size(), PopulateBatch{db_indx}); EngineShardSet& ess = sf_.service().shard_set();
std::vector<PopulateBatch> ps(ess.size(), PopulateBatch{db_indx});
SetCmd::SetParams params{db_indx}; SetCmd::SetParams params{db_indx};
for (uint64_t i = from; i < from + len; ++i) { for (uint64_t i = from; i < from + len; ++i) {
absl::StrAppend(&key, i); absl::StrAppend(&key, i);
ShardId sid = Shard(key, ess_->size()); ShardId sid = Shard(key, ess.size());
key.resize(prefsize); key.resize(prefsize);
auto& pops = ps[sid]; auto& pops = ps[sid];
pops.index[pops.sz++] = i; pops.index[pops.sz++] = i;
if (pops.sz == 32) { if (pops.sz == 32) {
ess_->Add(sid, [=, p = pops] { ess.Add(sid, [=, p = pops] {
DoPopulateBatch(prefix, value_len, params, p); DoPopulateBatch(prefix, value_len, params, p);
if (i % 50 == 0) { if (i % 50 == 0) {
this_fiber::yield(); this_fiber::yield();
@ -218,13 +244,14 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view
} }
} }
ess_->RunBriefInParallel([&](EngineShard* shard) { ess.RunBriefInParallel([&](EngineShard* shard) {
DoPopulateBatch(prefix, value_len, params, ps[shard->shard_id()]); DoPopulateBatch(prefix, value_len, params, ps[shard->shard_id()]);
}); });
} }
void DebugCmd::Inspect(string_view key) { void DebugCmd::Inspect(string_view key) {
ShardId sid = Shard(key, ess_->size()); EngineShardSet& ess = sf_.service().shard_set();
ShardId sid = Shard(key, ess.size());
using ObjInfo = pair<unsigned, unsigned>; // type, encoding. using ObjInfo = pair<unsigned, unsigned>; // type, encoding.
auto cb = [&]() -> facade::OpResult<ObjInfo> { auto cb = [&]() -> facade::OpResult<ObjInfo> {
@ -236,7 +263,7 @@ void DebugCmd::Inspect(string_view key) {
return OpStatus::KEY_NOTFOUND; return OpStatus::KEY_NOTFOUND;
}; };
OpResult<ObjInfo> res = ess_->Await(sid, cb); OpResult<ObjInfo> res = ess.Await(sid, cb);
if (res) { if (res) {
string resp = absl::StrCat("Value encoding:", strEncoding(res->second)); string resp = absl::StrCat("Value encoding:", strEncoding(res->second));
(*cntx_)->SendSimpleString(resp); (*cntx_)->SendSimpleString(resp);

View File

@ -9,10 +9,11 @@
namespace dfly { namespace dfly {
class EngineShardSet; class EngineShardSet;
class ServerFamily;
class DebugCmd { class DebugCmd {
public: public:
DebugCmd(EngineShardSet* ess, ConnectionContext* cntx); DebugCmd(ServerFamily* owner, ConnectionContext* cntx);
void Run(CmdArgList args); void Run(CmdArgList args);
@ -22,7 +23,7 @@ class DebugCmd {
void Reload(CmdArgList args); void Reload(CmdArgList args);
void Inspect(std::string_view key); void Inspect(std::string_view key);
EngineShardSet* ess_; ServerFamily& sf_;
ConnectionContext* cntx_; ConnectionContext* cntx_;
}; };

View File

@ -67,6 +67,10 @@ class Service : public facade::ServiceInterface {
absl::flat_hash_map<std::string, unsigned> UknownCmdMap() const; absl::flat_hash_map<std::string, unsigned> UknownCmdMap() const;
const CommandId* FindCmd(std::string_view cmd) const {
return registry_.Find(cmd);
}
private: private:
static void Quit(CmdArgList args, ConnectionContext* cntx); static void Quit(CmdArgList args, ConnectionContext* cntx);
static void Multi(CmdArgList args, ConnectionContext* cntx); static void Multi(CmdArgList args, ConnectionContext* cntx);

View File

@ -106,6 +106,11 @@ TEST_F(RdbTest, Save) {
} }
TEST_F(RdbTest, Load) { TEST_F(RdbTest, Load) {
Run({"set", "string_key", "val"});
Run({"set", "large_key", string(511, 'L')});
// Run({"zadd", "zs1", "1.1", "a", "-1.1", "b"});
Run({"debug", "reload"});
} }
} // namespace dfly } // namespace dfly

View File

@ -47,9 +47,9 @@ using namespace std;
using namespace util; using namespace util;
namespace fibers = ::boost::fibers; namespace fibers = ::boost::fibers;
namespace fs = std::filesystem; namespace fs = std::filesystem;
using absl::StrCat;
using facade::MCReplyBuilder; using facade::MCReplyBuilder;
using strings::HumanReadableNumBytes; using strings::HumanReadableNumBytes;
using absl::StrCat;
namespace { namespace {
@ -77,7 +77,7 @@ error_code CreateDirs(fs::path dir_path) {
ServerFamily::ServerFamily(Service* service) : service_(*service), ess_(service->shard_set()) { ServerFamily::ServerFamily(Service* service) : service_(*service), ess_(service->shard_set()) {
start_time_ = time(NULL); start_time_ = time(NULL);
last_save_.store(start_time_, memory_order_release); last_save_ = start_time_;
script_mgr_.reset(new ScriptMgr(&service->shard_set())); script_mgr_.reset(new ScriptMgr(&service->shard_set()));
} }
@ -124,7 +124,7 @@ void ServerFamily::Shutdown() {
pb_task_->CancelPeriodic(task_10ms_); pb_task_->CancelPeriodic(task_10ms_);
task_10ms_ = 0; task_10ms_ = 0;
unique_lock lk(replica_of_mu_); unique_lock lk(replicaof_mu_);
if (replica_) { if (replica_) {
replica_->Stop(); replica_->Stop();
} }
@ -177,6 +177,106 @@ void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext*
#undef ADD_LINE #undef ADD_LINE
} }
error_code ServerFamily::DoSave(Transaction* trans, string* err_details) {
static unsigned fl_index = 1;
auto [current, switched] = global_state_.Next(GlobalState::SAVING);
if (!switched) {
*err_details = StrCat(GlobalState::Name(current), " - can not save database");
return make_error_code(errc::operation_in_progress);
}
absl::Cleanup rev_state = [this] { global_state_.Clear(); };
fs::path dir_path(FLAGS_dir);
error_code ec;
if (!dir_path.empty()) {
ec = CreateDirs(dir_path);
if (ec) {
*err_details = "create-dir ";
return ec;
}
}
string filename = FLAGS_dbfilename.empty() ? "dump_save.rdb" : FLAGS_dbfilename;
fs::path path = dir_path;
path.append(filename);
path.concat(StrCat("_", fl_index++));
VLOG(1) << "Saving to " << path;
auto res = uring::OpenWrite(path.generic_string());
if (!res) {
return res.error();
}
auto& pool = service_.proactor_pool();
pool.Await([](auto*) { ServerState::tlocal()->set_gstate(GlobalState::SAVING); });
unique_ptr<::io::WriteFile> wf(*res);
auto start = absl::Now();
RdbSaver saver{&ess_, wf.get()};
ec = saver.SaveHeader();
if (!ec) {
auto cb = [&saver](Transaction* t, EngineShard* shard) {
saver.StartSnapshotInShard(shard);
return OpStatus::OK;
};
trans->ScheduleSingleHop(std::move(cb));
// perform snapshot serialization, block the current fiber until it completes.
RdbTypeFreqMap freq_map;
ec = saver.SaveBody(&freq_map);
// TODO: needs protection from reads.
last_save_freq_map_.clear();
for (const auto& k_v : freq_map) {
last_save_freq_map_.push_back(k_v);
}
}
pool.Await([](auto*) { ServerState::tlocal()->set_gstate(GlobalState::IDLE); });
CHECK_EQ(GlobalState::SAVING, global_state_.Clear());
absl::Duration dur = absl::Now() - start;
double seconds = double(absl::ToInt64Milliseconds(dur)) / 1000;
LOG(INFO) << "Saving " << path << " finished after "
<< strings::HumanReadableElapsedTime(seconds);
auto close_ec = wf->Close();
if (!ec)
ec = close_ec;
if (!ec) {
lock_guard lk(save_mu_);
last_save_ = time(NULL);
last_save_file_ = path.generic_string();
}
return ec;
}
error_code ServerFamily::DoFlush(Transaction* transaction, DbIndex db_ind) {
transaction->Schedule(); // TODO: to convert to ScheduleSingleHop ?
transaction->Execute(
[db_ind](Transaction* t, EngineShard* shard) {
shard->db_slice().FlushDb(db_ind);
return OpStatus::OK;
},
true);
return error_code{};
}
string ServerFamily::LastSaveFile() const {
lock_guard lk(save_mu_);
return last_save_file_;
}
void ServerFamily::DbSize(CmdArgList args, ConnectionContext* cntx) { void ServerFamily::DbSize(CmdArgList args, ConnectionContext* cntx) {
atomic_ulong num_keys{0}; atomic_ulong num_keys{0};
@ -192,16 +292,7 @@ void ServerFamily::DbSize(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::FlushDb(CmdArgList args, ConnectionContext* cntx) { void ServerFamily::FlushDb(CmdArgList args, ConnectionContext* cntx) {
DCHECK(cntx->transaction); DCHECK(cntx->transaction);
Transaction* transaction = cntx->transaction; DoFlush(cntx->transaction, cntx->transaction->db_index());
transaction->Schedule(); // TODO: to convert to ScheduleSingleHop ?
transaction->Execute(
[](Transaction* t, EngineShard* shard) {
shard->db_slice().FlushDb(t->db_index());
return OpStatus::OK;
},
true);
cntx->reply_builder()->SendOk(); cntx->reply_builder()->SendOk();
} }
@ -212,16 +303,7 @@ void ServerFamily::FlushAll(CmdArgList args, ConnectionContext* cntx) {
} }
DCHECK(cntx->transaction); DCHECK(cntx->transaction);
Transaction* transaction = cntx->transaction; DoFlush(cntx->transaction, DbSlice::kDbAll);
transaction->Schedule();
transaction->Execute(
[](Transaction* t, EngineShard* shard) {
shard->db_slice().FlushDb(DbSlice::kDbAll);
return OpStatus::OK;
},
true);
(*cntx)->SendOk(); (*cntx)->SendOk();
} }
@ -262,7 +344,7 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
return (*cntx)->SendStringArr(res); return (*cntx)->SendStringArr(res);
} else { } else {
string err = StrCat("Unknown subcommand or wrong number of arguments for '", sub_cmd, string err = StrCat("Unknown subcommand or wrong number of arguments for '", sub_cmd,
"'. Try CONFIG HELP."); "'. Try CONFIG HELP.");
return (*cntx)->SendError(err, kSyntaxErr); return (*cntx)->SendError(err, kSyntaxErr);
} }
} }
@ -270,7 +352,7 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::Debug(CmdArgList args, ConnectionContext* cntx) { void ServerFamily::Debug(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[1]); ToUpper(&args[1]);
DebugCmd dbg_cmd{&ess_, cntx}; DebugCmd dbg_cmd{this, cntx};
return dbg_cmd.Run(args); return dbg_cmd.Run(args);
} }
@ -283,86 +365,18 @@ void ServerFamily::Memory(CmdArgList args, ConnectionContext* cntx) {
} }
string err = StrCat("Unknown subcommand or wrong number of arguments for '", sub_cmd, string err = StrCat("Unknown subcommand or wrong number of arguments for '", sub_cmd,
"'. Try MEMORY HELP."); "'. Try MEMORY HELP.");
return (*cntx)->SendError(err, kSyntaxErr); return (*cntx)->SendError(err, kSyntaxErr);
} }
void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) { void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
static unsigned fl_index = 1; string err_detail;
auto [current, switched] = global_state_.Next(GlobalState::SAVING); error_code ec = DoSave(cntx->transaction, &err_detail);
if (!switched) {
string error = StrCat(GlobalState::Name(current), " - can not save database");
return (*cntx)->SendError(error);
}
absl::Cleanup rev_state = [this] { global_state_.Clear(); };
fs::path dir_path(FLAGS_dir);
error_code ec;
if (!dir_path.empty()) {
ec = CreateDirs(dir_path);
if (ec)
return (*cntx)->SendError(StrCat("create dir ", ec.message()));
}
string filename = FLAGS_dbfilename.empty() ? "dump_save.rdb" : FLAGS_dbfilename;
fs::path path = dir_path;
path.append(filename);
path.concat(StrCat("_", fl_index++));
VLOG(1) << "Saving to " << path;
auto res = uring::OpenWrite(path.generic_string());
if (!res) {
(*cntx)->SendError(res.error().message());
return;
}
auto& pool = service_.proactor_pool();
pool.Await([](auto*) { ServerState::tlocal()->set_gstate(GlobalState::SAVING); });
unique_ptr<::io::WriteFile> wf(*res);
auto start = absl::Now();
RdbSaver saver{&ess_, wf.get()};
ec = saver.SaveHeader();
if (!ec) {
auto cb = [&saver](Transaction* t, EngineShard* shard) {
saver.StartSnapshotInShard(shard);
return OpStatus::OK;
};
cntx->transaction->ScheduleSingleHop(std::move(cb));
// perform snapshot serialization, block the current fiber until it completes.
RdbTypeFreqMap freq_map;
ec = saver.SaveBody(&freq_map);
// TODO: needs protection from reads.
last_save_freq_map_.clear();
for (const auto& k_v : freq_map) {
last_save_freq_map_.push_back(k_v);
}
}
pool.Await([](auto*) { ServerState::tlocal()->set_gstate(GlobalState::IDLE); });
CHECK_EQ(GlobalState::SAVING, global_state_.Clear());
absl::Duration dur = absl::Now() - start;
double seconds = double(absl::ToInt64Milliseconds(dur)) / 1000;
LOG(INFO) << "Saving " << path << " finished after "
<< strings::HumanReadableElapsedTime(seconds);
auto close_ec = wf->Close();
if (!ec)
ec = close_ec;
if (ec) { if (ec) {
(*cntx)->SendError(ec.message()); (*cntx)->SendError(absl::StrCat(err_detail, ec.message()));
} else { } else {
last_save_.store(time(NULL), memory_order_release);
(*cntx)->SendOk(); (*cntx)->SendOk();
} }
} }
@ -430,7 +444,7 @@ tcp_port:)";
absl::StrAppend(&info, a1, a2, "\r\n"); absl::StrAppend(&info, a1, a2, "\r\n");
}; };
#define ADD_HEADER(x) absl::StrAppend(&info, x "\r\n") #define ADD_HEADER(x) absl::StrAppend(&info, x "\r\n")
if (should_enter("SERVER")) { if (should_enter("SERVER")) {
append(kInfo1, FLAGS_port); append(kInfo1, FLAGS_port);
@ -566,7 +580,7 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
if (absl::EqualsIgnoreCase(host, "no") && absl::EqualsIgnoreCase(port_s, "one")) { if (absl::EqualsIgnoreCase(host, "no") && absl::EqualsIgnoreCase(port_s, "one")) {
// use this lock as critical section to prevent concurrent replicaof commands running. // use this lock as critical section to prevent concurrent replicaof commands running.
unique_lock lk(replica_of_mu_); unique_lock lk(replicaof_mu_);
// Switch to primary mode. // Switch to primary mode.
if (!ServerState::tlocal()->is_master) { if (!ServerState::tlocal()->is_master) {
@ -591,7 +605,7 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
auto new_replica = make_shared<Replica>(string(host), port, &service_); auto new_replica = make_shared<Replica>(string(host), port, &service_);
unique_lock lk(replica_of_mu_); unique_lock lk(replicaof_mu_);
if (replica_) { if (replica_) {
replica_->Stop(); // NOTE: consider introducing update API flow. replica_->Stop(); // NOTE: consider introducing update API flow.
} }
@ -637,8 +651,10 @@ void ServerFamily::Sync(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::Psync(CmdArgList args, ConnectionContext* cntx) { void ServerFamily::Psync(CmdArgList args, ConnectionContext* cntx) {
SyncGeneric("?", 0, cntx); // full sync, ignore the request. SyncGeneric("?", 0, cntx); // full sync, ignore the request.
} }
void ServerFamily::LastSave(CmdArgList args, ConnectionContext* cntx) { void ServerFamily::LastSave(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendLong(last_save_.load(memory_order_relaxed)); lock_guard lk(save_mu_);
(*cntx)->SendLong(last_save_);
} }
void ServerFamily::_Shutdown(CmdArgList args, ConnectionContext* cntx) { void ServerFamily::_Shutdown(CmdArgList args, ConnectionContext* cntx) {

View File

@ -35,13 +35,15 @@ struct Metrics {
class ServerFamily { class ServerFamily {
public: public:
ServerFamily(Service* engine); ServerFamily(Service* service);
~ServerFamily(); ~ServerFamily();
void Init(util::AcceptServer* acceptor); void Init(util::AcceptServer* acceptor);
void Register(CommandRegistry* registry); void Register(CommandRegistry* registry);
void Shutdown(); void Shutdown();
Service& service() { return service_;}
Metrics GetMetrics() const; Metrics GetMetrics() const;
GlobalState* global_state() { GlobalState* global_state() {
@ -54,6 +56,11 @@ class ServerFamily {
void StatsMC(std::string_view section, facade::ConnectionContext* cntx); void StatsMC(std::string_view section, facade::ConnectionContext* cntx);
std::error_code DoSave(Transaction* transaction, std::string* err_details);
std::error_code DoFlush(Transaction* transaction, DbIndex db_ind);
std::string LastSaveFile() const;
private: private:
uint32_t shard_count() const { uint32_t shard_count() const {
return ess_.size(); return ess_.size();
@ -86,12 +93,16 @@ class ServerFamily {
util::AcceptServer* acceptor_ = nullptr; util::AcceptServer* acceptor_ = nullptr;
util::ProactorBase* pb_task_ = nullptr; util::ProactorBase* pb_task_ = nullptr;
::boost::fibers::mutex replica_of_mu_;
mutable ::boost::fibers::mutex replicaof_mu_, save_mu_;
std::shared_ptr<Replica> replica_; // protected by replica_of_mu_ std::shared_ptr<Replica> replica_; // protected by replica_of_mu_
std::unique_ptr<ScriptMgr> script_mgr_; std::unique_ptr<ScriptMgr> script_mgr_;
std::atomic<int64_t> last_save_; // in seconds.
int64_t last_save_; // in seconds. protected by save_mu_.
std::string last_save_file_; // protected by save_mu_.
GlobalState global_state_; GlobalState global_state_;
time_t start_time_ = 0; // in seconds, epoch time. time_t start_time_ = 0; // in seconds, epoch time.

View File

@ -452,6 +452,7 @@ void Transaction::RunNoop(EngineShard* shard) {
} }
void Transaction::ScheduleInternal() { void Transaction::ScheduleInternal() {
DCHECK(!shard_data_.empty());
DCHECK_EQ(0u, txid_); DCHECK_EQ(0u, txid_);
DCHECK_EQ(0, coordinator_state_ & (COORD_SCHED | COORD_OOO)); DCHECK_EQ(0, coordinator_state_ & (COORD_SCHED | COORD_OOO));
@ -886,6 +887,8 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
// This function should not block since it's run via RunBriefInParallel. // This function should not block since it's run via RunBriefInParallel.
pair<bool, bool> Transaction::ScheduleInShard(EngineShard* shard) { pair<bool, bool> Transaction::ScheduleInShard(EngineShard* shard) {
DCHECK(!shard_data_.empty());
// schedule_success, lock_granted. // schedule_success, lock_granted.
pair<bool, bool> result{false, false}; pair<bool, bool> result{false, false};