diff --git a/server/replica.cc b/server/replica.cc index 5c64a8f..01e8334 100644 --- a/server/replica.cc +++ b/server/replica.cc @@ -25,6 +25,7 @@ namespace dfly { using namespace std; using namespace util; using namespace boost::asio; +namespace this_fiber = ::boost::this_fiber; namespace { @@ -62,6 +63,25 @@ int ResolveDns(std::string_view host, char* dest) { return res; } +error_code Recv(FiberSocketBase* input, base::IoBuf* dest) { + auto buf = dest->AppendBuffer(); + io::Result exp_size = input->Recv(buf); + if (!exp_size) + return exp_size.error(); + + dest->CommitWrite(*exp_size); + + return error_code{}; +} + + +// TODO: to remove usages of this macro and make code crash-less. +#define CHECK_EC(x) \ + do { \ + auto __ec$ = (x); \ + CHECK(!__ec$) << "Error: " << __ec$ << " " << __ec$.message() << " for " << #x; \ + } while (false) + } // namespace Replica::Replica(string host, uint16_t port, Service* se) @@ -94,6 +114,7 @@ bool Replica::Run(ConnectionContext* cntx) { state_mask_ = R_ENABLED | R_TCP_CONNECTED; last_io_time_ = sock_thread_->GetMonotonicTimeNs(); + sync_fb_ = ::boost::fibers::fiber(&Replica::ConnectFb, this); cntx->SendOk(); return true; @@ -143,6 +164,159 @@ void Replica::Stop() { sync_fb_.join(); } +void Replica::ConnectFb() { + error_code ec; + constexpr unsigned kOnline = R_SYNC_OK | R_TCP_CONNECTED; + + while (state_mask_ & R_ENABLED) { + if ((state_mask_ & R_TCP_CONNECTED) == 0) { + this_fiber::sleep_for(500ms); + ec = ConnectSocket(); + if (ec) { + LOG(ERROR) << "Error connecting " << ec; + continue; + } + VLOG(1) << "Replica socket connected"; + state_mask_ |= R_TCP_CONNECTED; + } + + if ((state_mask_ & kOnline) == R_TCP_CONNECTED) { // lacks great_ok + ec = GreatAndSync(); + if (ec) { + LOG(INFO) << "Error greating " << ec; + state_mask_ &= ~kOnline; + continue; + } + VLOG(1) << "Replica great ok"; + } + + if ((state_mask_ & kOnline) == kOnline) { + // There is a data race condition in Redis-master code, where "ACK 0" handler may be + // triggerred + // before Redis is ready to transition to the streaming state and it silenty ignores "ACK + // 0". We reduce the chance it happens with this delay. + this_fiber::sleep_for(50ms); + ec = ConsumeRedisStream(); + + LOG_IF(ERROR, !FiberSocketBase::IsConnClosed(ec)) << "Replica socket error " << ec; + state_mask_ &= ~kOnline; + } + } + + VLOG(1) << "Replication fiber finished"; +} + +error_code Replica::GreatAndSync() { + base::IoBuf io_buf{128}; + + ReqSerializer serializer{sock_.get()}; + + RedisParser parser{false}; // client mode + RespVec args; + + serializer.SendCommand("PING"); // optional. + RETURN_ON_ERR(serializer.ec()); + RETURN_ON_ERR(Recv(sock_.get(), &io_buf)); + last_io_time_ = sock_thread_->GetMonotonicTimeNs(); + + uint32_t consumed = 0; + RedisParser::Result result = parser.Parse(io_buf.InputBuffer(), &consumed, &args); + CHECK_EQ(result, RedisParser::OK); + CHECK(!args.empty() && args.front().type == RespExpr::STRING); + + // TODO: to check nauth, permission denied etc responses. + VLOG(1) << "Master ping reply " << ToSV(args.front().GetBuf()); + + io_buf.ConsumeInput(consumed); + + // TODO: we may also send REPLCONF listening-port, ip-address + serializer.SendCommand("REPLCONF capa eof capa psync2"); + RETURN_ON_ERR(serializer.ec()); + RETURN_ON_ERR(Recv(sock_.get(), &io_buf)); + + result = parser.Parse(io_buf.InputBuffer(), &consumed, &args); + CHECK_EQ(result, RedisParser::OK); + CHECK(!args.empty() && args.front().type == RespExpr::STRING); + + VLOG(1) << "Master REPLCONF reply " << ToSV(args.front().GetBuf()); + io_buf.ConsumeInput(consumed); + + // Announce that we are the dragonfly client. + // Note that we currently do not support dragonfly->redis replication. + // + serializer.SendCommand("REPLCONF capa dragonfly"); + RETURN_ON_ERR(serializer.ec()); + RETURN_ON_ERR(Recv(sock_.get(), &io_buf)); + result = parser.Parse(io_buf.InputBuffer(), &consumed, &args); + CHECK_EQ(result, RedisParser::OK); + CHECK(!args.empty()); + CHECK_EQ(RespExpr::STRING, args[0].type); + last_io_time_ = sock_thread_->GetMonotonicTimeNs(); + + if (args.size() == 1) { + CHECK_EQ("OK", ToSV(args[0].GetBuf())); + } else { + LOG(FATAL) << "Bad response " << args; + } + io_buf.ConsumeInput(consumed); + + // Start full sync + state_mask_ |= R_SYNCING; + serializer.SendCommand("PSYNC ? -1"); + RETURN_ON_ERR(serializer.ec()); + + DCHECK_EQ(0u, io_buf.InputLen()); + + return make_error_code(errc::io_error); +} + +error_code Replica::ConsumeRedisStream() { + base::IoBuf io_buf(16_KB); + parser_.reset(new RedisParser); + + ReqSerializer serializer{sock_.get()}; + + // Master waits for this command in order to start sending replication stream. + serializer.SendCommand("REPLCONF ACK 0"); + CHECK_EC(serializer.ec()); + + VLOG(1) << "Before reading repl-log"; + + // Redis sends eiher pings every "repl_ping_slave_period" time inside replicationCron(). + // or, alternatively, write commands stream coming from propagate() function. + // Replica connection must send "REPLCONF ACK xxx" in order to make sure that master replication + // buffer gets disposed of already processed commands. + error_code ec; + time_t last_ack = time(nullptr); + string ack_cmd; + + while (!ec) { + io::MutableBytes buf = io_buf.AppendBuffer(); + io::Result size_res = sock_->Recv(buf); + if (!size_res) + return size_res.error(); + + VLOG(1) << "Read replication stream of " << *size_res << " bytes"; + last_io_time_ = sock_thread_->GetMonotonicTimeNs(); + + io_buf.CommitWrite(*size_res); + repl_offs_ += *size_res; + + // Send repl ack back to master. + if (repl_offs_ > ack_offs_ + 1024 || time(nullptr) > last_ack + 5) { + ack_cmd.clear(); + absl::StrAppend(&ack_cmd, "REPLCONF ACK ", repl_offs_); + serializer.SendCommand(ack_cmd); + CHECK_EC(serializer.ec()); + } + + ec = ParseAndExecute(&io_buf); + } + + VLOG(1) << "ConsumeRedisStream finished"; + return ec; +} + // Threadsafe, fiber blocking. auto Replica::GetInfo() const -> Info { CHECK(sock_thread_); @@ -157,4 +331,33 @@ auto Replica::GetInfo() const -> Info { }); } +error_code Replica::ParseAndExecute(base::IoBuf* io_buf) { + VLOG(1) << "ParseAndExecute: input len " << io_buf->InputLen(); + if (parser_->stash_size() > 0) { + DVLOG(1) << "Stash " << *parser_->stash()[0]; + } + + uint32_t consumed = 0; + RedisParser::Result result = RedisParser::OK; + RespVec cmd_args; + + do { + result = parser_->Parse(io_buf->InputBuffer(), &consumed, &cmd_args); + + switch (result) { + case RedisParser::OK: + case RedisParser::INPUT_PENDING: + io_buf->ConsumeInput(consumed); + break; + default: + LOG(ERROR) << "Invalid parser status " << result << " for buffer of size " + << io_buf->InputLen(); + return std::make_error_code(std::errc::bad_message); + } + } while (io_buf->InputLen() > 0 && result == RedisParser::OK); + VLOG(1) << "ParseAndExecute: " << io_buf->InputLen() << " " << ToSV(io_buf->InputBuffer()); + + return error_code{}; +} + } // namespace dfly diff --git a/server/replica.h b/server/replica.h index 120b3bc..d63e8b0 100644 --- a/server/replica.h +++ b/server/replica.h @@ -59,7 +59,13 @@ class Replica { R_SYNC_OK = 8, }; + void ConnectFb(); + + using ReplHeader = std::variant; std::error_code ConnectSocket(); + std::error_code GreatAndSync(); + std::error_code ConsumeRedisStream(); + std::error_code ParseAndExecute(base::IoBuf* io_buf); Service& service_; std::string host_; @@ -70,9 +76,13 @@ class Replica { // Where the sock_ is handled. util::ProactorBase* sock_thread_ = nullptr; + std::unique_ptr parser_; - unsigned state_mask_ = 0; + // repl_offs - till what offset we've already read from the master. + // ack_offs_ last acknowledged offset. + size_t repl_offs_ = 0, ack_offs_ = 0; uint64_t last_io_time_ = 0; // in ns, monotonic clock. + unsigned state_mask_ = 0; }; } // namespace dfly diff --git a/server/reply_builder.cc b/server/reply_builder.cc index d30c37e..09a8b63 100644 --- a/server/reply_builder.cc +++ b/server/reply_builder.cc @@ -222,4 +222,11 @@ void ReplyBuilder::SendStringArr(absl::Span arr) { as_resp()->SendDirect(res); } +void ReqSerializer::SendCommand(std::string_view str) { + VLOG(1) << "SendCommand: " << str; + + iovec v[] = {IoVec(str), IoVec(kCRLF)}; + Send(v, ABSL_ARRAYSIZE(v)); +} + } // namespace dfly diff --git a/server/reply_builder.h b/server/reply_builder.h index 03db044..6e0e846 100644 --- a/server/reply_builder.h +++ b/server/reply_builder.h @@ -138,4 +138,12 @@ class ReplyBuilder { Protocol protocol_; }; +class ReqSerializer : public RespSerializer { + public: + explicit ReqSerializer(::io::Sink* stream) : RespSerializer(stream) { + } + + void SendCommand(std::string_view str); +}; + } // namespace dfly