Introduce some pipelining support.

Also some refactoring.
This commit is contained in:
Roman Gershman 2021-11-24 14:09:53 +02:00
parent 9a6e8e31be
commit 18525d2b5e
9 changed files with 240 additions and 62 deletions

View File

@ -1,8 +1,8 @@
add_executable(dragonfly dfly_main.cc)
cxx_link(dragonfly base dragonfly_lib)
add_library(dragonfly_lib command_registry.cc config_flags.cc conn_context.cc db_slice.cc
dragonfly_listener.cc
add_library(dragonfly_lib command_registry.cc common_types.cc config_flags.cc
conn_context.cc db_slice.cc dragonfly_listener.cc
dragonfly_connection.cc engine_shard_set.cc
main_service.cc memcache_parser.cc
redis_parser.cc resp_expr.cc reply_builder.cc)

View File

@ -10,6 +10,7 @@
#include <functional>
#include "base/function2.hpp"
#include "server/common_types.h"
namespace dfly {
@ -31,9 +32,6 @@ const char* OptName(CommandOpt fl);
}; // namespace CO
using MutableStrSpan = absl::Span<char>;
using CmdArgList = absl::Span<MutableStrSpan>;
class CommandId {
public:
using CmdFunc = std::function<void(CmdArgList, ConnectionContext*)>;
@ -143,9 +141,4 @@ class CommandRegistry {
void Command(CmdArgList args, ConnectionContext* cntx);
};
inline std::string_view ArgS(CmdArgList args, size_t i) {
auto arg = args[i];
return std::string_view(arg.data(), arg.size());
}
} // namespace dfly

48
server/common_types.cc Normal file
View File

@ -0,0 +1,48 @@
// Copyright 2021, Beeri 15. All rights reserved.
// Author: Roman Gershman (romange@gmail.com)
//
#include "server/common_types.h"
#include <absl/strings/str_cat.h>
#include "base/logging.h"
namespace dfly {
using std::string;
string WrongNumArgsError(std::string_view cmd) {
return absl::StrCat("wrong number of arguments for '", cmd, "' command");
}
const char kSyntaxErr[] = "syntax error";
const char kWrongTypeErr[] = "-WRONGTYPE Operation against a key holding the wrong kind of value";
const char kKeyNotFoundErr[] = "no such key";
const char kInvalidIntErr[] = "value is not an integer or out of range";
const char kUintErr[] = "value is out of range, must be positive";
const char kInvalidFloatErr[] = "value is not a valid float";
const char kInvalidScoreErr[] = "resulting score is not a number (NaN)";
const char kDbIndOutOfRangeErr[] = "DB index is out of range";
const char kInvalidDbIndErr[] = "invalid DB index";
const char kSameObjErr[] = "source and destination objects are the same";
} // namespace dfly
namespace std {
ostream& operator<<(ostream& os, dfly::CmdArgList ras) {
os << "[";
if (!ras.empty()) {
for (size_t i = 0; i < ras.size() - 1; ++i) {
os << dfly::ArgS(ras, i) << ",";
}
os << dfly::ArgS(ras, ras.size() - 1);
}
os << "]";
return os;
}
} // namespace std

77
server/common_types.h Normal file
View File

@ -0,0 +1,77 @@
// Copyright 2021, Beeri 15. All rights reserved.
// Author: Roman Gershman (romange@gmail.com)
//
#pragma once
#include <absl/container/flat_hash_map.h>
#include <absl/strings/ascii.h>
#include <absl/types/span.h>
#include <xxhash.h>
#include <string_view>
#include <vector>
namespace dfly {
using DbIndex = uint16_t;
using ShardId = uint16_t;
using MutableStrSpan = absl::Span<char>;
using CmdArgList = absl::Span<MutableStrSpan>;
using CmdArgVec = std::vector<MutableStrSpan>;
constexpr DbIndex kInvalidDbId = DbIndex(-1);
constexpr ShardId kInvalidSid = ShardId(-1);
struct ConnectionState {
enum Mask : uint32_t {
ASYNC_DISPATCH = 1, // whether a command is handled via async dispatch.
CONN_CLOSING = 2, // could be because of unrecoverable error or planned action.
};
uint32_t mask = 0; // A bitmask of Mask values.
bool IsClosing() const {
return mask & CONN_CLOSING;
}
bool IsRunViaDispatch() const {
return mask & ASYNC_DISPATCH;
}
};
template <typename View> inline ShardId Shard(const View& v, ShardId shard_num) {
XXH64_hash_t hash = XXH64(v.data(), v.size(), 120577);
return hash % shard_num;
}
using MainValue = std::string;
using MainTable = absl::flat_hash_map<std::string, MainValue>;
using MainIterator = MainTable::iterator;
class EngineShard;
inline std::string_view ArgS(CmdArgList args, size_t i) {
auto arg = args[i];
return std::string_view(arg.data(), arg.size());
}
inline void ToUpper(const MutableStrSpan* val) {
for (auto& c : *val) {
c = absl::ascii_toupper(c);
}
}
inline MutableStrSpan ToMSS(absl::Span<uint8_t> span) {
return MutableStrSpan{reinterpret_cast<char*>(span.data()), span.size()};
}
std::string WrongNumArgsError(std::string_view cmd);
} // namespace dfly
namespace std {
ostream& operator<<(ostream& os, dfly::CmdArgList args);
} // namespace std

View File

@ -5,6 +5,7 @@
#pragma once
#include "server/reply_builder.h"
#include "server/common_types.h"
namespace dfly {
@ -26,6 +27,8 @@ class ConnectionContext : public ReplyBuilder {
Protocol protocol() const;
ConnectionState conn_state;
private:
Connection* owner_;
};

View File

@ -4,9 +4,7 @@
#pragma once
#include <absl/container/flat_hash_map.h>
#include <absl/container/flat_hash_set.h>
#include "server/common_types.h"
#include "server/op_status.h"
namespace util {
@ -15,8 +13,6 @@ class ProactorBase;
namespace dfly {
class EngineShard;
class DbSlice {
struct InternalDbStats {
// Object memory usage besides hash-table capacity.
@ -24,12 +20,6 @@ class DbSlice {
};
public:
using MainValue = std::string;
using MainTable = absl::flat_hash_map<std::string, MainValue>;
using MainIterator = MainTable::iterator;
using ShardId = uint16_t;
using DbIndex = uint16_t;
DbSlice(uint32_t index, EngineShard* owner);
~DbSlice();

View File

@ -4,6 +4,8 @@
#include "server/dragonfly_connection.h"
#include <absl/container/flat_hash_map.h>
#include <boost/fiber/operations.hpp>
#include "base/io_buf.h"
@ -76,7 +78,7 @@ Connection::Connection(Protocol protocol, Service* service, SSL_CTX* ctx)
switch (protocol) {
case Protocol::REDIS:
redis_parser_.reset(new RedisParser);
redis_parser_.reset(new RedisParser);
break;
case Protocol::MEMCACHE:
memcache_parser_.reset(new MemcacheParser);
@ -140,6 +142,8 @@ void Connection::HandleRequests() {
void Connection::InputLoop(FiberSocketBase* peer) {
base::IoBuf io_buf{kMinReadSize};
auto dispatch_fb = fibers::fiber(fibers::launch::dispatch, [&] { DispatchFiber(peer); });
ParserStatus status = OK;
std::error_code ec;
@ -156,7 +160,7 @@ void Connection::InputLoop(FiberSocketBase* peer) {
io_buf.CommitWrite(*recv_sz);
if (redis_parser_)
status = ParseRedis(&io_buf);
status = ParseRedis(&io_buf);
else {
DCHECK(memcache_parser_);
status = ParseMemcache(&io_buf);
@ -169,6 +173,10 @@ void Connection::InputLoop(FiberSocketBase* peer) {
}
} while (peer->IsOpen() && !cc_->ec());
cc_->conn_state.mask |= ConnectionState::CONN_CLOSING; // Signal dispatch to close.
evc_.notify();
dispatch_fb.join();
if (cc_->ec()) {
ec = cc_->ec();
} else {
@ -208,8 +216,24 @@ auto Connection::ParseRedis(base::IoBuf* io_buf) -> ParserStatus {
DVLOG(2) << "Got Args with first token " << ToSV(first.GetBuf());
}
RespToArgList(args, &arg_vec);
service_->DispatchCommand(CmdArgList{arg_vec.data(), arg_vec.size()}, cc_.get());
// An optimization to skip dispatch_q_ if no pipelining is identified.
// We use ASYNC_DISPATCH as a lock to avoid out-of-order replies when the
// dispatch fiber pulls the last record but is still processing the command and then this
// fiber enters the condition below and executes out of order.
bool is_sync_dispatch = !cc_->conn_state.IsRunViaDispatch();
if (dispatch_q_.empty() && is_sync_dispatch && consumed >= io_buf->InputLen()) {
RespToArgList(args, &arg_vec);
service_->DispatchCommand(CmdArgList{arg_vec.data(), arg_vec.size()}, cc_.get());
} else {
// Dispatch via queue to speedup input reading,
Request* req = FromArgs(std::move(args));
dispatch_q_.emplace_back(req);
if (dispatch_q_.size() == 1) {
evc_.notify();
} else if (dispatch_q_.size() > 10) {
this_fiber::yield();
}
}
}
io_buf->ConsumeInput(consumed);
} while (RedisParser::OK == result && !cc_->ec());
@ -249,8 +273,15 @@ auto Connection::ParseMemcache(base::IoBuf* io_buf) -> ParserStatus {
}
}
service_->DispatchMC(cmd, value, cc_.get());
io_buf->ConsumeInput(total_len);
// An optimization to skip dispatch_q_ if no pipelining is identified.
// We use ASYNC_DISPATCH as a lock to avoid out-of-order replies when the
// dispatch fiber pulls the last record but is still processing the command and then this
// fiber enters the condition below and executes out of order.
bool is_sync_dispatch = (cc_->conn_state.mask & ConnectionState::ASYNC_DISPATCH) == 0;
if (dispatch_q_.empty() && is_sync_dispatch && consumed >= io_buf->InputLen()) {
service_->DispatchMC(cmd, value, cc_.get());
}
io_buf->ConsumeInput(consumed);
} while (!cc_->ec());
parser_error_ = result;
@ -264,4 +295,50 @@ auto Connection::ParseMemcache(base::IoBuf* io_buf) -> ParserStatus {
return ERROR;
}
// DispatchFiber handles commands coming from the InputLoop.
// Thus, InputLoop can quickly read data from the input buffer, parse it and push
// into the dispatch queue and DispatchFiber will run those commands asynchronously with InputLoop.
// Note: in some cases, InputLoop may decide to dispatch directly and bypass the DispatchFiber.
void Connection::DispatchFiber(util::FiberSocketBase* peer) {
this_fiber::properties<FiberProps>().set_name("DispatchFiber");
while (!cc_->ec()) {
evc_.await([this] { return cc_->conn_state.IsClosing() || !dispatch_q_.empty(); });
if (cc_->conn_state.IsClosing())
break;
std::unique_ptr<Request> req{dispatch_q_.front()};
dispatch_q_.pop_front();
cc_->conn_state.mask |= ConnectionState::ASYNC_DISPATCH;
service_->DispatchCommand(CmdArgList{req->args.data(), req->args.size()}, cc_.get());
cc_->conn_state.mask &= ~ConnectionState::ASYNC_DISPATCH;
}
cc_->conn_state.mask |= ConnectionState::CONN_CLOSING;
}
auto Connection::FromArgs(RespVec args) -> Request* {
DCHECK(!args.empty());
size_t backed_sz = 0;
for (const auto& arg : args) {
CHECK_EQ(RespExpr::STRING, arg.type);
backed_sz += arg.GetBuf().size();
}
DCHECK(backed_sz);
Request* req = new Request{args.size(), backed_sz};
auto* next = req->storage.data();
for (size_t i = 0; i < args.size(); ++i) {
auto buf = args[i].GetBuf();
size_t s = buf.size();
memcpy(next, buf.data(), s);
req->args[i] = MutableStrSpan(next, s);
next += s;
}
return req;
}
} // namespace dfly

View File

@ -4,10 +4,16 @@
#pragma once
#include "util/connection.h"
#include <absl/container/fixed_array.h>
#include <deque>
#include "base/io_buf.h"
#include "server/common_types.h"
#include "server/dfly_protocol.h"
#include "server/resp_expr.h"
#include "util/connection.h"
#include "util/fibers/event_count.h"
typedef struct ssl_ctx_st SSL_CTX;
@ -30,7 +36,9 @@ class Connection : public util::Connection {
ShutdownHandle RegisterShutdownHook(ShutdownCb cb);
void UnregisterShutdownHook(ShutdownHandle id);
Protocol protocol() const { return protocol_;}
Protocol protocol() const {
return protocol_;
}
protected:
void OnShutdown() override;
@ -41,6 +49,7 @@ class Connection : public util::Connection {
void HandleRequests() final;
void InputLoop(util::FiberSocketBase* peer);
void DispatchFiber(util::FiberSocketBase* peer);
ParserStatus ParseRedis(base::IoBuf* buf);
ParserStatus ParseMemcache(base::IoBuf* buf);
@ -51,6 +60,19 @@ class Connection : public util::Connection {
SSL_CTX* ctx_;
std::unique_ptr<ConnectionContext> cc_;
struct Request {
absl::FixedArray<MutableStrSpan> args;
absl::FixedArray<char> storage;
Request(size_t nargs, size_t capacity) : args(nargs), storage(capacity) {
}
Request(const Request&) = delete;
};
static Request* FromArgs(RespVec args);
std::deque<Request*> dispatch_q_; // coordinated via evc_.
util::fibers_ext::EventCount evc_;
unsigned parser_error_ = 0;
Protocol protocol_;
struct Shutdown;

View File

@ -18,23 +18,6 @@
DEFINE_uint32(port, 6380, "Redis port");
DEFINE_uint32(memcache_port, 0, "Memcached port");
namespace std {
ostream& operator<<(ostream& os, dfly::CmdArgList args) {
os << "[";
if (!args.empty()) {
for (size_t i = 0; i < args.size() - 1; ++i) {
os << dfly::ArgS(args, i) << ",";
}
os << dfly::ArgS(args, args.size() - 1);
}
os << "]";
return os;
}
} // namespace std
namespace dfly {
using namespace std;
@ -52,21 +35,6 @@ DEFINE_VARZ(VarzQps, set_qps);
std::optional<VarzFunction> engine_varz;
inline ShardId Shard(string_view sv, ShardId shard_num) {
XXH64_hash_t hash = XXH64(sv.data(), sv.size(), 24061983);
return hash % shard_num;
}
inline void ToUpper(const MutableStrSpan* val) {
for (auto& c : *val) {
c = absl::ascii_toupper(c);
}
}
string WrongNumArgsError(string_view cmd) {
return absl::StrCat("wrong number of arguments for '", cmd, "' command");
}
} // namespace
Service::Service(ProactorPool* pp) : shard_set_(pp), pp_(*pp) {
@ -215,7 +183,7 @@ void Service::Get(CmdArgList args, ConnectionContext* cntx) {
shard_set_.Await(sid, [&] {
EngineShard* es = EngineShard::tlocal();
OpResult<DbSlice::MainIterator> res = es->db_slice.Find(0, key);
OpResult<MainIterator> res = es->db_slice.Find(0, key);
if (res) {
opres.value() = res.value()->second;
} else {