Few improvements.

1. Stabilized naming scheme for snapshot files.
   The behavior is determined by dbfilename flag.
   By default it's 'dump'. Dragonfly checks if the flag has an extension
   if not, it automatically saves timestamped files (dump*.rdb) and loads the latest file that is available.
   In case the flag has extension like 'dump.rdb' it fallbacks to redis behavior of loading and saving to the
   same file.

2. Updated the logo url.
This commit is contained in:
Roman Gershman 2022-05-19 15:50:42 +03:00
parent 9eb13b5163
commit 439070d00d
10 changed files with 165 additions and 44 deletions

View File

@ -34,7 +34,7 @@ template <unsigned NUM_SLOTS> class SlotBitmap {
// probe=true GetProbe returns index of probing entries, i.e. hosted but not owned by this bucket.
// probe=false - mask of owning entries
uint32_t GetProbe(bool probe) const {
if (SINGLE)
if constexpr (SINGLE)
return ((val_[0].d >> 4) & kAllocMask) ^ ((!probe) * kAllocMask);
return (val_[1].d & kAllocMask) ^ ((!probe) * kAllocMask);
}

View File

@ -122,7 +122,7 @@ Listener::Listener(Protocol protocol, ServiceInterface* e) : service_(e), protoc
ctx_ = CreateSslCntx();
}
http_base_.reset(new HttpListener<>);
http_base_->set_resource_prefix("https://romange.s3.eu-west-1.amazonaws.com/static");
http_base_->enable_metrics();
}

View File

