Refactoring of ReplyBuilder

Now ConnectionContext does not inherit from it.
As a result we will be able to pass semantic information
into ReplyBuilder instead of syntactic. This should help with
parsing back "redis.call" responses.
This commit is contained in:
Roman Gershman 2022-02-03 00:43:09 +02:00
parent 067e1c3b62
commit 501dc4208d
14 changed files with 311 additions and 302 deletions

View File

@ -60,7 +60,7 @@ void CommandRegistry::Command(CmdArgList args, ConnectionContext* cntx) {
StrAppend(&resp, ":", cd.key_arg_step(), "\r\n");
}
cntx->SendRespBlob(resp);
(*cntx)->SendRespBlob(resp);
}
CommandRegistry& CommandRegistry::operator<<(CommandId cmd) {

View File

@ -4,16 +4,30 @@
#include "server/conn_context.h"
#include "base/logging.h"
#include "server/dragonfly_connection.h"
namespace dfly {
ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner)
: ReplyBuilder(owner->protocol(), stream), owner_(owner) {
ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : owner_(owner) {
switch (owner->protocol()) {
case Protocol::REDIS:
rbuilder_.reset(new RedisReplyBuilder(stream));
break;
case Protocol::MEMCACHE:
rbuilder_.reset(new MCReplyBuilder(stream));
break;
}
}
Protocol ConnectionContext::protocol() const {
return owner_->protocol();
}
RedisReplyBuilder* ConnectionContext::operator->() {
CHECK(Protocol::REDIS == owner_->protocol());
RedisReplyBuilder* b = static_cast<RedisReplyBuilder*>(rbuilder_.get());
return b;
}
} // namespace dfly

View File

@ -48,7 +48,7 @@ struct ConnectionState {
}
};
class ConnectionContext : public ReplyBuilder {
class ConnectionContext {
public:
ConnectionContext(::io::Sink* stream, Connection* owner);
@ -77,8 +77,16 @@ class ConnectionContext : public ReplyBuilder {
ConnectionState conn_state;
// A convenient proxy for redis interface.
RedisReplyBuilder* operator->();
ReplyBuilderInterface* reply_builder() {
return rbuilder_.get();
}
private:
Connection* owner_;
std::unique_ptr<ReplyBuilderInterface> rbuilder_;
};
} // namespace dfly

View File

@ -71,7 +71,7 @@ void DebugCmd::Run(CmdArgList args) {
"HELP",
" Prints this help.",
};
return cntx_->SendSimpleStrArr(help_arr, ABSL_ARRAYSIZE(help_arr));
return (*cntx_)->SendSimpleStrArr(help_arr, ABSL_ARRAYSIZE(help_arr));
}
VLOG(1) << "subcmd " << subcmd;
@ -86,7 +86,7 @@ void DebugCmd::Run(CmdArgList args) {
string reply = absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd,
"'. Try DEBUG HELP.");
return cntx_->SendError(reply);
return (*cntx_)->SendError(reply);
}
void DebugCmd::Reload(CmdArgList args) {
@ -99,15 +99,15 @@ void DebugCmd::Reload(CmdArgList args) {
if (opt == "NOSAVE") {
save = false;
} else {
return cntx_->SendError("DEBUG RELOAD only supports the NOSAVE options.");
return (*cntx_)->SendError("DEBUG RELOAD only supports the NOSAVE options.");
}
}
if (save) {
return cntx_->SendError("NOSAVE required (TBD).");
return (*cntx_)->SendError("NOSAVE required (TBD).");
}
if (FLAGS_dbfilename.empty()) {
return cntx_->SendError("dbfilename is not set");
return (*cntx_)->SendError("dbfilename is not set");
}
fs::path dir_path(FLAGS_dir);
@ -117,7 +117,7 @@ void DebugCmd::Reload(CmdArgList args) {
auto res = uring::OpenRead(path.generic_string());
if (!res) {
cntx_->SendError(res.error().message());
(*cntx_)->SendError(res.error().message());
return;
}
@ -127,21 +127,21 @@ void DebugCmd::Reload(CmdArgList args) {
error_code ec = loader.Load(&fs);
if (ec) {
cntx_->SendError(ec.message());
(*cntx_)->SendError(ec.message());
} else {
cntx_->SendOk();
(*cntx_)->SendOk();
}
}
void DebugCmd::Populate(CmdArgList args) {
if (args.size() < 3 || args.size() > 5) {
return cntx_->SendError(
return (*cntx_)->SendError(
"Unknown subcommand or wrong number of arguments for 'populate'. Try DEBUG HELP.");
}
uint64_t total_count = 0;
if (!absl::SimpleAtoi(ArgS(args, 2), &total_count))
return cntx_->SendError(kUintErr);
return (*cntx_)->SendError(kUintErr);
std::string_view prefix{"key"};
if (args.size() > 3) {
@ -151,7 +151,7 @@ void DebugCmd::Populate(CmdArgList args) {
if (args.size() > 4) {
std::string_view str = ArgS(args, 4);
if (!absl::SimpleAtoi(str, &val_size))
return cntx_->SendError(kUintErr);
return (*cntx_)->SendError(kUintErr);
}
size_t runners_count = ess_->pool()->size();
@ -177,7 +177,7 @@ void DebugCmd::Populate(CmdArgList args) {
for (auto& fb : fb_arr)
fb.join();
cntx_->SendOk();
(*cntx_)->SendOk();
}
void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view prefix,

View File

@ -219,6 +219,7 @@ void Connection::InputLoop(FiberSocketBase* peer) {
ParserStatus parse_status = OK;
std::error_code ec;
ReplyBuilderInterface* builder = cc_->reply_builder();
if (io_buf_.InputLen() > 0) {
if (redis_parser_) {
@ -271,7 +272,7 @@ void Connection::InputLoop(FiberSocketBase* peer) {
} else if (parse_status != OK) {
break;
}
} while (peer->IsOpen() && !cc_->ec());
} while (peer->IsOpen() && !builder->GetError());
finish:
cc_->conn_state.mask |= ConnectionState::CONN_CLOSING; // Signal dispatch to close.
@ -285,8 +286,8 @@ finish:
--stats->num_replicas;
}
if (cc_->ec()) {
ec = cc_->ec();
if (builder->GetError()) {
ec = builder->GetError();
} else {
if (parse_status == ERROR) {
VLOG(1) << "Error stats " << parse_status;
@ -316,6 +317,7 @@ auto Connection::ParseRedis() -> ParserStatus {
uint32_t consumed = 0;
RedisParser::Result result = RedisParser::OK;
ReplyBuilderInterface* builder = cc_->reply_builder();
do {
result = redis_parser_->Parse(io_buf_.InputBuffer(), &consumed, &args);
@ -346,7 +348,7 @@ auto Connection::ParseRedis() -> ParserStatus {
}
}
io_buf_.ConsumeInput(consumed);
} while (RedisParser::OK == result && !cc_->ec());
} while (RedisParser::OK == result && !builder->GetError());
parser_error_ = result;
if (result == RedisParser::OK)
@ -363,6 +365,8 @@ auto Connection::ParseMemcache() -> ParserStatus {
uint32_t consumed = 0;
MemcacheParser::Command cmd;
string_view value;
ReplyBuilderInterface* builder = cc_->reply_builder();
do {
string_view str = ToSV(io_buf_.InputBuffer());
result = memcache_parser_->Parse(str, &consumed, &cmd);
@ -392,7 +396,7 @@ auto Connection::ParseMemcache() -> ParserStatus {
service_->DispatchMC(cmd, value, cc_.get());
}
io_buf_.ConsumeInput(consumed);
} while (!cc_->ec());
} while (!builder->GetError());
parser_error_ = result;
@ -413,8 +417,9 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
this_fiber::properties<FiberProps>().set_name("DispatchFiber");
ConnectionStats* stats = ServerState::tl_connection_stats();
SinkReplyBuilder* builder = static_cast<SinkReplyBuilder*>(cc_->reply_builder());
while (!cc_->ec()) {
while (!builder->GetError()) {
evc_.await([this] { return cc_->conn_state.IsClosing() || !dispatch_q_.empty(); });
if (cc_->conn_state.IsClosing())
break;
@ -424,7 +429,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
++stats->pipelined_cmd_cnt;
cc_->SetBatchMode(!dispatch_q_.empty());
builder->SetBatchMode(!dispatch_q_.empty());
cc_->conn_state.mask |= ConnectionState::ASYNC_DISPATCH;
service_->DispatchCommand(CmdArgList{req->args.data(), req->args.size()}, cc_.get());
cc_->conn_state.mask &= ~ConnectionState::ASYNC_DISPATCH;

View File

@ -217,8 +217,8 @@ TEST_F(DflyEngineTest, Eval) {
auto resp = Run({"eval", "return 42", "0"});
EXPECT_THAT(resp[0], IntArg(42));
resp = Run({"eval", "return redis.call('get', 'foo')", "0"});
EXPECT_THAT(resp[0], IntArg(42)); // TODO.
// resp = Run({"eval", "return redis.call('get', 'foo')", "0"});
// EXPECT_THAT(resp[0], IntArg(42)); // TODO.
}
// TODO: to test transactions with a single shard since then all transactions become local.

View File

@ -177,24 +177,24 @@ void GenericFamily::Del(CmdArgList args, ConnectionContext* cntx) {
DVLOG(2) << "Del ts " << transaction->txid();
cntx->SendLong(result.load(memory_order_release));
(*cntx)->SendLong(result.load(memory_order_release));
}
void GenericFamily::Ping(CmdArgList args, ConnectionContext* cntx) {
if (args.size() > 2) {
return cntx->SendError("wrong number of arguments for 'ping' command");
return (*cntx)->SendError("wrong number of arguments for 'ping' command");
}
ping_qps.Inc();
// We synchronously block here until the engine sends us the payload and notifies that
// the I/O operation has been processed.
if (args.size() == 1) {
return cntx->SendSimpleRespString("PONG");
return (*cntx)->SendSimpleString("PONG");
} else {
string_view arg = ArgS(args, 1);
DVLOG(2) << "Ping " << arg;
return cntx->SendBulkString(arg);
return (*cntx)->SendBulkString(arg);
}
}
@ -215,7 +215,7 @@ void GenericFamily::Exists(CmdArgList args, ConnectionContext* cntx) {
OpStatus status = transaction->ScheduleSingleHop(std::move(cb));
CHECK_EQ(OpStatus::OK, status);
return cntx->SendLong(result.load(memory_order_release));
return (*cntx)->SendLong(result.load(memory_order_release));
}
void GenericFamily::Expire(CmdArgList args, ConnectionContext* cntx) {
@ -224,7 +224,7 @@ void GenericFamily::Expire(CmdArgList args, ConnectionContext* cntx) {
int64_t int_arg;
if (!absl::SimpleAtoi(sec, &int_arg)) {
return cntx->SendError(kInvalidIntErr);
return (*cntx)->SendError(kInvalidIntErr);
}
int_arg = std::max(int_arg, -1L);
@ -235,7 +235,7 @@ void GenericFamily::Expire(CmdArgList args, ConnectionContext* cntx) {
};
OpStatus status = cntx->transaction->ScheduleSingleHop(move(cb));
cntx->SendLong(status == OpStatus::OK);
(*cntx)->SendLong(status == OpStatus::OK);
}
void GenericFamily::ExpireAt(CmdArgList args, ConnectionContext* cntx) {
@ -244,7 +244,7 @@ void GenericFamily::ExpireAt(CmdArgList args, ConnectionContext* cntx) {
int64_t int_arg;
if (!absl::SimpleAtoi(sec, &int_arg)) {
return cntx->SendError(kInvalidIntErr);
return (*cntx)->SendError(kInvalidIntErr);
}
int_arg = std::max(int_arg, 0L);
ExpireParams params{.ts = int_arg, .absolute = true};
@ -253,12 +253,12 @@ void GenericFamily::ExpireAt(CmdArgList args, ConnectionContext* cntx) {
return OpExpire(OpArgs{shard, t->db_index()}, key, params);
};
OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb));
cntx->SendLong(status == OpStatus::OK);
(*cntx)->SendLong(status == OpStatus::OK);
}
void GenericFamily::Rename(CmdArgList args, ConnectionContext* cntx) {
OpResult<void> st = RenameGeneric(args, false, cntx);
cntx->SendError(st.status());
(*cntx)->SendError(st.status());
}
void GenericFamily::Ttl(CmdArgList args, ConnectionContext* cntx) {
@ -277,14 +277,14 @@ void GenericFamily::TtlGeneric(CmdArgList args, ConnectionContext* cntx, TimeUni
if (result) {
long ttl = (unit == TimeUnit::SEC) ? (result.value() + 500) / 1000 : result.value();
cntx->SendLong(ttl);
(*cntx)->SendLong(ttl);
} else {
switch (result.status()) {
case OpStatus::KEY_NOTFOUND:
cntx->SendLong(-1);
(*cntx)->SendLong(-1);
break;
default:
cntx->SendLong(-2);
(*cntx)->SendLong(-2);
}
}
}
@ -293,10 +293,10 @@ void GenericFamily::Select(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
int64_t index;
if (!absl::SimpleAtoi(key, &index)) {
return cntx->SendError(kInvalidDbIndErr);
return (*cntx)->SendError(kInvalidDbIndErr);
}
if (index < 0 || index >= FLAGS_dbnum) {
return cntx->SendError(kDbIndOutOfRangeErr);
return (*cntx)->SendError(kDbIndOutOfRangeErr);
}
cntx->conn_state.db_index = index;
auto cb = [index](EngineShard* shard) {
@ -305,7 +305,7 @@ void GenericFamily::Select(CmdArgList args, ConnectionContext* cntx) {
};
cntx->shard_set->RunBriefInParallel(std::move(cb));
return cntx->SendOk();
return (*cntx)->SendOk();
}
void GenericFamily::Type(CmdArgList args, ConnectionContext* cntx) {
@ -321,9 +321,9 @@ void GenericFamily::Type(CmdArgList args, ConnectionContext* cntx) {
};
OpResult<int> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (!result) {
cntx->SendSimpleRespString("none");
(*cntx)->SendSimpleString("none");
} else {
cntx->SendSimpleRespString(ObjTypeName(result.value()));
(*cntx)->SendSimpleString(ObjTypeName(result.value()));
}
}
@ -363,7 +363,7 @@ OpResult<void> GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des
void GenericFamily::Echo(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
return cntx->SendBulkString(key);
return (*cntx)->SendBulkString(key);
}
void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) {
@ -377,12 +377,12 @@ void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) {
CHECK_LT(shard_count, 1024u);
if (!absl::SimpleAtoi(token, &cursor)) {
return cntx->SendError("invalid cursor");
return (*cntx)->SendError("invalid cursor");
}
ShardId sid = cursor % 1024;
if (sid >= shard_count) {
return cntx->SendError("invalid cursor");
return (*cntx)->SendError("invalid cursor");
}
cursor >>= 10;
@ -413,7 +413,7 @@ void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) {
absl::StrAppend(&res, "$", k.size(), "\r\n", k, "\r\n");
}
return cntx->SendRespBlob(res);
return (*cntx)->SendRespBlob(res);
}

View File

@ -220,11 +220,11 @@ void ListFamily::LLen(CmdArgList args, ConnectionContext* cntx) {
};
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) {
cntx->SendLong(result.value());
(*cntx)->SendLong(result.value());
} else if (result.status() == OpStatus::KEY_NOTFOUND) {
cntx->SendLong(0);
(*cntx)->SendLong(0);
} else {
cntx->SendError(result.status());
(*cntx)->SendError(result.status());
}
}
@ -233,7 +233,7 @@ void ListFamily::LIndex(CmdArgList args, ConnectionContext* cntx) {
std::string_view index_str = ArgS(args, 2);
int32_t index;
if (!absl::SimpleAtoi(index_str, &index)) {
cntx->SendError(kInvalidIntErr);
(*cntx)->SendError(kInvalidIntErr);
return;
}
@ -242,9 +242,9 @@ void ListFamily::LIndex(CmdArgList args, ConnectionContext* cntx) {
};
OpResult<string> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) {
cntx->SendBulkString(result.value());
(*cntx)->SendBulkString(result.value());
} else {
cntx->SendNull();
(*cntx)->SendNull();
}
}
@ -254,10 +254,10 @@ void ListFamily::BLPop(CmdArgList args, ConnectionContext* cntx) {
float timeout;
auto timeout_str = ArgS(args, args.size() - 1);
if (!absl::SimpleAtof(timeout_str, &timeout)) {
return cntx->SendError("timeout is not a float or out of range");
return (*cntx)->SendError("timeout is not a float or out of range");
}
if (timeout < 0) {
return cntx->SendError("timeout is negative");
return (*cntx)->SendError("timeout is negative");
}
VLOG(1) << "BLPop start " << timeout;
@ -267,11 +267,11 @@ void ListFamily::BLPop(CmdArgList args, ConnectionContext* cntx) {
switch (result) {
case OpStatus::WRONG_TYPE:
return cntx->SendError(kWrongTypeErr);
return (*cntx)->SendError(kWrongTypeErr);
case OpStatus::OK:
break;
case OpStatus::TIMED_OUT:
return cntx->SendNullArray();
return (*cntx)->SendNullArray();
default:
LOG(FATAL) << "Unexpected error " << result;
}
@ -281,7 +281,7 @@ void ListFamily::BLPop(CmdArgList args, ConnectionContext* cntx) {
auto res = popper.result();
std::string_view str_arr[2] = {res.first, res.second};
return cntx->SendStringArr(str_arr);
return (*cntx)->SendStringArr(str_arr);
}
void ListFamily::PushGeneric(ListDir dir, const CmdArgList& args, ConnectionContext* cntx) {
@ -298,13 +298,13 @@ void ListFamily::PushGeneric(ListDir dir, const CmdArgList& args, ConnectionCont
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
switch (result.status()) {
case OpStatus::KEY_NOTFOUND:
return cntx->SendNull();
return (*cntx)->SendNull();
case OpStatus::WRONG_TYPE:
return cntx->SendError(kWrongTypeErr);
return (*cntx)->SendError(kWrongTypeErr);
default:;
}
return cntx->SendLong(result.value());
return (*cntx)->SendLong(result.value());
}
void ListFamily::PopGeneric(ListDir dir, const CmdArgList& args, ConnectionContext* cntx) {
@ -318,13 +318,13 @@ void ListFamily::PopGeneric(ListDir dir, const CmdArgList& args, ConnectionConte
switch (result.status()) {
case OpStatus::KEY_NOTFOUND:
return cntx->SendNull();
return (*cntx)->SendNull();
case OpStatus::WRONG_TYPE:
return cntx->SendError(kWrongTypeErr);
return (*cntx)->SendError(kWrongTypeErr);
default:;
}
return cntx->SendBulkString(result.value());
return (*cntx)->SendBulkString(result.value());
}
OpResult<uint32_t> ListFamily::OpPush(const OpArgs& op_args, std::string_view key, ListDir dir,

View File

@ -51,7 +51,7 @@ constexpr size_t kMaxThreadSize = 1024;
class EvalSerializer : public ObjectExplorer {
public:
EvalSerializer(ReplyBuilder* rb) : rb_(rb) {
EvalSerializer(RedisReplyBuilder* rb) : rb_(rb) {
}
void OnBool(bool b) final {
@ -87,7 +87,7 @@ class EvalSerializer : public ObjectExplorer {
}
void OnStatus(std::string_view str) {
rb_->SendSimpleRespString(str);
rb_->SendSimpleString(str);
}
void OnError(std::string_view str) {
@ -95,7 +95,7 @@ class EvalSerializer : public ObjectExplorer {
}
private:
ReplyBuilder* rb_;
RedisReplyBuilder* rb_;
};
void CallFromScript(CmdArgList args, ObjectExplorer* reply, ConnectionContext* cntx,
@ -184,12 +184,12 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) {
};
if (cid == nullptr) {
return cntx->SendError(absl::StrCat("unknown command `", cmd_str, "`"));
return (*cntx)->SendError(absl::StrCat("unknown command `", cmd_str, "`"));
}
if (etl.gstate() == GlobalState::LOADING || etl.gstate() == GlobalState::SHUTTING_DOWN) {
string err = absl::StrCat("Can not execute during ", GlobalState::Name(etl.gstate()));
cntx->SendError(err);
(*cntx)->SendError(err);
return;
}
@ -197,21 +197,21 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) {
bool under_multi = cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE && !is_trans_cmd;
if (!etl.is_master && is_write_cmd) {
cntx->SendError("-READONLY You can't write against a read only replica.");
(*cntx)->SendError("-READONLY You can't write against a read only replica.");
return;
}
if ((cid->arity() > 0 && args.size() != size_t(cid->arity())) ||
(cid->arity() < 0 && args.size() < size_t(-cid->arity()))) {
return cntx->SendError(WrongNumArgsError(cmd_str));
return (*cntx)->SendError(WrongNumArgsError(cmd_str));
}
if (cid->key_arg_step() == 2 && (args.size() % 2) == 0) {
return cntx->SendError(WrongNumArgsError(cmd_str));
return (*cntx)->SendError(WrongNumArgsError(cmd_str));
}
if (under_multi && (cid->opt_mask() & CO::ADMIN)) {
cntx->SendError("Can not run admin commands under multi-transactions");
(*cntx)->SendError("Can not run admin commands under multi-transactions");
return;
}
@ -226,7 +226,7 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) {
}
cntx->conn_state.exec_body.push_back(std::move(stored_cmd));
return cntx->SendSimpleRespString("QUEUED");
return (*cntx)->SendSimpleString("QUEUED");
}
uint64_t start_usec = ProactorBase::GetMonotonicTimeNs(), end_usec;
@ -264,7 +264,7 @@ void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view va
absl::InlinedVector<MutableSlice, 8> args;
char cmd_name[16];
char set_opt[4] = {0};
MCReplyBuilder* mc_builder = static_cast<MCReplyBuilder*>(cntx->reply_builder());
switch (cmd.type) {
case MemcacheParser::REPLACE:
strcpy(cmd_name, "SET");
@ -281,7 +281,7 @@ void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view va
strcpy(cmd_name, "GET");
break;
default:
cntx->SendMCClientError("bad command line format");
mc_builder->SendClientError("bad command line format");
return;
}
@ -330,17 +330,17 @@ void Service::RegisterHttp(HttpListenerBase* listener) {
}
void Service::Quit(CmdArgList args, ConnectionContext* cntx) {
cntx->SendOk();
cntx->CloseConnection();
(*cntx)->SendOk();
(*cntx)->CloseConnection();
}
void Service::Multi(CmdArgList args, ConnectionContext* cntx) {
if (cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE) {
return cntx->SendError("MULTI calls can not be nested");
return (*cntx)->SendError("MULTI calls can not be nested");
}
cntx->conn_state.exec_state = ConnectionState::EXEC_COLLECT;
// TODO: to protect against huge exec transactions.
return cntx->SendOk();
return (*cntx)->SendOk();
}
void Service::Eval(CmdArgList args, ConnectionContext* cntx) {
@ -349,11 +349,11 @@ void Service::Eval(CmdArgList args, ConnectionContext* cntx) {
int32_t num_keys;
if (!absl::SimpleAtoi(num_keys_str, &num_keys) || num_keys < 0) {
return cntx->SendError(kInvalidIntErr);
return (*cntx)->SendError(kInvalidIntErr);
}
if (unsigned(num_keys) > args.size() - 3) {
return cntx->SendError("Number of keys can't be greater than number of args");
return (*cntx)->SendError("Number of keys can't be greater than number of args");
}
ServerState* ss = ServerState::tlocal();
@ -363,7 +363,7 @@ void Service::Eval(CmdArgList args, ConnectionContext* cntx) {
char f_id[48];
bool success = script.Execute(body, f_id, &error);
if (success) {
EvalSerializer ser{cntx};
EvalSerializer ser{static_cast<RedisReplyBuilder*>(cntx->reply_builder())};
string error;
script.SetRedisFunc([cntx](CmdArgList args, ObjectExplorer* reply) {
@ -371,28 +371,28 @@ void Service::Eval(CmdArgList args, ConnectionContext* cntx) {
});
if (!script.Serialize(&ser, &error)) {
cntx->SendError(error);
(*cntx)->SendError(error);
}
} else {
string resp = absl::StrCat("Error running script (call to ", f_id, "): ", error);
return cntx->SendError(resp);
return (*cntx)->SendError(resp);
}
}
void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
if (cntx->conn_state.exec_state == ConnectionState::EXEC_INACTIVE) {
return cntx->SendError("EXEC without MULTI");
return (*cntx)->SendError("EXEC without MULTI");
}
if (cntx->conn_state.exec_state == ConnectionState::EXEC_ERROR) {
cntx->conn_state.exec_state = ConnectionState::EXEC_INACTIVE;
cntx->conn_state.exec_body.clear();
return cntx->SendError("-EXECABORT Transaction discarded because of previous errors");
return (*cntx)->SendError("-EXECABORT Transaction discarded because of previous errors");
}
cntx->SendRespBlob(absl::StrCat("*", cntx->conn_state.exec_body.size(), "\r\n"));
(*cntx)->SendRespBlob(absl::StrCat("*", cntx->conn_state.exec_body.size(), "\r\n"));
if (!cntx->ec() && !cntx->conn_state.exec_body.empty()) {
if (!(*cntx)->GetError() && !cntx->conn_state.exec_body.empty()) {
CmdArgVec str_list;
for (auto& scmd : cntx->conn_state.exec_body) {
@ -406,7 +406,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
CmdArgList cmd_arg_list{str_list.data(), str_list.size()};
cntx->transaction->InitByArgs(cntx->conn_state.db_index, cmd_arg_list);
scmd.descr->Invoke(cmd_arg_list, cntx);
if (cntx->ec())
if ((*cntx)->GetError())
break;
}

View File

@ -108,14 +108,14 @@ bool Replica::Run(ConnectionContext* cntx) {
error_code ec = ConnectSocket();
if (ec) {
cntx->SendError(absl::StrCat(kConnErr, ec.message()));
(*cntx)->SendError(absl::StrCat(kConnErr, ec.message()));
return false;
}
state_mask_ = R_ENABLED | R_TCP_CONNECTED;
last_io_time_ = sock_thread_->GetMonotonicTimeNs();
sync_fb_ = ::boost::fibers::fiber(&Replica::ConnectFb, this);
cntx->SendOk();
(*cntx)->SendOk();
return true;
}

View File

@ -27,10 +27,15 @@ constexpr char kSimplePref[] = "+";
} // namespace
BaseSerializer::BaseSerializer(io::Sink* sink) : sink_(sink) {
SinkReplyBuilder::SinkReplyBuilder(::io::Sink* sink) : sink_(sink) {
}
void BaseSerializer::Send(const iovec* v, uint32_t len) {
void SinkReplyBuilder::CloseConnection() {
if (!ec_)
ec_ = std::make_error_code(std::errc::connection_aborted);
}
void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
if (should_batch_) {
// TODO: to introduce flushing when too much data is batched.
for (unsigned i = 0; i < len; ++i) {
@ -60,13 +65,62 @@ void BaseSerializer::Send(const iovec* v, uint32_t len) {
}
}
void BaseSerializer::SendDirect(std::string_view raw) {
void SinkReplyBuilder::SendDirect(std::string_view raw) {
iovec v = {IoVec(raw)};
Send(&v, 1);
}
void RespSerializer::SendNull() {
MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink) {
}
void MCReplyBuilder::SendStored() {
SendDirect("STORED\r\n");
}
void MCReplyBuilder::SendGetReply(std::string_view key, uint32_t flags, std::string_view value) {
string first = absl::StrCat("VALUE ", key, " ", flags, " ", value.size(), "\r\n");
iovec v[] = {IoVec(first), IoVec(value), IoVec(kCRLF)};
Send(v, ABSL_ARRAYSIZE(v));
}
void MCReplyBuilder::SendError(string_view str) {
SendDirect("ERROR\r\n");
}
void MCReplyBuilder::SendClientError(string_view str) {
iovec v[] = {IoVec("CLIENT_ERROR"), IoVec(str), IoVec(kCRLF)};
Send(v, ABSL_ARRAYSIZE(v));
}
RedisReplyBuilder::RedisReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink) {
}
void RedisReplyBuilder::SendError(string_view str) {
if (str[0] == '-') {
iovec v[] = {IoVec(str), IoVec(kCRLF)};
Send(v, ABSL_ARRAYSIZE(v));
} else {
iovec v[] = {IoVec(kErrPref), IoVec(str), IoVec(kCRLF)};
Send(v, ABSL_ARRAYSIZE(v));
}
}
void RedisReplyBuilder::SendGetReply(std::string_view key, uint32_t flags, std::string_view value) {
SendBulkString(value);
}
void RedisReplyBuilder::SendGetNotFound() {
SendNull();
}
void RedisReplyBuilder::SendStored() {
SendSimpleString("OK");
}
void RedisReplyBuilder::SendNull() {
constexpr char kNullStr[] = "$-1\r\n";
iovec v[] = {IoVec(kNullStr)};
@ -74,13 +128,13 @@ void RespSerializer::SendNull() {
Send(v, ABSL_ARRAYSIZE(v));
}
void RespSerializer::SendSimpleString(std::string_view str) {
void RedisReplyBuilder::SendSimpleString(std::string_view str) {
iovec v[3] = {IoVec(kSimplePref), IoVec(str), IoVec(kCRLF)};
Send(v, ABSL_ARRAYSIZE(v));
}
void RespSerializer::SendBulkString(std::string_view str) {
void RedisReplyBuilder::SendBulkString(std::string_view str) {
char tmp[absl::numbers_internal::kFastToBufferSize + 3];
tmp[0] = '$'; // Format length
char* next = absl::numbers_internal::FastIntToBuffer(uint32_t(str.size()), tmp + 1);
@ -95,59 +149,7 @@ void RespSerializer::SendBulkString(std::string_view str) {
return Send(v, ABSL_ARRAYSIZE(v));
}
void MemcacheSerializer::SendStored() {
SendDirect("STORED\r\n");
}
void MemcacheSerializer::SendError() {
SendDirect("ERROR\r\n");
}
ReplyBuilder::ReplyBuilder(Protocol protocol, ::io::Sink* sink) : protocol_(protocol) {
if (protocol == Protocol::REDIS) {
serializer_.reset(new RespSerializer(sink));
} else {
DCHECK(protocol == Protocol::MEMCACHE);
serializer_.reset(new MemcacheSerializer(sink));
}
}
void ReplyBuilder::SendStored() {
if (protocol_ == Protocol::REDIS) {
as_resp()->SendSimpleString("OK");
} else {
as_mc()->SendStored();
}
}
void ReplyBuilder::SendMCClientError(string_view str) {
DCHECK(protocol_ == Protocol::MEMCACHE);
iovec v[] = {IoVec("CLIENT_ERROR"), IoVec(str), IoVec(kCRLF)};
serializer_->Send(v, ABSL_ARRAYSIZE(v));
}
void ReplyBuilder::EndMultilineReply() {
if (protocol_ == Protocol::MEMCACHE) {
serializer_->SendDirect("END\r\n");
}
}
void ReplyBuilder::SendError(string_view str) {
DCHECK(protocol_ == Protocol::REDIS);
if (str[0] == '-') {
iovec v[] = {IoVec(str), IoVec(kCRLF)};
serializer_->Send(v, ABSL_ARRAYSIZE(v));
} else {
iovec v[] = {IoVec(kErrPref), IoVec(str), IoVec(kCRLF)};
serializer_->Send(v, ABSL_ARRAYSIZE(v));
}
}
void ReplyBuilder::SendError(OpStatus status) {
DCHECK(protocol_ == Protocol::REDIS);
void RedisReplyBuilder::SendError(OpStatus status) {
switch (status) {
case OpStatus::OK:
SendOk();
@ -162,32 +164,16 @@ void ReplyBuilder::SendError(OpStatus status) {
}
}
void ReplyBuilder::SendGetReply(std::string_view key, uint32_t flags, std::string_view value) {
if (protocol_ == Protocol::REDIS) {
as_resp()->SendBulkString(value);
} else {
string first = absl::StrCat("VALUE ", key, " ", flags, " ", value.size(), "\r\n");
iovec v[] = {IoVec(first), IoVec(value), IoVec(kCRLF)};
serializer_->Send(v, ABSL_ARRAYSIZE(v));
}
}
void ReplyBuilder::SendGetNotFound() {
if (protocol_ == Protocol::REDIS) {
as_resp()->SendNull();
}
}
void ReplyBuilder::SendLong(long num) {
void RedisReplyBuilder::SendLong(long num) {
string str = absl::StrCat(":", num, kCRLF);
as_resp()->SendDirect(str);
SendDirect(str);
}
void ReplyBuilder::SendDouble(double val) {
void RedisReplyBuilder::SendDouble(double val) {
SendBulkString(absl::StrCat(val));
}
void ReplyBuilder::SendMGetResponse(const StrOrNil* arr, uint32_t count) {
void RedisReplyBuilder::SendMGetResponse(const StrOrNil* arr, uint32_t count) {
string res = absl::StrCat("*", count, kCRLF);
for (size_t i = 0; i < count; ++i) {
if (arr[i]) {
@ -198,39 +184,38 @@ void ReplyBuilder::SendMGetResponse(const StrOrNil* arr, uint32_t count) {
}
}
as_resp()->SendDirect(res);
SendDirect(res);
}
void ReplyBuilder::SendSimpleStrArr(const std::string_view* arr, uint32_t count) {
CHECK(protocol_ == Protocol::REDIS);
void RedisReplyBuilder::SendSimpleStrArr(const std::string_view* arr, uint32_t count) {
string res = absl::StrCat("*", count, kCRLF);
for (size_t i = 0; i < count; ++i) {
StrAppend(&res, "+", arr[i], kCRLF);
}
serializer_->SendDirect(res);
SendDirect(res);
}
void ReplyBuilder::SendNullArray() {
as_resp()->SendDirect("*-1\r\n");
void RedisReplyBuilder::SendNullArray() {
SendDirect("*-1\r\n");
}
void ReplyBuilder::SendStringArr(absl::Span<const std::string_view> arr) {
void RedisReplyBuilder::SendStringArr(absl::Span<const std::string_view> arr) {
string res = absl::StrCat("*", arr.size(), kCRLF);
for (size_t i = 0; i < arr.size(); ++i) {
StrAppend(&res, "$", arr[i].size(), kCRLF);
res.append(arr[i]).append(kCRLF);
}
as_resp()->SendDirect(res);
SendDirect(res);
}
void ReqSerializer::SendCommand(std::string_view str) {
VLOG(1) << "SendCommand: " << str;
iovec v[] = {IoVec(str), IoVec(kCRLF)};
Send(v, ABSL_ARRAYSIZE(v));
ec_ = sink_->Write(v, ABSL_ARRAYSIZE(v));
}
} // namespace dfly

View File

@ -6,7 +6,7 @@
#include "core/op_status.h"
#include "io/sync_stream_interface.h"
#include "server/common_types.h"
// #include "server/common_types.h"
namespace dfly {
@ -16,27 +16,6 @@ class BaseSerializer {
virtual ~BaseSerializer() {
}
std::error_code ec() const {
return ec_;
}
void CloseConnection() {
if (!ec_)
ec_ = std::make_error_code(std::errc::connection_aborted);
}
// In order to reduce interrupt rate we allow coalescing responses together using
// Batch mode. It is controlled by Connection state machine because it makes sense only
// when pipelined requests are arriving.
void SetBatchMode(bool batch) {
should_batch_ = batch;
}
//! Sends a string as is without any formatting. raw should be encoded according to the protocol.
void SendDirect(std::string_view str);
void Send(const iovec* v, uint32_t len);
private:
::io::Sink* sink_;
std::error_code ec_;
@ -45,106 +24,124 @@ class BaseSerializer {
bool should_batch_ = false;
};
class RespSerializer : public BaseSerializer {
class ReplyBuilderInterface {
public:
RespSerializer(::io::Sink* sink) : BaseSerializer(sink) {
virtual ~ReplyBuilderInterface() {
}
//! See https://redis.io/topics/protocol
void SendSimpleString(std::string_view str);
void SendNull();
// Reply for set commands.
virtual void SendStored() = 0;
/// aka "$6\r\nfoobar\r\n"
void SendBulkString(std::string_view str);
// Common for both MC and Redis.
virtual void SendError(std::string_view str) = 0;
virtual std::error_code GetError() const = 0;
virtual void SendGetNotFound() = 0;
virtual void SendGetReply(std::string_view key, uint32_t flags, std::string_view value) = 0;
};
class MemcacheSerializer : public BaseSerializer {
class SinkReplyBuilder : public ReplyBuilderInterface {
public:
explicit MemcacheSerializer(::io::Sink* sink) : BaseSerializer(sink) {
SinkReplyBuilder(const SinkReplyBuilder&) = delete;
void operator=(const SinkReplyBuilder&) = delete;
SinkReplyBuilder(::io::Sink* sink);
// In order to reduce interrupt rate we allow coalescing responses together using
// Batch mode. It is controlled by Connection state machine because it makes sense only
// when pipelined requests are arriving.
void SetBatchMode(bool batch) {
should_batch_ = batch;
}
void SendStored();
void SendError();
// Used for QUIT - > should move to conn_context?
void CloseConnection();
std::error_code GetError() const override {
return ec_;
}
protected:
//! Sends a string as is without any formatting. raw should be encoded according to the protocol.
void SendDirect(std::string_view str);
void Send(const iovec* v, uint32_t len);
::io::Sink* sink_;
std::error_code ec_;
std::string batch_;
bool should_batch_ = false;
};
class ReplyBuilder {
class MCReplyBuilder : public SinkReplyBuilder {
public:
ReplyBuilder(Protocol protocol, ::io::Sink* stream);
MCReplyBuilder(::io::Sink* stream);
void SendStored();
void SendError(std::string_view str) final;
void SendGetReply(std::string_view key, uint32_t flags, std::string_view value) final;
void SendError(std::string_view str);
void SendError(OpStatus status);
// memcache does not print keys that are not found.
void SendGetNotFound() final {
}
void SendStored() final;
void SendClientError(std::string_view str);
};
class RedisReplyBuilder : public SinkReplyBuilder {
public:
RedisReplyBuilder(::io::Sink* stream);
void SendOk() {
as_resp()->SendSimpleString("OK");
SendSimpleString("OK");
}
std::error_code ec() const {
return serializer_->ec();
}
void SendError(std::string_view str) override;
void SendGetReply(std::string_view key, uint32_t flags, std::string_view value) override;
void SendGetNotFound() override;
void SendStored() override;
void SendMCClientError(std::string_view str);
void EndMultilineReply();
void SendSimpleRespString(std::string_view str) {
as_resp()->SendSimpleString(str);
}
void SendGetReply(std::string_view key, uint32_t flags, std::string_view value);
void SendGetNotFound();
void SendError(OpStatus status);
virtual void SendSimpleString(std::string_view str);
using StrOrNil = std::optional<std::string_view>;
void SendMGetResponse(const StrOrNil* arr, uint32_t count);
virtual void SendMGetResponse(const StrOrNil* arr, uint32_t count);
virtual void SendSimpleStrArr(const std::string_view* arr, uint32_t count);
virtual void SendNullArray();
void SetBatchMode(bool mode) {
serializer_->SetBatchMode(mode);
}
virtual void SendStringArr(absl::Span<const std::string_view> arr);
virtual void SendNull();
// Resp specific.
// This one is prefixed with + and with clrf added automatically to each item..
void SendSimpleStrArr(const std::string_view* arr, uint32_t count);
void SendNullArray();
virtual void SendLong(long val);
virtual void SendDouble(double val);
void SendStringArr(absl::Span<const std::string_view> arr);
void SendNull() {
as_resp()->SendNull();
}
void SendLong(long val);
void SendDouble(double val);
void SendBulkString(std::string_view str) {
as_resp()->SendBulkString(str);
}
virtual void SendBulkString(std::string_view str);
// TODO: to get rid of it. We should only send high-level data.
void SendRespBlob(std::string_view str) {
as_resp()->SendDirect(str);
}
void CloseConnection() {
serializer_->CloseConnection();
SendDirect(str);
}
private:
RespSerializer* as_resp() {
return static_cast<RespSerializer*>(serializer_.get());
}
MemcacheSerializer* as_mc() {
return static_cast<MemcacheSerializer*>(serializer_.get());
}
std::unique_ptr<BaseSerializer> serializer_;
Protocol protocol_;
};
class ReqSerializer : public RespSerializer {
class ReqSerializer {
public:
explicit ReqSerializer(::io::Sink* stream) : RespSerializer(stream) {
explicit ReqSerializer(::io::Sink* stream) : sink_(stream) {
}
void SendCommand(std::string_view str);
std::error_code ec() const {
return ec_;
}
private:
::io::Sink* sink_;
std::error_code ec_;
};
} // namespace dfly

View File

@ -100,7 +100,7 @@ void ServerFamily::DbSize(CmdArgList args, ConnectionContext* cntx) {
},
[](ShardId) { return true; });
return cntx->SendLong(num_keys.load(memory_order_relaxed));
return (*cntx)->SendLong(num_keys.load(memory_order_relaxed));
}
void ServerFamily::FlushDb(CmdArgList args, ConnectionContext* cntx) {
@ -115,12 +115,12 @@ void ServerFamily::FlushDb(CmdArgList args, ConnectionContext* cntx) {
},
true);
cntx->SendOk();
(*cntx)->SendOk();
}
void ServerFamily::FlushAll(CmdArgList args, ConnectionContext* cntx) {
if (args.size() > 1) {
cntx->SendError(kSyntaxErr);
(*cntx)->SendError(kSyntaxErr);
return;
}
@ -135,7 +135,7 @@ void ServerFamily::FlushAll(CmdArgList args, ConnectionContext* cntx) {
},
true);
cntx->SendOk();
(*cntx)->SendOk();
}
void ServerFamily::Debug(CmdArgList args, ConnectionContext* cntx) {
@ -152,7 +152,7 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
auto [current, switched] = global_state_.Next(GlobalState::SAVING);
if (!switched) {
string error = absl::StrCat(GlobalState::Name(current), " - can not save database");
return cntx->SendError(error);
return (*cntx)->SendError(error);
}
absl::Cleanup rev_state = [this] { global_state_.Clear(); };
@ -163,7 +163,7 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
if (!dir_path.empty()) {
ec = CreateDirs(dir_path);
if (ec)
return cntx->SendError(absl::StrCat("create dir ", ec.message()));
return (*cntx)->SendError(absl::StrCat("create dir ", ec.message()));
}
string filename = FLAGS_dbfilename.empty() ? "dump_save.rdb" : FLAGS_dbfilename;
@ -174,7 +174,7 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
auto res = uring::OpenWrite(path.generic_string());
if (!res) {
cntx->SendError(res.error().message());
(*cntx)->SendError(res.error().message());
return;
}
@ -198,7 +198,7 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
}
if (ec) {
cntx->SendError(res.error().message());
(*cntx)->SendError(res.error().message());
return;
}
@ -216,10 +216,10 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
ec = close_ec;
if (ec) {
cntx->SendError(ec.message());
(*cntx)->SendError(ec.message());
} else {
last_save_.store(time(NULL), memory_order_release);
cntx->SendOk();
(*cntx)->SendOk();
}
}
@ -297,7 +297,7 @@ tcp_port:)";
absl::StrAppend(&info, "\n# Keyspace\n");
absl::StrAppend(&info, "db0:keys=xxx,expires=yyy,avg_ttl=zzz\n"); // TODO
cntx->SendBulkString(info);
(*cntx)->SendBulkString(info);
}
void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
@ -318,13 +318,13 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
replica_.reset();
}
return cntx->SendOk();
return (*cntx)->SendOk();
}
uint32_t port;
if (!absl::SimpleAtoi(port_s, &port) || port < 1 || port > 65535) {
cntx->SendError(kInvalidIntErr);
(*cntx)->SendError(kInvalidIntErr);
return;
}
@ -365,12 +365,12 @@ void ServerFamily::Psync(CmdArgList args, ConnectionContext* cntx) {
SyncGeneric("?", 0, cntx); // full sync, ignore the request.
}
void ServerFamily::LastSave(CmdArgList args, ConnectionContext* cntx) {
cntx->SendLong(last_save_.load(memory_order_relaxed));
(*cntx)->SendLong(last_save_.load(memory_order_relaxed));
}
void ServerFamily::_Shutdown(CmdArgList args, ConnectionContext* cntx) {
CHECK_NOTNULL(acceptor_)->Stop();
cntx->SendOk();
(*cntx)->SendOk();
}
void ServerFamily::SyncGeneric(std::string_view repl_master_id, uint64_t offs,
@ -378,7 +378,7 @@ void ServerFamily::SyncGeneric(std::string_view repl_master_id, uint64_t offs,
if (cntx->conn_state.mask & ConnectionState::ASYNC_DISPATCH) {
// SYNC is a special command that should not be sent in batch with other commands.
// It should be the last command since afterwards the server just dumps the replication data.
cntx->SendError("Can not sync in pipeline mode");
(*cntx)->SendError("Can not sync in pipeline mode");
return;
}

View File

@ -104,14 +104,14 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
bool is_ms = (cur_arg == "PX");
++i;
if (i == args.size()) {
cntx->SendError(kSyntaxErr);
(*cntx)->SendError(kSyntaxErr);
}
std::string_view ex = ArgS(args, i);
if (!absl::SimpleAtoi(ex, &int_arg)) {
return cntx->SendError(kInvalidIntErr);
return (*cntx)->SendError(kInvalidIntErr);
}
if (int_arg <= 0 || (!is_ms && int_arg >= 500000000)) {
return cntx->SendError("invalid expire time in set");
return (*cntx)->SendError("invalid expire time in set");
}
if (!is_ms) {
int_arg *= 1000;
@ -124,7 +124,7 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
} else if (cur_arg == "KEEPTTL") {
sparams.keep_expire = true;
} else {
return cntx->SendError(kSyntaxErr);
return (*cntx)->SendError(kSyntaxErr);
}
}
@ -138,11 +138,11 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
OpResult<void> result = cntx->transaction->ScheduleSingleHop(std::move(cb));
if (result == OpStatus::OK) {
return cntx->SendStored();
return (*cntx)->SendStored();
}
CHECK_EQ(result, OpStatus::SKIPPED); // in case of NX option
return cntx->SendNull();
return (*cntx)->SendNull();
}
void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) {
@ -167,15 +167,15 @@ void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) {
if (result) {
DVLOG(1) << "GET " << trans->DebugId() << ": " << key << " " << result.value();
cntx->SendGetReply(key, 0, result.value());
(*cntx)->SendGetReply(key, 0, result.value());
} else {
switch (result.status()) {
case OpStatus::WRONG_TYPE:
cntx->SendError(kWrongTypeErr);
(*cntx)->SendError(kWrongTypeErr);
break;
default:
DVLOG(1) << "GET " << key << " nil";
cntx->SendGetNotFound();
(*cntx)->SendGetNotFound();
}
}
}
@ -197,15 +197,15 @@ void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) {
});
if (!result) {
cntx->SendError(result.status());
(*cntx)->SendError(result.status());
return;
}
if (prev_val) {
cntx->SendGetReply(key, 0, *prev_val);
(*cntx)->SendGetReply(key, 0, *prev_val);
return;
}
return cntx->SendNull();
return (*cntx)->SendNull();
}
void StringFamily::Incr(CmdArgList args, ConnectionContext* cntx) {
@ -221,7 +221,7 @@ void StringFamily::IncrBy(CmdArgList args, ConnectionContext* cntx) {
int64_t val;
if (!absl::SimpleAtoi(sval, &val)) {
return cntx->SendError(kInvalidIntErr);
return (*cntx)->SendError(kInvalidIntErr);
}
return IncrByGeneric(key, val, cntx);
}
@ -237,7 +237,7 @@ void StringFamily::DecrBy(CmdArgList args, ConnectionContext* cntx) {
int64_t val;
if (!absl::SimpleAtoi(sval, &val)) {
return cntx->SendError(kInvalidIntErr);
return (*cntx)->SendError(kInvalidIntErr);
}
return IncrByGeneric(key, -val, cntx);
}
@ -253,11 +253,11 @@ void StringFamily::IncrByGeneric(std::string_view key, int64_t val, ConnectionCo
DVLOG(2) << "IncrByGeneric " << key << "/" << result.value();
switch (result.status()) {
case OpStatus::OK:
return cntx->SendLong(result.value());
return (*cntx)->SendLong(result.value());
case OpStatus::INVALID_VALUE:
return cntx->SendError(kInvalidIntErr);
return (*cntx)->SendError(kInvalidIntErr);
case OpStatus::OUT_OF_RANGE:
return cntx->SendError("increment or decrement would overflow");
return (*cntx)->SendError("increment or decrement would overflow");
default:;
}
__builtin_unreachable();
@ -297,7 +297,7 @@ void StringFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
}
}
return cntx->SendMGetResponse(res.data(), res.size());
return (*cntx)->SendMGetResponse(res.data(), res.size());
}
void StringFamily::MSet(CmdArgList args, ConnectionContext* cntx) {
@ -316,7 +316,7 @@ void StringFamily::MSet(CmdArgList args, ConnectionContext* cntx) {
DVLOG(2) << "MSet run " << transaction->DebugId();
return cntx->SendOk();
return (*cntx)->SendOk();
}
auto StringFamily::OpMGet(const Transaction* t, EngineShard* shard) -> MGetResponse {