Add memcached flush_all command.

Simplify reply code - remove the redundant class.
This commit is contained in:
Roman Gershman 2022-03-10 19:15:51 +02:00
parent 8054ed4f3a
commit 09fb05c0e1
10 changed files with 74 additions and 66 deletions

View File

@ -135,7 +135,7 @@ a distributed log format.
- [x] append - [x] append
- [x] prepend - [x] prepend
- [x] delete - [x] delete
- [ ] flush_all - [x] flush_all
- [x] incr - [x] incr
- [x] decr - [x] decr
- [x] version - [x] version

2
helio

@ -1 +1 @@
Subproject commit 775f321d8efe9b90fae3fee14b2fcf8364beae7b Subproject commit 6a755e1babf7383f90948c1d630728585ea6f10e

View File

@ -30,13 +30,13 @@ class ConnectionContext {
// A convenient proxy for redis interface. // A convenient proxy for redis interface.
RedisReplyBuilder* operator->(); RedisReplyBuilder* operator->();
ReplyBuilderInterface* reply_builder() { SinkReplyBuilder* reply_builder() {
return rbuilder_.get(); return rbuilder_.get();
} }
// Allows receiving the output data from the commands called from scripts. // Allows receiving the output data from the commands called from scripts.
ReplyBuilderInterface* Inject(ReplyBuilderInterface* new_i) { SinkReplyBuilder* Inject(SinkReplyBuilder* new_i) {
ReplyBuilderInterface* res = rbuilder_.release(); SinkReplyBuilder* res = rbuilder_.release();
rbuilder_.reset(new_i); rbuilder_.reset(new_i);
return res; return res;
} }
@ -49,7 +49,7 @@ class ConnectionContext {
bool authenticated: 1; bool authenticated: 1;
private: private:
Connection* owner_; Connection* owner_;
std::unique_ptr<ReplyBuilderInterface> rbuilder_; std::unique_ptr<SinkReplyBuilder> rbuilder_;
}; };
} // namespace facade } // namespace facade

View File