@ -5,10 +5,12 @@
#include "server/common.h"
#include <absl/strings/str_cat.h>
#include <mimalloc.h>
extern "C" {
#include "redis/object.h"
#include "redis/rdb.h"
#include "redis/zmalloc.h"
}
#include "base/logging.h"
@ -27,6 +29,11 @@ unsigned kernel_version = 0;
size_t max_memory_limit = 0;
ServerState::ServerState() {
CHECK(mi_heap_get_backing() == mi_heap_get_default());
mi_heap_t* tlh = mi_heap_new();
init_zmalloc_threadlocal(tlh);
data_heap_ = tlh;
}
ServerState::~ServerState() {
@ -144,8 +151,9 @@ bool ParseHumanReadableBytes(std::string_view str, int64_t* num_bytes) {
return false;
}
d *= scale;
if (d > kint64max || d < 0)
if (int64_t(d) > kint64max || d < 0)
return false;
*num_bytes = static_cast<int64>(d + 0.5);
if (neg) {
*num_bytes = -*num_bytes;

View File

@ -13,10 +13,10 @@
#include "server/error.h"
#include "server/main_service.h"
#include "server/rdb_load.h"
#include "server/server_state.h"
#include "server/string_family.h"
#include "server/transaction.h"
#include "util/uring/uring_fiber_algo.h"
#include "util/uring/uring_file.h"
DECLARE_string(dir);
DECLARE_string(dbfilename);
@ -150,13 +150,23 @@ void DebugCmd::Reload(CmdArgList args) {
}
}
string last_save_file = sf_.LastSaveFile();
Load(last_save_file);
}
void DebugCmd::Load(std::string_view filename) {
EngineShardSet& ess = *shard_set;
auto [current, switched] = sf_.global_state()->Next(GlobalState::LOADING);
if (!switched) {
LOG(WARNING) << GlobalState::Name(current) << " in progress, ignored";
return;
}
ProactorPool& pp = sf_.service().proactor_pool();
pp.Await([&](ProactorBase*) {
CHECK(ServerState::tlocal()->gstate() == GlobalState::IDLE);
ServerState::tlocal()->set_gstate(GlobalState::LOADING);
});
const CommandId* cid = sf_.service().FindCmd("FLUSHALL");
intrusive_ptr<Transaction> flush_trans(new Transaction{cid});
flush_trans->InitByArgs(0, {});
@ -174,24 +184,14 @@ void DebugCmd::Load(std::string_view filename) {
dir_path.append(filename);
path = dir_path;
}
auto res = uring::OpenRead(path.generic_string());
if (!res) {
(*cntx_)->SendError(res.error().message());
return;
}
VLOG(1) << "Performing load";
io::FileSource fs(*res);
RdbLoader loader(&ess, sf_.script_mgr());
ec = loader.Load(&fs);
// switches back to
ec = sf_.LoadRdb(path.generic_string());
if (ec) {
(*cntx_)->SendError(ec.message());
} else {
(*cntx_)->SendOk();
return (*cntx_)->SendError(ec.message());
}
(*cntx_)->SendOk();
}
void DebugCmd::Populate(CmdArgList args) {

View File

@ -11,6 +11,7 @@ extern "C" {
#include "base/logging.h"
#include "server/blocking_controller.h"
#include "server/server_state.h"
#include "server/tiered_storage.h"
#include "server/transaction.h"
#include "util/fiber_sched_algo.h"
@ -81,16 +82,13 @@ void EngineShard::Shutdown() {
void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
CHECK(shard_ == nullptr) << pb->GetIndex();
CHECK(mi_heap_get_backing() == mi_heap_get_default());
mi_heap_t* tlh = mi_heap_new();
init_zmalloc_threadlocal(tlh);
void* ptr = mi_heap_malloc_aligned(tlh, sizeof(EngineShard), alignof(EngineShard));
shard_ = new (ptr) EngineShard(pb, update_db_time, tlh);
mi_heap_t* data_heap = ServerState::tlocal()->data_heap();
void* ptr = mi_heap_malloc_aligned(data_heap, sizeof(EngineShard), alignof(EngineShard));
shard_ = new (ptr) EngineShard(pb, update_db_time, data_heap);
CompactObj::InitThreadLocal(shard_->memory_resource());
SmallString::InitThreadLocal(tlh);
SmallString::InitThreadLocal(data_heap);
if (!FLAGS_backing_prefix.empty()) {
string fn =

View File

@ -319,13 +319,13 @@ void Service::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_i
const InitOpts& opts) {
InitRedisTables();
uint32_t shard_num = pp_.size() > 1 ? pp_.size() - 1 : pp_.size();
shard_set->Init(shard_num, !opts.disable_time_update);
pp_.AwaitFiberOnAll([&](uint32_t index, ProactorBase* pb) {
ServerState::tlocal()->Init();
});
uint32_t shard_num = pp_.size() > 1 ? pp_.size() - 1 : pp_.size();
shard_set->Init(shard_num, !opts.disable_time_update);
request_latency_usec.Init(&pp_);
StringFamily::Init(&pp_);
GenericFamily::Init(&pp_);

View File

@ -19,6 +19,7 @@ extern "C" {
#include "base/logging.h"
#include "facade/dragonfly_connection.h"
#include "io/file_util.h"
#include "io/proc_reader.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
@ -26,6 +27,7 @@ extern "C" {
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/main_service.h"
#include "server/rdb_load.h"
#include "server/rdb_save.h"
#include "server/replica.h"
#include "server/script_mgr.h"
@ -37,7 +39,7 @@ extern "C" {
#include "util/uring/uring_file.h"
DEFINE_string(dir, "", "working directory");
DEFINE_string(dbfilename, "", "the filename to save/load the DB");
DEFINE_string(dbfilename, "dump", "the filename to save/load the DB");
DEFINE_string(requirepass, "", "password for AUTH authentication");
DECLARE_uint32(port);
@ -77,11 +79,37 @@ error_code CreateDirs(fs::path dir_path) {
return ec;
}
string UnknowSubCmd(string_view subcmd, string cmd) {
string UnknownSubCmd(string_view subcmd, string cmd) {
return absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd, "'. Try ",
cmd, " HELP.");
}
string InferLoadFile(fs::path data_dir) {
if (FLAGS_dbfilename.empty())
return string{};
fs::path fl_path = data_dir.append(FLAGS_dbfilename);
if (fs::exists(fl_path))
return fl_path.generic_string();
if (!fl_path.has_extension()) {
string glob = fl_path.generic_string();
glob.append("*.rdb");
io::Result<io::StatShortVec> short_vec = io::StatFiles(glob);
if (short_vec) {
if (!short_vec->empty()) {
return short_vec->back().name;
}
} else {
LOG(WARNING) << "Could not stat " << glob << ", error " << short_vec.error().message();
}
LOG(INFO) << "Checking " << fl_path;
}
return string{};
}
} // namespace
ServerFamily::ServerFamily(Service* service) : service_(*service) {
@ -114,22 +142,29 @@ void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* m
task_10ms_ = pb_task_->AwaitBrief([&] { return pb_task_->AddPeriodic(10, cache_cb); });
fs::path data_path = fs::current_path();
fs::path data_folder = fs::current_path();
if (!FLAGS_dir.empty()) {
data_path = FLAGS_dir;
data_folder = FLAGS_dir;
error_code ec;
data_path = fs::canonical(data_path, ec);
data_folder = fs::canonical(data_folder, ec);
}
LOG(INFO) << "Data directory is " << data_path;
LOG(INFO) << "Data directory is " << data_folder;
string load_path = InferLoadFile(data_folder);
if (!load_path.empty()) {
Load(load_path);
}
}
void ServerFamily::Shutdown() {
VLOG(1) << "ServerFamily::Shutdown";
if (load_fiber_.joinable())
load_fiber_.join();
pb_task_->Await([this] {
pb_task_->CancelPeriodic(task_10ms_);
task_10ms_ = 0;
@ -141,6 +176,63 @@ void ServerFamily::Shutdown() {
});
}
void ServerFamily::Load(const std::string& load_path) {
CHECK(!load_fiber_.get_id());
error_code ec;
auto path = fs::canonical(load_path, ec);
if (ec) {
LOG(ERROR) << "Error loading " << load_path << " " << ec.message();
return;
}
LOG(INFO) << "Loading " << load_path;
auto [current, switched] = global_state()->Next(GlobalState::LOADING);
if (!switched) {
LOG(WARNING) << GlobalState::Name(current) << " in progress, ignored";
return;
}
auto& pool = service_.proactor_pool();
// Deliberitely run on all I/O threads to update the state for non-shard threads as well.
pool.Await([&](ProactorBase*) {
// TODO: There can be a bug where status is different.
CHECK(ServerState::tlocal()->gstate() == GlobalState::IDLE);
ServerState::tlocal()->set_gstate(GlobalState::LOADING);
});
// Choose thread that does not handle shards if possible.
// This will balance out the CPU during the load.
ProactorBase* proactor =
shard_count() < pool.size() ? pool.at(shard_count()) : pool.GetNextProactor();
load_fiber_ = proactor->LaunchFiber([load_path, this] {
auto ec = LoadRdb(load_path);
LOG_IF(ERROR, ec) << "Error loading file " << ec.message();
});
}
error_code ServerFamily::LoadRdb(const std::string& rdb_file) {
io::ReadonlyFileOrError res = uring::OpenRead(rdb_file);
error_code ec;
if (res) {
io::FileSource fs(*res);
RdbLoader loader(shard_set, script_mgr());
ec = loader.Load(&fs);
} else {
ec = res.error();
}
auto& pool = service_.proactor_pool();
pool.Await([](auto*) { ServerState::tlocal()->set_gstate(GlobalState::IDLE); });
global_state()->Clear();
return ec;
}
void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext* cntx) {
if (!section.empty()) {
return cntx->reply_builder()->SendError("");
@ -188,8 +280,6 @@ void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext*
}
error_code ServerFamily::DoSave(Transaction* trans, string* err_details) {
static unsigned fl_index = 1;
auto [current, switched] = global_state_.Next(GlobalState::SAVING);
if (!switched) {
*err_details = StrCat(GlobalState::Name(current), " - can not save database");
@ -209,10 +299,16 @@ error_code ServerFamily::DoSave(Transaction* trans, string* err_details) {
}
}
string filename = FLAGS_dbfilename.empty() ? "dump_save.rdb" : FLAGS_dbfilename;
fs::path filename = FLAGS_dbfilename.empty() ? "dump" : FLAGS_dbfilename;
fs::path path = dir_path;
path.append(filename);
path.concat(StrCat("_", fl_index++));
if (!filename.has_extension()) {
absl::Time now = absl::Now();
string ft_time = absl::FormatTime("-%Y-%m-%dT%H:%M:%S", now, absl::UTCTimeZone());
filename += StrCat(ft_time, ".rdb");
}
path += filename;
VLOG(1) << "Saving to " << path;
auto res = uring::OpenWrite(path.generic_string());
@ -370,7 +466,7 @@ void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) {
}
LOG_FIRST_N(ERROR, 10) << "Subcommand " << sub_cmd << " not supported";
return (*cntx)->SendError(UnknowSubCmd(sub_cmd, "CLIENT"), kSyntaxErr);
return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "CLIENT"), kSyntaxErr);
}
void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
@ -394,7 +490,7 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
});
return (*cntx)->SendOk();
} else {
return (*cntx)->SendError(UnknowSubCmd(sub_cmd, "CONFIG"), kSyntaxErr);
return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "CONFIG"), kSyntaxErr);
}
}

