Improve memory allocations with pipeline mode

This commit is contained in:
Roman Gershman 2022-02-23 22:18:05 +02:00
parent edff35ae3e
commit fcb58efe15
2 changed files with 35 additions and 17 deletions

View File

@ -6,6 +6,7 @@
#include <absl/container/flat_hash_map.h>
#include <absl/strings/match.h>
#include <mimalloc.h>
#include <boost/fiber/operations.hpp>
@ -78,6 +79,19 @@ struct Connection::Shutdown {
}
};
struct Connection::Request {
absl::FixedArray<MutableSlice> args;
// I do not use m_heap_t explicitly but mi_stl_allocator at the end does the same job
// of using the thread's heap.
absl::FixedArray<char, 256, mi_stl_allocator<char>> storage;
Request(size_t nargs, size_t capacity) : args(nargs), storage(capacity) {
}
Request(const Request&) = delete;
};
Connection::Connection(Protocol protocol, Service* service, SSL_CTX* ctx)
: io_buf_{kMinReadSize}, service_(service), ctx_(ctx) {
protocol_ = protocol;
@ -129,7 +143,7 @@ void Connection::HandleRequests() {
auto remote_ep = lsb->RemoteEndpoint();
std::unique_ptr<tls::TlsSocket> tls_sock;
unique_ptr<tls::TlsSocket> tls_sock;
if (ctx_) {
tls_sock.reset(new tls::TlsSocket(socket_.get()));
tls_sock->InitSSL(ctx_);
@ -233,7 +247,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
}
}
std::error_code ec;
error_code ec;
// Main loop.
if (parse_status != ERROR) {
@ -287,6 +301,7 @@ auto Connection::ParseRedis() -> ParserStatus {
RedisParser::Result result = RedisParser::OK;
ReplyBuilderInterface* builder = cc_->reply_builder();
mi_heap_t* tlh = mi_heap_get_backing();
do {
result = redis_parser_->Parse(io_buf_.InputBuffer(), &consumed, &args);
@ -306,8 +321,10 @@ auto Connection::ParseRedis() -> ParserStatus {
RespToArgList(args, &arg_vec);
service_->DispatchCommand(CmdArgList{arg_vec.data(), arg_vec.size()}, cc_.get());
} else {
// Dispatch via queue to speedup input reading,
Request* req = FromArgs(std::move(args));
// Dispatch via queue to speedup input reading
// We could use
Request* req = FromArgs(std::move(args), tlh);
dispatch_q_.emplace_back(req);
if (dispatch_q_.size() == 1) {
evc_.notify();
@ -466,9 +483,9 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
while (!builder->GetError()) {
evc_.await([this] { return cc_->conn_state.IsClosing() || !dispatch_q_.empty(); });
if (cc_->conn_state.IsClosing())
break;
break; // TODO: We have a memory leak with pending requests in the queue.
std::unique_ptr<Request> req{dispatch_q_.front()};
Request* req = dispatch_q_.front();
dispatch_q_.pop_front();
++stats->pipelined_cmd_cnt;
@ -477,12 +494,14 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
cc_->conn_state.mask |= ConnectionState::ASYNC_DISPATCH;
service_->DispatchCommand(CmdArgList{req->args.data(), req->args.size()}, cc_.get());
cc_->conn_state.mask &= ~ConnectionState::ASYNC_DISPATCH;
req->~Request();
mi_free(req);
}
cc_->conn_state.mask |= ConnectionState::CONN_CLOSING;
}
auto Connection::FromArgs(RespVec args) -> Request* {
auto Connection::FromArgs(RespVec args, mi_heap_t* heap) -> Request* {
DCHECK(!args.empty());
size_t backed_sz = 0;
for (const auto& arg : args) {
@ -491,7 +510,12 @@ auto Connection::FromArgs(RespVec args) -> Request* {
}
DCHECK(backed_sz);
Request* req = new Request{args.size(), backed_sz};
constexpr auto kReqSz = sizeof(Request);
static_assert(kReqSz < MI_SMALL_SIZE_MAX);
static_assert(alignof(Request) == 8);
void* ptr = mi_heap_malloc_small(heap, kReqSz);
Request* req = new (ptr) Request{args.size(), backed_sz};
auto* next = req->storage.data();
for (size_t i = 0; i < args.size(); ++i) {

View File

@ -16,6 +16,7 @@
#include "util/fibers/event_count.h"
typedef struct ssl_ctx_st SSL_CTX;
typedef struct mi_heap_s mi_heap_t;
namespace dfly {
@ -66,16 +67,9 @@ class Connection : public util::Connection {
SSL_CTX* ctx_;
std::unique_ptr<ConnectionContext> cc_;
struct Request {
absl::FixedArray<MutableSlice> args;
absl::FixedArray<char> storage;
struct Request;
Request(size_t nargs, size_t capacity) : args(nargs), storage(capacity) {
}
Request(const Request&) = delete;
};
static Request* FromArgs(RespVec args);
static Request* FromArgs(RespVec args, mi_heap_t* heap);
std::deque<Request*> dispatch_q_; // coordinated via evc_.
util::fibers_ext::EventCount evc_;