Expose some connection-level stats

This commit is contained in:
Roman Gershman 2022-01-17 09:20:35 +02:00
parent 0d57b25124
commit 5923c22d99
7 changed files with 171 additions and 3 deletions

View File

@ -8,11 +8,36 @@
#include "base/logging.h"
#include "server/error.h"
#include "server/server_state.h"
namespace dfly {
using std::string;
thread_local ServerState ServerState::state_;
ServerState::ServerState() {
}
ServerState::~ServerState() {
}
#define ADD(x) (x) += o.x
ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
static_assert(sizeof(ConnectionStats) == 40);
ADD(num_conns);
ADD(num_replicas);
ADD(read_buf_capacity);
ADD(io_reads_cnt);
ADD(pipelined_cmd_cnt);
ADD(command_cnt);
return *this;
}
#undef ADD
string WrongNumArgsError(std::string_view cmd) {
return absl::StrCat("wrong number of arguments for '", cmd, "' command");

View File

@ -42,11 +42,31 @@ struct KeyLockArgs {
unsigned key_step;
};
struct ConnectionStats {
uint32_t num_conns = 0;
uint32_t num_replicas = 0;
size_t read_buf_capacity = 0;
size_t io_reads_cnt = 0;
size_t command_cnt = 0;
size_t pipelined_cmd_cnt = 0;
ConnectionStats& operator+=(const ConnectionStats& o);
};
struct OpArgs {
EngineShard* shard;
DbIndex db_ind;
};
constexpr inline unsigned long long operator""_MB(unsigned long long x) {
return 1024L * 1024L * x;
}
constexpr inline unsigned long long operator""_KB(unsigned long long x) {
return 1024L * x;
}
inline std::string_view ArgS(CmdArgList args, size_t i) {
auto arg = args[i];
return std::string_view(arg.data(), arg.size());

View File

@ -15,6 +15,7 @@
#include "server/main_service.h"
#include "server/memcache_parser.h"
#include "server/redis_parser.h"
#include "server/server_state.h"
#include "util/fiber_sched_algo.h"
#include "util/tls/tls_socket.h"
@ -49,6 +50,7 @@ void RespToArgList(const RespVec& src, CmdArgVec* dest) {
}
constexpr size_t kMinReadSize = 256;
constexpr size_t kMaxReadSize = 32_KB;
} // namespace
@ -85,6 +87,7 @@ Connection::~Connection() {
void Connection::OnShutdown() {
VLOG(1) << "Connection::OnShutdown";
if (shutdown_) {
for (const auto& k_v : shutdown_->map) {
k_v.second();
@ -112,7 +115,7 @@ void Connection::HandleRequests() {
int val = 1;
CHECK_EQ(0, setsockopt(socket_->native_handle(), SOL_TCP, TCP_NODELAY, &val, sizeof(val)));
auto ep = socket_->RemoteEndpoint();
auto remote_ep = socket_->RemoteEndpoint();
std::unique_ptr<tls::TlsSocket> tls_sock;
if (ctx_) {
@ -126,25 +129,31 @@ void Connection::HandleRequests() {
}
VLOG(1) << "TLS handshake succeeded";
}
FiberSocketBase* peer = tls_sock ? (FiberSocketBase*)tls_sock.get() : socket_.get();
cc_.reset(new ConnectionContext(peer, this));
cc_->shard_set = &service_->shard_set();
InputLoop(peer);
VLOG(1) << "Closed connection for peer " << ep;
VLOG(1) << "Closed connection for peer " << remote_ep;
}
void Connection::InputLoop(FiberSocketBase* peer) {
base::IoBuf io_buf{kMinReadSize};
auto dispatch_fb = fibers::fiber(fibers::launch::dispatch, [&] { DispatchFiber(peer); });
ConnectionStats* stats = ServerState::tl_connection_stats();
stats->num_conns++;
stats->read_buf_capacity += io_buf.Capacity();
ParserStatus status = OK;
std::error_code ec;
do {
auto buf = io_buf.AppendBuffer();
::io::Result<size_t> recv_sz = peer->Recv(buf);
++stats->io_reads_cnt;
if (!recv_sz) {
ec = recv_sz.error();
@ -163,6 +172,22 @@ void Connection::InputLoop(FiberSocketBase* peer) {
if (status == NEED_MORE) {
status = OK;
size_t capacity = io_buf.Capacity();
if (capacity < kMaxReadSize) {
size_t parser_hint = redis_parser_->parselen_hint();
if (parser_hint > capacity) {
io_buf.Reserve(std::min(kMaxReadSize, parser_hint));
} else if (buf.size() == *recv_sz && buf.size() > capacity / 2) {
// Last io used most of the io_buf to the end.
io_buf.Reserve(capacity * 2); // Valid growth range.
}
if (capacity < io_buf.Capacity()) {
VLOG(1) << "Growing io_buf to " << io_buf.Capacity();
stats->read_buf_capacity += (io_buf.Capacity() - capacity);
}
}
} else if (status != OK) {
break;
}
@ -172,6 +197,8 @@ void Connection::InputLoop(FiberSocketBase* peer) {
evc_.notify();
dispatch_fb.join();
stats->read_buf_capacity -= io_buf.Capacity();
if (cc_->ec()) {
ec = cc_->ec();
} else {
@ -193,6 +220,8 @@ void Connection::InputLoop(FiberSocketBase* peer) {
if (ec && !FiberSocketBase::IsConnClosed(ec)) {
LOG(WARNING) << "Socket error " << ec;
}
--stats->num_conns;
}
auto Connection::ParseRedis(base::IoBuf* io_buf) -> ParserStatus {
@ -297,6 +326,8 @@ auto Connection::ParseMemcache(base::IoBuf* io_buf) -> ParserStatus {
void Connection::DispatchFiber(util::FiberSocketBase* peer) {
this_fiber::properties<FiberProps>().set_name("DispatchFiber");
ConnectionStats* stats = ServerState::tl_connection_stats();
while (!cc_->ec()) {
evc_.await([this] { return cc_->conn_state.IsClosing() || !dispatch_q_.empty(); });
if (cc_->conn_state.IsClosing())
@ -304,6 +335,8 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
std::unique_ptr<Request> req{dispatch_q_.front()};
dispatch_q_.pop_front();
++stats->pipelined_cmd_cnt;
cc_->SetBatchMode(!dispatch_q_.empty());
cc_->conn_state.mask |= ConnectionState::ASYNC_DISPATCH;

View File

@ -20,6 +20,7 @@ extern "C" {
#include "server/error.h"
#include "server/generic_family.h"
#include "server/list_family.h"
#include "server/server_state.h"
#include "server/string_family.h"
#include "server/transaction.h"
#include "util/metrics/metrics.h"
@ -109,6 +110,8 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) {
string_view cmd_str = ArgS(args, 0);
bool is_trans_cmd = (cmd_str == "EXEC" || cmd_str == "MULTI");
const CommandId* cid = registry_.Find(cmd_str);
ServerState& etl = *ServerState::tlocal();
++etl.connection_stats.command_cnt;
absl::Cleanup multi_error = [cntx] {
if (cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE) {

View File

@ -1,4 +1,4 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
@ -21,6 +21,7 @@ extern "C" {
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/main_service.h"
#include "server/server_state.h"
#include "server/transaction.h"
#include "util/accept_server.h"
@ -118,6 +119,22 @@ void ServerFamily::Debug(CmdArgList args, ConnectionContext* cntx) {
return dbg_cmd.Run(args);
}
Metrics ServerFamily::GetMetrics() const {
Metrics result;
fibers::mutex mu;
auto cb = [&](EngineShard* shard) {
lock_guard<fibers::mutex> lk(mu);
result.conn_stats += *ServerState::tl_connection_stats();
};
ess_.RunBlockingInParallel(std::move(cb));
return result;
}
void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
const char kInfo1[] =
R"(# Server
@ -130,6 +147,15 @@ tcp_port:)";
string info = absl::StrCat(kInfo1, FLAGS_port, "\n");
Metrics m = GetMetrics();
absl::StrAppend(&info, "\n# Stats\n");
absl::StrAppend(&info, "total_commands_processed:", m.conn_stats.command_cnt, "\n");
absl::StrAppend(&info, "total_pipelined_commands:", m.conn_stats.pipelined_cmd_cnt, "\n");
absl::StrAppend(&info, "total_reads_processed:", m.conn_stats.io_reads_cnt, "\n");
absl::StrAppend(&info, "\n# Clients\n");
absl::StrAppend(&info, "connected_clients:", m.conn_stats.num_conns, "\n");
absl::StrAppend(&info, "client_read_buf_capacity:", m.conn_stats.read_buf_capacity, "\n");
cntx->SendBulkString(info);
}

View File

@ -4,6 +4,7 @@
#pragma once
#include "server/common_types.h"
#include "server/engine_shard_set.h"
#include "util/proactor_pool.h"
@ -17,6 +18,10 @@ class ConnectionContext;
class CommandRegistry;
class Service;
struct Metrics {
ConnectionStats conn_stats;
};
class ServerFamily {
public:
ServerFamily(Service* engine);
@ -26,6 +31,8 @@ class ServerFamily {
void Register(CommandRegistry* registry);
void Shutdown();
Metrics GetMetrics() const;
private:
uint32_t shard_count() const {
return ess_.size();

54
server/server_state.h Normal file
View File

@ -0,0 +1,54 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <vector>
#include "server/common_types.h"
namespace dfly {
// Present in every server thread. This class differs from EngineShard. The latter manages
// state around engine shards while the former represents coordinator/connection state.
// There may be threads that handle engine shards but not IO, there may be threads that handle IO
// but not engine shards and there can be threads that handle both. This class is present only
// for threads that handle IO and owne coordination fibers.
class ServerState { // public struct - to allow initialization.
ServerState(const ServerState&) = delete;
void operator=(const ServerState&) = delete;
public:
static ServerState* tlocal() {
return &state_;
}
static ConnectionStats* tl_connection_stats() {
return &state_.connection_stats;
}
ServerState();
~ServerState();
ConnectionStats connection_stats;
void TxCountInc() {
++live_transactions_;
}
void TxCountDec() {
--live_transactions_; // can go negative since we can start on one thread and end on another.
}
int64_t live_transactions() const {
return live_transactions_;
}
private:
int64_t live_transactions_ = 0;
static thread_local ServerState state_;
};
} // namespace dfly