View File

@ -69,6 +69,8 @@ class ServerFamily {
std::string LastSaveFile() const;
std::error_code LoadRdb(const std::string& rdb_file);
private:
uint32_t shard_count() const {
return shard_set->size();
@ -97,6 +99,11 @@ class ServerFamily {
void SyncGeneric(std::string_view repl_master_id, uint64_t offs, ConnectionContext* cntx);
void Load(const std::string& file_name);
boost::fibers::fiber load_fiber_;
uint32_t task_10ms_ = 0;
Service& service_;

View File

@ -12,6 +12,8 @@
#include "server/global_state.h"
#include "util/sliding_counter.h"
typedef struct mi_heap_s mi_heap_t;
namespace dfly {
// Present in every server thread. This class differs from EngineShard. The latter manages
@ -72,8 +74,15 @@ class ServerState { // public struct - to allow initialization.
qps_.Inc();
}
// data heap used by zmalloc and shards.
mi_heap_t* data_heap() {
return data_heap_;
}
private:
int64_t live_transactions_ = 0;
mi_heap_t* data_heap_;
std::optional<Interpreter> interpreter_;
GlobalState::S gstate_ = GlobalState::IDLE;

View File

@ -17,6 +17,8 @@ extern "C" {
#include "facade/dragonfly_connection.h"
#include "util/uring/uring_pool.h"
DECLARE_string(dbfilename);
namespace dfly {
using MP = MemcacheParser;
using namespace std;
@ -50,6 +52,7 @@ BaseFamilyTest::~BaseFamilyTest() {
}
void BaseFamilyTest::SetUpTestSuite() {
FLAGS_dbfilename = "";
init_zmalloc_threadlocal(mi_heap_get_backing());
}