@ -316,7 +316,7 @@ auto Connection::ParseRedis() -> ParserStatus {
uint32_t consumed = 0; uint32_t consumed = 0;
RedisParser::Result result = RedisParser::OK; RedisParser::Result result = RedisParser::OK;
ReplyBuilderInterface* builder = cc_->reply_builder(); SinkReplyBuilder* builder = cc_->reply_builder();
mi_heap_t* tlh = mi_heap_get_backing(); mi_heap_t* tlh = mi_heap_get_backing();
do { do {
@ -418,7 +418,7 @@ auto Connection::ParseMemcache() -> ParserStatus {
} }
auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant<error_code, ParserStatus> { auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant<error_code, ParserStatus> {
SinkReplyBuilder* builder = static_cast<SinkReplyBuilder*>(cc_->reply_builder()); SinkReplyBuilder* builder = cc_->reply_builder();
ConnectionStats* stats = service_->GetThreadLocalConnectionStats(); ConnectionStats* stats = service_->GetThreadLocalConnectionStats();
error_code ec; error_code ec;
ParserStatus parse_status = OK; ParserStatus parse_status = OK;
@ -489,7 +489,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
this_fiber::properties<FiberProps>().set_name("DispatchFiber"); this_fiber::properties<FiberProps>().set_name("DispatchFiber");
ConnectionStats* stats = service_->GetThreadLocalConnectionStats(); ConnectionStats* stats = service_->GetThreadLocalConnectionStats();
SinkReplyBuilder* builder = static_cast<SinkReplyBuilder*>(cc_->reply_builder()); SinkReplyBuilder* builder = cc_->reply_builder();
while (!builder->GetError()) { while (!builder->GetError()) {
evc_.await([this] { return cc_->conn_closing || !dispatch_q_.empty(); }); evc_.await([this] { return cc_->conn_closing || !dispatch_q_.empty(); });

View File

@ -84,12 +84,21 @@ void SinkReplyBuilder::SendDirect(std::string_view raw) {
MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink) { MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink) {
} }
void MCReplyBuilder::SendSimpleString(std::string_view str) {
iovec v[2] = {IoVec(str), IoVec(kCRLF)};
Send(v, ABSL_ARRAYSIZE(v));
}
void MCReplyBuilder::SendStored() { void MCReplyBuilder::SendStored() {
SendDirect("STORED\r\n"); SendSimpleString("STORED");
} }
void MCReplyBuilder::SendLong(long val) { void MCReplyBuilder::SendLong(long val) {
SendDirect(absl::StrCat(val, kCRLF)); char buf[32];
char* next = absl::numbers_internal::FastIntToBuffer(val, buf);
SendSimpleString(string_view(buf, next - buf));
} }
void MCReplyBuilder::SendMGetResponse(const OptResp* resp, uint32_t count) { void MCReplyBuilder::SendMGetResponse(const OptResp* resp, uint32_t count) {
@ -109,11 +118,11 @@ void MCReplyBuilder::SendMGetResponse(const OptResp* resp, uint32_t count) {
header.clear(); header.clear();
} }
} }
SendDirect("END\r\n"); SendSimpleString("END");
} }
void MCReplyBuilder::SendError(string_view str, std::string_view type) { void MCReplyBuilder::SendError(string_view str, std::string_view type) {
SendDirect("ERROR\r\n"); SendSimpleString("ERROR");
} }
void MCReplyBuilder::SendClientError(string_view str) { void MCReplyBuilder::SendClientError(string_view str) {
@ -122,11 +131,11 @@ void MCReplyBuilder::SendClientError(string_view str) {
} }
void MCReplyBuilder::SendSetSkipped() { void MCReplyBuilder::SendSetSkipped() {
SendDirect("NOT_STORED\r\n"); SendSimpleString("NOT_STORED");
} }
void MCReplyBuilder::SendNotFound() { void MCReplyBuilder::SendNotFound() {
SendDirect("NOT_FOUND\r\n"); SendSimpleString("NOT_FOUND");
} }
RedisReplyBuilder::RedisReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink) { RedisReplyBuilder::RedisReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink) {
@ -144,6 +153,12 @@ void RedisReplyBuilder::SendError(string_view str, std::string_view type) {
} }
} }
void RedisReplyBuilder::SendSimpleString(std::string_view str) {
iovec v[3] = {IoVec(kSimplePref), IoVec(str), IoVec(kCRLF)};
Send(v, ABSL_ARRAYSIZE(v));
}
void RedisReplyBuilder::SendStored() { void RedisReplyBuilder::SendStored() {
SendSimpleString("OK"); SendSimpleString("OK");
} }
@ -160,12 +175,6 @@ void RedisReplyBuilder::SendNull() {
Send(v, ABSL_ARRAYSIZE(v)); Send(v, ABSL_ARRAYSIZE(v));
} }
void RedisReplyBuilder::SendSimpleString(std::string_view str) {
iovec v[3] = {IoVec(kSimplePref), IoVec(str), IoVec(kCRLF)};
Send(v, ABSL_ARRAYSIZE(v));
}
void RedisReplyBuilder::SendBulkString(std::string_view str) { void RedisReplyBuilder::SendBulkString(std::string_view str) {
char tmp[absl::numbers_internal::kFastToBufferSize + 3]; char tmp[absl::numbers_internal::kFastToBufferSize + 3];
tmp[0] = '$'; // Format length tmp[0] = '$'; // Format length

View File

@ -11,41 +11,16 @@
namespace facade { namespace facade {
class ReplyBuilderInterface { class SinkReplyBuilder {
public:
virtual ~ReplyBuilderInterface() {
}
// Reply for set commands.
virtual void SendStored() = 0;
// Common for both MC and Redis.
virtual void SendError(std::string_view str, std::string_view type = std::string_view{}) = 0;
virtual std::error_code GetError() const = 0;
struct ResponseValue {
std::string_view key;
std::string value;
uint64_t mc_ver = 0; // 0 means we do not output it (i.e has not been requested).
uint32_t mc_flag = 0;
};
using OptResp = std::optional<ResponseValue>;
virtual void SendMGetResponse(const OptResp* resp, uint32_t count) = 0;
virtual void SendLong(long val) = 0;
virtual void SendSetSkipped() = 0;
};
class SinkReplyBuilder : public ReplyBuilderInterface {
public: public:
SinkReplyBuilder(const SinkReplyBuilder&) = delete; SinkReplyBuilder(const SinkReplyBuilder&) = delete;
void operator=(const SinkReplyBuilder&) = delete; void operator=(const SinkReplyBuilder&) = delete;
SinkReplyBuilder(::io::Sink* sink); SinkReplyBuilder(::io::Sink* sink);
virtual ~SinkReplyBuilder() {
}
// In order to reduce interrupt rate we allow coalescing responses together using // In order to reduce interrupt rate we allow coalescing responses together using
// Batch mode. It is controlled by Connection state machine because it makes sense only // Batch mode. It is controlled by Connection state machine because it makes sense only
// when pipelined requests are arriving. // when pipelined requests are arriving.
@ -56,7 +31,7 @@ class SinkReplyBuilder : public ReplyBuilderInterface {
// Used for QUIT - > should move to conn_context? // Used for QUIT - > should move to conn_context?
void CloseConnection(); void CloseConnection();
std::error_code GetError() const override { std::error_code GetError() const {
return ec_; return ec_;
} }
@ -81,6 +56,31 @@ class SinkReplyBuilder : public ReplyBuilderInterface {
//! Sends a string as is without any formatting. raw should be encoded according to the protocol. //! Sends a string as is without any formatting. raw should be encoded according to the protocol.
void SendDirect(std::string_view str); void SendDirect(std::string_view str);
// Common for both MC and Redis.
virtual void SendError(std::string_view str, std::string_view type = std::string_view{}) = 0;
virtual void SendSimpleString(std::string_view str) = 0;
void SendOk() {
SendSimpleString("OK");
}
struct ResponseValue {
std::string_view key;
std::string value;
uint64_t mc_ver = 0; // 0 means we do not output it (i.e has not been requested).
uint32_t mc_flag = 0;
};
using OptResp = std::optional<ResponseValue>;
virtual void SendMGetResponse(const OptResp* resp, uint32_t count) = 0;
virtual void SendLong(long val) = 0;
// Reply for set commands.
virtual void SendStored() = 0;
virtual void SendSetSkipped() = 0;
protected: protected:
void Send(const iovec* v, uint32_t len); void Send(const iovec* v, uint32_t len);
@ -110,25 +110,21 @@ class MCReplyBuilder : public SinkReplyBuilder {
void SendClientError(std::string_view str); void SendClientError(std::string_view str);
void SendNotFound(); void SendNotFound();
void SendSimpleString(std::string_view str) final;
}; };
class RedisReplyBuilder : public SinkReplyBuilder { class RedisReplyBuilder : public SinkReplyBuilder {
public: public:
RedisReplyBuilder(::io::Sink* stream); RedisReplyBuilder(::io::Sink* stream);
void SendOk() {
SendSimpleString("OK");
}
void SendError(std::string_view str, std::string_view type = std::string_view{}) override; void SendError(std::string_view str, std::string_view type = std::string_view{}) override;
void SendMGetResponse(const OptResp* resp, uint32_t count) override; void SendMGetResponse(const OptResp* resp, uint32_t count) override;
void SendSimpleString(std::string_view str) override;
void SendStored() override; void SendStored() override;
void SendLong(long val) override; void SendLong(long val) override;
void SendSetSkipped() override; void SendSetSkipped() override;
void SendError(OpStatus status); void SendError(OpStatus status);
virtual void SendSimpleString(std::string_view str);
virtual void SendSimpleStrArr(const std::string_view* arr, uint32_t count); virtual void SendSimpleStrArr(const std::string_view* arr, uint32_t count);
virtual void SendNullArray(); virtual void SendNullArray();

View File

@ -185,7 +185,7 @@ void GenericFamily::Del(CmdArgList args, ConnectionContext* cntx) {
if (del_cnt == 0) { if (del_cnt == 0) {
mc_builder->SendNotFound(); mc_builder->SendNotFound();
} else { } else {
mc_builder->SendDirect("DELETED\r\n"); mc_builder->SendSimpleString("DELETED");
} }
} else { } else {
(*cntx)->SendLong(del_cnt); (*cntx)->SendLong(del_cnt);

View File

@ -524,6 +524,9 @@ void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view va
case MemcacheParser::GET: case MemcacheParser::GET:
strcpy(cmd_name, "MGET"); strcpy(cmd_name, "MGET");
break; break;
case MemcacheParser::FLUSHALL:
strcpy(cmd_name, "FLUSHDB");
break;
case MemcacheParser::QUIT: case MemcacheParser::QUIT:
strcpy(cmd_name, "QUIT"); strcpy(cmd_name, "QUIT");
break; break;
@ -531,7 +534,7 @@ void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view va
server_family_.StatsMC(cmd.key, cntx); server_family_.StatsMC(cmd.key, cntx);
return; return;
case MemcacheParser::VERSION: case MemcacheParser::VERSION:
mc_builder->SendDirect(absl::StrCat("VERSION ", gflags::VersionString(), "\r\n")); mc_builder->SendSimpleString(absl::StrCat("VERSION ", gflags::VersionString()));
return; return;
default: default:
mc_builder->SendClientError("bad command line format"); mc_builder->SendClientError("bad command line format");
@ -635,7 +638,7 @@ void Service::Quit(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendOk(); (*cntx)->SendOk();
using facade::SinkReplyBuilder; using facade::SinkReplyBuilder;
SinkReplyBuilder* builder = static_cast<SinkReplyBuilder*>(cntx->reply_builder()); SinkReplyBuilder* builder = cntx->reply_builder();
builder->CloseConnection(); builder->CloseConnection();
} }
@ -651,7 +654,7 @@ void Service::Multi(CmdArgList args, ConnectionContext* cntx) {
void Service::CallFromScript(CmdArgList args, ObjectExplorer* reply, ConnectionContext* cntx) { void Service::CallFromScript(CmdArgList args, ObjectExplorer* reply, ConnectionContext* cntx) {
DCHECK(cntx->transaction); DCHECK(cntx->transaction);
InterpreterReplier replier(reply); InterpreterReplier replier(reply);
facade::ReplyBuilderInterface* orig = cntx->Inject(&replier); facade::SinkReplyBuilder* orig = cntx->Inject(&replier);
DispatchCommand(std::move(args), cntx); DispatchCommand(std::move(args), cntx);

View File

@ -168,7 +168,7 @@ void ServerFamily::FlushDb(CmdArgList args, ConnectionContext* cntx) {
}, },
true); true);
(*cntx)->SendOk(); cntx->reply_builder()->SendOk();
} }
void ServerFamily::FlushAll(CmdArgList args, ConnectionContext* cntx) { void ServerFamily::FlushAll(CmdArgList args, ConnectionContext* cntx) {

View File

@ -24,7 +24,7 @@ namespace {
using namespace std; using namespace std;
using facade::Protocol; using facade::Protocol;
using facade::ReplyBuilderInterface; using facade::SinkReplyBuilder;
using CI = CommandId; using CI = CommandId;
DEFINE_VARZ(VarzQps, set_qps); DEFINE_VARZ(VarzQps, set_qps);
@ -105,7 +105,7 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
sparams.memcache_flags = cntx->conn_state.memcache_flag; sparams.memcache_flags = cntx->conn_state.memcache_flag;
int64_t int_arg; int64_t int_arg;
ReplyBuilderInterface* builder = cntx->reply_builder(); SinkReplyBuilder* builder = cntx->reply_builder();
for (size_t i = 3; i < args.size(); ++i) { for (size_t i = 3; i < args.size(); ++i) {
ToUpper(&args[i]); ToUpper(&args[i]);
@ -342,7 +342,7 @@ void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContex
}; };
OpResult<bool> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); OpResult<bool> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
ReplyBuilderInterface* builder = cntx->reply_builder(); SinkReplyBuilder* builder = cntx->reply_builder();
if (result.value_or(false)) { if (result.value_or(false)) {
return builder->SendStored(); return builder->SendStored();
} }
@ -375,7 +375,7 @@ void StringFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
CHECK_EQ(OpStatus::OK, result); CHECK_EQ(OpStatus::OK, result);
// reorder the responses back according to the order of their corresponding keys. // reorder the responses back according to the order of their corresponding keys.
vector<ReplyBuilderInterface::OptResp> res(args.size() - 1); vector<SinkReplyBuilder::OptResp> res(args.size() - 1);
for (ShardId sid = 0; sid < shard_count; ++sid) { for (ShardId sid = 0; sid < shard_count; ++sid) {
if (!transaction->IsActive(sid)) if (!transaction->IsActive(sid))