Replica: add a state machine that continously pulls data from the master

This commit is contained in:
Roman Gershman 2022-02-01 16:11:19 +02:00
parent d1f6f6d410
commit fc56a8e61a
4 changed files with 229 additions and 1 deletions

View File

@ -25,6 +25,7 @@ namespace dfly {
using namespace std;
using namespace util;
using namespace boost::asio;
namespace this_fiber = ::boost::this_fiber;
namespace {
@ -62,6 +63,25 @@ int ResolveDns(std::string_view host, char* dest) {
return res;
}
error_code Recv(FiberSocketBase* input, base::IoBuf* dest) {
auto buf = dest->AppendBuffer();
io::Result<size_t> exp_size = input->Recv(buf);
if (!exp_size)
return exp_size.error();
dest->CommitWrite(*exp_size);
return error_code{};
}
// TODO: to remove usages of this macro and make code crash-less.
#define CHECK_EC(x) \
do { \
auto __ec$ = (x); \
CHECK(!__ec$) << "Error: " << __ec$ << " " << __ec$.message() << " for " << #x; \
} while (false)
} // namespace
Replica::Replica(string host, uint16_t port, Service* se)
@ -94,6 +114,7 @@ bool Replica::Run(ConnectionContext* cntx) {
state_mask_ = R_ENABLED | R_TCP_CONNECTED;
last_io_time_ = sock_thread_->GetMonotonicTimeNs();
sync_fb_ = ::boost::fibers::fiber(&Replica::ConnectFb, this);
cntx->SendOk();
return true;
@ -143,6 +164,159 @@ void Replica::Stop() {
sync_fb_.join();
}
void Replica::ConnectFb() {
error_code ec;
constexpr unsigned kOnline = R_SYNC_OK | R_TCP_CONNECTED;
while (state_mask_ & R_ENABLED) {
if ((state_mask_ & R_TCP_CONNECTED) == 0) {
this_fiber::sleep_for(500ms);
ec = ConnectSocket();
if (ec) {
LOG(ERROR) << "Error connecting " << ec;
continue;
}
VLOG(1) << "Replica socket connected";
state_mask_ |= R_TCP_CONNECTED;
}
if ((state_mask_ & kOnline) == R_TCP_CONNECTED) { // lacks great_ok
ec = GreatAndSync();
if (ec) {
LOG(INFO) << "Error greating " << ec;
state_mask_ &= ~kOnline;
continue;
}
VLOG(1) << "Replica great ok";
}
if ((state_mask_ & kOnline) == kOnline) {
// There is a data race condition in Redis-master code, where "ACK 0" handler may be
// triggerred
// before Redis is ready to transition to the streaming state and it silenty ignores "ACK
// 0". We reduce the chance it happens with this delay.
this_fiber::sleep_for(50ms);
ec = ConsumeRedisStream();
LOG_IF(ERROR, !FiberSocketBase::IsConnClosed(ec)) << "Replica socket error " << ec;
state_mask_ &= ~kOnline;
}
}
VLOG(1) << "Replication fiber finished";
}
error_code Replica::GreatAndSync() {
base::IoBuf io_buf{128};
ReqSerializer serializer{sock_.get()};
RedisParser parser{false}; // client mode
RespVec args;
serializer.SendCommand("PING"); // optional.
RETURN_ON_ERR(serializer.ec());
RETURN_ON_ERR(Recv(sock_.get(), &io_buf));
last_io_time_ = sock_thread_->GetMonotonicTimeNs();
uint32_t consumed = 0;
RedisParser::Result result = parser.Parse(io_buf.InputBuffer(), &consumed, &args);
CHECK_EQ(result, RedisParser::OK);
CHECK(!args.empty() && args.front().type == RespExpr::STRING);
// TODO: to check nauth, permission denied etc responses.
VLOG(1) << "Master ping reply " << ToSV(args.front().GetBuf());
io_buf.ConsumeInput(consumed);
// TODO: we may also send REPLCONF listening-port, ip-address
serializer.SendCommand("REPLCONF capa eof capa psync2");
RETURN_ON_ERR(serializer.ec());
RETURN_ON_ERR(Recv(sock_.get(), &io_buf));
result = parser.Parse(io_buf.InputBuffer(), &consumed, &args);
CHECK_EQ(result, RedisParser::OK);
CHECK(!args.empty() && args.front().type == RespExpr::STRING);
VLOG(1) << "Master REPLCONF reply " << ToSV(args.front().GetBuf());
io_buf.ConsumeInput(consumed);
// Announce that we are the dragonfly client.
// Note that we currently do not support dragonfly->redis replication.
//
serializer.SendCommand("REPLCONF capa dragonfly");
RETURN_ON_ERR(serializer.ec());
RETURN_ON_ERR(Recv(sock_.get(), &io_buf));
result = parser.Parse(io_buf.InputBuffer(), &consumed, &args);
CHECK_EQ(result, RedisParser::OK);
CHECK(!args.empty());
CHECK_EQ(RespExpr::STRING, args[0].type);
last_io_time_ = sock_thread_->GetMonotonicTimeNs();
if (args.size() == 1) {
CHECK_EQ("OK", ToSV(args[0].GetBuf()));
} else {
LOG(FATAL) << "Bad response " << args;
}
io_buf.ConsumeInput(consumed);
// Start full sync
state_mask_ |= R_SYNCING;
serializer.SendCommand("PSYNC ? -1");
RETURN_ON_ERR(serializer.ec());
DCHECK_EQ(0u, io_buf.InputLen());
return make_error_code(errc::io_error);
}
error_code Replica::ConsumeRedisStream() {
base::IoBuf io_buf(16_KB);
parser_.reset(new RedisParser);
ReqSerializer serializer{sock_.get()};
// Master waits for this command in order to start sending replication stream.
serializer.SendCommand("REPLCONF ACK 0");
CHECK_EC(serializer.ec());
VLOG(1) << "Before reading repl-log";
// Redis sends eiher pings every "repl_ping_slave_period" time inside replicationCron().
// or, alternatively, write commands stream coming from propagate() function.
// Replica connection must send "REPLCONF ACK xxx" in order to make sure that master replication
// buffer gets disposed of already processed commands.
error_code ec;
time_t last_ack = time(nullptr);
string ack_cmd;
while (!ec) {
io::MutableBytes buf = io_buf.AppendBuffer();
io::Result<size_t> size_res = sock_->Recv(buf);
if (!size_res)
return size_res.error();
VLOG(1) << "Read replication stream of " << *size_res << " bytes";
last_io_time_ = sock_thread_->GetMonotonicTimeNs();
io_buf.CommitWrite(*size_res);
repl_offs_ += *size_res;
// Send repl ack back to master.
if (repl_offs_ > ack_offs_ + 1024 || time(nullptr) > last_ack + 5) {
ack_cmd.clear();
absl::StrAppend(&ack_cmd, "REPLCONF ACK ", repl_offs_);
serializer.SendCommand(ack_cmd);
CHECK_EC(serializer.ec());
}
ec = ParseAndExecute(&io_buf);
}
VLOG(1) << "ConsumeRedisStream finished";
return ec;
}
// Threadsafe, fiber blocking.
auto Replica::GetInfo() const -> Info {
CHECK(sock_thread_);
@ -157,4 +331,33 @@ auto Replica::GetInfo() const -> Info {
});
}
error_code Replica::ParseAndExecute(base::IoBuf* io_buf) {
VLOG(1) << "ParseAndExecute: input len " << io_buf->InputLen();
if (parser_->stash_size() > 0) {
DVLOG(1) << "Stash " << *parser_->stash()[0];
}
uint32_t consumed = 0;
RedisParser::Result result = RedisParser::OK;
RespVec cmd_args;
do {
result = parser_->Parse(io_buf->InputBuffer(), &consumed, &cmd_args);
switch (result) {
case RedisParser::OK:
case RedisParser::INPUT_PENDING:
io_buf->ConsumeInput(consumed);
break;
default:
LOG(ERROR) << "Invalid parser status " << result << " for buffer of size "
<< io_buf->InputLen();
return std::make_error_code(std::errc::bad_message);
}
} while (io_buf->InputLen() > 0 && result == RedisParser::OK);
VLOG(1) << "ParseAndExecute: " << io_buf->InputLen() << " " << ToSV(io_buf->InputBuffer());
return error_code{};
}
} // namespace dfly

