From 5ebbfa5a64abe102ee13f71b7f6f1b8f425ca988 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Tue, 23 Nov 2021 12:39:35 +0200 Subject: [PATCH] Adding memcached protocol support for dragonfly --- server/CMakeLists.txt | 3 +- server/command_registry.cc | 2 +- server/conn_context.cc | 15 +++++++ server/conn_context.h | 7 +-- server/dfly_main.cc | 7 ++- server/dfly_protocol.h | 16 +++++++ server/dragonfly_connection.cc | 60 ++++++++++++++++++++++++- server/dragonfly_connection.h | 10 ++++- server/dragonfly_listener.cc | 4 +- server/dragonfly_listener.h | 4 +- server/main_service.cc | 49 +++++++++++++++++++-- server/main_service.h | 3 ++ server/reply_builder.cc | 80 +++++++++++++++++++++++++--------- server/reply_builder.h | 72 ++++++++++++++++++++++-------- 14 files changed, 278 insertions(+), 54 deletions(-) create mode 100644 server/conn_context.cc create mode 100644 server/dfly_protocol.h diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index edf9415..a344667 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -1,7 +1,8 @@ add_executable(dragonfly dfly_main.cc) cxx_link(dragonfly base dragonfly_lib) -add_library(dragonfly_lib command_registry.cc config_flags.cc db_slice.cc dragonfly_listener.cc +add_library(dragonfly_lib command_registry.cc config_flags.cc conn_context.cc db_slice.cc + dragonfly_listener.cc dragonfly_connection.cc engine_shard_set.cc main_service.cc memcache_parser.cc redis_parser.cc resp_expr.cc reply_builder.cc) diff --git a/server/command_registry.cc b/server/command_registry.cc index 41c2104..dc4d031 100644 --- a/server/command_registry.cc +++ b/server/command_registry.cc @@ -55,7 +55,7 @@ void CommandRegistry::Command(CmdArgList args, ConnectionContext* cntx) { StrAppend(&resp, ":", cd.key_arg_step(), "\r\n"); } - cntx->SendDirect(resp); + cntx->SendRespBlob(resp); } namespace CO { diff --git a/server/conn_context.cc b/server/conn_context.cc new file mode 100644 index 0000000..e6227cc --- /dev/null +++ b/server/conn_context.cc @@ -0,0 +1,15 @@ +// Copyright 2021, Beeri 15. All rights reserved. +// Author: Roman Gershman (romange@gmail.com) +// + +#include "server/conn_context.h" + +#include "server/dragonfly_connection.h" + +namespace dfly { + +ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) + : ReplyBuilder(owner->protocol(), stream), owner_(owner) { +} + +} // namespace dfly diff --git a/server/conn_context.h b/server/conn_context.h index 56bd912..bea5e60 100644 --- a/server/conn_context.h +++ b/server/conn_context.h @@ -14,14 +14,15 @@ class CommandId; class ConnectionContext : public ReplyBuilder { public: - ConnectionContext(::io::Sink* stream, Connection* owner) : ReplyBuilder(stream), owner_(owner) { - } + ConnectionContext(::io::Sink* stream, Connection* owner); // TODO: to introduce proper accessors. const CommandId* cid = nullptr; EngineShardSet* shard_set = nullptr; - Connection* owner() { return owner_;} + Connection* owner() { + return owner_; + } private: Connection* owner_; diff --git a/server/dfly_main.cc b/server/dfly_main.cc index de897be..e4b612c 100644 --- a/server/dfly_main.cc +++ b/server/dfly_main.cc @@ -11,6 +11,7 @@ DEFINE_int32(http_port, 8080, "Http port."); DECLARE_uint32(port); +DECLARE_uint32(memcache_port); using namespace util; @@ -24,7 +25,11 @@ void RunEngine(ProactorPool* pool, AcceptServer* acceptor, HttpListener<>* http) service.RegisterHttp(http); } - acceptor->AddListener(FLAGS_port, new Listener{&service}); + acceptor->AddListener(FLAGS_port, new Listener{Protocol::REDIS, &service}); + if (FLAGS_memcache_port > 0) { + acceptor->AddListener(FLAGS_memcache_port, new Listener{Protocol::MEMCACHE, &service}); + } + acceptor->Run(); acceptor->Wait(); diff --git a/server/dfly_protocol.h b/server/dfly_protocol.h new file mode 100644 index 0000000..7e867ec --- /dev/null +++ b/server/dfly_protocol.h @@ -0,0 +1,16 @@ +// Copyright 2021, Beeri 15. All rights reserved. +// Author: Roman Gershman (romange@gmail.com) +// + +#pragma once + +#include + +namespace dfly { + +enum class Protocol : uint8_t { + MEMCACHE = 1, + REDIS = 2 +}; + +} // namespace dfly diff --git a/server/dragonfly_connection.cc b/server/dragonfly_connection.cc index 93cd3d8..12ea3c8 100644 --- a/server/dragonfly_connection.cc +++ b/server/dragonfly_connection.cc @@ -11,6 +11,7 @@ #include "server/command_registry.h" #include "server/conn_context.h" #include "server/main_service.h" +#include "server/memcache_parser.h" #include "server/redis_parser.h" #include "util/fiber_sched_algo.h" #include "util/tls/tls_socket.h" @@ -69,8 +70,18 @@ struct Connection::Shutdown { } }; -Connection::Connection(Service* service, SSL_CTX* ctx) : service_(service), ctx_(ctx) { +Connection::Connection(Protocol protocol, Service* service, SSL_CTX* ctx) + : service_(service), ctx_(ctx) { + protocol_ = protocol; + + switch (protocol) { + case Protocol::REDIS: redis_parser_.reset(new RedisParser); + break; + case Protocol::MEMCACHE: + memcache_parser_.reset(new MemcacheParser); + break; + } } Connection::~Connection() { @@ -143,7 +154,14 @@ void Connection::InputLoop(FiberSocketBase* peer) { } io_buf.CommitWrite(*recv_sz); + + if (redis_parser_) status = ParseRedis(&io_buf); + else { + DCHECK(memcache_parser_); + status = ParseMemcache(&io_buf); + } + if (status == NEED_MORE) { status = OK; } else if (status != OK) { @@ -206,4 +224,44 @@ auto Connection::ParseRedis(base::IoBuf* io_buf) -> ParserStatus { return ERROR; } +auto Connection::ParseMemcache(base::IoBuf* io_buf) -> ParserStatus { + MemcacheParser::Result result = MemcacheParser::OK; + uint32_t consumed = 0; + MemcacheParser::Command cmd; + string_view value; + do { + string_view str = ToSV(io_buf->InputBuffer()); + result = memcache_parser_->Parse(str, &consumed, &cmd); + + if (result != MemcacheParser::OK) { + io_buf->ConsumeInput(consumed); + break; + } + + size_t total_len = consumed; + if (MemcacheParser::IsStoreCmd(cmd.type)) { + total_len += cmd.bytes_len + 2; + if (io_buf->InputLen() >= total_len) { + value = str.substr(consumed, cmd.bytes_len); + // TODO: dispatch. + } else { + return NEED_MORE; + } + } + + service_->DispatchMC(cmd, value, cc_.get()); + io_buf->ConsumeInput(total_len); + } while (!cc_->ec()); + + parser_error_ = result; + + if (result == MemcacheParser::OK) + return OK; + + if (result == MemcacheParser::INPUT_PENDING) + return NEED_MORE; + + return ERROR; +} + } // namespace dfly diff --git a/server/dragonfly_connection.h b/server/dragonfly_connection.h index b4b9b8d..7d580cb 100644 --- a/server/dragonfly_connection.h +++ b/server/dragonfly_connection.h @@ -7,6 +7,7 @@ #include "util/connection.h" #include "base/io_buf.h" +#include "server/dfly_protocol.h" typedef struct ssl_ctx_st SSL_CTX; @@ -15,10 +16,11 @@ namespace dfly { class ConnectionContext; class RedisParser; class Service; +class MemcacheParser; class Connection : public util::Connection { public: - Connection(Service* service, SSL_CTX* ctx); + Connection(Protocol protocol, Service* service, SSL_CTX* ctx); ~Connection(); using error_code = std::error_code; @@ -28,6 +30,8 @@ class Connection : public util::Connection { ShutdownHandle RegisterShutdownHook(ShutdownCb cb); void UnregisterShutdownHook(ShutdownHandle id); + Protocol protocol() const { return protocol_;} + protected: void OnShutdown() override; @@ -39,14 +43,16 @@ class Connection : public util::Connection { void InputLoop(util::FiberSocketBase* peer); ParserStatus ParseRedis(base::IoBuf* buf); + ParserStatus ParseMemcache(base::IoBuf* buf); std::unique_ptr redis_parser_; + std::unique_ptr memcache_parser_; Service* service_; SSL_CTX* ctx_; std::unique_ptr cc_; unsigned parser_error_ = 0; - + Protocol protocol_; struct Shutdown; std::unique_ptr shutdown_; }; diff --git a/server/dragonfly_listener.cc b/server/dragonfly_listener.cc index 01f7568..c0fb6a4 100644 --- a/server/dragonfly_listener.cc +++ b/server/dragonfly_listener.cc @@ -81,7 +81,7 @@ static SSL_CTX* CreateSslCntx() { return ctx; } -Listener::Listener(Service* e) : engine_(e) { +Listener::Listener(Protocol protocol, Service* e) : engine_(e), protocol_(protocol) { if (FLAGS_tls) { OPENSSL_init_ssl(OPENSSL_INIT_SSL_DEFAULT, NULL); ctx_ = CreateSslCntx(); @@ -93,7 +93,7 @@ Listener::~Listener() { } util::Connection* Listener::NewConnection(ProactorBase* proactor) { - return new Connection{engine_, ctx_}; + return new Connection{protocol_, engine_, ctx_}; } void Listener::PreShutdown() { diff --git a/server/dragonfly_listener.h b/server/dragonfly_listener.h index 15aa5d5..bd9fc4e 100644 --- a/server/dragonfly_listener.h +++ b/server/dragonfly_listener.h @@ -5,6 +5,7 @@ #pragma once #include "util/listener_interface.h" +#include "server/dfly_protocol.h" typedef struct ssl_ctx_st SSL_CTX; @@ -14,7 +15,7 @@ class Service; class Listener : public util::ListenerInterface { public: - Listener(Service*); + Listener(Protocol protocol, Service*); ~Listener(); private: @@ -28,6 +29,7 @@ class Listener : public util::ListenerInterface { Service* engine_; std::atomic_uint32_t next_id_{0}; + Protocol protocol_; SSL_CTX* ctx_ = nullptr; }; diff --git a/server/main_service.cc b/server/main_service.cc index a980a72..4c24c19 100644 --- a/server/main_service.cc +++ b/server/main_service.cc @@ -16,6 +16,7 @@ #include "util/varz.h" DEFINE_uint32(port, 6380, "Redis port"); +DEFINE_uint32(memcache_port, 0, "Memcached port"); namespace std { @@ -127,6 +128,47 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) { request_latency_usec.IncBy(cmd_str, (end_usec - start_usec) / 1000); } +void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view value, + ConnectionContext* cntx) { + absl::InlinedVector args; + char cmd_name[16]; + char set_opt[4] = {0}; + + switch (cmd.type) { + case MemcacheParser::REPLACE: + strcpy(cmd_name, "SET"); + strcpy(set_opt, "XX"); + break; + case MemcacheParser::SET: + strcpy(cmd_name, "SET"); + break; + case MemcacheParser::ADD: + strcpy(cmd_name, "SET"); + strcpy(set_opt, "NX"); + break; + default: + cntx->SendMCClientError("bad command line format"); + return; + } + + args.emplace_back(cmd_name, strlen(cmd_name)); + char* key = const_cast(cmd.key.data()); + args.emplace_back(key, cmd.key.size()); + + if (MemcacheParser::IsStoreCmd(cmd.type)) { + char* v = const_cast(value.data()); + args.emplace_back(v, value.size()); + + if (set_opt[0]) { + args.emplace_back(set_opt, strlen(set_opt)); + } + } + + CmdArgList arg_list{args.data(), args.size()}; + DispatchCommand(arg_list, cntx); +} + + void Service::RegisterHttp(HttpListenerBase* listener) { CHECK_NOTNULL(listener); } @@ -138,12 +180,12 @@ void Service::Ping(CmdArgList args, ConnectionContext* cntx) { ping_qps.Inc(); if (args.size() == 1) { - return cntx->SendSimpleString("PONG"); + return cntx->SendSimpleRespString("PONG"); } std::string_view arg = ArgS(args, 1); DVLOG(2) << "Ping " << arg; - return cntx->SendSimpleString(arg); + return cntx->SendSimpleRespString(arg); } void Service::Set(CmdArgList args, ConnectionContext* cntx) { @@ -159,7 +201,8 @@ void Service::Set(CmdArgList args, ConnectionContext* cntx) { auto [it, res] = es->db_slice.AddOrFind(0, key); it->second = val; }); - cntx->SendOk(); + + cntx->SendStored(); } diff --git a/server/main_service.h b/server/main_service.h index 40edc49..8d5ae0d 100644 --- a/server/main_service.h +++ b/server/main_service.h @@ -8,6 +8,7 @@ #include "server/command_registry.h" #include "server/engine_shard_set.h" #include "util/http/http_handler.h" +#include "server/memcache_parser.h" namespace util { class AcceptServer; @@ -29,6 +30,8 @@ class Service { void Shutdown(); void DispatchCommand(CmdArgList args, ConnectionContext* cntx); + void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value, + ConnectionContext* cntx); uint32_t shard_count() const { return shard_set_.size(); diff --git a/server/reply_builder.cc b/server/reply_builder.cc index abeb616..11a52dd 100644 --- a/server/reply_builder.cc +++ b/server/reply_builder.cc @@ -26,23 +26,38 @@ constexpr char kSimplePref[] = "+"; } // namespace -RespSerializer::RespSerializer(io::Sink* stream) : sink_(stream) { +BaseSerializer::BaseSerializer(io::Sink* sink) : sink_(sink) { } -void RespSerializer::Send(const iovec* v, uint32_t len) { +void BaseSerializer::Send(const iovec* v, uint32_t len) { error_code ec = sink_->Write(v, len); if (ec) { ec_ = ec; } } -void RespSerializer::SendDirect(std::string_view raw) { +void BaseSerializer::SendDirect(std::string_view raw) { iovec v = {IoVec(raw)}; Send(&v, 1); } -void ReplyBuilder::SendBulkString(std::string_view str) { +void RespSerializer::SendNull() { + constexpr char kNullStr[] = "$-1\r\n"; + + iovec v[] = {IoVec(kNullStr)}; + + Send(v, ABSL_ARRAYSIZE(v)); +} + +void RespSerializer::SendSimpleString(std::string_view str) { + iovec v[3] = {IoVec(kSimplePref), IoVec(str), IoVec(kCRLF)}; + + Send(v, ABSL_ARRAYSIZE(v)); +} + + +void RespSerializer::SendBulkString(std::string_view str) { char tmp[absl::numbers_internal::kFastToBufferSize + 3]; tmp[0] = '$'; // Format length char* next = absl::numbers_internal::FastIntToBuffer(uint32_t(str.size()), tmp + 1); @@ -57,17 +72,54 @@ void ReplyBuilder::SendBulkString(std::string_view str) { return Send(v, ABSL_ARRAYSIZE(v)); } -void ReplyBuilder::SendError(std::string_view str) { +void MemcacheSerializer::SendStored() { + SendDirect("STORED\r\n"); +} + +void MemcacheSerializer::SendError() { + SendDirect("ERROR\r\n"); +} + +ReplyBuilder::ReplyBuilder(Protocol protocol, ::io::Sink* sink) : protocol_(protocol) { + if (protocol == Protocol::REDIS) { + serializer_.reset(new RespSerializer(sink)); + } else { + DCHECK(protocol == Protocol::MEMCACHE); + serializer_.reset(new MemcacheSerializer(sink)); + } +} + +void ReplyBuilder::SendStored() { + if (protocol_ == Protocol::REDIS) { + as_resp()->SendSimpleString("OK"); + } else { + as_mc()->SendStored(); + } +} + + +void ReplyBuilder::SendMCClientError(string_view str) { + DCHECK(protocol_ == Protocol::MEMCACHE); + + iovec v[] = {IoVec("CLIENT_ERROR"), IoVec(str), IoVec(kCRLF)}; + serializer_->Send(v, ABSL_ARRAYSIZE(v)); +} + +void ReplyBuilder::SendError(string_view str) { + DCHECK(protocol_ == Protocol::REDIS); + if (str[0] == '-') { iovec v[] = {IoVec(str), IoVec(kCRLF)}; - return Send(v, ABSL_ARRAYSIZE(v)); + serializer_->Send(v, ABSL_ARRAYSIZE(v)); } else { iovec v[] = {IoVec(kErrPref), IoVec(str), IoVec(kCRLF)}; - return Send(v, ABSL_ARRAYSIZE(v)); + serializer_->Send(v, ABSL_ARRAYSIZE(v)); } } void ReplyBuilder::SendError(OpStatus status) { + DCHECK(protocol_ == Protocol::REDIS); + switch (status) { case OpStatus::OK: SendOk(); @@ -82,18 +134,4 @@ void ReplyBuilder::SendError(OpStatus status) { } } -void ReplyBuilder::SendNull() { - constexpr char kNullStr[] = "$-1\r\n"; - - iovec v[] = {IoVec(kNullStr)}; - - Send(v, ABSL_ARRAYSIZE(v)); -} - -void ReplyBuilder::SendSimpleString(std::string_view str) { - iovec v[3] = {IoVec(kSimplePref), IoVec(str), IoVec(kCRLF)}; - - Send(v, ABSL_ARRAYSIZE(v)); -} - } // namespace dfly diff --git a/server/reply_builder.h b/server/reply_builder.h index 97229ef..e4c538e 100644 --- a/server/reply_builder.h +++ b/server/reply_builder.h @@ -2,17 +2,16 @@ // Author: Roman Gershman (romange@gmail.com) // #include -#include #include "io/sync_stream_interface.h" - +#include "server/dfly_protocol.h" #include "server/op_status.h" namespace dfly { -class RespSerializer { +class BaseSerializer { public: - explicit RespSerializer(::io::Sink* sink); + explicit BaseSerializer(::io::Sink* sink); std::error_code ec() const { return ec_; @@ -23,41 +22,78 @@ class RespSerializer { ec_ = std::make_error_code(std::errc::connection_aborted); } - //! Sends a string as is without any formatting. raw should be RESP-encoded. + //! Sends a string as is without any formatting. raw should be encoded according to the protocol. void SendDirect(std::string_view str); - ::io::Sink* sink() { return sink_; } + ::io::Sink* sink() { + return sink_; + } - protected: void Send(const iovec* v, uint32_t len); - ::io::Sink* sink_; - private: + ::io::Sink* sink_; std::error_code ec_; }; -class ReplyBuilder : public RespSerializer { +class RespSerializer : public BaseSerializer { public: - explicit ReplyBuilder(::io::Sink* stream) : RespSerializer(stream) { + RespSerializer(::io::Sink* sink) : BaseSerializer(sink) { } + //! See https://redis.io/topics/protocol + void SendSimpleString(std::string_view str); + void SendNull(); + /// aka "$6\r\nfoobar\r\n" void SendBulkString(std::string_view str); +}; - void SendNull(); - - void SendOk() { - return SendSimpleString("OK"); +class MemcacheSerializer : public BaseSerializer { + public: + explicit MemcacheSerializer(::io::Sink* sink) : BaseSerializer(sink) { } + void SendStored(); + void SendError(); +}; + +class ReplyBuilder { + public: + ReplyBuilder(Protocol protocol, ::io::Sink* stream); + + void SendStored(); + void SendError(std::string_view str); void SendError(OpStatus status); - //! See https://redis.io/topics/protocol - void SendSimpleString(std::string_view str); + void SendOk() { + as_resp()->SendSimpleString("OK"); + } -private: + std::error_code ec() const { + return serializer_->ec(); + } + + void SendMCClientError(std::string_view str); + + void SendSimpleRespString(std::string_view str) { + as_resp()->SendSimpleString(str); + } + + void SendRespBlob(std::string_view str) { + as_resp()->SendDirect(str); + } + private: + RespSerializer* as_resp() { + return static_cast(serializer_.get()); + } + MemcacheSerializer* as_mc() { + return static_cast(serializer_.get()); + } + + std::unique_ptr serializer_; + Protocol protocol_; }; } // namespace dfly