diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9185846..17eb2bb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,7 +23,7 @@ jobs: image: ghcr.io/${{ github.actor }}/ubuntu-dev:latest credentials: username: ${{ github.actor }} - password: ${{ secrets.GHCR_TOKEN }} + password: ${{ secrets.GITHUB_TOKEN }} steps: - uses: actions/checkout@v2 with: diff --git a/core/interpreter.cc b/core/interpreter.cc index 5b54c41..db33da4 100644 --- a/core/interpreter.cc +++ b/core/interpreter.cc @@ -634,7 +634,7 @@ int Interpreter::RedisGenericCommand(bool raise_error) { for (int j = 0; j < argc; j++) { unsigned idx = j + 1; - size_t len; + size_t len = 0; if (lua_isinteger(lua_, idx)) { char* next = absl::numbers_internal::FastIntToBuffer(lua_tointeger(lua_, idx), cur); len = next - cur; diff --git a/helio b/helio index 847c612..84c1b2c 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 847c612488e152e8e12df98c3c26f222d6e7ac13 +Subproject commit 84c1b2ca38d077eaae347098e56f451aad2caa2b diff --git a/server/generic_family.cc b/server/generic_family.cc index 950a77c..1efc030 100644 --- a/server/generic_family.cc +++ b/server/generic_family.cc @@ -23,8 +23,6 @@ using namespace std; namespace { -DEFINE_VARZ(VarzQps, ping_qps); - class Renamer { public: Renamer(DbIndex dind, ShardId source_id) : db_indx_(dind), src_sid_(source_id) { @@ -152,11 +150,9 @@ const char* ObjTypeName(int type) { } // namespace void GenericFamily::Init(util::ProactorPool* pp) { - ping_qps.Init(pp); } void GenericFamily::Shutdown() { - ping_qps.Shutdown(); } void GenericFamily::Del(CmdArgList args, ConnectionContext* cntx) { @@ -184,7 +180,6 @@ void GenericFamily::Ping(CmdArgList args, ConnectionContext* cntx) { if (args.size() > 2) { return (*cntx)->SendError("wrong number of arguments for 'ping' command"); } - ping_qps.Inc(); // We synchronously block here until the engine sends us the payload and notifies that // the I/O operation has been processed. diff --git a/server/main_service.cc b/server/main_service.cc index dc4c3bb..4774274 100644 --- a/server/main_service.cc +++ b/server/main_service.cc @@ -43,7 +43,6 @@ namespace this_fiber = ::boost::this_fiber; namespace { DEFINE_VARZ(VarzMapAverage, request_latency_usec); -DEFINE_VARZ(VarzQps, ping_qps); std::optional engine_varz; metrics::CounterFamily cmd_req("requests_total", "Number of served redis requests"); @@ -311,7 +310,6 @@ void Service::Init(util::AcceptServer* acceptor, const InitOpts& opts) { }); request_latency_usec.Init(&pp_); - ping_qps.Init(&pp_); StringFamily::Init(&pp_); GenericFamily::Init(&pp_); cmd_req.Init(&pp_, {"type"}); @@ -327,7 +325,6 @@ void Service::Shutdown() { engine_varz.reset(); request_latency_usec.Shutdown(); - ping_qps.Shutdown(); pp_.AwaitFiberOnAll([](ProactorBase* pb) { ServerState::tlocal()->Shutdown(); }); @@ -352,7 +349,8 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) { bool is_trans_cmd = (cmd_str == "EXEC" || cmd_str == "MULTI"); const CommandId* cid = registry_.Find(cmd_str); ServerState& etl = *ServerState::tlocal(); - ++etl.connection_stats.command_cnt; + + etl.RecordCmd(); absl::Cleanup multi_error = [cntx] { if (cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE) { diff --git a/server/main_service.h b/server/main_service.h index 644fd1b..8af4957 100644 --- a/server/main_service.h +++ b/server/main_service.h @@ -86,10 +86,10 @@ class Service { util::ProactorPool& pp_; - CommandRegistry registry_; EngineShardSet shard_set_; - ServerFamily server_family_; + CommandRegistry registry_; + util::HttpListenerBase* http_listener_ = nullptr; }; diff --git a/server/server_family.cc b/server/server_family.cc index 5eb6182..4901da0 100644 --- a/server/server_family.cc +++ b/server/server_family.cc @@ -7,6 +7,7 @@ #include #include // for master_id_ generation. #include +#include #include @@ -15,6 +16,7 @@ extern "C" { } #include "base/logging.h" +#include "io/proc_reader.h" #include "server/command_registry.h" #include "server/conn_context.h" #include "server/debugcmd.h" @@ -23,8 +25,8 @@ extern "C" { #include "server/main_service.h" #include "server/rdb_save.h" #include "server/replica.h" -#include "server/server_state.h" #include "server/script_mgr.h" +#include "server/server_state.h" #include "server/transaction.h" #include "strings/human_readable.h" #include "util/accept_server.h" @@ -40,8 +42,8 @@ namespace dfly { using namespace std; using namespace util; namespace fibers = ::boost::fibers; - namespace fs = std::filesystem; +using strings::HumanReadableNumBytes; namespace { @@ -69,7 +71,8 @@ error_code CreateDirs(fs::path dir_path) { ServerFamily::ServerFamily(Service* engine) : engine_(*engine), pp_(engine->proactor_pool()), ess_(engine->shard_set()) { - last_save_.store(time(NULL), memory_order_release); + start_time_ = time(NULL); + last_save_.store(start_time_, memory_order_release); script_mgr_.reset(new ScriptMgr(&engine->shard_set())); } @@ -230,16 +233,24 @@ Metrics ServerFamily::GetMetrics() const { fibers::mutex mu; - auto cb = [&](EngineShard* shard) { - auto local_stats = shard->db_slice().GetStats(); + auto cb = [&](ProactorBase* pb) { + EngineShard* shard = EngineShard::tlocal(); + ServerState* ss = ServerState::tlocal(); + lock_guard lk(mu); - result.db += local_stats.db; - result.events += local_stats.events; - result.conn_stats += *ServerState::tl_connection_stats(); + result.conn_stats += ss->connection_stats; + result.qps += uint64_t(ss->MovingSum6()); + + if (shard) { + auto shard_stats = shard->db_slice().GetStats(); + result.db += shard_stats.db; + result.events += shard_stats.events; + } }; - ess_.RunBlockingInParallel(std::move(cb)); + pp_.AwaitFiberOnAll(std::move(cb)); + result.qps /= 6; return result; } @@ -251,29 +262,53 @@ redis_version:1.9.9 redis_mode:standalone arch_bits:64 multiplexing_api:iouring -atomicvar_api:atomic-builtin tcp_port:)"; string info = absl::StrCat(kInfo1, FLAGS_port, "\n"); + size_t uptime = time(NULL) - start_time_; + absl::StrAppend(&info, "uptime_in_seconds:", uptime, "\n"); + absl::StrAppend(&info, "uptime_in_days:", uptime / (3600 * 24), "\n"); + + auto sdata_res = io::ReadStatusInfo(); + Metrics m = GetMetrics(); + + // Clients + absl::StrAppend(&info, "\n# Clients\n"); + absl::StrAppend(&info, "connected_clients:", m.conn_stats.num_conns, "\n"); + absl::StrAppend(&info, "client_read_buf_capacity:", m.conn_stats.read_buf_capacity, "\n"); + absl::StrAppend(&info, "blocked_clients:", 0, "\n"); + + size_t db_size = m.db.table_mem_usage + m.db.obj_memory_usage; + absl::StrAppend(&info, "\n# Memory\n"); + + // TODO: should be extracted from mimalloc heaps instead of using table stats. + absl::StrAppend(&info, "used_memory:", db_size, "\n"); + absl::StrAppend(&info, "used_memory_human:", HumanReadableNumBytes(db_size), "\n"); + if (sdata_res.has_value()) { + absl::StrAppend(&info, "used_memory_rss:", sdata_res->vm_rss, "\n"); + absl::StrAppend(&info, "used_memory_rss_human:", HumanReadableNumBytes(sdata_res->vm_rss), + "\n"); + } else { + LOG(ERROR) << "Error fetching /proc/self/status stats"; + } + + // TBD: should be the max of all seen used_memory values. + absl::StrAppend(&info, "used_memory_peak:", -1, "\n"); absl::StrAppend(&info, "object_used_memory:", m.db.obj_memory_usage, "\n"); absl::StrAppend(&info, "table_used_memory:", m.db.table_mem_usage, "\n"); - absl::StrAppend(&info, "used_memory_human:", - strings::HumanReadableNumBytes(m.db.table_mem_usage + m.db.obj_memory_usage), - "\n"); absl::StrAppend(&info, "num_entries:", m.db.key_count, "\n"); absl::StrAppend(&info, "inline_keys:", m.db.inline_keys, "\n"); + // Stats absl::StrAppend(&info, "\n# Stats\n"); + absl::StrAppend(&info, "instantaneous_ops_per_sec:", m.qps, "\n"); absl::StrAppend(&info, "total_commands_processed:", m.conn_stats.command_cnt, "\n"); absl::StrAppend(&info, "total_pipelined_commands:", m.conn_stats.pipelined_cmd_cnt, "\n"); absl::StrAppend(&info, "total_reads_processed:", m.conn_stats.io_reads_cnt, "\n"); - absl::StrAppend(&info, "\n# Clients\n"); - absl::StrAppend(&info, "connected_clients:", m.conn_stats.num_conns, "\n"); - absl::StrAppend(&info, "client_read_buf_capacity:", m.conn_stats.read_buf_capacity, "\n"); absl::StrAppend(&info, "\n# Replication\n"); ServerState& etl = *ServerState::tlocal(); @@ -412,8 +447,7 @@ void ServerFamily::Register(CommandRegistry* registry) { << CI{"REPLICAOF", kReplicaOpts, 3, 0, 0, 0}.HFUNC(ReplicaOf) << CI{"SYNC", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Sync) << CI{"PSYNC", CO::ADMIN | CO::GLOBAL_TRANS, 3, 0, 0, 0}.HFUNC(Psync) - << CI{"SCRIPT", CO::NOSCRIPT, -2, 0, 0, 0}.HFUNC(Script) - ; + << CI{"SCRIPT", CO::NOSCRIPT, -2, 0, 0, 0}.HFUNC(Script); } } // namespace dfly diff --git a/server/server_family.h b/server/server_family.h index 5b10cc3..4aa1c34 100644 --- a/server/server_family.h +++ b/server/server_family.h @@ -24,6 +24,8 @@ struct Metrics { DbStats db; SliceEvents events; + size_t qps = 0; + ConnectionStats conn_stats; }; @@ -79,6 +81,7 @@ class ServerFamily { std::atomic last_save_; // in seconds. GlobalState global_state_; + time_t start_time_ = 0; // in seconds, epoch time. }; } // namespace dfly diff --git a/server/server_state.h b/server/server_state.h index 0c353af..e8535f2 100644 --- a/server/server_state.h +++ b/server/server_state.h @@ -10,6 +10,7 @@ #include "core/interpreter.h" #include "server/common_types.h" #include "server/global_state.h" +#include "util/sliding_counter.h" namespace dfly { @@ -62,11 +63,23 @@ class ServerState { // public struct - to allow initialization. Interpreter& GetInterpreter(); + // Returns sum of all requests in the last 6 seconds + // (not including the current one). + uint32_t MovingSum6() const { return qps_.SumTail(); } + + void RecordCmd() { + ++connection_stats.command_cnt; + qps_.Inc(); + } + private: int64_t live_transactions_ = 0; std::optional interpreter_; GlobalState::S gstate_ = GlobalState::IDLE; + using Counter = util::SlidingCounter<7>; + Counter qps_; + static thread_local ServerState state_; };