From 3f0fcbf99ff318401a0d0659740daf2fc715d8fa Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 3 Mar 2022 01:59:29 +0200 Subject: [PATCH] Factor out client connections module into a separate library called facade --- src/CMakeLists.txt | 1 + src/core/dragonfly_core.cc | 67 +------- src/facade/CMakeLists.txt | 13 ++ src/facade/README.md | 8 + src/facade/conn_context.h | 51 ++++++ .../dragonfly_connection.cc | 84 +++++----- src/{server => facade}/dragonfly_connection.h | 22 ++- src/{server => facade}/dragonfly_listener.cc | 26 +-- src/{server => facade}/dragonfly_listener.h | 15 +- src/facade/error.h | 25 +++ src/facade/facade.cc | 157 ++++++++++++++++++ src/facade/facade_test.cc | 98 +++++++++++ src/facade/facade_test.h | 83 +++++++++ src/facade/facade_types.h | 58 +++++++ src/{server => facade}/memcache_parser.cc | 4 +- src/{server => facade}/memcache_parser.h | 2 +- .../memcache_parser_test.cc | 12 +- src/facade/ok_main.cc | 70 ++++++++ src/{core => facade}/op_status.h | 10 +- src/{server => facade}/redis_parser.cc | 4 +- src/{server => facade}/redis_parser.h | 23 ++- src/{server => facade}/redis_parser_test.cc | 39 +---- src/{server => facade}/reply_builder.cc | 6 +- src/{server => facade}/reply_builder.h | 4 +- src/{core => facade}/resp_expr.h | 12 +- src/facade/service_interface.h | 31 ++++ src/server/CMakeLists.txt | 15 +- src/server/common.cc | 52 ------ src/server/common_types.h | 51 +----- src/server/conn_context.cc | 33 ---- src/server/conn_context.h | 60 +------ src/server/db_slice.cc | 3 +- src/server/db_slice.h | 4 +- src/server/debugcmd.cc | 1 + src/server/dfly_main.cc | 13 +- src/server/dragonfly_test.cc | 40 ++++- src/server/error.h | 18 +- src/server/generic_family.cc | 4 +- src/server/generic_family.h | 5 +- src/server/generic_family_test.cc | 1 + src/server/hset_family.h | 3 +- src/server/hset_family_test.cc | 2 + src/server/list_family.h | 4 +- src/server/list_family_test.cc | 1 + src/server/main_service.cc | 110 +++++++----- src/server/main_service.h | 23 ++- src/server/rdb_load.cc | 1 + src/server/rdb_save.cc | 1 + src/server/replica.cc | 4 +- src/server/replica.h | 7 +- src/server/server_family.cc | 13 +- src/server/server_family.h | 6 +- src/server/server_state.h | 4 +- src/server/set_family.cc | 1 + src/server/set_family.h | 4 +- src/server/set_family_test.cc | 1 + src/server/string_family.cc | 14 +- src/server/string_family.h | 2 + src/server/string_family_test.cc | 1 + src/server/test_utils.cc | 98 +---------- src/server/test_utils.h | 77 +-------- src/server/transaction.cc | 1 + src/server/transaction.h | 7 +- src/server/zset_family.h | 2 +- 64 files changed, 946 insertions(+), 666 deletions(-) create mode 100644 src/facade/CMakeLists.txt create mode 100644 src/facade/README.md create mode 100644 src/facade/conn_context.h rename src/{server => facade}/dragonfly_connection.cc (87%) rename src/{server => facade}/dragonfly_connection.h (77%) rename src/{server => facade}/dragonfly_listener.cc (85%) rename src/{server => facade}/dragonfly_listener.h (67%) create mode 100644 src/facade/error.h create mode 100644 src/facade/facade.cc create mode 100644 src/facade/facade_test.cc create mode 100644 src/facade/facade_test.h create mode 100644 src/facade/facade_types.h rename src/{server => facade}/memcache_parser.cc (98%) rename src/{server => facade}/memcache_parser.h (97%) rename src/{server => facade}/memcache_parser_test.cc (93%) create mode 100644 src/facade/ok_main.cc rename src/{core => facade}/op_status.h (86%) rename src/{server => facade}/redis_parser.cc (99%) rename src/{server => facade}/redis_parser.h (87%) rename src/{server => facade}/redis_parser_test.cc (86%) rename src/{server => facade}/reply_builder.cc (98%) rename src/{server => facade}/reply_builder.h (98%) rename src/{core => facade}/resp_expr.h (80%) create mode 100644 src/facade/service_interface.h delete mode 100644 src/server/conn_context.cc diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 6b81a9f..f62214c 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,3 +1,4 @@ add_subdirectory(redis) add_subdirectory(core) +add_subdirectory(facade) add_subdirectory(server) diff --git a/src/core/dragonfly_core.cc b/src/core/dragonfly_core.cc index 9631197..d4fd1e8 100644 --- a/src/core/dragonfly_core.cc +++ b/src/core/dragonfly_core.cc @@ -4,28 +4,11 @@ #include "base/logging.h" #include "core/intent_lock.h" -#include "core/resp_expr.h" + +#include namespace dfly { -const char* RespExpr::TypeName(Type t) { - switch (t) { - case STRING: - return "string"; - case INT64: - return "int"; - case ARRAY: - return "array"; - case NIL_ARRAY: - return "nil-array"; - case NIL: - return "nil"; - case ERROR: - return "error"; - } - ABSL_INTERNAL_UNREACHABLE; -} - const char* IntentLock::ModeName(Mode m) { switch (m) { case IntentLock::SHARED: @@ -42,48 +25,4 @@ void IntentLock::VerifyDebug() { DCHECK_EQ(0u, cnt_[1] & kMsb); } -} // namespace dfly - -namespace std { -ostream& operator<<(ostream& os, const dfly::RespExpr& e) { - using dfly::RespExpr; - using dfly::ToSV; - - switch (e.type) { - case RespExpr::INT64: - os << "i" << get(e.u); - break; - case RespExpr::STRING: - os << "'" << ToSV(get(e.u)) << "'"; - break; - case RespExpr::NIL: - os << "nil"; - break; - case RespExpr::NIL_ARRAY: - os << "[]"; - break; - case RespExpr::ARRAY: - os << dfly::RespSpan{*get(e.u)}; - break; - case RespExpr::ERROR: - os << "e(" << ToSV(get(e.u)) << ")"; - break; - } - - return os; -} - -ostream& operator<<(ostream& os, dfly::RespSpan ras) { - os << "["; - if (!ras.empty()) { - for (size_t i = 0; i < ras.size() - 1; ++i) { - os << ras[i] << ","; - } - os << ras.back(); - } - os << "]"; - - return os; -} - -} // namespace std +} // namespace dfly \ No newline at end of file diff --git a/src/facade/CMakeLists.txt b/src/facade/CMakeLists.txt new file mode 100644 index 0000000..48a93b6 --- /dev/null +++ b/src/facade/CMakeLists.txt @@ -0,0 +1,13 @@ +add_library(dfly_facade dragonfly_listener.cc dragonfly_connection.cc facade.cc + memcache_parser.cc redis_parser.cc reply_builder.cc) +cxx_link(dfly_facade base uring_fiber_lib fibers_ext strings_lib http_server_lib + tls_lib TRDP::mimalloc) + +add_library(facade_test facade_test.cc) +cxx_link(facade_test dfly_facade gtest_main_ext) + +cxx_test(memcache_parser_test dfly_facade LABELS DFLY) +cxx_test(redis_parser_test facade_test LABELS DFLY) + +add_executable(ok_backend ok_main.cc) +cxx_link(ok_backend dfly_facade) diff --git a/src/facade/README.md b/src/facade/README.md new file mode 100644 index 0000000..401ceca --- /dev/null +++ b/src/facade/README.md @@ -0,0 +1,8 @@ +## A facade library + +The library is responsible for opening dragonfly-like TCP client connections. +I call it facade because "client" term is often abused. + +It should be separated from the rest of dragonfly server logic and should be self-contained, i.e +no redis-lib or server dependencies are allowed. + diff --git a/src/facade/conn_context.h b/src/facade/conn_context.h new file mode 100644 index 0000000..f2343fa --- /dev/null +++ b/src/facade/conn_context.h @@ -0,0 +1,51 @@ +// Copyright 2021, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include + +#include "facade/facade_types.h" +#include "facade/reply_builder.h" + +namespace facade { + +class Connection; + +class ConnectionContext { + public: + ConnectionContext(::io::Sink* stream, Connection* owner); + + Connection* owner() { + return owner_; + } + + Protocol protocol() const; + + // A convenient proxy for redis interface. + RedisReplyBuilder* operator->(); + + ReplyBuilderInterface* reply_builder() { + return rbuilder_.get(); + } + + // Allows receiving the output data from the commands called from scripts. + ReplyBuilderInterface* Inject(ReplyBuilderInterface* new_i) { + ReplyBuilderInterface* res = rbuilder_.release(); + rbuilder_.reset(new_i); + return res; + } + + // connection state / properties. + bool async_dispatch: 1; + bool conn_closing: 1; + bool req_auth: 1; + bool replica_conn: 1; + bool authenticated: 1; + private: + Connection* owner_; + std::unique_ptr rbuilder_; +}; + +} // namespace facade diff --git a/src/server/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc similarity index 87% rename from src/server/dragonfly_connection.cc rename to src/facade/dragonfly_connection.cc index 7ec4e0a..197c9b3 100644 --- a/src/server/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -2,7 +2,7 @@ // See LICENSE for licensing terms. // -#include "server/dragonfly_connection.h" +#include "facade/dragonfly_connection.h" #include #include @@ -11,13 +11,11 @@ #include #include "base/logging.h" -#include "server/command_registry.h" -#include "server/conn_context.h" -#include "server/main_service.h" -#include "server/memcache_parser.h" -#include "server/redis_parser.h" -#include "server/server_state.h" -#include "server/transaction.h" + +#include "facade/conn_context.h" +#include "facade/memcache_parser.h" +#include "facade/redis_parser.h" +#include "facade/service_interface.h" #include "util/fiber_sched_algo.h" #include "util/tls/tls_socket.h" #include "util/uring/uring_socket.h" @@ -30,7 +28,7 @@ using nonstd::make_unexpected; namespace this_fiber = boost::this_fiber; namespace fibers = boost::fibers; -namespace dfly { +namespace facade { namespace { void SendProtocolError(RedisParser::Result pres, FiberSocketBase* peer) { @@ -94,8 +92,9 @@ struct Connection::Request { Request(const Request&) = delete; }; -Connection::Connection(Protocol protocol, Service* service, SSL_CTX* ctx) - : io_buf_{kMinReadSize}, service_(service), ctx_(ctx) { +Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx, + ServiceInterface* service) + : io_buf_(kMinReadSize), http_listener_(http_listener), ctx_(ctx), service_(service) { protocol_ = protocol; switch (protocol) { @@ -167,7 +166,7 @@ void Connection::HandleRequests() { if (http_res) { if (*http_res) { VLOG(1) << "HTTP1.1 identified"; - HttpConnection http_conn{service_->http_listener()}; + HttpConnection http_conn{http_listener_}; http_conn.SetSocket(peer); auto ec = http_conn.ParseFromBuffer(io_buf_.InputBuffer()); io_buf_.ConsumeInput(io_buf_.InputLen()); @@ -176,30 +175,27 @@ void Connection::HandleRequests() { } http_conn.ReleaseSocket(); } else { - cc_.reset(new ConnectionContext(peer, this)); - cc_->shard_set = &service_->shard_set(); - - if (service_->IsPassProtected()) - cc_->conn_state.mask |= ConnectionState::REQ_AUTH; + cc_.reset(service_->CreateContext(peer, this)); + bool should_disarm_poller = false; // TODO: to move this interface to LinuxSocketBase so we won't need to cast. uring::UringSocket* us = static_cast(socket_.get()); + uint32_t poll_id = 0; + if (breaker_cb_) { + should_disarm_poller = true; - bool poll_armed = true; - uint32_t poll_id = us->PollEvent(POLLERR | POLLHUP, [&](uint32_t mask) { - VLOG(1) << "Got event " << mask; - cc_->conn_state.mask |= ConnectionState::CONN_CLOSING; - if (cc_->transaction) { - cc_->transaction->BreakOnClose(); - } - - evc_.notify(); // Notify dispatch fiber. - poll_armed = false; - }); + poll_id = us->PollEvent(POLLERR | POLLHUP, [&](uint32_t mask) { + VLOG(1) << "Got event " << mask; + cc_->conn_closing = true; + breaker_cb_(mask); + evc_.notify(); // Notify dispatch fiber. + should_disarm_poller = false; + }); + } ConnectionFlow(peer); - if (poll_armed) { + if (should_disarm_poller) { us->CancelPoll(poll_id); } } @@ -208,6 +204,10 @@ void Connection::HandleRequests() { VLOG(1) << "Closed connection for peer " << remote_ep; } +void Connection::RegisterOnBreak(BreakerCb breaker_cb) { + breaker_cb_ = breaker_cb; +} + io::Result Connection::CheckForHttpProto(util::FiberSocketBase* peer) { size_t last_len = 0; do { @@ -235,7 +235,7 @@ io::Result Connection::CheckForHttpProto(util::FiberSocketBase* peer) { void Connection::ConnectionFlow(FiberSocketBase* peer) { auto dispatch_fb = fibers::fiber(fibers::launch::dispatch, [&] { DispatchFiber(peer); }); - ConnectionStats* stats = ServerState::tl_connection_stats(); + ConnectionStats* stats = service_->GetThreadLocalConnectionStats(); stats->num_conns++; stats->read_buf_capacity += io_buf_.Capacity(); @@ -265,14 +265,14 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { } } - cc_->conn_state.mask |= ConnectionState::CONN_CLOSING; // Signal dispatch to close. + cc_->conn_closing = true; // Signal dispatch to close. evc_.notify(); dispatch_fb.join(); stats->read_buf_capacity -= io_buf_.Capacity(); // Update num_replicas if this was a replica connection. - if (cc_->conn_state.mask & ConnectionState::REPL_CONNECTION) { + if (cc_->replica_conn) { --stats->num_replicas; } @@ -321,7 +321,7 @@ auto Connection::ParseRedis() -> ParserStatus { // We use ASYNC_DISPATCH as a lock to avoid out-of-order replies when the // dispatch fiber pulls the last record but is still processing the command and then this // fiber enters the condition below and executes out of order. - bool is_sync_dispatch = !cc_->conn_state.IsRunViaDispatch(); + bool is_sync_dispatch = !cc_->async_dispatch; if (dispatch_q_.empty() && is_sync_dispatch && consumed >= io_buf_.InputLen()) { RespToArgList(args, &arg_vec); service_->DispatchCommand(CmdArgList{arg_vec.data(), arg_vec.size()}, cc_.get()); @@ -382,7 +382,7 @@ auto Connection::ParseMemcache() -> ParserStatus { // We use ASYNC_DISPATCH as a lock to avoid out-of-order replies when the // dispatch fiber pulls the last record but is still processing the command and then this // fiber enters the condition below and executes out of order. - bool is_sync_dispatch = (cc_->conn_state.mask & ConnectionState::ASYNC_DISPATCH) == 0; + bool is_sync_dispatch = !cc_->async_dispatch; if (dispatch_q_.empty() && is_sync_dispatch) { service_->DispatchMC(cmd, value, cc_.get()); } @@ -408,7 +408,7 @@ auto Connection::ParseMemcache() -> ParserStatus { auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant { SinkReplyBuilder* builder = static_cast(cc_->reply_builder()); - ConnectionStats* stats = ServerState::tl_connection_stats(); + ConnectionStats* stats = service_->GetThreadLocalConnectionStats(); error_code ec; ParserStatus parse_status = OK; @@ -484,12 +484,12 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant().set_name("DispatchFiber"); - ConnectionStats* stats = ServerState::tl_connection_stats(); + ConnectionStats* stats = service_->GetThreadLocalConnectionStats(); SinkReplyBuilder* builder = static_cast(cc_->reply_builder()); while (!builder->GetError()) { - evc_.await([this] { return cc_->conn_state.IsClosing() || !dispatch_q_.empty(); }); - if (cc_->conn_state.IsClosing()) + evc_.await([this] { return cc_->conn_closing || !dispatch_q_.empty(); }); + if (cc_->conn_closing) break; // TODO: We have a memory leak with pending requests in the queue. Request* req = dispatch_q_.front(); @@ -498,14 +498,14 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) { ++stats->pipelined_cmd_cnt; builder->SetBatchMode(!dispatch_q_.empty()); - cc_->conn_state.mask |= ConnectionState::ASYNC_DISPATCH; + cc_->async_dispatch = true; service_->DispatchCommand(CmdArgList{req->args.data(), req->args.size()}, cc_.get()); - cc_->conn_state.mask &= ~ConnectionState::ASYNC_DISPATCH; + cc_->async_dispatch = false; req->~Request(); mi_free(req); } - cc_->conn_state.mask |= ConnectionState::CONN_CLOSING; + cc_->conn_closing = true; } auto Connection::FromArgs(RespVec args, mi_heap_t* heap) -> Request* { @@ -536,4 +536,4 @@ auto Connection::FromArgs(RespVec args, mi_heap_t* heap) -> Request* { return req; } -} // namespace dfly +} // namespace facade diff --git a/src/server/dragonfly_connection.h b/src/facade/dragonfly_connection.h similarity index 77% rename from src/server/dragonfly_connection.h rename to src/facade/dragonfly_connection.h index 9689013..e059b03 100644 --- a/src/server/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -10,24 +10,26 @@ #include #include "base/io_buf.h" -#include "core/resp_expr.h" -#include "server/common_types.h" +#include "facade/facade_types.h" +#include "facade/resp_expr.h" #include "util/connection.h" #include "util/fibers/event_count.h" +#include "util/http/http_handler.h" typedef struct ssl_ctx_st SSL_CTX; typedef struct mi_heap_s mi_heap_t; -namespace dfly { +namespace facade { class ConnectionContext; class RedisParser; -class Service; +class ServiceInterface; class MemcacheParser; class Connection : public util::Connection { public: - Connection(Protocol protocol, Service* service, SSL_CTX* ctx); + Connection(Protocol protocol, util::HttpListenerBase* http_listener, + SSL_CTX* ctx, ServiceInterface* service); ~Connection(); using error_code = std::error_code; @@ -41,6 +43,9 @@ class Connection : public util::Connection { return protocol_; } + using BreakerCb = std::function; + void RegisterOnBreak(BreakerCb breaker_cb); + protected: void OnShutdown() override; @@ -63,8 +68,10 @@ class Connection : public util::Connection { base::IoBuf io_buf_; std::unique_ptr redis_parser_; std::unique_ptr memcache_parser_; - Service* service_; + util::HttpListenerBase* http_listener_; SSL_CTX* ctx_; + ServiceInterface* service_; + std::unique_ptr cc_; struct Request; @@ -77,6 +84,7 @@ class Connection : public util::Connection { Protocol protocol_; struct Shutdown; std::unique_ptr shutdown_; + BreakerCb breaker_cb_; }; -} // namespace dfly +} // namespace facade diff --git a/src/server/dragonfly_listener.cc b/src/facade/dragonfly_listener.cc similarity index 85% rename from src/server/dragonfly_listener.cc rename to src/facade/dragonfly_listener.cc index dd1e80b..887a362 100644 --- a/src/server/dragonfly_listener.cc +++ b/src/facade/dragonfly_listener.cc @@ -1,14 +1,13 @@ -// Copyright 2021, Roman Gershman. All rights reserved. +// Copyright 2022, Roman Gershman. All rights reserved. // See LICENSE for licensing terms. // -#include "server/dragonfly_listener.h" +#include "facade/dragonfly_listener.h" #include #include "base/logging.h" -#include "server/config_flags.h" -#include "server/dragonfly_connection.h" +#include "facade/dragonfly_connection.h" #include "util/proactor_pool.h" DEFINE_uint32(conn_threads, 0, "Number of threads used for handing server connections"); @@ -17,16 +16,17 @@ DEFINE_bool(conn_use_incoming_cpu, false, "If true uses incoming cpu of a socket in order to distribute" " incoming connections"); -CONFIG_string(tls_client_cert_file, "", "", TrueValidator); -CONFIG_string(tls_client_key_file, "", "", TrueValidator); +DEFINE_string(tls_client_cert_file, "", "cert file for tls connections"); +DEFINE_string(tls_client_key_file, "", "key file for tls connections"); +#if 0 enum TlsClientAuth { CL_AUTH_NO = 0, CL_AUTH_YES = 1, CL_AUTH_OPTIONAL = 2, }; -dfly::ConfigEnum tls_auth_clients_enum[] = { +facade::ConfigEnum tls_auth_clients_enum[] = { {"no", CL_AUTH_NO}, {"yes", CL_AUTH_YES}, {"optional", CL_AUTH_OPTIONAL}, @@ -35,8 +35,9 @@ dfly::ConfigEnum tls_auth_clients_enum[] = { static int tls_auth_clients_opt = CL_AUTH_YES; CONFIG_enum(tls_auth_clients, "yes", "", tls_auth_clients_enum, tls_auth_clients_opt); +#endif -namespace dfly { +namespace facade { using namespace util; using namespace std; @@ -85,11 +86,14 @@ static SSL_CTX* CreateSslCntx() { return ctx; } -Listener::Listener(Protocol protocol, Service* e) : engine_(e), protocol_(protocol) { +Listener::Listener(Protocol protocol, ServiceInterface* e) : service_(e), protocol_(protocol) { if (FLAGS_tls) { OPENSSL_init_ssl(OPENSSL_INIT_SSL_DEFAULT, NULL); ctx_ = CreateSslCntx(); } + http_base_.reset(new HttpListener<>); + + http_base_->enable_metrics(); } Listener::~Listener() { @@ -97,7 +101,7 @@ Listener::~Listener() { } util::Connection* Listener::NewConnection(ProactorBase* proactor) { - return new Connection{protocol_, engine_, ctx_}; + return new Connection{protocol_, http_base_.get(), ctx_, service_}; } void Listener::PreShutdown() { @@ -139,4 +143,4 @@ ProactorBase* Listener::PickConnectionProactor(LinuxSocketBase* sock) { return pp->at(id % total); } -} // namespace dfly +} // namespace facade diff --git a/src/server/dragonfly_listener.h b/src/facade/dragonfly_listener.h similarity index 67% rename from src/server/dragonfly_listener.h rename to src/facade/dragonfly_listener.h index 10a1ed1..0622549 100644 --- a/src/server/dragonfly_listener.h +++ b/src/facade/dragonfly_listener.h @@ -4,18 +4,19 @@ #pragma once +#include "facade/facade_types.h" +#include "util/http/http_handler.h" #include "util/listener_interface.h" -#include "server/common_types.h" typedef struct ssl_ctx_st SSL_CTX; -namespace dfly { +namespace facade { -class Service; +class ServiceInterface; class Listener : public util::ListenerInterface { public: - Listener(Protocol protocol, Service*); + Listener(Protocol protocol, ServiceInterface*); ~Listener(); private: @@ -26,11 +27,13 @@ class Listener : public util::ListenerInterface { void PostShutdown(); - Service* engine_; + std::unique_ptr http_base_; + + ServiceInterface* service_; std::atomic_uint32_t next_id_{0}; Protocol protocol_; SSL_CTX* ctx_ = nullptr; }; -} // namespace dfly +} // namespace facade diff --git a/src/facade/error.h b/src/facade/error.h new file mode 100644 index 0000000..4e85ad7 --- /dev/null +++ b/src/facade/error.h @@ -0,0 +1,25 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include +#include + +namespace facade { + +std::string WrongNumArgsError(std::string_view cmd); + +extern const char kSyntaxErr[]; +extern const char kWrongTypeErr[]; +extern const char kKeyNotFoundErr[]; +extern const char kInvalidIntErr[]; +extern const char kUintErr[]; +extern const char kDbIndOutOfRangeErr[]; +extern const char kInvalidDbIndErr[]; +extern const char kScriptNotFound[]; +extern const char kAuthRejected[]; + + +} // namespace dfly diff --git a/src/facade/facade.cc b/src/facade/facade.cc new file mode 100644 index 0000000..6db9ce1 --- /dev/null +++ b/src/facade/facade.cc @@ -0,0 +1,157 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#include + +#include "base/logging.h" + +#include "facade/conn_context.h" +#include "facade/dragonfly_connection.h" +#include "facade/error.h" +#include "facade/facade_types.h" + +namespace facade { + +using namespace std; + +#define ADD(x) (x) += o.x + +ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { + // To break this code deliberately if we add/remove a field to this struct. + static_assert(sizeof(ConnectionStats) == 64); + + ADD(num_conns); + ADD(num_replicas); + ADD(read_buf_capacity); + ADD(io_read_cnt); + ADD(io_read_bytes); + ADD(io_write_cnt); + ADD(io_write_bytes); + ADD(pipelined_cmd_cnt); + ADD(command_cnt); + + return *this; +} + +#undef ADD + +string WrongNumArgsError(std::string_view cmd) { + return absl::StrCat("wrong number of arguments for '", cmd, "' command"); +} + +const char kSyntaxErr[] = "syntax error"; +const char kWrongTypeErr[] = "-WRONGTYPE Operation against a key holding the wrong kind of value"; +const char kKeyNotFoundErr[] = "no such key"; +const char kInvalidIntErr[] = "value is not an integer or out of range"; +const char kUintErr[] = "value is out of range, must be positive"; +const char kDbIndOutOfRangeErr[] = "DB index is out of range"; +const char kInvalidDbIndErr[] = "invalid DB index"; +const char kScriptNotFound[] = "-NOSCRIPT No matching script. Please use EVAL."; +const char kAuthRejected[] = "-WRONGPASS invalid username-password pair or user is disabled."; + +const char* RespExpr::TypeName(Type t) { + switch (t) { + case STRING: + return "string"; + case INT64: + return "int"; + case ARRAY: + return "array"; + case NIL_ARRAY: + return "nil-array"; + case NIL: + return "nil"; + case ERROR: + return "error"; + } + ABSL_INTERNAL_UNREACHABLE; +} + +ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : owner_(owner) { + switch (owner->protocol()) { + case Protocol::REDIS: + rbuilder_.reset(new RedisReplyBuilder(stream)); + break; + case Protocol::MEMCACHE: + rbuilder_.reset(new MCReplyBuilder(stream)); + break; + } + + async_dispatch = false; + conn_closing = false; + req_auth = false; + replica_conn = false; + authenticated = false; +} + +Protocol ConnectionContext::protocol() const { + return owner_->protocol(); +} + +RedisReplyBuilder* ConnectionContext::operator->() { + CHECK(Protocol::REDIS == protocol()); + + return static_cast(rbuilder_.get()); +} + +} // namespace facade + + +namespace std { + +ostream& operator<<(ostream& os, facade::CmdArgList ras) { + os << "["; + if (!ras.empty()) { + for (size_t i = 0; i < ras.size() - 1; ++i) { + os << facade::ArgS(ras, i) << ","; + } + os << facade::ArgS(ras, ras.size() - 1); + } + os << "]"; + + return os; +} + +ostream& operator<<(ostream& os, const facade::RespExpr& e) { + using facade::RespExpr; + using facade::ToSV; + + switch (e.type) { + case RespExpr::INT64: + os << "i" << get(e.u); + break; + case RespExpr::STRING: + os << "'" << ToSV(get(e.u)) << "'"; + break; + case RespExpr::NIL: + os << "nil"; + break; + case RespExpr::NIL_ARRAY: + os << "[]"; + break; + case RespExpr::ARRAY: + os << facade::RespSpan{*get(e.u)}; + break; + case RespExpr::ERROR: + os << "e(" << ToSV(get(e.u)) << ")"; + break; + } + + return os; +} + +ostream& operator<<(ostream& os, facade::RespSpan ras) { + os << "["; + if (!ras.empty()) { + for (size_t i = 0; i < ras.size() - 1; ++i) { + os << ras[i] << ","; + } + os << ras.back(); + } + os << "]"; + + return os; +} + +} // namespace std diff --git a/src/facade/facade_test.cc b/src/facade/facade_test.cc new file mode 100644 index 0000000..f710d5f --- /dev/null +++ b/src/facade/facade_test.cc @@ -0,0 +1,98 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "facade/facade_test.h" + +#include + +namespace facade { + +using namespace testing; +using namespace std; + +bool RespMatcher::MatchAndExplain(const RespExpr& e, MatchResultListener* listener) const { + if (e.type != type_) { + *listener << "\nWrong type: " << RespExpr::TypeName(e.type); + return false; + } + + if (type_ == RespExpr::STRING || type_ == RespExpr::ERROR) { + RespExpr::Buffer ebuf = e.GetBuf(); + std::string_view actual{reinterpret_cast(ebuf.data()), ebuf.size()}; + + if (type_ == RespExpr::ERROR && !absl::StrContains(actual, exp_str_)) { + *listener << "Actual does not contain '" << exp_str_ << "'"; + return false; + } + if (type_ == RespExpr::STRING && exp_str_ != actual) { + *listener << "\nActual string: " << actual; + return false; + } + } else if (type_ == RespExpr::INT64) { + auto actual = get(e.u); + if (exp_int_ != actual) { + *listener << "\nActual : " << actual << " expected: " << exp_int_; + return false; + } + } else if (type_ == RespExpr::ARRAY) { + size_t len = get(e.u)->size(); + if (len != size_t(exp_int_)) { + *listener << "Actual length " << len << ", expected: " << exp_int_; + return false; + } + } + + return true; +} + +void RespMatcher::DescribeTo(std::ostream* os) const { + *os << "is "; + switch (type_) { + case RespExpr::STRING: + case RespExpr::ERROR: + *os << exp_str_; + break; + + case RespExpr::INT64: + *os << exp_str_; + break; + default: + *os << "TBD"; + break; + } +} + +void RespMatcher::DescribeNegationTo(std::ostream* os) const { + *os << "is not "; +} + +bool RespTypeMatcher::MatchAndExplain(const RespExpr& e, MatchResultListener* listener) const { + if (e.type != type_) { + *listener << "\nWrong type: " << RespExpr::TypeName(e.type); + return false; + } + + return true; +} + +void RespTypeMatcher::DescribeTo(std::ostream* os) const { + *os << "is " << RespExpr::TypeName(type_); +} + +void RespTypeMatcher::DescribeNegationTo(std::ostream* os) const { + *os << "is not " << RespExpr::TypeName(type_); +} + +void PrintTo(const RespExpr::Vec& vec, std::ostream* os) { + *os << "Vec: ["; + if (!vec.empty()) { + for (size_t i = 0; i < vec.size() - 1; ++i) { + *os << vec[i] << ","; + } + *os << vec.back(); + } + *os << "]\n"; +} + +} // namespace facade diff --git a/src/facade/facade_test.h b/src/facade/facade_test.h new file mode 100644 index 0000000..986a661 --- /dev/null +++ b/src/facade/facade_test.h @@ -0,0 +1,83 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include + +#include "facade/resp_expr.h" + +namespace facade { + +class RespMatcher { + public: + RespMatcher(std::string_view val, RespExpr::Type t = RespExpr::STRING) : type_(t), exp_str_(val) { + } + + RespMatcher(int64_t val, RespExpr::Type t = RespExpr::INT64) : type_(t), exp_int_(val) { + } + + using is_gtest_matcher = void; + + bool MatchAndExplain(const RespExpr& e, testing::MatchResultListener*) const; + + void DescribeTo(std::ostream* os) const; + + void DescribeNegationTo(std::ostream* os) const; + + private: + RespExpr::Type type_; + + std::string exp_str_; + int64_t exp_int_; +}; + +class RespTypeMatcher { + public: + RespTypeMatcher(RespExpr::Type type) : type_(type) { + } + + using is_gtest_matcher = void; + + bool MatchAndExplain(const RespExpr& e, testing::MatchResultListener*) const; + + void DescribeTo(std::ostream* os) const; + + void DescribeNegationTo(std::ostream* os) const; + + private: + RespExpr::Type type_; +}; + +inline ::testing::PolymorphicMatcher StrArg(std::string_view str) { + return ::testing::MakePolymorphicMatcher(RespMatcher(str)); +} + +inline ::testing::PolymorphicMatcher ErrArg(std::string_view str) { + return ::testing::MakePolymorphicMatcher(RespMatcher(str, RespExpr::ERROR)); +} + +inline ::testing::PolymorphicMatcher IntArg(int64_t ival) { + return ::testing::MakePolymorphicMatcher(RespMatcher(ival)); +} + +inline ::testing::PolymorphicMatcher ArrLen(size_t len) { + return ::testing::MakePolymorphicMatcher(RespMatcher(len, RespExpr::ARRAY)); +} + +inline ::testing::PolymorphicMatcher ArgType(RespExpr::Type t) { + return ::testing::MakePolymorphicMatcher(RespTypeMatcher(t)); +} + +inline bool operator==(const RespExpr& left, const char* s) { + return left.type == RespExpr::STRING && ToSV(left.GetBuf()) == s; +} + +void PrintTo(const RespExpr::Vec& vec, std::ostream* os); + +MATCHER_P(RespEq, val, "") { + return ::testing::ExplainMatchResult(::testing::ElementsAre(StrArg(val)), arg, result_listener); +} + +} // namespace facade diff --git a/src/facade/facade_types.h b/src/facade/facade_types.h new file mode 100644 index 0000000..88cbc28 --- /dev/null +++ b/src/facade/facade_types.h @@ -0,0 +1,58 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include + +namespace facade { + +enum class Protocol : uint8_t { + MEMCACHE = 1, + REDIS = 2 +}; + +using MutableSlice = absl::Span; +using CmdArgList = absl::Span; +using CmdArgVec = std::vector; + + +struct ConnectionStats { + uint32_t num_conns = 0; + uint32_t num_replicas = 0; + size_t read_buf_capacity = 0; + size_t io_read_cnt = 0; + size_t io_read_bytes = 0; + size_t io_write_cnt = 0; + size_t io_write_bytes = 0; + size_t command_cnt = 0; + size_t pipelined_cmd_cnt = 0; + + ConnectionStats& operator+=(const ConnectionStats& o); +}; + +inline MutableSlice ToMSS(absl::Span span) { + return MutableSlice{reinterpret_cast(span.data()), span.size()}; +} + +inline std::string_view ArgS(CmdArgList args, size_t i) { + auto arg = args[i]; + return std::string_view(arg.data(), arg.size()); +} + +constexpr inline unsigned long long operator""_MB(unsigned long long x) { + return 1024L * 1024L * x; +} + +constexpr inline unsigned long long operator""_KB(unsigned long long x) { + return 1024L * x; +} + +} // namespace facade + + +namespace std { +ostream& operator<<(ostream& os, facade::CmdArgList args); + +} // namespace std diff --git a/src/server/memcache_parser.cc b/src/facade/memcache_parser.cc similarity index 98% rename from src/server/memcache_parser.cc rename to src/facade/memcache_parser.cc index 2f04c9e..b12dada 100644 --- a/src/server/memcache_parser.cc +++ b/src/facade/memcache_parser.cc @@ -1,7 +1,7 @@ // Copyright 2022, Roman Gershman. All rights reserved. // See LICENSE for licensing terms. // -#include "server/memcache_parser.h" +#include "facade/memcache_parser.h" #include #include @@ -9,7 +9,7 @@ #include "base/stl_util.h" -namespace dfly { +namespace facade { using namespace std; using MP = MemcacheParser; diff --git a/src/server/memcache_parser.h b/src/facade/memcache_parser.h similarity index 97% rename from src/server/memcache_parser.h rename to src/facade/memcache_parser.h index 34dcbad..fc3a6b7 100644 --- a/src/server/memcache_parser.h +++ b/src/facade/memcache_parser.h @@ -7,7 +7,7 @@ #include #include -namespace dfly { +namespace facade { // Memcache parser does not parse value blobs, only the commands. // The expectation is that the caller will parse the command and diff --git a/src/server/memcache_parser_test.cc b/src/facade/memcache_parser_test.cc similarity index 93% rename from src/server/memcache_parser_test.cc rename to src/facade/memcache_parser_test.cc index 0614c57..f8d8864 100644 --- a/src/server/memcache_parser_test.cc +++ b/src/facade/memcache_parser_test.cc @@ -2,23 +2,22 @@ // See LICENSE for licensing terms. // -#include "server/memcache_parser.h" +#include "facade/memcache_parser.h" #include #include "absl/strings/str_cat.h" #include "base/gtest.h" #include "base/logging.h" -#include "server/test_utils.h" +#include "facade/facade_test.h" using namespace testing; using namespace std; -namespace dfly { + +namespace facade { class MCParserTest : public testing::Test { protected: - RedisParser::Result Parse(std::string_view str); - MemcacheParser parser_; MemcacheParser::Command cmd_; uint32_t consumed_; @@ -26,7 +25,6 @@ class MCParserTest : public testing::Test { unique_ptr stash_; }; - TEST_F(MCParserTest, Basic) { MemcacheParser::Result st = parser_.Parse("set a 1 20 3\r\n", &consumed_, &cmd_); EXPECT_EQ(MemcacheParser::OK, st); @@ -80,4 +78,4 @@ TEST_F(MCParserTest, Stats) { EXPECT_EQ(MemcacheParser::PARSE_ERROR, st); } -} // namespace dfly \ No newline at end of file +} // namespace facade \ No newline at end of file diff --git a/src/facade/ok_main.cc b/src/facade/ok_main.cc new file mode 100644 index 0000000..7d49a61 --- /dev/null +++ b/src/facade/ok_main.cc @@ -0,0 +1,70 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "base/init.h" +#include "facade/conn_context.h" +#include "facade/dragonfly_listener.h" +#include "facade/service_interface.h" +#include "util/accept_server.h" +#include "util/uring/uring_pool.h" + +DEFINE_uint32(port, 6379, "server port"); + +using namespace util; +using namespace std; + +namespace facade { + +namespace { + +thread_local ConnectionStats tl_stats; + +class OkService : public ServiceInterface { + public: + void DispatchCommand(CmdArgList args, ConnectionContext* cntx) final { + (*cntx)->SendOk(); + } + + void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value, + ConnectionContext* cntx) final { + cntx->reply_builder()->SendError(""); + } + + ConnectionContext* CreateContext(util::FiberSocketBase* peer, Connection* owner) final { + return new ConnectionContext{peer, owner}; + } + + ConnectionStats* GetThreadLocalConnectionStats() final { + return &tl_stats; + } +}; + +void RunEngine(ProactorPool* pool, AcceptServer* acceptor) { + OkService service; + + acceptor->AddListener(FLAGS_port, new Listener{Protocol::REDIS, &service}); + + acceptor->Run(); + acceptor->Wait(); +} + +} // namespace + +} // namespace facade + +int main(int argc, char* argv[]) { + MainInitGuard guard(&argc, &argv); + + CHECK_GT(FLAGS_port, 0u); + + uring::UringPool pp{1024}; + pp.Run(); + + AcceptServer acceptor(&pp); + facade::RunEngine(&pp, &acceptor); + + pp.Stop(); + + return 0; +} diff --git a/src/core/op_status.h b/src/facade/op_status.h similarity index 86% rename from src/core/op_status.h rename to src/facade/op_status.h index 7d4a1ea..df5ea4e 100644 --- a/src/core/op_status.h +++ b/src/facade/op_status.h @@ -1,4 +1,4 @@ -// Copyright 2021, Roman Gershman. All rights reserved. +// Copyright 2022, Roman Gershman. All rights reserved. // See LICENSE for licensing terms. // @@ -6,7 +6,7 @@ #include -namespace dfly { +namespace facade { enum class OpStatus : uint16_t { OK, @@ -84,16 +84,16 @@ inline bool operator==(OpStatus st, const OpResultBase& ob) { return ob.operator==(st); } -} // namespace dfly +} // namespace facade namespace std { -template std::ostream& operator<<(std::ostream& os, const dfly::OpResult& res) { +template std::ostream& operator<<(std::ostream& os, const facade::OpResult& res) { os << int(res.status()); return os; } -inline std::ostream& operator<<(std::ostream& os, const dfly::OpStatus op) { +inline std::ostream& operator<<(std::ostream& os, const facade::OpStatus op) { os << int(op); return os; } diff --git a/src/server/redis_parser.cc b/src/facade/redis_parser.cc similarity index 99% rename from src/server/redis_parser.cc rename to src/facade/redis_parser.cc index a5da30a..4d378ab 100644 --- a/src/server/redis_parser.cc +++ b/src/facade/redis_parser.cc @@ -1,13 +1,13 @@ // Copyright 2021, Roman Gershman. All rights reserved. // See LICENSE for licensing terms. // -#include "server/redis_parser.h" +#include "facade/redis_parser.h" #include #include "base/logging.h" -namespace dfly { +namespace facade { using namespace std; diff --git a/src/server/redis_parser.h b/src/facade/redis_parser.h similarity index 87% rename from src/server/redis_parser.h rename to src/facade/redis_parser.h index 3a52df2..ac40bef 100644 --- a/src/server/redis_parser.h +++ b/src/facade/redis_parser.h @@ -5,9 +5,9 @@ #include -#include "core/resp_expr.h" +#include "facade/resp_expr.h" -namespace dfly { +namespace facade { /** * @brief Zero-copy (best-effort) parser. @@ -15,14 +15,7 @@ namespace dfly { */ class RedisParser { public: - enum Result { - OK, - INPUT_PENDING, - BAD_ARRAYLEN, - BAD_BULKLEN, - BAD_STRING, - BAD_INT - }; + enum Result { OK, INPUT_PENDING, BAD_ARRAYLEN, BAD_BULKLEN, BAD_STRING, BAD_INT }; using Buffer = RespExpr::Buffer; explicit RedisParser(bool server_mode = true) : server_mode_(server_mode) { @@ -51,8 +44,12 @@ class RedisParser { return bulk_len_; } - size_t stash_size() const { return stash_.size(); } - const std::vector>& stash() const { return stash_;} + size_t stash_size() const { + return stash_.size(); + } + const std::vector>& stash() const { + return stash_; + } private: void InitStart(uint8_t prefix_b, RespVec* res); @@ -97,4 +94,4 @@ class RedisParser { bool server_mode_ = true; }; -} // namespace dfly +} // namespace facade diff --git a/src/server/redis_parser_test.cc b/src/facade/redis_parser_test.cc similarity index 86% rename from src/server/redis_parser_test.cc rename to src/facade/redis_parser_test.cc index b0e2c39..c691807 100644 --- a/src/server/redis_parser_test.cc +++ b/src/facade/redis_parser_test.cc @@ -2,12 +2,7 @@ // See LICENSE for licensing terms. // -#include "server/redis_parser.h" - -extern "C" { - #include "redis/sds.h" - #include "redis/zmalloc.h" -} +#include "facade/redis_parser.h" #include #include @@ -15,11 +10,11 @@ extern "C" { #include "absl/strings/str_cat.h" #include "base/gtest.h" #include "base/logging.h" -#include "server/test_utils.h" +#include "facade/facade_test.h" using namespace testing; using namespace std; -namespace dfly { +namespace facade { MATCHER_P(ArrArg, expected, absl::StrCat(negation ? "is not" : "is", " equal to:\n", expected)) { if (arg.type != RespExpr::ARRAY) { @@ -39,7 +34,6 @@ MATCHER_P(ArrArg, expected, absl::StrCat(negation ? "is not" : "is", " equal to: class RedisParserTest : public testing::Test { protected: static void SetUpTestSuite() { - init_zmalloc_threadlocal(); } RedisParser::Result Parse(std::string_view str); @@ -91,31 +85,8 @@ TEST_F(RedisParserTest, Inline) { EXPECT_EQ(2, consumed_); } -TEST_F(RedisParserTest, Sds) { - int argc; - sds* argv = sdssplitargs("\r\n",&argc); - EXPECT_EQ(0, argc); - sdsfreesplitres(argv,argc); - - argv = sdssplitargs("\026 \020 \200 \277 \r\n",&argc); - EXPECT_EQ(4, argc); - EXPECT_STREQ("\026", argv[0]); - sdsfreesplitres(argv,argc); - - argv = sdssplitargs(R"(abc "oops\n" )""\r\n",&argc); - EXPECT_EQ(2, argc); - EXPECT_STREQ("oops\n", argv[1]); - sdsfreesplitres(argv,argc); - - argv = sdssplitargs(R"( "abc\xf0" )" "\t'oops\n' \r\n",&argc); - ASSERT_EQ(2, argc); - EXPECT_STREQ("abc\xf0", argv[0]); - EXPECT_STREQ("oops\n", argv[1]); - sdsfreesplitres(argv,argc); -} - TEST_F(RedisParserTest, InlineEscaping) { - LOG(ERROR) << "TBD: to be compliant with sdssplitargs"; // TODO: + LOG(ERROR) << "TBD: to be compliant with sdssplitargs"; // TODO: } TEST_F(RedisParserTest, Multi1) { @@ -217,4 +188,4 @@ TEST_F(RedisParserTest, LargeBulk) { ASSERT_EQ(RedisParser::OK, Parse("\r\n")); } -} // namespace dfly +} // namespace facade diff --git a/src/server/reply_builder.cc b/src/facade/reply_builder.cc similarity index 98% rename from src/server/reply_builder.cc rename to src/facade/reply_builder.cc index 65dd19a..c070dac 100644 --- a/src/server/reply_builder.cc +++ b/src/facade/reply_builder.cc @@ -1,18 +1,18 @@ // Copyright 2021, Roman Gershman. All rights reserved. // See LICENSE for licensing terms. // -#include "server/reply_builder.h" +#include "facade/reply_builder.h" #include #include #include "base/logging.h" -#include "server/error.h" +#include "facade/error.h" using namespace std; using absl::StrAppend; -namespace dfly { +namespace facade { namespace { diff --git a/src/server/reply_builder.h b/src/facade/reply_builder.h similarity index 98% rename from src/server/reply_builder.h rename to src/facade/reply_builder.h index f1999fe..fca8780 100644 --- a/src/server/reply_builder.h +++ b/src/facade/reply_builder.h @@ -4,10 +4,10 @@ #include #include -#include "core/op_status.h" +#include "facade/op_status.h" #include "io/sync_stream_interface.h" -namespace dfly { +namespace facade { class ReplyBuilderInterface { public: diff --git a/src/core/resp_expr.h b/src/facade/resp_expr.h similarity index 80% rename from src/core/resp_expr.h rename to src/facade/resp_expr.h index 175b3d4..fe535db 100644 --- a/src/core/resp_expr.h +++ b/src/facade/resp_expr.h @@ -10,7 +10,7 @@ #include #include -namespace dfly { +namespace facade { class RespExpr { public: @@ -31,7 +31,9 @@ class RespExpr { return Buffer{reinterpret_cast(s->data()), s->size()}; } - Buffer GetBuf() const { return std::get(u); } + Buffer GetBuf() const { + return std::get(u); + } static const char* TypeName(Type t); }; @@ -43,11 +45,11 @@ inline std::string_view ToSV(const absl::Span& s) { return std::string_view{reinterpret_cast(s.data()), s.size()}; } -} // namespace dfly +} // namespace facade namespace std { -ostream& operator<<(ostream& os, const dfly::RespExpr& e); -ostream& operator<<(ostream& os, dfly::RespSpan rspan); +ostream& operator<<(ostream& os, const facade::RespExpr& e); +ostream& operator<<(ostream& os, facade::RespSpan rspan); } // namespace std \ No newline at end of file diff --git a/src/facade/service_interface.h b/src/facade/service_interface.h new file mode 100644 index 0000000..68bcc64 --- /dev/null +++ b/src/facade/service_interface.h @@ -0,0 +1,31 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include "facade/facade_types.h" +#include "facade/memcache_parser.h" + +#include "util/fiber_socket_base.h" + +namespace facade { + +class ConnectionContext; +class Connection; +class ConnectionStats; + +class ServiceInterface { + public: + virtual ~ServiceInterface() { + } + + virtual void DispatchCommand(CmdArgList args, ConnectionContext* cntx) = 0; + virtual void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value, + ConnectionContext* cntx) = 0; + + virtual ConnectionContext* CreateContext(util::FiberSocketBase* peer, Connection* owner) = 0; + virtual ConnectionStats* GetThreadLocalConnectionStats() = 0; +}; + +} // namespace facade diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 9510630..c7035a6 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -2,25 +2,22 @@ add_executable(dragonfly dfly_main.cc) cxx_link(dragonfly base dragonfly_lib) add_library(dragonfly_lib command_registry.cc common.cc config_flags.cc - conn_context.cc db_slice.cc debugcmd.cc dragonfly_listener.cc - dragonfly_connection.cc engine_shard_set.cc generic_family.cc hset_family.cc - list_family.cc main_service.cc memcache_parser.cc rdb_load.cc rdb_save.cc replica.cc - snapshot.cc redis_parser.cc reply_builder.cc script_mgr.cc server_family.cc + db_slice.cc debugcmd.cc + engine_shard_set.cc generic_family.cc hset_family.cc + list_family.cc main_service.cc rdb_load.cc rdb_save.cc replica.cc + snapshot.cc script_mgr.cc server_family.cc set_family.cc string_family.cc transaction.cc zset_family.cc) -cxx_link(dragonfly_lib dfly_core redis_lib uring_fiber_lib - fibers_ext strings_lib http_server_lib tls_lib) +cxx_link(dragonfly_lib dfly_core dfly_facade redis_lib strings_lib) add_library(dfly_test_lib test_utils.cc) -cxx_link(dfly_test_lib dragonfly_lib gtest_main_ext) +cxx_link(dfly_test_lib dragonfly_lib facade_test gtest_main_ext) cxx_test(dragonfly_test dfly_test_lib LABELS DFLY) cxx_test(generic_family_test dfly_test_lib LABELS DFLY) cxx_test(hset_family_test dfly_test_lib LABELS DFLY) cxx_test(list_family_test dfly_test_lib LABELS DFLY) -cxx_test(memcache_parser_test dfly_test_lib LABELS DFLY) -cxx_test(redis_parser_test dfly_test_lib LABELS DFLY) cxx_test(set_family_test dfly_test_lib LABELS DFLY) cxx_test(string_family_test dfly_test_lib LABELS DFLY) cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/small.rdb LABELS DFLY) diff --git a/src/server/common.cc b/src/server/common.cc index bee449d..16ba9c7 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -38,41 +38,6 @@ Interpreter& ServerState::GetInterpreter() { return interpreter_.value(); } -#define ADD(x) (x) += o.x - -ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { - // To break this code deliberately if we add/remove a field to this struct. - static_assert(sizeof(ConnectionStats) == 64); - - ADD(num_conns); - ADD(num_replicas); - ADD(read_buf_capacity); - ADD(io_read_cnt); - ADD(io_read_bytes); - ADD(io_write_cnt); - ADD(io_write_bytes); - ADD(pipelined_cmd_cnt); - ADD(command_cnt); - - return *this; -} - -#undef ADD - -string WrongNumArgsError(std::string_view cmd) { - return absl::StrCat("wrong number of arguments for '", cmd, "' command"); -} - -const char kSyntaxErr[] = "syntax error"; -const char kWrongTypeErr[] = "-WRONGTYPE Operation against a key holding the wrong kind of value"; -const char kKeyNotFoundErr[] = "no such key"; -const char kInvalidIntErr[] = "value is not an integer or out of range"; -const char kUintErr[] = "value is out of range, must be positive"; -const char kDbIndOutOfRangeErr[] = "DB index is out of range"; -const char kInvalidDbIndErr[] = "invalid DB index"; -const char kScriptNotFound[] = "-NOSCRIPT No matching script. Please use EVAL."; -const char kAuthRejected[] = "-WRONGPASS invalid username-password pair or user is disabled."; - const char* GlobalState::Name(S s) { switch (s) { case GlobalState::IDLE: @@ -88,20 +53,3 @@ const char* GlobalState::Name(S s) { } } // namespace dfly - -namespace std { - -ostream& operator<<(ostream& os, dfly::CmdArgList ras) { - os << "["; - if (!ras.empty()) { - for (size_t i = 0; i < ras.size() - 1; ++i) { - os << dfly::ArgS(ras, i) << ","; - } - os << dfly::ArgS(ras, ras.size() - 1); - } - os << "]"; - - return os; -} - -} // namespace std diff --git a/src/server/common_types.h b/src/server/common_types.h index 4e68bd6..760a140 100644 --- a/src/server/common_types.h +++ b/src/server/common_types.h @@ -1,4 +1,4 @@ -// Copyright 2021, Roman Gershman. All rights reserved. +// Copyright 2022, Roman Gershman. All rights reserved. // See LICENSE for licensing terms. // @@ -9,25 +9,24 @@ #include #include +#include "facade/facade_types.h" namespace dfly { enum class ListDir : uint8_t { LEFT, RIGHT }; -enum class Protocol : uint8_t { - MEMCACHE = 1, - REDIS = 2 -}; using DbIndex = uint16_t; using ShardId = uint16_t; using TxId = uint64_t; using TxClock = uint64_t; +using facade::MutableSlice; +using facade::CmdArgList; +using facade::CmdArgVec; +using facade::ArgS; + using ArgSlice = absl::Span; -using MutableSlice = absl::Span; -using CmdArgList = absl::Span; -using CmdArgVec = std::vector; constexpr DbIndex kInvalidDbId = DbIndex(-1); constexpr ShardId kInvalidSid = ShardId(-1); @@ -50,42 +49,11 @@ struct KeyIndex { unsigned step; // 1 for commands like mget. 2 for commands like mset. }; -struct ConnectionStats { - uint32_t num_conns = 0; - uint32_t num_replicas = 0; - size_t read_buf_capacity = 0; - size_t io_read_cnt = 0; - size_t io_read_bytes = 0; - size_t io_write_cnt = 0; - size_t io_write_bytes = 0; - size_t command_cnt = 0; - size_t pipelined_cmd_cnt = 0; - - ConnectionStats& operator+=(const ConnectionStats& o); -}; - struct OpArgs { EngineShard* shard; DbIndex db_ind; }; -constexpr inline unsigned long long operator""_MB(unsigned long long x) { - return 1024L * 1024L * x; -} - -constexpr inline unsigned long long operator""_KB(unsigned long long x) { - return 1024L * x; -} - -inline std::string_view ArgS(CmdArgList args, size_t i) { - auto arg = args[i]; - return std::string_view(arg.data(), arg.size()); -} - -inline MutableSlice ToMSS(absl::Span span) { - return MutableSlice{reinterpret_cast(span.data()), span.size()}; -} - inline void ToUpper(const MutableSlice* val) { for (auto& c : *val) { c = absl::ascii_toupper(c); @@ -99,8 +67,3 @@ inline void ToLower(const MutableSlice* val) { } } // namespace dfly - -namespace std { -ostream& operator<<(ostream& os, dfly::CmdArgList args); - -} // namespace std diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc deleted file mode 100644 index dc6f8eb..0000000 --- a/src/server/conn_context.cc +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2021, Roman Gershman. All rights reserved. -// See LICENSE for licensing terms. -// - -#include "server/conn_context.h" - -#include "base/logging.h" -#include "server/dragonfly_connection.h" - -namespace dfly { - -ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : owner_(owner) { - switch (owner->protocol()) { - case Protocol::REDIS: - rbuilder_.reset(new RedisReplyBuilder(stream)); - break; - case Protocol::MEMCACHE: - rbuilder_.reset(new MCReplyBuilder(stream)); - break; - } -} - -Protocol ConnectionContext::protocol() const { - return owner_->protocol(); -} - -RedisReplyBuilder* ConnectionContext::operator->() { - CHECK(Protocol::REDIS == protocol()); - - return static_cast(rbuilder_.get()); -} - -} // namespace dfly diff --git a/src/server/conn_context.h b/src/server/conn_context.h index ae98908..89ba60b 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -1,4 +1,4 @@ -// Copyright 2021, Roman Gershman. All rights reserved. +// Copyright 2022, Roman Gershman. All rights reserved. // See LICENSE for licensing terms. // @@ -6,12 +6,11 @@ #include +#include "facade/conn_context.h" #include "server/common_types.h" -#include "server/reply_builder.h" namespace dfly { -class Connection; class EngineShardSet; struct StoredCmd { @@ -30,19 +29,6 @@ struct ConnectionState { ExecState exec_state = EXEC_INACTIVE; std::vector exec_body; - enum Mask : uint32_t { - ASYNC_DISPATCH = 1, // whether a command is handled via async dispatch. - CONN_CLOSING = 2, // could be because of unrecoverable error or planned action. - - // Whether this connection belongs to replica, i.e. a dragonfly slave is connected to this - // host (master) via this connection to sync from it. - REPL_CONNECTION = 4, - REQ_AUTH = 8, - AUTHENTICATED = 0x10, - }; - - uint32_t mask = 0; // A bitmask of Mask values. - enum MCGetMask { FETCH_CAS_VER = 1, }; @@ -52,14 +38,6 @@ struct ConnectionState { // For get op - we use it as a mask of MCGetMask values. uint32_t memcache_flag = 0; - bool IsClosing() const { - return mask & CONN_CLOSING; - } - - bool IsRunViaDispatch() const { - return mask & ASYNC_DISPATCH; - } - // Lua-script related data. struct Script { bool is_write = true; @@ -69,10 +47,11 @@ struct ConnectionState { std::optional