Wire SAVE and LASTSAVE commands

SAVE has a dummy implementation that will be replaced in the next commits.
This commit is contained in:
Roman Gershman 2022-01-20 21:48:02 +02:00
parent 7a839b6081
commit ba72e70de5
2 changed files with 79 additions and 0 deletions

View File

@ -25,6 +25,10 @@ extern "C" {
#include "server/transaction.h"
#include "strings/human_readable.h"
#include "util/accept_server.h"
#include "util/uring/uring_file.h"
DEFINE_string(dir, "", "working directory");
DEFINE_string(dbfilename, "", "the filename to save/load the DB");
DECLARE_uint32(port);
@ -46,10 +50,22 @@ inline CommandId::Handler HandlerFunc(ServerFamily* se, EngineFunc f) {
using CI = CommandId;
// Create a direc
error_code CreateDirs(fs::path dir_path) {
error_code ec;
fs::file_status dir_status = fs::status(dir_path, ec);
if (ec == errc::no_such_file_or_directory) {
fs::create_directories(dir_path, ec);
if (!ec)
dir_status = fs::status(dir_path, ec);
}
return ec;
}
} // namespace
ServerFamily::ServerFamily(Service* engine)
: engine_(*engine), pp_(engine->proactor_pool()), ess_(engine->shard_set()) {
last_save_.store(time(NULL), memory_order_release);
}
ServerFamily::~ServerFamily() {
@ -120,6 +136,58 @@ void ServerFamily::Debug(CmdArgList args, ConnectionContext* cntx) {
return dbg_cmd.Run(args);
}
void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
static unsigned fl_index = 1;
fs::path dir_path(FLAGS_dir);
error_code ec;
if (!dir_path.empty()) {
ec = CreateDirs(dir_path);
if (ec)
return cntx->SendError(absl::StrCat("create dir ", ec.message()));
}
string filename = FLAGS_dbfilename.empty() ? "dump_save.rdb" : FLAGS_dbfilename;
fs::path path = dir_path;
path.append(filename);
path.concat(absl::StrCat("_", fl_index++));
LOG(INFO) << "Saving to " << path;
// TODO: use io-uring file instead.
auto res = uring::OpenWrite(path.generic_string());
if (!res) {
cntx->SendError(res.error().message());
return;
}
unique_ptr<::io::WriteFile> wf(*res);
auto start = absl::Now();
ec = wf->Write(string(1024, 'A'));
if (ec) {
cntx->SendError(res.error().message());
return;
}
absl::Duration dur = absl::Now() - start;
double seconds = double(absl::ToInt64Milliseconds(dur)) / 1000;
LOG(INFO) << "Saving " << path << " finished after "
<< strings::HumanReadableElapsedTime(seconds);
auto close_ec = wf->Close();
if (!ec)
ec = close_ec;
if (ec) {
cntx->SendError(ec.message());
} else {
last_save_.store(time(NULL), memory_order_release);
cntx->SendOk();
}
}
Metrics ServerFamily::GetMetrics() const {
Metrics result;
@ -130,6 +198,7 @@ Metrics ServerFamily::GetMetrics() const {
lock_guard<fibers::mutex> lk(mu);
result.db += local_stats.db;
result.events += local_stats.events;
result.conn_stats += *ServerState::tl_connection_stats();
};
@ -169,6 +238,10 @@ tcp_port:)";
cntx->SendBulkString(info);
}
void ServerFamily::LastSave(CmdArgList args, ConnectionContext* cntx) {
cntx->SendLong(last_save_.load(memory_order_relaxed));
}
void ServerFamily::_Shutdown(CmdArgList args, ConnectionContext* cntx) {
CHECK_NOTNULL(acceptor_)->Stop();
cntx->SendOk();
@ -182,6 +255,9 @@ void ServerFamily::Register(CommandRegistry* registry) {
<< CI{"FLUSHDB", CO::WRITE | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(FlushDb)
<< CI{"FLUSHALL", CO::WRITE | CO::GLOBAL_TRANS, -1, 0, 0, 0}.HFUNC(FlushAll)
<< CI{"INFO", CO::LOADING | CO::STALE, -1, 0, 0, 0}.HFUNC(Info)
<< CI{"LASTSAVE", CO::LOADING | CO::STALE | CO::RANDOM | CO::FAST, 1, 0, 0, 0}.HFUNC(
LastSave)
<< CI{"SAVE", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Save)
<< CI{"SHUTDOWN", CO::ADMIN | CO::NOSCRIPT | CO::LOADING | CO::STALE, 1, 0, 0, 0}.HFUNC(
_Shutdown);
}

View File

@ -45,7 +45,9 @@ class ServerFamily {
void DbSize(CmdArgList args, ConnectionContext* cntx);
void FlushDb(CmdArgList args, ConnectionContext* cntx);
void FlushAll(CmdArgList args, ConnectionContext* cntx);
void Save(CmdArgList args, ConnectionContext* cntx);
void Info(CmdArgList args, ConnectionContext* cntx);
void LastSave(CmdArgList args, ConnectionContext* cntx);
void _Shutdown(CmdArgList args, ConnectionContext* cntx);
Service& engine_;
@ -53,6 +55,7 @@ class ServerFamily {
EngineShardSet& ess_;
util::AcceptServer* acceptor_ = nullptr;
std::atomic<int64_t> last_save_; // in seconds.
};
} // namespace dfly