diff --git a/README.md b/README.md index b8cfd5e..b08957d 100644 --- a/README.md +++ b/README.md @@ -135,7 +135,7 @@ a distributed log format. - [x] append - [x] prepend - [x] delete -- [ ] flush_all +- [x] flush_all - [x] incr - [x] decr - [x] version diff --git a/helio b/helio index 775f321..6a755e1 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 775f321d8efe9b90fae3fee14b2fcf8364beae7b +Subproject commit 6a755e1babf7383f90948c1d630728585ea6f10e diff --git a/src/facade/conn_context.h b/src/facade/conn_context.h index ac79800..7010fa5 100644 --- a/src/facade/conn_context.h +++ b/src/facade/conn_context.h @@ -30,13 +30,13 @@ class ConnectionContext { // A convenient proxy for redis interface. RedisReplyBuilder* operator->(); - ReplyBuilderInterface* reply_builder() { + SinkReplyBuilder* reply_builder() { return rbuilder_.get(); } // Allows receiving the output data from the commands called from scripts. - ReplyBuilderInterface* Inject(ReplyBuilderInterface* new_i) { - ReplyBuilderInterface* res = rbuilder_.release(); + SinkReplyBuilder* Inject(SinkReplyBuilder* new_i) { + SinkReplyBuilder* res = rbuilder_.release(); rbuilder_.reset(new_i); return res; } @@ -49,7 +49,7 @@ class ConnectionContext { bool authenticated: 1; private: Connection* owner_; - std::unique_ptr rbuilder_; + std::unique_ptr rbuilder_; }; } // namespace facade diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index b55b695..ccc892e 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -316,7 +316,7 @@ auto Connection::ParseRedis() -> ParserStatus { uint32_t consumed = 0; RedisParser::Result result = RedisParser::OK; - ReplyBuilderInterface* builder = cc_->reply_builder(); + SinkReplyBuilder* builder = cc_->reply_builder(); mi_heap_t* tlh = mi_heap_get_backing(); do { @@ -418,7 +418,7 @@ auto Connection::ParseMemcache() -> ParserStatus { } auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant { - SinkReplyBuilder* builder = static_cast(cc_->reply_builder()); + SinkReplyBuilder* builder = cc_->reply_builder(); ConnectionStats* stats = service_->GetThreadLocalConnectionStats(); error_code ec; ParserStatus parse_status = OK; @@ -489,7 +489,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) { this_fiber::properties().set_name("DispatchFiber"); ConnectionStats* stats = service_->GetThreadLocalConnectionStats(); - SinkReplyBuilder* builder = static_cast(cc_->reply_builder()); + SinkReplyBuilder* builder = cc_->reply_builder(); while (!builder->GetError()) { evc_.await([this] { return cc_->conn_closing || !dispatch_q_.empty(); }); diff --git a/src/facade/reply_builder.cc b/src/facade/reply_builder.cc index b23ff9a..b796426 100644 --- a/src/facade/reply_builder.cc +++ b/src/facade/reply_builder.cc @@ -84,12 +84,21 @@ void SinkReplyBuilder::SendDirect(std::string_view raw) { MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink) { } +void MCReplyBuilder::SendSimpleString(std::string_view str) { + iovec v[2] = {IoVec(str), IoVec(kCRLF)}; + + Send(v, ABSL_ARRAYSIZE(v)); +} + + void MCReplyBuilder::SendStored() { - SendDirect("STORED\r\n"); + SendSimpleString("STORED"); } void MCReplyBuilder::SendLong(long val) { - SendDirect(absl::StrCat(val, kCRLF)); + char buf[32]; + char* next = absl::numbers_internal::FastIntToBuffer(val, buf); + SendSimpleString(string_view(buf, next - buf)); } void MCReplyBuilder::SendMGetResponse(const OptResp* resp, uint32_t count) { @@ -109,11 +118,11 @@ void MCReplyBuilder::SendMGetResponse(const OptResp* resp, uint32_t count) { header.clear(); } } - SendDirect("END\r\n"); + SendSimpleString("END"); } void MCReplyBuilder::SendError(string_view str, std::string_view type) { - SendDirect("ERROR\r\n"); + SendSimpleString("ERROR"); } void MCReplyBuilder::SendClientError(string_view str) { @@ -122,11 +131,11 @@ void MCReplyBuilder::SendClientError(string_view str) { } void MCReplyBuilder::SendSetSkipped() { - SendDirect("NOT_STORED\r\n"); + SendSimpleString("NOT_STORED"); } void MCReplyBuilder::SendNotFound() { - SendDirect("NOT_FOUND\r\n"); + SendSimpleString("NOT_FOUND"); } RedisReplyBuilder::RedisReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink) { @@ -144,6 +153,12 @@ void RedisReplyBuilder::SendError(string_view str, std::string_view type) { } } +void RedisReplyBuilder::SendSimpleString(std::string_view str) { + iovec v[3] = {IoVec(kSimplePref), IoVec(str), IoVec(kCRLF)}; + + Send(v, ABSL_ARRAYSIZE(v)); +} + void RedisReplyBuilder::SendStored() { SendSimpleString("OK"); } @@ -160,12 +175,6 @@ void RedisReplyBuilder::SendNull() { Send(v, ABSL_ARRAYSIZE(v)); } -void RedisReplyBuilder::SendSimpleString(std::string_view str) { - iovec v[3] = {IoVec(kSimplePref), IoVec(str), IoVec(kCRLF)}; - - Send(v, ABSL_ARRAYSIZE(v)); -} - void RedisReplyBuilder::SendBulkString(std::string_view str) { char tmp[absl::numbers_internal::kFastToBufferSize + 3]; tmp[0] = '$'; // Format length diff --git a/src/facade/reply_builder.h b/src/facade/reply_builder.h index f91a256..1bf2856 100644 --- a/src/facade/reply_builder.h +++ b/src/facade/reply_builder.h @@ -11,41 +11,16 @@ namespace facade { -class ReplyBuilderInterface { - public: - virtual ~ReplyBuilderInterface() { - } - - // Reply for set commands. - virtual void SendStored() = 0; - - // Common for both MC and Redis. - virtual void SendError(std::string_view str, std::string_view type = std::string_view{}) = 0; - - virtual std::error_code GetError() const = 0; - - struct ResponseValue { - std::string_view key; - std::string value; - uint64_t mc_ver = 0; // 0 means we do not output it (i.e has not been requested). - uint32_t mc_flag = 0; - }; - - using OptResp = std::optional; - - virtual void SendMGetResponse(const OptResp* resp, uint32_t count) = 0; - virtual void SendLong(long val) = 0; - - virtual void SendSetSkipped() = 0; -}; - -class SinkReplyBuilder : public ReplyBuilderInterface { +class SinkReplyBuilder { public: SinkReplyBuilder(const SinkReplyBuilder&) = delete; void operator=(const SinkReplyBuilder&) = delete; SinkReplyBuilder(::io::Sink* sink); + virtual ~SinkReplyBuilder() { + } + // In order to reduce interrupt rate we allow coalescing responses together using // Batch mode. It is controlled by Connection state machine because it makes sense only // when pipelined requests are arriving. @@ -56,7 +31,7 @@ class SinkReplyBuilder : public ReplyBuilderInterface { // Used for QUIT - > should move to conn_context? void CloseConnection(); - std::error_code GetError() const override { + std::error_code GetError() const { return ec_; } @@ -81,6 +56,31 @@ class SinkReplyBuilder : public ReplyBuilderInterface { //! Sends a string as is without any formatting. raw should be encoded according to the protocol. void SendDirect(std::string_view str); + // Common for both MC and Redis. + virtual void SendError(std::string_view str, std::string_view type = std::string_view{}) = 0; + + virtual void SendSimpleString(std::string_view str) = 0; + + void SendOk() { + SendSimpleString("OK"); + } + + struct ResponseValue { + std::string_view key; + std::string value; + uint64_t mc_ver = 0; // 0 means we do not output it (i.e has not been requested). + uint32_t mc_flag = 0; + }; + + using OptResp = std::optional; + + virtual void SendMGetResponse(const OptResp* resp, uint32_t count) = 0; + virtual void SendLong(long val) = 0; + + // Reply for set commands. + virtual void SendStored() = 0; + virtual void SendSetSkipped() = 0; + protected: void Send(const iovec* v, uint32_t len); @@ -110,25 +110,21 @@ class MCReplyBuilder : public SinkReplyBuilder { void SendClientError(std::string_view str); void SendNotFound(); + void SendSimpleString(std::string_view str) final; }; class RedisReplyBuilder : public SinkReplyBuilder { public: RedisReplyBuilder(::io::Sink* stream); - void SendOk() { - SendSimpleString("OK"); - } - void SendError(std::string_view str, std::string_view type = std::string_view{}) override; void SendMGetResponse(const OptResp* resp, uint32_t count) override; - + void SendSimpleString(std::string_view str) override; void SendStored() override; void SendLong(long val) override; void SendSetSkipped() override; void SendError(OpStatus status); - virtual void SendSimpleString(std::string_view str); virtual void SendSimpleStrArr(const std::string_view* arr, uint32_t count); virtual void SendNullArray(); diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index caff801..0bfe12f 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -185,7 +185,7 @@ void GenericFamily::Del(CmdArgList args, ConnectionContext* cntx) { if (del_cnt == 0) { mc_builder->SendNotFound(); } else { - mc_builder->SendDirect("DELETED\r\n"); + mc_builder->SendSimpleString("DELETED"); } } else { (*cntx)->SendLong(del_cnt); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index e56c6a9..98155c4 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -524,6 +524,9 @@ void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view va case MemcacheParser::GET: strcpy(cmd_name, "MGET"); break; + case MemcacheParser::FLUSHALL: + strcpy(cmd_name, "FLUSHDB"); + break; case MemcacheParser::QUIT: strcpy(cmd_name, "QUIT"); break; @@ -531,7 +534,7 @@ void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view va server_family_.StatsMC(cmd.key, cntx); return; case MemcacheParser::VERSION: - mc_builder->SendDirect(absl::StrCat("VERSION ", gflags::VersionString(), "\r\n")); + mc_builder->SendSimpleString(absl::StrCat("VERSION ", gflags::VersionString())); return; default: mc_builder->SendClientError("bad command line format"); @@ -635,7 +638,7 @@ void Service::Quit(CmdArgList args, ConnectionContext* cntx) { (*cntx)->SendOk(); using facade::SinkReplyBuilder; - SinkReplyBuilder* builder = static_cast(cntx->reply_builder()); + SinkReplyBuilder* builder = cntx->reply_builder(); builder->CloseConnection(); } @@ -651,7 +654,7 @@ void Service::Multi(CmdArgList args, ConnectionContext* cntx) { void Service::CallFromScript(CmdArgList args, ObjectExplorer* reply, ConnectionContext* cntx) { DCHECK(cntx->transaction); InterpreterReplier replier(reply); - facade::ReplyBuilderInterface* orig = cntx->Inject(&replier); + facade::SinkReplyBuilder* orig = cntx->Inject(&replier); DispatchCommand(std::move(args), cntx); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 74ae694..7765878 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -168,7 +168,7 @@ void ServerFamily::FlushDb(CmdArgList args, ConnectionContext* cntx) { }, true); - (*cntx)->SendOk(); + cntx->reply_builder()->SendOk(); } void ServerFamily::FlushAll(CmdArgList args, ConnectionContext* cntx) { diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 17f3be4..56a7498 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -24,7 +24,7 @@ namespace { using namespace std; using facade::Protocol; -using facade::ReplyBuilderInterface; +using facade::SinkReplyBuilder; using CI = CommandId; DEFINE_VARZ(VarzQps, set_qps); @@ -105,7 +105,7 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) { sparams.memcache_flags = cntx->conn_state.memcache_flag; int64_t int_arg; - ReplyBuilderInterface* builder = cntx->reply_builder(); + SinkReplyBuilder* builder = cntx->reply_builder(); for (size_t i = 3; i < args.size(); ++i) { ToUpper(&args[i]); @@ -342,7 +342,7 @@ void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContex }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); - ReplyBuilderInterface* builder = cntx->reply_builder(); + SinkReplyBuilder* builder = cntx->reply_builder(); if (result.value_or(false)) { return builder->SendStored(); } @@ -375,7 +375,7 @@ void StringFamily::MGet(CmdArgList args, ConnectionContext* cntx) { CHECK_EQ(OpStatus::OK, result); // reorder the responses back according to the order of their corresponding keys. - vector res(args.size() - 1); + vector res(args.size() - 1); for (ShardId sid = 0; sid < shard_count; ++sid) { if (!transaction->IsActive(sid))