From 7f8346cded5108e812ec44a56bac9763ac0b7da7 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Tue, 12 Apr 2022 08:34:48 +0300 Subject: [PATCH] Implement DEBUG RELOAD option. Fix c++ and openssl deprecation warnings. --- src/core/dash.h | 11 +- src/core/interpreter.cc | 38 ++++--- src/server/debugcmd.cc | 63 ++++++++--- src/server/debugcmd.h | 5 +- src/server/main_service.h | 4 + src/server/rdb_test.cc | 5 + src/server/server_family.cc | 218 +++++++++++++++++++----------------- src/server/server_family.h | 17 ++- src/server/transaction.cc | 3 + 9 files changed, 219 insertions(+), 145 deletions(-) diff --git a/src/core/dash.h b/src/core/dash.h index 3f1eb4c..e176653 100644 --- a/src/core/dash.h +++ b/src/core/dash.h @@ -375,10 +375,11 @@ DashTable<_Key, _Value, Policy>::~DashTable() { Clear(); auto* resource = segment_.get_allocator().resource(); std::pmr::polymorphic_allocator pa(resource); + using alloc_traits = std::allocator_traits; IterateUnique([&](SegmentType* seg) { - pa.destroy(seg); - pa.deallocate(seg, 1); + alloc_traits::destroy(pa, seg); + alloc_traits::deallocate(pa, seg, 1); return false; }); } @@ -438,6 +439,8 @@ void DashTable<_Key, _Value, Policy>::Clear() { **********/ if (global_depth_ > initial_depth_) { std::pmr::polymorphic_allocator pa(segment_.get_allocator()); + using alloc_traits = std::allocator_traits; + size_t dest = 0, src = 0; size_t new_size = (1 << initial_depth_); @@ -448,8 +451,8 @@ void DashTable<_Key, _Value, Policy>::Clear() { seg->set_local_depth(initial_depth_); segment_[dest++] = seg; } else { - pa.destroy(seg); - pa.deallocate(seg, 1); + alloc_traits::destroy(pa, seg); + alloc_traits::deallocate(pa, seg, 1); } src = next_src; diff --git a/src/core/interpreter.cc b/src/core/interpreter.cc index a53a49e..390a5e2 100644 --- a/src/core/interpreter.cc +++ b/src/core/interpreter.cc @@ -5,7 +5,7 @@ #include "core/interpreter.h" #include -#include +#include #include #include @@ -25,6 +25,16 @@ using namespace std; namespace { +// EVP_Q_digest is not present in the older versions of OpenSSL. +int EVPDigest(const void* data, size_t datalen, unsigned char* md, size_t* mdlen) { + unsigned int temp = 0; + int ret = EVP_Digest(data, datalen, md, &temp, EVP_sha1(), NULL); + + if (mdlen != NULL) + *mdlen = temp; + return ret; +} + class RedisTranslator : public ObjectExplorer { public: RedisTranslator(lua_State* lua) : lua_(lua) { @@ -155,7 +165,6 @@ void SetGlobalArrayInternal(lua_State* lua, const char* name, MutSliceSpan args) lua_setglobal(lua, name); } - /* This function is used in order to push an error on the Lua stack in the * format used by redis.pcall to return errors, which is a lua table * with a single "err" field set to the error string. Note that this @@ -247,6 +256,7 @@ debug = nil lua_setglobal(lua, "dofile"); } +// dest must have at least 41 chars. void ToHex(const uint8_t* src, char* dest) { const char cset[] = "0123456789abcdef"; for (size_t j = 0; j < 20; j++) { @@ -266,16 +276,13 @@ int RedisSha1Command(lua_State* lua) { size_t len; const char* s = lua_tolstring(lua, 1, &len); - SHA_CTX ctx; - uint8_t buf[20]; - char digest[41]; + uint8_t digest[EVP_MAX_MD_SIZE]; + EVPDigest(s, len, digest, NULL); - SHA1_Init(&ctx); - SHA1_Update(&ctx, s, len); - SHA1_Final(buf, &ctx); - ToHex(buf, digest); + char hex[41]; + ToHex(digest, hex); - lua_pushstring(lua, digest); + lua_pushstring(lua, hex); return 1; } @@ -353,13 +360,10 @@ Interpreter::~Interpreter() { } void Interpreter::FuncSha1(string_view body, char* fp) { - SHA_CTX ctx; - uint8_t buf[20]; + uint8_t digest[EVP_MAX_MD_SIZE]; + EVPDigest(body.data(), body.size(), digest, NULL); - SHA1_Init(&ctx); - SHA1_Update(&ctx, body.data(), body.size()); - SHA1_Final(buf, &ctx); - ToHex(buf, fp); + ToHex(digest, fp); } auto Interpreter::AddFunction(string_view body, string* result) -> AddResult { @@ -500,7 +504,7 @@ bool Interpreter::IsTableSafe() const { i = 0; len = lua_rawlen(lua_, -1); } else { - lua_pop(lua_, 1); // pop table element + lua_pop(lua_, 1); // pop table element ++i; } } diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index b4284a6..3d9829f 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -11,8 +11,10 @@ #include "base/logging.h" #include "server/engine_shard_set.h" #include "server/error.h" +#include "server/main_service.h" #include "server/rdb_load.h" #include "server/string_family.h" +#include "server/transaction.h" #include "util/uring/uring_fiber_algo.h" #include "util/uring/uring_file.h" @@ -24,6 +26,7 @@ namespace dfly { using namespace std; using namespace util; namespace this_fiber = ::boost::this_fiber; +using boost::intrusive_ptr; using boost::fibers::fiber; using namespace facade; namespace fs = std::filesystem; @@ -52,7 +55,7 @@ void DoPopulateBatch(std::string_view prefix, size_t val_size, const SetCmd::Set } } -DebugCmd::DebugCmd(EngineShardSet* ess, ConnectionContext* cntx) : ess_(ess), cntx_(cntx) { +DebugCmd::DebugCmd(ServerFamily* owner, ConnectionContext* cntx) : sf_(*owner), cntx_(cntx) { } void DebugCmd::Run(CmdArgList args) { @@ -99,6 +102,7 @@ void DebugCmd::Run(CmdArgList args) { void DebugCmd::Reload(CmdArgList args) { bool save = true; + for (size_t i = 2; i < args.size(); ++i) { ToUpper(&args[i]); std::string_view opt = ArgS(args, i); @@ -110,18 +114,38 @@ void DebugCmd::Reload(CmdArgList args) { return (*cntx_)->SendError("DEBUG RELOAD only supports the NOSAVE options."); } } + + error_code ec; + if (save) { - return (*cntx_)->SendError("NOSAVE required (TBD)."); + string err_details; + const CommandId* cid = sf_.service().FindCmd("SAVE"); + CHECK_NOTNULL(cid); + intrusive_ptr trans(new Transaction{cid, &sf_.service().shard_set()}); + trans->InitByArgs(0, {}); + ec = sf_.DoSave(trans.get(), &err_details); + if (ec) { + return (*cntx_)->SendError(absl::StrCat(err_details, ec.message())); + } } - if (FLAGS_dbfilename.empty()) { - return (*cntx_)->SendError("dbfilename is not set"); + const CommandId* cid = sf_.service().FindCmd("FLUSHALL"); + intrusive_ptr flush_trans(new Transaction{cid, &sf_.service().shard_set()}); + flush_trans->InitByArgs(0, {}); + ec = sf_.DoFlush(flush_trans.get(), DbSlice::kDbAll); + if (ec) { + LOG(ERROR) << "Error flushing db " << ec.message(); } - fs::path dir_path(FLAGS_dir); - string filename = FLAGS_dbfilename; - fs::path path = dir_path; - path.append(filename); + string last_save_file = sf_.LastSaveFile(); + fs::path path(last_save_file); + + if (last_save_file.empty()) { + fs::path dir_path(FLAGS_dir); + string filename = FLAGS_dbfilename; + dir_path.append(filename); + path = dir_path; + } auto res = uring::OpenRead(path.generic_string()); if (!res) { @@ -132,7 +156,7 @@ void DebugCmd::Reload(CmdArgList args) { io::FileSource fs(*res); RdbLoader loader; - error_code ec = loader.Load(&fs); + ec = loader.Load(&fs); if (ec) { (*cntx_)->SendError(ec.message()); @@ -162,7 +186,8 @@ void DebugCmd::Populate(CmdArgList args) { return (*cntx_)->SendError(kUintErr); } - size_t runners_count = ess_->pool()->size(); + ProactorPool& pp = sf_.service().proactor_pool(); + size_t runners_count = pp.size(); vector> ranges(runners_count - 1); uint64_t batch_size = total_count / runners_count; size_t from = 0; @@ -178,8 +203,8 @@ void DebugCmd::Populate(CmdArgList args) { auto range = ranges[i]; // whatever we do, we should not capture i by reference. - fb_arr[i] = ess_->pool()->at(i)->LaunchFiber( - [=] { this->PopulateRangeFiber(range.first, range.second, prefix, val_size); }); + fb_arr[i] = pp.at(i)->LaunchFiber( + [=, this] { this->PopulateRangeFiber(range.first, range.second, prefix, val_size); }); } for (auto& fb : fb_arr) fb.join(); @@ -195,18 +220,19 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view string key = absl::StrCat(prefix, ":"); size_t prefsize = key.size(); DbIndex db_indx = cntx_->db_index(); - std::vector ps(ess_->size(), PopulateBatch{db_indx}); + EngineShardSet& ess = sf_.service().shard_set(); + std::vector ps(ess.size(), PopulateBatch{db_indx}); SetCmd::SetParams params{db_indx}; for (uint64_t i = from; i < from + len; ++i) { absl::StrAppend(&key, i); - ShardId sid = Shard(key, ess_->size()); + ShardId sid = Shard(key, ess.size()); key.resize(prefsize); auto& pops = ps[sid]; pops.index[pops.sz++] = i; if (pops.sz == 32) { - ess_->Add(sid, [=, p = pops] { + ess.Add(sid, [=, p = pops] { DoPopulateBatch(prefix, value_len, params, p); if (i % 50 == 0) { this_fiber::yield(); @@ -218,13 +244,14 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view } } - ess_->RunBriefInParallel([&](EngineShard* shard) { + ess.RunBriefInParallel([&](EngineShard* shard) { DoPopulateBatch(prefix, value_len, params, ps[shard->shard_id()]); }); } void DebugCmd::Inspect(string_view key) { - ShardId sid = Shard(key, ess_->size()); + EngineShardSet& ess = sf_.service().shard_set(); + ShardId sid = Shard(key, ess.size()); using ObjInfo = pair; // type, encoding. auto cb = [&]() -> facade::OpResult { @@ -236,7 +263,7 @@ void DebugCmd::Inspect(string_view key) { return OpStatus::KEY_NOTFOUND; }; - OpResult res = ess_->Await(sid, cb); + OpResult res = ess.Await(sid, cb); if (res) { string resp = absl::StrCat("Value encoding:", strEncoding(res->second)); (*cntx_)->SendSimpleString(resp); diff --git a/src/server/debugcmd.h b/src/server/debugcmd.h index fea595a..b145016 100644 --- a/src/server/debugcmd.h +++ b/src/server/debugcmd.h @@ -9,10 +9,11 @@ namespace dfly { class EngineShardSet; +class ServerFamily; class DebugCmd { public: - DebugCmd(EngineShardSet* ess, ConnectionContext* cntx); + DebugCmd(ServerFamily* owner, ConnectionContext* cntx); void Run(CmdArgList args); @@ -22,7 +23,7 @@ class DebugCmd { void Reload(CmdArgList args); void Inspect(std::string_view key); - EngineShardSet* ess_; + ServerFamily& sf_; ConnectionContext* cntx_; }; diff --git a/src/server/main_service.h b/src/server/main_service.h index f02caf8..be29d39 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -67,6 +67,10 @@ class Service : public facade::ServiceInterface { absl::flat_hash_map UknownCmdMap() const; + const CommandId* FindCmd(std::string_view cmd) const { + return registry_.Find(cmd); + } + private: static void Quit(CmdArgList args, ConnectionContext* cntx); static void Multi(CmdArgList args, ConnectionContext* cntx); diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index b14dddb..eb1ef41 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -106,6 +106,11 @@ TEST_F(RdbTest, Save) { } TEST_F(RdbTest, Load) { + Run({"set", "string_key", "val"}); + Run({"set", "large_key", string(511, 'L')}); + // Run({"zadd", "zs1", "1.1", "a", "-1.1", "b"}); + + Run({"debug", "reload"}); } } // namespace dfly \ No newline at end of file diff --git a/src/server/server_family.cc b/src/server/server_family.cc index cf3fe8c..e8109cf 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -47,9 +47,9 @@ using namespace std; using namespace util; namespace fibers = ::boost::fibers; namespace fs = std::filesystem; +using absl::StrCat; using facade::MCReplyBuilder; using strings::HumanReadableNumBytes; -using absl::StrCat; namespace { @@ -77,7 +77,7 @@ error_code CreateDirs(fs::path dir_path) { ServerFamily::ServerFamily(Service* service) : service_(*service), ess_(service->shard_set()) { start_time_ = time(NULL); - last_save_.store(start_time_, memory_order_release); + last_save_ = start_time_; script_mgr_.reset(new ScriptMgr(&service->shard_set())); } @@ -124,7 +124,7 @@ void ServerFamily::Shutdown() { pb_task_->CancelPeriodic(task_10ms_); task_10ms_ = 0; - unique_lock lk(replica_of_mu_); + unique_lock lk(replicaof_mu_); if (replica_) { replica_->Stop(); } @@ -177,6 +177,106 @@ void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext* #undef ADD_LINE } +error_code ServerFamily::DoSave(Transaction* trans, string* err_details) { + static unsigned fl_index = 1; + + auto [current, switched] = global_state_.Next(GlobalState::SAVING); + if (!switched) { + *err_details = StrCat(GlobalState::Name(current), " - can not save database"); + return make_error_code(errc::operation_in_progress); + } + + absl::Cleanup rev_state = [this] { global_state_.Clear(); }; + + fs::path dir_path(FLAGS_dir); + error_code ec; + + if (!dir_path.empty()) { + ec = CreateDirs(dir_path); + if (ec) { + *err_details = "create-dir "; + return ec; + } + } + + string filename = FLAGS_dbfilename.empty() ? "dump_save.rdb" : FLAGS_dbfilename; + fs::path path = dir_path; + path.append(filename); + path.concat(StrCat("_", fl_index++)); + VLOG(1) << "Saving to " << path; + + auto res = uring::OpenWrite(path.generic_string()); + if (!res) { + return res.error(); + } + + auto& pool = service_.proactor_pool(); + pool.Await([](auto*) { ServerState::tlocal()->set_gstate(GlobalState::SAVING); }); + + unique_ptr<::io::WriteFile> wf(*res); + auto start = absl::Now(); + + RdbSaver saver{&ess_, wf.get()}; + + ec = saver.SaveHeader(); + if (!ec) { + auto cb = [&saver](Transaction* t, EngineShard* shard) { + saver.StartSnapshotInShard(shard); + return OpStatus::OK; + }; + trans->ScheduleSingleHop(std::move(cb)); + + // perform snapshot serialization, block the current fiber until it completes. + RdbTypeFreqMap freq_map; + ec = saver.SaveBody(&freq_map); + + // TODO: needs protection from reads. + last_save_freq_map_.clear(); + for (const auto& k_v : freq_map) { + last_save_freq_map_.push_back(k_v); + } + } + + pool.Await([](auto*) { ServerState::tlocal()->set_gstate(GlobalState::IDLE); }); + CHECK_EQ(GlobalState::SAVING, global_state_.Clear()); + + absl::Duration dur = absl::Now() - start; + double seconds = double(absl::ToInt64Milliseconds(dur)) / 1000; + LOG(INFO) << "Saving " << path << " finished after " + << strings::HumanReadableElapsedTime(seconds); + + auto close_ec = wf->Close(); + + if (!ec) + ec = close_ec; + + if (!ec) { + lock_guard lk(save_mu_); + last_save_ = time(NULL); + last_save_file_ = path.generic_string(); + } + + return ec; +} + +error_code ServerFamily::DoFlush(Transaction* transaction, DbIndex db_ind) { + transaction->Schedule(); // TODO: to convert to ScheduleSingleHop ? + + transaction->Execute( + [db_ind](Transaction* t, EngineShard* shard) { + shard->db_slice().FlushDb(db_ind); + return OpStatus::OK; + }, + true); + + return error_code{}; +} + +string ServerFamily::LastSaveFile() const { + lock_guard lk(save_mu_); + return last_save_file_; +} + void ServerFamily::DbSize(CmdArgList args, ConnectionContext* cntx) { atomic_ulong num_keys{0}; @@ -192,16 +292,7 @@ void ServerFamily::DbSize(CmdArgList args, ConnectionContext* cntx) { void ServerFamily::FlushDb(CmdArgList args, ConnectionContext* cntx) { DCHECK(cntx->transaction); - Transaction* transaction = cntx->transaction; - transaction->Schedule(); // TODO: to convert to ScheduleSingleHop ? - - transaction->Execute( - [](Transaction* t, EngineShard* shard) { - shard->db_slice().FlushDb(t->db_index()); - return OpStatus::OK; - }, - true); - + DoFlush(cntx->transaction, cntx->transaction->db_index()); cntx->reply_builder()->SendOk(); } @@ -212,16 +303,7 @@ void ServerFamily::FlushAll(CmdArgList args, ConnectionContext* cntx) { } DCHECK(cntx->transaction); - Transaction* transaction = cntx->transaction; - transaction->Schedule(); - - transaction->Execute( - [](Transaction* t, EngineShard* shard) { - shard->db_slice().FlushDb(DbSlice::kDbAll); - return OpStatus::OK; - }, - true); - + DoFlush(cntx->transaction, DbSlice::kDbAll); (*cntx)->SendOk(); } @@ -262,7 +344,7 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) { return (*cntx)->SendStringArr(res); } else { string err = StrCat("Unknown subcommand or wrong number of arguments for '", sub_cmd, - "'. Try CONFIG HELP."); + "'. Try CONFIG HELP."); return (*cntx)->SendError(err, kSyntaxErr); } } @@ -270,7 +352,7 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) { void ServerFamily::Debug(CmdArgList args, ConnectionContext* cntx) { ToUpper(&args[1]); - DebugCmd dbg_cmd{&ess_, cntx}; + DebugCmd dbg_cmd{this, cntx}; return dbg_cmd.Run(args); } @@ -283,86 +365,18 @@ void ServerFamily::Memory(CmdArgList args, ConnectionContext* cntx) { } string err = StrCat("Unknown subcommand or wrong number of arguments for '", sub_cmd, - "'. Try MEMORY HELP."); + "'. Try MEMORY HELP."); return (*cntx)->SendError(err, kSyntaxErr); } void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) { - static unsigned fl_index = 1; + string err_detail; - auto [current, switched] = global_state_.Next(GlobalState::SAVING); - if (!switched) { - string error = StrCat(GlobalState::Name(current), " - can not save database"); - return (*cntx)->SendError(error); - } - - absl::Cleanup rev_state = [this] { global_state_.Clear(); }; - - fs::path dir_path(FLAGS_dir); - error_code ec; - - if (!dir_path.empty()) { - ec = CreateDirs(dir_path); - if (ec) - return (*cntx)->SendError(StrCat("create dir ", ec.message())); - } - - string filename = FLAGS_dbfilename.empty() ? "dump_save.rdb" : FLAGS_dbfilename; - fs::path path = dir_path; - path.append(filename); - path.concat(StrCat("_", fl_index++)); - VLOG(1) << "Saving to " << path; - - auto res = uring::OpenWrite(path.generic_string()); - if (!res) { - (*cntx)->SendError(res.error().message()); - return; - } - - auto& pool = service_.proactor_pool(); - pool.Await([](auto*) { ServerState::tlocal()->set_gstate(GlobalState::SAVING); }); - - unique_ptr<::io::WriteFile> wf(*res); - auto start = absl::Now(); - - RdbSaver saver{&ess_, wf.get()}; - - ec = saver.SaveHeader(); - if (!ec) { - auto cb = [&saver](Transaction* t, EngineShard* shard) { - saver.StartSnapshotInShard(shard); - return OpStatus::OK; - }; - cntx->transaction->ScheduleSingleHop(std::move(cb)); - - // perform snapshot serialization, block the current fiber until it completes. - RdbTypeFreqMap freq_map; - ec = saver.SaveBody(&freq_map); - - // TODO: needs protection from reads. - last_save_freq_map_.clear(); - for (const auto& k_v : freq_map) { - last_save_freq_map_.push_back(k_v); - } - } - - pool.Await([](auto*) { ServerState::tlocal()->set_gstate(GlobalState::IDLE); }); - CHECK_EQ(GlobalState::SAVING, global_state_.Clear()); - - absl::Duration dur = absl::Now() - start; - double seconds = double(absl::ToInt64Milliseconds(dur)) / 1000; - LOG(INFO) << "Saving " << path << " finished after " - << strings::HumanReadableElapsedTime(seconds); - - auto close_ec = wf->Close(); - - if (!ec) - ec = close_ec; + error_code ec = DoSave(cntx->transaction, &err_detail); if (ec) { - (*cntx)->SendError(ec.message()); + (*cntx)->SendError(absl::StrCat(err_detail, ec.message())); } else { - last_save_.store(time(NULL), memory_order_release); (*cntx)->SendOk(); } } @@ -430,7 +444,7 @@ tcp_port:)"; absl::StrAppend(&info, a1, a2, "\r\n"); }; - #define ADD_HEADER(x) absl::StrAppend(&info, x "\r\n") +#define ADD_HEADER(x) absl::StrAppend(&info, x "\r\n") if (should_enter("SERVER")) { append(kInfo1, FLAGS_port); @@ -566,7 +580,7 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) { if (absl::EqualsIgnoreCase(host, "no") && absl::EqualsIgnoreCase(port_s, "one")) { // use this lock as critical section to prevent concurrent replicaof commands running. - unique_lock lk(replica_of_mu_); + unique_lock lk(replicaof_mu_); // Switch to primary mode. if (!ServerState::tlocal()->is_master) { @@ -591,7 +605,7 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) { auto new_replica = make_shared(string(host), port, &service_); - unique_lock lk(replica_of_mu_); + unique_lock lk(replicaof_mu_); if (replica_) { replica_->Stop(); // NOTE: consider introducing update API flow. } @@ -637,8 +651,10 @@ void ServerFamily::Sync(CmdArgList args, ConnectionContext* cntx) { void ServerFamily::Psync(CmdArgList args, ConnectionContext* cntx) { SyncGeneric("?", 0, cntx); // full sync, ignore the request. } + void ServerFamily::LastSave(CmdArgList args, ConnectionContext* cntx) { - (*cntx)->SendLong(last_save_.load(memory_order_relaxed)); + lock_guard lk(save_mu_); + (*cntx)->SendLong(last_save_); } void ServerFamily::_Shutdown(CmdArgList args, ConnectionContext* cntx) { diff --git a/src/server/server_family.h b/src/server/server_family.h index d78b308..199dd7e 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -35,13 +35,15 @@ struct Metrics { class ServerFamily { public: - ServerFamily(Service* engine); + ServerFamily(Service* service); ~ServerFamily(); void Init(util::AcceptServer* acceptor); void Register(CommandRegistry* registry); void Shutdown(); + Service& service() { return service_;} + Metrics GetMetrics() const; GlobalState* global_state() { @@ -54,6 +56,11 @@ class ServerFamily { void StatsMC(std::string_view section, facade::ConnectionContext* cntx); + std::error_code DoSave(Transaction* transaction, std::string* err_details); + std::error_code DoFlush(Transaction* transaction, DbIndex db_ind); + + std::string LastSaveFile() const; + private: uint32_t shard_count() const { return ess_.size(); @@ -86,12 +93,16 @@ class ServerFamily { util::AcceptServer* acceptor_ = nullptr; util::ProactorBase* pb_task_ = nullptr; - ::boost::fibers::mutex replica_of_mu_; + + mutable ::boost::fibers::mutex replicaof_mu_, save_mu_; std::shared_ptr replica_; // protected by replica_of_mu_ std::unique_ptr script_mgr_; - std::atomic last_save_; // in seconds. + + int64_t last_save_; // in seconds. protected by save_mu_. + std::string last_save_file_; // protected by save_mu_. + GlobalState global_state_; time_t start_time_ = 0; // in seconds, epoch time. diff --git a/src/server/transaction.cc b/src/server/transaction.cc index bcefb01..be80ab0 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -452,6 +452,7 @@ void Transaction::RunNoop(EngineShard* shard) { } void Transaction::ScheduleInternal() { + DCHECK(!shard_data_.empty()); DCHECK_EQ(0u, txid_); DCHECK_EQ(0, coordinator_state_ & (COORD_SCHED | COORD_OOO)); @@ -886,6 +887,8 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { // This function should not block since it's run via RunBriefInParallel. pair Transaction::ScheduleInShard(EngineShard* shard) { + DCHECK(!shard_data_.empty()); + // schedule_success, lock_granted. pair result{false, false};