View File

@ -59,7 +59,13 @@ class Replica {
R_SYNC_OK = 8,
};
void ConnectFb();
using ReplHeader = std::variant<std::string, size_t>;
std::error_code ConnectSocket();
std::error_code GreatAndSync();
std::error_code ConsumeRedisStream();
std::error_code ParseAndExecute(base::IoBuf* io_buf);
Service& service_;
std::string host_;
@ -70,9 +76,13 @@ class Replica {
// Where the sock_ is handled.
util::ProactorBase* sock_thread_ = nullptr;
std::unique_ptr<RedisParser> parser_;
unsigned state_mask_ = 0;
// repl_offs - till what offset we've already read from the master.
// ack_offs_ last acknowledged offset.
size_t repl_offs_ = 0, ack_offs_ = 0;
uint64_t last_io_time_ = 0; // in ns, monotonic clock.
unsigned state_mask_ = 0;
};
} // namespace dfly

View File

@ -222,4 +222,11 @@ void ReplyBuilder::SendStringArr(absl::Span<const std::string_view> arr) {
as_resp()->SendDirect(res);
}
void ReqSerializer::SendCommand(std::string_view str) {
VLOG(1) << "SendCommand: " << str;
iovec v[] = {IoVec(str), IoVec(kCRLF)};
Send(v, ABSL_ARRAYSIZE(v));
}
} // namespace dfly

View File

@ -138,4 +138,12 @@ class ReplyBuilder {
Protocol protocol_;
};
class ReqSerializer : public RespSerializer {
public:
explicit ReqSerializer(::io::Sink* stream) : RespSerializer(stream) {
}
void SendCommand(std::string_view str);
};
} // namespace dfly