Fix reply batching.

This commit is contained in:
Roman Gershman 2021-11-30 21:01:09 +02:00
parent 6c5e4dad3b
commit 45294f8c2f
8 changed files with 46 additions and 17 deletions

View File

@ -26,5 +26,3 @@ cd build-opt && ninja midi-redis
``` ```
for more options, run `./midi-redis --help` for more options, run `./midi-redis --help`

2
helio

@ -1 +1 @@
Subproject commit 29e247c00c2f10798c78044607436953fb22b384 Subproject commit dab0fc768494d8e7cc42d77b08fbee1a81415072

View File

@ -26,8 +26,6 @@ namespace fibers = boost::fibers;
namespace dfly { namespace dfly {
namespace { namespace {
using CmdArgVec = std::vector<MutableStrSpan>;
void SendProtocolError(RedisParser::Result pres, FiberSocketBase* peer) { void SendProtocolError(RedisParser::Result pres, FiberSocketBase* peer) {
string res("-ERR Protocol error: "); string res("-ERR Protocol error: ");
if (pres == RedisParser::BAD_BULKLEN) { if (pres == RedisParser::BAD_BULKLEN) {
@ -43,10 +41,6 @@ void SendProtocolError(RedisParser::Result pres, FiberSocketBase* peer) {
} }
} }
inline MutableStrSpan ToMSS(absl::Span<uint8_t> span) {
return MutableStrSpan{reinterpret_cast<char*>(span.data()), span.size()};
}
void RespToArgList(const RespVec& src, CmdArgVec* dest) { void RespToArgList(const RespVec& src, CmdArgVec* dest) {
dest->resize(src.size()); dest->resize(src.size());
for (size_t i = 0; i < src.size(); ++i) { for (size_t i = 0; i < src.size(); ++i) {
@ -311,6 +305,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
std::unique_ptr<Request> req{dispatch_q_.front()}; std::unique_ptr<Request> req{dispatch_q_.front()};
dispatch_q_.pop_front(); dispatch_q_.pop_front();
cc_->SetBatchMode(!dispatch_q_.empty());
cc_->conn_state.mask |= ConnectionState::ASYNC_DISPATCH; cc_->conn_state.mask |= ConnectionState::ASYNC_DISPATCH;
service_->DispatchCommand(CmdArgList{req->args.data(), req->args.size()}, cc_.get()); service_->DispatchCommand(CmdArgList{req->args.data(), req->args.size()}, cc_.get());
cc_->conn_state.mask &= ~ConnectionState::ASYNC_DISPATCH; cc_->conn_state.mask &= ~ConnectionState::ASYNC_DISPATCH;

View File

@ -119,11 +119,12 @@ ProactorBase* Listener::PickConnectionProactor(LinuxSocketBase* sock) {
if (FLAGS_conn_use_incoming_cpu) { if (FLAGS_conn_use_incoming_cpu) {
int fd = sock->native_handle(); int fd = sock->native_handle();
int cpu; int cpu, napi_id;
socklen_t len = sizeof(cpu); socklen_t len = sizeof(cpu);
CHECK_EQ(0, getsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, &cpu, &len)); CHECK_EQ(0, getsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, &cpu, &len));
VLOG(1) << "CPU for connection " << fd << " is " << cpu; CHECK_EQ(0, getsockopt(fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, &napi_id, &len));
VLOG(1) << "CPU/NAPI for connection " << fd << " is " << cpu << "/" << napi_id;
vector<unsigned> ids = pool()->MapCpuToThreads(cpu); vector<unsigned> ids = pool()->MapCpuToThreads(cpu);
if (!ids.empty()) { if (!ids.empty()) {

View File

@ -43,7 +43,7 @@ void EngineShard::DestroyThreadLocal() {
delete shard_; delete shard_;
shard_ = nullptr; shard_ = nullptr;
DVLOG(1) << "Shard reset " << index; VLOG(1) << "Shard reset " << index;
} }
void EngineShardSet::Init(uint32_t sz) { void EngineShardSet::Init(uint32_t sz) {

View File

@ -65,6 +65,8 @@ void Service::Init(util::AcceptServer* acceptor) {
} }
void Service::Shutdown() { void Service::Shutdown() {
VLOG(1) << "Service::Shutdown";
engine_varz.reset(); engine_varz.reset();
request_latency_usec.Shutdown(); request_latency_usec.Shutdown();
ping_qps.Shutdown(); ping_qps.Shutdown();

View File

@ -30,7 +30,30 @@ BaseSerializer::BaseSerializer(io::Sink* sink) : sink_(sink) {
} }
void BaseSerializer::Send(const iovec* v, uint32_t len) { void BaseSerializer::Send(const iovec* v, uint32_t len) {
error_code ec = sink_->Write(v, len); if (should_batch_) {
// TODO: to introduce flushing when too much data is batched.
for (unsigned i = 0; i < len; ++i) {
std::string_view src((char*)v[i].iov_base, v[i].iov_len);
DVLOG(2) << "Appending to stream " << sink_ << " " << src;
batch_.append(src.data(), src.size());
}
return;
}
error_code ec;
if (batch_.empty()) {
ec = sink_->Write(v, len);
} else {
DVLOG(1) << "Sending batch to stream " << sink_ << "\n" << batch_;
iovec tmp[len + 1];
tmp[0].iov_base = batch_.data();
tmp[0].iov_len = batch_.size();
copy(v, v + len, tmp + 1);
ec = sink_->Write(tmp, len + 1);
batch_.clear();
}
if (ec) { if (ec) {
ec_ = ec; ec_ = ec;
} }

View File

@ -22,18 +22,24 @@ class BaseSerializer {
ec_ = std::make_error_code(std::errc::connection_aborted); ec_ = std::make_error_code(std::errc::connection_aborted);
} }
// In order to reduce interrupt rate we allow coalescing responses together using
// Batch mode. It is controlled by Connection state machine because it makes sense only
// when pipelined requests are arriving.
void SetBatchMode(bool batch) {
should_batch_ = batch;
}
//! Sends a string as is without any formatting. raw should be encoded according to the protocol. //! Sends a string as is without any formatting. raw should be encoded according to the protocol.
void SendDirect(std::string_view str); void SendDirect(std::string_view str);
::io::Sink* sink() {
return sink_;
}
void Send(const iovec* v, uint32_t len); void Send(const iovec* v, uint32_t len);
private: private:
::io::Sink* sink_; ::io::Sink* sink_;
std::error_code ec_; std::error_code ec_;
std::string batch_;
bool should_batch_ = false;
}; };
class RespSerializer : public BaseSerializer { class RespSerializer : public BaseSerializer {
@ -89,6 +95,10 @@ class ReplyBuilder {
void SendGetReply(std::string_view key, uint32_t flags, std::string_view value); void SendGetReply(std::string_view key, uint32_t flags, std::string_view value);
void SendGetNotFound(); void SendGetNotFound();
void SetBatchMode(bool mode) {
serializer_->SetBatchMode(mode);
}
private: private:
RespSerializer* as_resp() { RespSerializer* as_resp() {
return static_cast<RespSerializer*>(serializer_.get()); return static_cast<RespSerializer*>(serializer_.get());