Add skeleton for replica/sync commands

This commit is contained in:
Roman Gershman 2022-01-29 13:21:22 +02:00
parent b56408b51a
commit cf3d208f81
3 changed files with 61 additions and 7 deletions

View File

@ -1,4 +1,4 @@
// Copyright 2021, Roman Gershman. All rights reserved. // Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms. // See LICENSE for licensing terms.
// //
@ -7,9 +7,9 @@
#include "base/varz_value.h" #include "base/varz_value.h"
#include "server/command_registry.h" #include "server/command_registry.h"
#include "server/engine_shard_set.h" #include "server/engine_shard_set.h"
#include "util/http/http_handler.h"
#include "server/memcache_parser.h" #include "server/memcache_parser.h"
#include "server/server_family.h" #include "server/server_family.h"
#include "util/http/http_handler.h"
namespace util { namespace util {
class AcceptServer; class AcceptServer;
@ -57,16 +57,17 @@ class Service {
return pp_; return pp_;
} }
util::HttpListenerBase* http_listener() { return http_listener_; } util::HttpListenerBase* http_listener() {
return http_listener_;
}
private: private:
static void Quit(CmdArgList args, ConnectionContext* cntx); static void Quit(CmdArgList args, ConnectionContext* cntx);
void Exec(CmdArgList args, ConnectionContext* cntx);
static void Multi(CmdArgList args, ConnectionContext* cntx); static void Multi(CmdArgList args, ConnectionContext* cntx);
void Exec(CmdArgList args, ConnectionContext* cntx);
void RegisterCommands(); void RegisterCommands();
base::VarzValue::Map GetVarzStats(); base::VarzValue::Map GetVarzStats();
util::ProactorPool& pp_; util::ProactorPool& pp_;

View File

@ -249,6 +249,8 @@ tcp_port:)";
absl::StrAppend(&info, "used_memory_human:", absl::StrAppend(&info, "used_memory_human:",
strings::HumanReadableNumBytes(m.db.table_mem_usage + m.db.obj_memory_usage), strings::HumanReadableNumBytes(m.db.table_mem_usage + m.db.obj_memory_usage),
"\n"); "\n");
absl::StrAppend(&info, "num_entries:", m.db.key_count, "\n");
absl::StrAppend(&info, "inline_keys:", m.db.inline_keys, "\n");
absl::StrAppend(&info, "\n# Stats\n"); absl::StrAppend(&info, "\n# Stats\n");
absl::StrAppend(&info, "total_commands_processed:", m.conn_stats.command_cnt, "\n"); absl::StrAppend(&info, "total_commands_processed:", m.conn_stats.command_cnt, "\n");
@ -261,6 +263,33 @@ tcp_port:)";
cntx->SendBulkString(info); cntx->SendBulkString(info);
} }
void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
std::string_view host = ArgS(args, 1);
std::string_view port_s = ArgS(args, 2);
if (absl::EqualsIgnoreCase(host, "no") && absl::EqualsIgnoreCase(port_s, "one")) {
LOG(FATAL) << "TBD";
return cntx->SendOk();
}
uint32_t port;
if (!absl::SimpleAtoi(port_s, &port) || port < 1 || port > 65535) {
cntx->SendError(kInvalidIntErr);
return;
}
cntx->SendOk();
}
void ServerFamily::Sync(CmdArgList args, ConnectionContext* cntx) {
SyncGeneric("", 0, cntx);
}
void ServerFamily::Psync(CmdArgList args, ConnectionContext* cntx) {
SyncGeneric("?", 0, cntx); // full sync, ignore the request.
}
void ServerFamily::LastSave(CmdArgList args, ConnectionContext* cntx) { void ServerFamily::LastSave(CmdArgList args, ConnectionContext* cntx) {
cntx->SendLong(last_save_.load(memory_order_relaxed)); cntx->SendLong(last_save_.load(memory_order_relaxed));
} }
@ -270,9 +299,22 @@ void ServerFamily::_Shutdown(CmdArgList args, ConnectionContext* cntx) {
cntx->SendOk(); cntx->SendOk();
} }
void ServerFamily::SyncGeneric(std::string_view repl_master_id, uint64_t offs,
ConnectionContext* cntx) {
if (cntx->conn_state.mask & ConnectionState::ASYNC_DISPATCH) {
// we can not sync if there are commands following on the socket because our reply to sync is
// streaming response.
cntx->SendError("Can not sync in pipeline mode");
return;
}
// TBD.
}
#define HFUNC(x) SetHandler(HandlerFunc(this, &ServerFamily::x)) #define HFUNC(x) SetHandler(HandlerFunc(this, &ServerFamily::x))
void ServerFamily::Register(CommandRegistry* registry) { void ServerFamily::Register(CommandRegistry* registry) {
constexpr auto kReplicaOpts = CO::ADMIN | CO::STALE | CO::GLOBAL_TRANS;
*registry << CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize) *registry << CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize)
<< CI{"DEBUG", CO::RANDOM | CO::READONLY, -2, 0, 0, 0}.HFUNC(Debug) << CI{"DEBUG", CO::RANDOM | CO::READONLY, -2, 0, 0, 0}.HFUNC(Debug)
<< CI{"FLUSHDB", CO::WRITE | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(FlushDb) << CI{"FLUSHDB", CO::WRITE | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(FlushDb)
@ -282,7 +324,11 @@ void ServerFamily::Register(CommandRegistry* registry) {
LastSave) LastSave)
<< CI{"SAVE", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Save) << CI{"SAVE", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Save)
<< CI{"SHUTDOWN", CO::ADMIN | CO::NOSCRIPT | CO::LOADING | CO::STALE, 1, 0, 0, 0}.HFUNC( << CI{"SHUTDOWN", CO::ADMIN | CO::NOSCRIPT | CO::LOADING | CO::STALE, 1, 0, 0, 0}.HFUNC(
_Shutdown); _Shutdown)
<< CI{"SLAVEOF", kReplicaOpts, 3, 0, 0, 0}.HFUNC(ReplicaOf)
<< CI{"REPLICAOF", kReplicaOpts, 3, 0, 0, 0}.HFUNC(ReplicaOf)
<< CI{"SYNC", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Sync)
<< CI{"PSYNC", CO::ADMIN | CO::GLOBAL_TRANS, 3, 0, 0, 0}.HFUNC(Psync);
} }
} // namespace dfly } // namespace dfly

View File

@ -52,9 +52,16 @@ class ServerFamily {
void FlushAll(CmdArgList args, ConnectionContext* cntx); void FlushAll(CmdArgList args, ConnectionContext* cntx);
void Save(CmdArgList args, ConnectionContext* cntx); void Save(CmdArgList args, ConnectionContext* cntx);
void Info(CmdArgList args, ConnectionContext* cntx); void Info(CmdArgList args, ConnectionContext* cntx);
void ReplicaOf(CmdArgList args, ConnectionContext* cntx);
void Sync(CmdArgList args, ConnectionContext* cntx);
void Psync(CmdArgList args, ConnectionContext* cntx);
void LastSave(CmdArgList args, ConnectionContext* cntx); void LastSave(CmdArgList args, ConnectionContext* cntx);
void _Shutdown(CmdArgList args, ConnectionContext* cntx); void _Shutdown(CmdArgList args, ConnectionContext* cntx);
void SyncGeneric(std::string_view repl_master_id, uint64_t offs, ConnectionContext* cntx);
Service& engine_; Service& engine_;
util::ProactorPool& pp_; util::ProactorPool& pp_;
EngineShardSet& ess_; EngineShardSet& ess_;