Execute commands from the replicated log

This commit is contained in:
Roman Gershman 2022-05-28 23:20:52 +03:00
parent 1490eb5c3c
commit ccf051742d
8 changed files with 42 additions and 19 deletions

View File

@ -25,7 +25,9 @@ class ConnectionContext {
return owner_;
}
Protocol protocol() const;
Protocol protocol() const {
return protocol_;
}
// A convenient proxy for redis interface.
RedisReplyBuilder* operator->();
@ -55,6 +57,7 @@ class ConnectionContext {
private:
Connection* owner_;
Protocol protocol_ = Protocol::REDIS;
std::unique_ptr<SinkReplyBuilder> rbuilder_;
};

View File

@ -46,13 +46,6 @@ void SendProtocolError(RedisParser::Result pres, FiberSocketBase* peer) {
}
}
void RespToArgList(const RespVec& src, CmdArgVec* dest) {
dest->resize(src.size());
for (size_t i = 0; i < src.size(); ++i) {
(*dest)[i] = ToMSS(src[i].GetBuf());
}
}
void FetchBuilderStats(ConnectionStats* stats, SinkReplyBuilder* builder) {
stats->io_write_cnt += builder->io_write_cnt();
stats->io_write_bytes += builder->io_write_bytes();
@ -639,4 +632,11 @@ auto Connection::FromArgs(RespVec args, mi_heap_t* heap) -> Request* {
return req;
}
void RespToArgList(const RespVec& src, CmdArgVec* dest) {
dest->resize(src.size());
for (size_t i = 0; i < src.size(); ++i) {
(*dest)[i] = ToMSS(src[i].GetBuf());
}
}
} // namespace facade

View File

@ -121,4 +121,6 @@ class Connection : public util::Connection {
BreakerCb breaker_cb_;
};
void RespToArgList(const RespVec& src, CmdArgVec* dest);
} // namespace facade

View File

@ -74,7 +74,6 @@ const char kScriptErrType[] = "script_error";
const char kIndexOutOfRange[] = "index out of range";
const char kOutOfMemory[] = "Out of memory";
const char* RespExpr::TypeName(Type t) {
switch (t) {
case STRING:
@ -94,7 +93,10 @@ const char* RespExpr::TypeName(Type t) {
}
ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : owner_(owner) {
switch (owner->protocol()) {
if (owner) {
protocol_ = owner->protocol();
}
switch (protocol_) {
case Protocol::REDIS:
rbuilder_.reset(new RedisReplyBuilder(stream));
break;
@ -111,10 +113,6 @@ ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : ow
force_dispatch = false;
}
Protocol ConnectionContext::protocol() const {
return owner_->protocol();
}
RedisReplyBuilder* ConnectionContext::operator->() {
CHECK(Protocol::REDIS == protocol());

View File

@ -85,6 +85,8 @@ class ConnectionContext : public facade::ConnectionContext {
}
void ChangeSubscription(bool to_add, bool to_reply, CmdArgList args);
bool is_replicating = false;
};
} // namespace dfly

View File

@ -411,7 +411,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
bool under_multi =
dfly_cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE && !is_trans_cmd;
if (!etl.is_master && is_write_cmd) {
if (!etl.is_master && is_write_cmd && !dfly_cntx->is_replicating) {
(*cntx)->SendError("-READONLY You can't write against a read only replica.");
return;
}

View File

@ -14,6 +14,7 @@ extern "C" {
#include <boost/asio/ip/tcp.hpp>
#include "base/logging.h"
#include "facade/dragonfly_connection.h"
#include "facade/redis_parser.h"
#include "server/error.h"
#include "server/main_service.h"
@ -521,6 +522,8 @@ error_code Replica::ConsumeRedisStream() {
time_t last_ack = time(nullptr);
string ack_cmd;
// basically reflection of dragonfly_connection IoLoop function.
while (!ec) {
io::MutableBytes buf = io_buf.AppendBuffer();
io::Result<size_t> size_res = sock_->Recv(buf);
@ -538,7 +541,7 @@ error_code Replica::ConsumeRedisStream() {
ack_cmd.clear();
absl::StrAppend(&ack_cmd, "REPLCONF ACK ", repl_offs_);
serializer.SendCommand(ack_cmd);
CHECK_EC(serializer.ec());
RETURN_ON_ERR(serializer.ec());
}
ec = ParseAndExecute(&io_buf);
@ -570,13 +573,25 @@ error_code Replica::ParseAndExecute(base::IoBuf* io_buf) {
uint32_t consumed = 0;
RedisParser::Result result = RedisParser::OK;
RespVec cmd_args;
io::NullSink null_sink; // we never reply back on the commands.
ConnectionContext conn_context{&null_sink, nullptr};
conn_context.is_replicating = true;
do {
result = parser_->Parse(io_buf->InputBuffer(), &consumed, &cmd_args);
result = parser_->Parse(io_buf->InputBuffer(), &consumed, &cmd_args_);
switch (result) {
case RedisParser::OK:
if (!cmd_args_.empty()) {
VLOG(2) << "Got command " << ToSV(cmd_args_[0].GetBuf()) << ToSV(cmd_args_[1].GetBuf())
<< "\n consumed: " << consumed;
facade::RespToArgList(cmd_args_, &cmd_str_args_);
CmdArgList arg_list{cmd_str_args_.data(), cmd_str_args_.size()};
service_.DispatchCommand(arg_list, &conn_context);
}
io_buf->ConsumeInput(consumed);
break;
case RedisParser::INPUT_PENDING:
io_buf->ConsumeInput(consumed);
break;

View File

@ -7,13 +7,14 @@
#include <variant>
#include "base/io_buf.h"
#include "facade/facade_types.h"
#include "facade/redis_parser.h"
#include "server/conn_context.h"
#include "util/fiber_socket_base.h"
namespace dfly {
class Service;
class ConnectionContext;
class Replica {
public:
@ -88,6 +89,8 @@ class Replica {
// Where the sock_ is handled.
util::ProactorBase* sock_thread_ = nullptr;
std::unique_ptr<facade::RedisParser> parser_;
facade::RespVec cmd_args_;
facade::CmdArgVec cmd_str_args_;
// repl_offs - till what offset we've already read from the master.
// ack_offs_ last acknowledged offset.