Implement parsing of replication stream from redis

This commit is contained in:
Roman Gershman 2022-05-28 21:24:07 +03:00
parent 46929e9c52
commit 1490eb5c3c
7 changed files with 337 additions and 65 deletions

View File

@ -21,9 +21,9 @@ using facade::kDbIndOutOfRangeErr;
#define RETURN_ON_ERR(x) \
do { \
auto ec = (x); \
if (ec) \
return ec; \
auto __ec = (x); \
if (__ec) \
return __ec; \
} while (0)
#endif // RETURN_ON_ERR

View File

@ -190,9 +190,9 @@ struct RdbLoader::ObjSettings {
ObjSettings() = default;
};
RdbLoader::RdbLoader(EngineShardSet* ess, ScriptMgr* script_mgr)
: script_mgr_(script_mgr), ess_(*ess), mem_buf_{16_KB} {
shard_buf_.reset(new ItemsBuf[ess_.size()]);
RdbLoader::RdbLoader(ScriptMgr* script_mgr)
: script_mgr_(script_mgr), mem_buf_{16_KB} {
shard_buf_.reset(new ItemsBuf[shard_set->size()]);
}
RdbLoader::~RdbLoader() {
@ -309,12 +309,12 @@ error_code RdbLoader::Load(io::Source* src) {
}
VLOG(1) << "Select DB: " << dbid;
for (unsigned i = 0; i < ess_.size(); ++i) {
for (unsigned i = 0; i < shard_set->size(); ++i) {
// we should flush pending items before switching dbid.
FlushShardAsync(i);
// Active database if not existed before.
ess_.Add(i, [dbid] { EngineShard::tlocal()->db_slice().ActivateDb(dbid); });
shard_set->Add(i, [dbid] { EngineShard::tlocal()->db_slice().ActivateDb(dbid); });
}
cur_db_index_ = dbid;
@ -354,13 +354,13 @@ error_code RdbLoader::Load(io::Source* src) {
/* Verify the checksum if RDB version is >= 5 */
RETURN_ON_ERR(VerifyChecksum());
fibers_ext::BlockingCounter bc(ess_.size());
for (unsigned i = 0; i < ess_.size(); ++i) {
fibers_ext::BlockingCounter bc(shard_set->size());
for (unsigned i = 0; i < shard_set->size(); ++i) {
// Flush the remaining items.
FlushShardAsync(i);
// Send sentinel callbacks to ensure that all previous messages have been processed.
ess_.Add(i, [bc]() mutable { bc.Dec(); });
shard_set->Add(i, [bc]() mutable { bc.Dec(); });
}
bc.Wait(); // wait for sentinels to report.
@ -378,6 +378,12 @@ error_code RdbLoader::EnsureReadInternal(size_t min_sz) {
auto out_buf = mem_buf_.AppendBuffer();
CHECK_GT(out_buf.size(), min_sz);
// If limit was applied we do not want to read more than needed
// important when reading from sockets.
if (bytes_read_ + out_buf.size() > source_limit_) {
out_buf = out_buf.subspan(0, source_limit_ - bytes_read_);
}
io::Result<size_t> res = src_->ReadAtLeast(out_buf, min_sz);
if (!res)
return res.error();
@ -386,6 +392,8 @@ error_code RdbLoader::EnsureReadInternal(size_t min_sz) {
return RdbError(errc::rdb_file_corrupted);
bytes_read_ += *res;
DCHECK_LE(bytes_read_, source_limit_);
mem_buf_.CommitWrite(*res);
return kOk;
@ -450,6 +458,12 @@ std::error_code RdbLoader::FetchBuf(size_t size, void* dest) {
next += to_copy;
if (size + bytes_read_ > source_limit_) {
LOG(ERROR) << "Out of bound read " << size + bytes_read_ << " vs " << source_limit_;
return RdbError(errc::rdb_file_corrupted);
}
if (size > 512) { // Worth reading directly into next.
io::MutableBytes mb{next, size};
@ -458,6 +472,7 @@ std::error_code RdbLoader::FetchBuf(size_t size, void* dest) {
return RdbError(errc::rdb_file_corrupted);
bytes_read_ += bytes_read;
DCHECK_LE(bytes_read_, source_limit_);
return kOk;
}
@ -467,12 +482,18 @@ std::error_code RdbLoader::FetchBuf(size_t size, void* dest) {
// Must be because mem_buf_ is be empty.
DCHECK_GT(mb.size(), size);
if (bytes_read_ + mb.size() > source_limit_) {
mb = mb.subspan(0, source_limit_ - bytes_read_);
}
SET_OR_RETURN(src_->ReadAtLeast(mb, size), bytes_read);
if (bytes_read < size)
return RdbError(errc::rdb_file_corrupted);
bytes_read_ += bytes_read;
DCHECK_LE(bytes_read_, source_limit_);
mem_buf_.CommitWrite(bytes_read);
::memcpy(next, mem_buf_.InputBuffer().data(), size);
mem_buf_.ConsumeInput(size);
@ -571,7 +592,7 @@ void RdbLoader::FlushShardAsync(ShardId sid) {
return;
auto cb = [indx = this->cur_db_index_, vec = std::move(out_buf)] { LoadItemsBuffer(indx, vec); };
ess_.Add(sid, std::move(cb));
shard_set->Add(sid, std::move(cb));
}
void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
@ -1273,7 +1294,7 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
std::move(key_cleanup).Cancel();
std::string_view str_key(key, sdslen(key));
ShardId sid = Shard(str_key, ess_.size());
ShardId sid = Shard(str_key, shard_set->size());
uint64_t expire_at_ms = settings->expiretime;
auto& out_buf = shard_buf_[sid];

View File

@ -1,4 +1,4 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include <system_error>
@ -19,11 +19,15 @@ class ScriptMgr;
class RdbLoader {
public:
explicit RdbLoader(EngineShardSet* ess, ScriptMgr* script_mgr);
explicit RdbLoader(ScriptMgr* script_mgr);
~RdbLoader();
std::error_code Load(::io::Source* src);
void set_source_limit(size_t n) { source_limit_ = n;}
::io::Bytes Leftover() const { return mem_buf_.InputBuffer(); }
size_t bytes_read() const { return bytes_read_; }
private:
using MutableBytes = ::io::MutableBytes;
@ -84,13 +88,13 @@ class RdbLoader {
static void LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib);
ScriptMgr* script_mgr_;
EngineShardSet& ess_;
base::IoBuf mem_buf_;
base::PODArray<uint8_t> compr_buf_;
std::unique_ptr<ItemsBuf[]> shard_buf_;
::io::Source* src_ = nullptr;
size_t bytes_read_ = 0;
size_t source_limit_ = SIZE_MAX;
DbIndex cur_db_index_ = 0;
};

View File

@ -73,14 +73,14 @@ TEST_F(RdbTest, Crc) {
TEST_F(RdbTest, LoadEmpty) {
io::FileSource fs = GetSource("empty.rdb");
RdbLoader loader(shard_set, NULL);
RdbLoader loader(NULL);
auto ec = loader.Load(&fs);
CHECK(!ec);
}
TEST_F(RdbTest, LoadSmall6) {
io::FileSource fs = GetSource("redis6_small.rdb");
RdbLoader loader(shard_set, service_->script_mgr());
RdbLoader loader(service_->script_mgr());
auto ec = loader.Load(&fs);
ASSERT_FALSE(ec) << ec.message();

View File

@ -26,6 +26,7 @@ using namespace std;
using namespace util;
using namespace boost::asio;
using namespace facade;
using absl::StrCat;
namespace this_fiber = ::boost::this_fiber;
namespace {
@ -75,6 +76,8 @@ error_code Recv(FiberSocketBase* input, base::IoBuf* dest) {
return error_code{};
}
constexpr unsigned kRdbEofMarkSize = 40;
// TODO: to remove usages of this macro and make code crash-less.
#define CHECK_EC(x) \
do { \
@ -108,13 +111,19 @@ bool Replica::Run(ConnectionContext* cntx) {
error_code ec = ConnectSocket();
if (ec) {
(*cntx)->SendError(absl::StrCat(kConnErr, ec.message()));
(*cntx)->SendError(StrCat(kConnErr, ec.message()));
return false;
}
state_mask_ = R_ENABLED | R_TCP_CONNECTED;
last_io_time_ = sock_thread_->GetMonotonicTimeNs();
sync_fb_ = ::boost::fibers::fiber(&Replica::ConnectFb, this);
ec = Greet();
if (ec) {
(*cntx)->SendError(StrCat("could not greet master ", ec.message()));
return false;
}
sync_fb_ = ::boost::fibers::fiber(&Replica::ReplicateFb, this);
(*cntx)->SendOk();
return true;
@ -164,9 +173,8 @@ void Replica::Stop() {
sync_fb_.join();
}
void Replica::ConnectFb() {
void Replica::ReplicateFb() {
error_code ec;
constexpr unsigned kOnline = R_SYNC_OK | R_TCP_CONNECTED;
while (state_mask_ & R_ENABLED) {
if ((state_mask_ & R_TCP_CONNECTED) == 0) {
@ -180,33 +188,42 @@ void Replica::ConnectFb() {
state_mask_ |= R_TCP_CONNECTED;
}
if ((state_mask_ & kOnline) == R_TCP_CONNECTED) { // lacks great_ok
ec = GreatAndSync();
if ((state_mask_ & R_GREETED) == 0) {
ec = Greet();
if (ec) {
LOG(INFO) << "Error greating " << ec;
state_mask_ &= ~kOnline;
LOG(INFO) << "Error greeting " << ec;
state_mask_ &= ~R_TCP_CONNECTED;
continue;
}
VLOG(1) << "Replica great ok";
}
if ((state_mask_ & kOnline) == kOnline) {
// There is a data race condition in Redis-master code, where "ACK 0" handler may be
// triggerred
// before Redis is ready to transition to the streaming state and it silenty ignores "ACK
// 0". We reduce the chance it happens with this delay.
this_fiber::sleep_for(50ms);
ec = ConsumeRedisStream();
LOG_IF(ERROR, !FiberSocketBase::IsConnClosed(ec)) << "Replica socket error " << ec;
state_mask_ &= ~kOnline;
if ((state_mask_ & R_SYNC_OK) == 0) { // has not synced
ec = InitiatePSync();
if (ec) {
LOG(WARNING) << "Error syncing " << ec << " " << ec.message();
state_mask_ &= R_ENABLED; // reset
continue;
}
VLOG(1) << "Replica greet ok";
}
// There is a data race condition in Redis-master code, where "ACK 0" handler may be
// triggerred
// before Redis is ready to transition to the streaming state and it silenty ignores "ACK
// 0". We reduce the chance it happens with this delay.
this_fiber::sleep_for(50ms);
// Start consuming the replication stream.
ec = ConsumeRedisStream();
LOG_IF(ERROR, !FiberSocketBase::IsConnClosed(ec)) << "Replica socket error " << ec;
state_mask_ &= ~R_SYNC_OK; //
}
VLOG(1) << "Replication fiber finished";
}
error_code Replica::GreatAndSync() {
error_code Replica::Greet() {
base::IoBuf io_buf{128};
ReqSerializer serializer{sock_.get()};
@ -214,6 +231,7 @@ error_code Replica::GreatAndSync() {
RedisParser parser{false}; // client mode
RespVec args;
// Corresponds to server.repl_state == REPL_STATE_CONNECTING state in redis
serializer.SendCommand("PING"); // optional.
RETURN_ON_ERR(serializer.ec());
RETURN_ON_ERR(Recv(sock_.get(), &io_buf));
@ -221,24 +239,55 @@ error_code Replica::GreatAndSync() {
uint32_t consumed = 0;
RedisParser::Result result = parser.Parse(io_buf.InputBuffer(), &consumed, &args);
CHECK_EQ(result, RedisParser::OK);
CHECK(!args.empty() && args.front().type == RespExpr::STRING);
auto check_respok = [&] {
return result == RedisParser::OK && !args.empty() && args.front().type == RespExpr::STRING;
};
auto reply_err = [&]() -> string {
if (result != RedisParser::OK)
return StrCat("parse_err: ", result);
if (args.empty())
return "none";
return RespExpr::TypeName(args.front().type);
};
if (!check_respok()) {
LOG(WARNING) << "Bad reply from the server " << reply_err();
return make_error_code(errc::bad_message);
}
string_view pong = ToSV(args.front().GetBuf());
VLOG(1) << "Master ping reply " << pong;
// TODO: to check nauth, permission denied etc responses.
VLOG(1) << "Master ping reply " << ToSV(args.front().GetBuf());
if (pong != "PONG") {
LOG(ERROR) << "Unsupported reply " << pong;
return make_error_code(errc::operation_not_permitted);
}
io_buf.ConsumeInput(consumed);
// TODO: we may also send REPLCONF listening-port, ip-address
// See server.repl_state == REPL_STATE_SEND_PORT condition in replication.c
// Corresponds to server.repl_state == REPL_STATE_SEND_CAPA
serializer.SendCommand("REPLCONF capa eof capa psync2");
RETURN_ON_ERR(serializer.ec());
RETURN_ON_ERR(Recv(sock_.get(), &io_buf));
result = parser.Parse(io_buf.InputBuffer(), &consumed, &args);
CHECK_EQ(result, RedisParser::OK);
CHECK(!args.empty() && args.front().type == RespExpr::STRING);
if (!check_respok()) {
LOG(WARNING) << "Bad reply from the server " << reply_err();
return make_error_code(errc::bad_message);
}
pong = ToSV(args.front().GetBuf());
VLOG(1) << "Master REPLCONF reply " << pong;
// TODO: to check replconf reply.
VLOG(1) << "Master REPLCONF reply " << ToSV(args.front().GetBuf());
io_buf.ConsumeInput(consumed);
// Announce that we are the dragonfly client.
@ -247,27 +296,209 @@ error_code Replica::GreatAndSync() {
serializer.SendCommand("REPLCONF capa dragonfly");
RETURN_ON_ERR(serializer.ec());
RETURN_ON_ERR(Recv(sock_.get(), &io_buf));
result = parser.Parse(io_buf.InputBuffer(), &consumed, &args);
CHECK_EQ(result, RedisParser::OK);
CHECK(!args.empty());
CHECK_EQ(RespExpr::STRING, args[0].type);
last_io_time_ = sock_thread_->GetMonotonicTimeNs();
if (args.size() == 1) {
CHECK_EQ("OK", ToSV(args[0].GetBuf()));
result = parser.Parse(io_buf.InputBuffer(), &consumed, &args);
if (!check_respok()) {
LOG(ERROR) << "Bad response from the server: " << reply_err();
return make_error_code(errc::bad_message);
}
last_io_time_ = sock_thread_->GetMonotonicTimeNs();
string_view cmd = ToSV(args[0].GetBuf());
if (args.size() == 1) { // Redis
if (cmd != "OK") {
LOG(ERROR) << "Unexpected command " << cmd;
return make_error_code(errc::bad_message);
}
} else {
// TODO: dragonfly
LOG(FATAL) << "Bad response " << args;
}
io_buf.ConsumeInput(consumed);
state_mask_ |= R_GREETED;
return error_code{};
}
error_code Replica::InitiatePSync() {
base::IoBuf io_buf{128};
ReqSerializer serializer{sock_.get()};
// Start full sync
state_mask_ |= R_SYNCING;
serializer.SendCommand("PSYNC ? -1");
// Corresponds to server.repl_state == REPL_STATE_SEND_PSYNC
// Send psync command with null master id and null offset
string id("?");
int64_t offs = -1;
if (!master_repl_id_.empty()) {
id = master_repl_id_;
offs = repl_offs_;
}
serializer.SendCommand(StrCat("PSYNC ", id, " ", offs));
RETURN_ON_ERR(serializer.ec());
DCHECK_EQ(0u, io_buf.InputLen());
// Master may delay sync response with "repl_diskless_sync_delay"
PSyncResponse repl_header;
return make_error_code(errc::io_error);
RETURN_ON_ERR(ParseReplicationHeader(&io_buf, &repl_header));
string* token = absl::get_if<string>(&repl_header.fullsync);
size_t snapshot_size = SIZE_MAX;
if (!token) {
snapshot_size = absl::get<size_t>(repl_header.fullsync);
}
last_io_time_ = sock_thread_->GetMonotonicTimeNs();
if (snapshot_size || token != nullptr) { // full sync
SocketSource ss{sock_.get()};
io::PrefixSource ps{io_buf.InputBuffer(), &ss};
RdbLoader loader(NULL);
loader.set_source_limit(snapshot_size);
// TODO: to allow registering callbacks within loader to send '\n' pings back to master.
// Also to allow updating last_io_time_.
error_code ec = loader.Load(&ps);
RETURN_ON_ERR(ec);
VLOG(1) << "full sync completed";
if (token) {
uint8_t buf[kRdbEofMarkSize];
io::PrefixSource chained(loader.Leftover(), &ps);
VLOG(1) << "Before reading from chained stream";
io::Result<size_t> eof_res = chained.Read(io::MutableBytes{buf});
CHECK(eof_res && *eof_res == kRdbEofMarkSize);
VLOG(1) << "Comparing token " << ToSV(buf);
// TODO: handle gracefully...
CHECK_EQ(0, memcmp(token->data(), buf, kRdbEofMarkSize));
CHECK(chained.unused_prefix().empty());
} else {
CHECK_EQ(0u, loader.Leftover().size());
CHECK_EQ(snapshot_size, loader.bytes_read());
}
CHECK(ps.unused_prefix().empty());
io_buf.ConsumeInput(io_buf.InputLen());
last_io_time_ = sock_thread_->GetMonotonicTimeNs();
}
state_mask_ &= ~R_SYNCING;
state_mask_ |= R_SYNC_OK;
return error_code{};
}
error_code Replica::ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* dest) {
std::string_view str;
RETURN_ON_ERR(ReadLine(io_buf, &str));
DCHECK(!str.empty());
std::string_view header;
bool valid = false;
// non-empty lines
if (str[0] != '+') {
goto bad_header;
}
header = str.substr(1);
VLOG(1) << "header: " << header;
if (absl::ConsumePrefix(&header, "FULLRESYNC ")) {
// +FULLRESYNC db7bd45bf68ae9b1acac33acb 123\r\n
// master_id repl_offset
size_t pos = header.find(' ');
if (pos != std::string_view::npos) {
if (absl::SimpleAtoi(header.substr(pos + 1), &repl_offs_)) {
master_repl_id_ = string(header.substr(0, pos));
valid = true;
VLOG(1) << "master repl_id " << master_repl_id_ << " / " << repl_offs_;
}
}
if (!valid)
goto bad_header;
io_buf->ConsumeInput(str.size() + 2);
RETURN_ON_ERR(ReadLine(io_buf, &str)); // Read the next line parsed below.
// Readline checks for non ws character first before searching for eol
// so str must be non empty.
DCHECK(!str.empty());
if (str[0] != '$') {
goto bad_header;
}
std::string_view token = str.substr(1);
VLOG(1) << "token: " << token;
if (absl::ConsumePrefix(&token, "EOF:")) {
CHECK_EQ(kRdbEofMarkSize, token.size()) << token;
dest->fullsync.emplace<string>(token);
VLOG(1) << "Token: " << token;
} else {
size_t rdb_size = 0;
if (!absl::SimpleAtoi(token, &rdb_size))
return std::make_error_code(std::errc::illegal_byte_sequence);
VLOG(1) << "rdb size " << rdb_size;
dest->fullsync.emplace<size_t>(rdb_size);
}
io_buf->ConsumeInput(str.size() + 2);
} else if (absl::ConsumePrefix(&header, "CONTINUE")) {
// we send psync2 so we should get master replid.
// That could change due to redis failovers.
// TODO: part sync
dest->fullsync.emplace<size_t>(0);
}
return error_code{};
bad_header:
LOG(ERROR) << "Bad replication header: " << str;
return std::make_error_code(std::errc::illegal_byte_sequence);
}
error_code Replica::ReadLine(base::IoBuf* io_buf, string_view* line) {
size_t eol_pos;
std::string_view input_str = ToSV(io_buf->InputBuffer());
// consume whitespace.
while (true) {
auto it = find_if_not(input_str.begin(), input_str.end(), absl::ascii_isspace);
size_t ws_len = it - input_str.begin();
io_buf->ConsumeInput(ws_len);
input_str = ToSV(io_buf->InputBuffer());
if (!input_str.empty())
break;
RETURN_ON_ERR(Recv(sock_.get(), io_buf));
input_str = ToSV(io_buf->InputBuffer());
};
// find eol.
while (true) {
eol_pos = input_str.find('\n');
if (eol_pos != std::string_view::npos) {
DCHECK_GT(eol_pos, 0u); // can not be 0 because then would be consumed as a whitespace.
if (input_str[eol_pos - 1] != '\r') {
break;
}
*line = input_str.substr(0, eol_pos - 1);
return error_code{};
}
RETURN_ON_ERR(Recv(sock_.get(), io_buf));
input_str = ToSV(io_buf->InputBuffer());
}
LOG(ERROR) << "Bad replication header: " << input_str;
return std::make_error_code(std::errc::illegal_byte_sequence);
}
error_code Replica::ConsumeRedisStream() {

View File

@ -51,23 +51,35 @@ class Replica {
// The flow is : R_ENABLED -> R_TCP_CONNECTED -> (R_SYNCING) -> R_SYNC_OK.
// SYNCING means that the initial ack succeeded. It may be optional if we can still load from
// the journal offset.
enum State {
enum State : unsigned {
R_ENABLED = 1, // Replication mode is enabled. Serves for signaling shutdown.
R_TCP_CONNECTED = 2,
R_SYNCING = 4,
R_SYNC_OK = 8,
R_GREETED = 4,
R_SYNCING = 8,
R_SYNC_OK = 0x10,
};
void ConnectFb();
void ReplicateFb();
struct PSyncResponse {
// string - end of sync token (diskless)
// size_t - size of the full sync blob (disk-based).
// if fullsync is 0, it means that master can continue with partial replication.
std::variant<std::string, size_t> fullsync;
};
using ReplHeader = std::variant<std::string, size_t>;
std::error_code ConnectSocket();
std::error_code GreatAndSync();
std::error_code Greet();
std::error_code InitiatePSync();
std::error_code ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* header);
std::error_code ReadLine(base::IoBuf* io_buf, std::string_view* line);
std::error_code ConsumeRedisStream();
std::error_code ParseAndExecute(base::IoBuf* io_buf);
Service& service_;
std::string host_;
std::string master_repl_id_;
uint16_t port_;
::boost::fibers::fiber sync_fb_;

View File

@ -111,9 +111,9 @@ string InferLoadFile(fs::path data_dir) {
}
class LinuxWriteWrapper : public io::WriteFile {
public:
LinuxWriteWrapper(uring::LinuxFile* lf) : WriteFile("wrapper"), lf_(lf) {}
public:
LinuxWriteWrapper(uring::LinuxFile* lf) : WriteFile("wrapper"), lf_(lf) {
}
io::Result<size_t> WriteSome(const iovec* v, uint32_t len) final {
io::Result<size_t> res = lf_->WriteSome(v, len, offset_, 0);
@ -248,7 +248,7 @@ error_code ServerFamily::LoadRdb(const std::string& rdb_file) {
if (res) {
io::FileSource fs(*res);
RdbLoader loader(shard_set, script_mgr());
RdbLoader loader(script_mgr());
ec = loader.Load(&fs);
} else {
ec = res.error();
@ -852,6 +852,10 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
unique_lock lk(replicaof_mu_);
if (replica_) {
replica_->Stop(); // NOTE: consider introducing update API flow.
} else {
// TODO: to disconnect all the blocked clients (pubsub, blpop etc)
pool.AwaitFiberOnAll([&](util::ProactorBase* pb) { ServerState::tlocal()->is_master = false; });
}
replica_.swap(new_replica);