Add skeleton of replication manager and initial support of replicaof command

This commit is contained in:
Roman Gershman 2022-01-29 21:02:31 +02:00
parent cf3d208f81
commit 8a3207f23e
7 changed files with 327 additions and 11 deletions

View File

@ -65,13 +65,13 @@ API 1.0
- [X] DBSIZE
- [X] DEBUG
- [X] EXEC
- [ ] FLUSHALL
- [X] FLUSHALL
- [X] FLUSHDB
- [X] INFO
- [X] MULTI
- [X] SHUTDOWN
- [X] LASTSAVE
- [ ] SLAVEOF/REPLICAOF
- [X] SLAVEOF/REPLICAOF
- [ ] SYNC
- [ ] Set Family
- [ ] SADD

View File

@ -4,7 +4,8 @@ cxx_link(dragonfly base dragonfly_lib)
add_library(dragonfly_lib command_registry.cc common.cc config_flags.cc
conn_context.cc db_slice.cc debugcmd.cc dragonfly_listener.cc
dragonfly_connection.cc engine_shard_set.cc generic_family.cc
list_family.cc main_service.cc memcache_parser.cc rdb_load.cc rdb_save.cc snapshot.cc
list_family.cc main_service.cc memcache_parser.cc rdb_load.cc rdb_save.cc replica.cc
snapshot.cc
redis_parser.cc reply_builder.cc server_family.cc string_family.cc transaction.cc)
cxx_link(dragonfly_lib dfly_core redis_lib uring_fiber_lib

160
server/replica.cc Normal file
View File

@ -0,0 +1,160 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/replica.h"
extern "C" {
#include "redis/rdb.h"
}
#include <absl/strings/escaping.h>
#include <absl/strings/str_cat.h>
#include <absl/strings/strip.h>
#include <boost/asio/ip/tcp.hpp>
#include "base/logging.h"
#include "server/error.h"
#include "server/main_service.h"
#include "server/rdb_load.h"
#include "server/redis_parser.h"
#include "util/proactor_base.h"
namespace dfly {
using namespace std;
using namespace util;
using namespace boost::asio;
namespace {
// TODO: 2. Use time-out on socket-reads so that we would not deadlock on unresponsive master.
// 3. Support ipv6 at some point.
int ResolveDns(std::string_view host, char* dest) {
struct addrinfo hints, *servinfo;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_flags = AI_ALL;
int res = getaddrinfo(host.data(), NULL, &hints, &servinfo);
if (res != 0)
return res;
static_assert(INET_ADDRSTRLEN < INET6_ADDRSTRLEN, "");
res = EAI_FAMILY;
for (addrinfo* p = servinfo; p != NULL; p = p->ai_next) {
if (p->ai_family == AF_INET) {
struct sockaddr_in* ipv4 = (struct sockaddr_in*)p->ai_addr;
const char* inet_res = inet_ntop(p->ai_family, &ipv4->sin_addr, dest, INET6_ADDRSTRLEN);
CHECK_NOTNULL(inet_res);
res = 0;
break;
}
LOG(WARNING) << "Only IPv4 is supported";
}
freeaddrinfo(servinfo);
return res;
}
} // namespace
Replica::Replica(string host, uint16_t port, Service* se)
: service_(*se), host_(std::move(host)), port_(port) {
}
Replica::~Replica() {
if (sync_fb_.joinable())
sync_fb_.join();
if (sock_) {
auto ec = sock_->Close();
LOG_IF(ERROR, ec) << "Error closing replica socket " << ec;
}
}
static const char kConnErr[] = "could not connect to master: ";
bool Replica::Run(ConnectionContext* cntx) {
CHECK(!sock_ && !sock_thread_);
sock_thread_ = ProactorBase::me();
CHECK(sock_thread_);
error_code ec = ConnectSocket();
if (ec) {
cntx->SendError(absl::StrCat(kConnErr, ec.message()));
return false;
}
state_mask_ = R_ENABLED | R_TCP_CONNECTED;
last_io_time_ = sock_thread_->GetMonotonicTimeNs();
cntx->SendOk();
return true;
}
std::error_code Replica::ConnectSocket() {
sock_.reset(sock_thread_->CreateSocket());
char ip_addr[INET6_ADDRSTRLEN];
int resolve_res = ResolveDns(host_, ip_addr);
if (resolve_res != 0) {
LOG(ERROR) << "Dns error " << gai_strerror(resolve_res);
return make_error_code(errc::host_unreachable);
}
auto address = ip::make_address(ip_addr);
ip::tcp::endpoint ep{address, port_};
/* These may help but require additional field testing to learn.
int yes = 1;
CHECK_EQ(0, setsockopt(sock_->native_handle(), IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)));
CHECK_EQ(0, setsockopt(sock_->native_handle(), SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes)));
int intv = 15;
CHECK_EQ(0, setsockopt(sock_->native_handle(), IPPROTO_TCP, TCP_KEEPIDLE, &intv, sizeof(intv)));
intv /= 3;
CHECK_EQ(0, setsockopt(sock_->native_handle(), IPPROTO_TCP, TCP_KEEPINTVL, &intv, sizeof(intv)));
intv = 3;
CHECK_EQ(0, setsockopt(sock_->native_handle(), IPPROTO_TCP, TCP_KEEPCNT, &intv, sizeof(intv)));
*/
return sock_->Connect(ep);
}
void Replica::Stop() {
if (sock_thread_) {
sock_thread_->Await([this] {
state_mask_ = 0; // Specifically ~R_ENABLED.
auto ec = sock_->Shutdown(SHUT_RDWR);
LOG_IF(ERROR, ec) << "Could not shutdown socket " << ec;
});
}
if (sync_fb_.joinable())
sync_fb_.join();
}
// Threadsafe, fiber blocking.
auto Replica::GetInfo() const -> Info {
CHECK(sock_thread_);
return sock_thread_->AwaitBrief([this] {
Info res;
res.host = host_;
res.port = port_;
res.master_link_established = (state_mask_ & R_TCP_CONNECTED);
res.sync_in_progress = (state_mask_ & R_SYNCING);
res.master_last_io_sec = (ProactorBase::GetMonotonicTimeNs() - last_io_time_) / 1000000000UL;
return res;
});
}
} // namespace dfly

78
server/replica.h Normal file
View File

@ -0,0 +1,78 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <boost/fiber/fiber.hpp>
#include <variant>
#include "base/io_buf.h"
#include "core/resp_expr.h"
#include "server/conn_context.h"
#include "util/fiber_socket_base.h"
namespace dfly {
class RedisParser;
class Service;
class Replica {
public:
Replica(std::string master_host, uint16_t port, Service* se);
~Replica();
// Spawns a fiber that runs until link with master is broken or the replication is stopped.
// Returns true if initial link with master has been established or
// false if it has failed.
bool Run(ConnectionContext* cntx);
// Thread-safe
void Stop();
const std::string& master_host() const {
return host_;
}
uint16_t port() {
return port_;
}
struct Info {
std::string host;
uint16_t port;
bool master_link_established;
bool sync_in_progress; // snapshot sync.
time_t master_last_io_sec; // monotonic clock.
};
// Threadsafe, fiber blocking.
Info GetInfo() const;
private:
// The flow is : R_ENABLED -> R_TCP_CONNECTED -> (R_SYNCING) -> R_SYNC_OK.
// SYNCING means that the initial ack succeeded. It may be optional if we can still load from
// the journal offset.
enum State {
R_ENABLED = 1, // Replication mode is enabled. Serves for signaling shutdown.
R_TCP_CONNECTED = 2,
R_SYNCING = 4,
R_SYNC_OK = 8,
};
std::error_code ConnectSocket();
Service& service_;
std::string host_;
uint16_t port_;
::boost::fibers::fiber sync_fb_;
std::unique_ptr<util::LinuxSocketBase> sock_;
// Where the sock_ is handled.
util::ProactorBase* sock_thread_ = nullptr;
unsigned state_mask_ = 0;
uint64_t last_io_time_ = 0; // in ns, monotonic clock.
};
} // namespace dfly

View File

@ -22,6 +22,7 @@ extern "C" {
#include "server/error.h"
#include "server/main_service.h"
#include "server/rdb_save.h"
#include "server/replica.h"
#include "server/server_state.h"
#include "server/transaction.h"
#include "strings/human_readable.h"
@ -79,6 +80,12 @@ void ServerFamily::Init(util::AcceptServer* acceptor) {
void ServerFamily::Shutdown() {
VLOG(1) << "ServerFamily::Shutdown";
pp_.GetNextProactor()->Await([this] {
unique_lock lk(replica_of_mu_);
if (replica_) {
replica_->Stop();
}
});
}
void ServerFamily::DbSize(CmdArgList args, ConnectionContext* cntx) {
@ -146,9 +153,7 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendError(error);
}
absl::Cleanup rev_state = [this] {
global_state_.Clear();
};
absl::Cleanup rev_state = [this] { global_state_.Clear(); };
fs::path dir_path(FLAGS_dir);
error_code ec;
@ -171,6 +176,8 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
return;
}
pp_.Await([](auto*) { ServerState::tlocal()->state = GlobalState::SAVING; });
unique_ptr<::io::WriteFile> wf(*res);
auto start = absl::Now();
@ -193,6 +200,9 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
return;
}
pp_.Await([](auto*) { ServerState::tlocal()->state = GlobalState::IDLE; });
CHECK_EQ(GlobalState::SAVING, global_state_.Clear());
absl::Duration dur = absl::Now() - start;
double seconds = double(absl::ToInt64Milliseconds(dur)) / 1000;
LOG(INFO) << "Saving " << path << " finished after "
@ -260,6 +270,28 @@ tcp_port:)";
absl::StrAppend(&info, "\n# Clients\n");
absl::StrAppend(&info, "connected_clients:", m.conn_stats.num_conns, "\n");
absl::StrAppend(&info, "client_read_buf_capacity:", m.conn_stats.read_buf_capacity, "\n");
absl::StrAppend(&info, "\n# Replication\n");
ServerState& etl = *ServerState::tlocal();
if (etl.is_master) {
absl::StrAppend(&info, "role:master\n");
absl::StrAppend(&info, "connected_slaves:", m.conn_stats.num_replicas, "\n");
} else {
absl::StrAppend(&info, "role:slave\n");
// it's safe to access replica_ because replica_ is created before etl.is_master set to false
// and cleared after etl.is_master is set to true. And since the code here that checks for
// is_master and copies shared_ptr is atomic, it1 should be correct.
auto replica_ptr = replica_;
Replica::Info rinfo = replica_ptr->GetInfo();
absl::StrAppend(&info, "master_host:", rinfo.host, "\n");
absl::StrAppend(&info, "master_port:", rinfo.port, "\n");
const char* link = rinfo.master_link_established ? "up" : "down";
absl::StrAppend(&info, "master_link_status:", link, "\n");
absl::StrAppend(&info, "master_last_io_seconds_ago:", rinfo.master_last_io_sec, "\n");
absl::StrAppend(&info, "master_sync_in_progress:", rinfo.sync_in_progress, "\n");
}
cntx->SendBulkString(info);
}
@ -268,7 +300,19 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
std::string_view port_s = ArgS(args, 2);
if (absl::EqualsIgnoreCase(host, "no") && absl::EqualsIgnoreCase(port_s, "one")) {
LOG(FATAL) << "TBD";
// use this lock as critical section to prevent concurrent replicaof commands running.
unique_lock lk(replica_of_mu_);
// Switch to primary mode.
if (!ServerState::tlocal()->is_master) {
auto repl_ptr = replica_;
CHECK(repl_ptr);
pp_.AwaitFiberOnAll([&](util::ProactorBase* pb) { ServerState::tlocal()->is_master = true; });
replica_->Stop();
replica_.reset();
}
return cntx->SendOk();
}
@ -280,7 +324,33 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
return;
}
cntx->SendOk();
auto new_replica = make_shared<Replica>(string(host), port, &engine_);
unique_lock lk(replica_of_mu_);
if (replica_) {
replica_->Stop(); // NOTE: consider introducing update API flow.
}
replica_.swap(new_replica);
// Flushing all the data after we marked this instance as replica.
Transaction* transaction = cntx->transaction;
transaction->Schedule();
auto cb = [](Transaction* t, EngineShard* shard) {
shard->db_slice().FlushDb(DbSlice::kDbAll);
return OpStatus::OK;
};
transaction->Execute(std::move(cb), true);
// Replica sends response in either case. No need to send response in this function.
// It's a bit confusing but simpler.
if (!replica_->Run(cntx)) {
replica_.reset();
}
bool is_master = !replica_;
pp_.AwaitFiberOnAll(
[&](util::ProactorBase* pb) { ServerState::tlocal()->is_master = is_master; });
}
void ServerFamily::Sync(CmdArgList args, ConnectionContext* cntx) {
@ -302,8 +372,8 @@ void ServerFamily::_Shutdown(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::SyncGeneric(std::string_view repl_master_id, uint64_t offs,
ConnectionContext* cntx) {
if (cntx->conn_state.mask & ConnectionState::ASYNC_DISPATCH) {
// we can not sync if there are commands following on the socket because our reply to sync is
// streaming response.
// SYNC is a special command that should not be sent in batch with other commands.
// It should be the last command since afterwards the server just dumps the replication data.
cntx->SendError("Can not sync in pipeline mode");
return;
}

View File

@ -4,7 +4,6 @@
#pragma once
#include "server/common_types.h"
#include "server/engine_shard_set.h"
#include "server/global_state.h"
#include "util/proactor_pool.h"
@ -18,6 +17,7 @@ namespace dfly {
class ConnectionContext;
class CommandRegistry;
class Service;
class Replica;
struct Metrics {
DbStats db;
@ -67,6 +67,9 @@ class ServerFamily {
EngineShardSet& ess_;
util::AcceptServer* acceptor_ = nullptr;
::boost::fibers::mutex replica_of_mu_;
std::shared_ptr<Replica> replica_; // protected by replica_of_mu_
std::atomic<int64_t> last_save_; // in seconds.
GlobalState global_state_;
};

View File

@ -7,6 +7,7 @@
#include <vector>
#include "server/common_types.h"
#include "server/global_state.h"
namespace dfly {
@ -31,6 +32,9 @@ class ServerState { // public struct - to allow initialization.
ServerState();
~ServerState();
GlobalState::S state = GlobalState::IDLE;
bool is_master = true;
ConnectionStats connection_stats;
void TxCountInc() {