diff --git a/README.md b/README.md index 81dc8ef..7203499 100644 --- a/README.md +++ b/README.md @@ -166,6 +166,11 @@ Commands that I prefer not implement before launch: Also, I would omit keyspace notifications. For that I would like to deep dive and learn exact use-cases for this API. +### Random commands we implemented along the way + + - [X] ROLE (2.8) decorator for for master withour replicas + - [X] UNLINK (4.0) decorator for DEL command + - [X] BGSAVE ## Milestone Nymph API 2,3,4 without cluster support, without modules, without memory inspection commands. Without support for keyspace notifications. diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index f06fc89..ac2fa61 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -19,6 +19,12 @@ extern "C" { #include "base/logging.h" #include "base/pod_array.h" +#if defined(__aarch64__) +#include "base/sse2neon.h" +#else +#include +#endif + namespace dfly { using namespace std; @@ -26,22 +32,18 @@ namespace { constexpr XXH64_hash_t kHashSeed = 24061983; -size_t QlUsedSize(quicklist* ql) { +// Approximation since does not account for listpacks. +size_t QlMAllocSize(quicklist* ql) { size_t res = ql->len * sizeof(quicklistNode) + znallocx(sizeof(quicklist)); - quicklistNode* ptr = ql->head; - while (ptr) { - res += ptr->sz; - ptr = ptr->next; - } - return res; + return res + ql->count * 16; // we account for each member 16 bytes. } // Approximated dictionary size. size_t DictMallocSize(dict* d) { - size_t res = - zmalloc_usable_size(d->ht_table[0]) + zmalloc_usable_size(d->ht_table[1]) + sizeof(dict); - res += dictSize(d) * 8; // approximation. - return res; + size_t res = zmalloc_usable_size(d->ht_table[0]) + zmalloc_usable_size(d->ht_table[1]) + + znallocx(sizeof(dict)); + + return res = dictSize(d) * 16; // approximation. } // Deniel's Lemire function validate_ascii_fast() - under Apache/MIT license. @@ -175,7 +177,7 @@ size_t RobjWrapper::MallocUsed() const { break; case OBJ_LIST: CHECK_EQ(encoding, OBJ_ENCODING_QUICKLIST); - return QlUsedSize((quicklist*)ptr); + return QlMAllocSize((quicklist*)ptr); case OBJ_SET: switch (encoding) { case OBJ_ENCODING_HT: diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index a8ecc57..ab5854b 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -61,11 +61,15 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* }); if (update_db_time) { - periodic_task_ = pb->AddPeriodic(1, [] { - auto* shard = EngineShard::tlocal(); - DCHECK(shard); + constexpr uint32_t kClockCycleMs = 1; + + periodic_task_ = pb->AddPeriodic(kClockCycleMs, [this] { // absl::GetCurrentTimeNanos() returns current time since the Unix Epoch. - shard->db_slice().UpdateExpireClock(absl::GetCurrentTimeNanos() / 1000000); + db_slice().UpdateExpireClock(absl::GetCurrentTimeNanos() / 1000000); + + if (task_iters_++ % 8 == 0) { + CacheStats(); + } }); } @@ -466,6 +470,34 @@ bool EngineShard::HasResultConverged(TxId notifyid) const { (committed_txid_ == notifyid || txq_.HeadScore() > notifyid)); } +void EngineShard::CacheStats() { + mi_heap_t* tlh = mi_heap_get_backing(); + struct Sum { + size_t used = 0; + size_t comitted = 0; + } sum; + + auto visit_cb = [](const mi_heap_t* heap, const mi_heap_area_t* area, void* block, + size_t block_size, void* arg) -> bool { + DCHECK(!block); + Sum* sum = (Sum*)arg; + + // mimalloc mistakenly exports used in blocks instead of bytes. + sum->used += block_size * area->used; + sum->comitted += area->committed; + + DVLOG(1) << "block_size " << block_size << "/" << area->block_size << ", reserved " + << area->reserved << " comitted " << area->committed << " used: " << area->used; + + return true; // continue iteration + }; + + mi_heap_visit_blocks(tlh, false /* visit all blocks*/, visit_cb, &sum); + + stats_.heap_used_bytes = sum.used; + stats_.heap_comitted_bytes = sum.comitted; +} + void EngineShardSet::Init(uint32_t sz) { CHECK_EQ(0u, size()); diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index bab27d6..373bc57 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -25,8 +25,17 @@ namespace dfly { class EngineShard { public: struct Stats { - uint64_t ooo_runs = 0; - uint64_t quick_runs = 0; + uint64_t ooo_runs = 0; // how many times transactions run as OOO. + uint64_t quick_runs = 0; // how many times single shard "RunQuickie" transaction run. + + // number of bytes that were allocated by the application (with mi_mallocxxx methods). + // should be less than or equal to heap_comitted_bytes. + // Cached every few millis. + size_t heap_used_bytes = 0; + + // number of bytes comitted by the allocator library (i.e. mmapped into physical memory). + // + size_t heap_comitted_bytes = 0; }; // EngineShard() is private down below. @@ -128,6 +137,7 @@ class EngineShard { void OnTxFinish(); void NotifyConvergence(Transaction* tx); + void CacheStats(); /// Returns the notified transaction, /// or null if all transactions in the queue have expired.. @@ -167,6 +177,7 @@ class EngineShard { IntentLock shard_lock_; uint32_t periodic_task_ = 0; + uint64_t task_iters_ = 0; static thread_local EngineShard* shard_; }; diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index e1a154a..4866d00 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -571,7 +571,8 @@ void GenericFamily::Register(CommandRegistry* registry) { << CI{"SCAN", CO::READONLY | CO::FAST, -2, 0, 0, 0}.HFUNC(Scan) << CI{"TTL", CO::READONLY | CO::FAST | CO::RANDOM, 2, 1, 1, 1}.HFUNC(Ttl) << CI{"PTTL", CO::READONLY | CO::FAST | CO::RANDOM, 2, 1, 1, 1}.HFUNC(Pttl) - << CI{"TYPE", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Type); + << CI{"TYPE", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Type) + << CI{"UNLINK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Del); } } // namespace dfly diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 968b28a..8bcc565 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -315,9 +315,13 @@ Metrics ServerFamily::GetMetrics() const { result.qps += uint64_t(ss->MovingSum6()); if (shard) { - auto shard_stats = shard->db_slice().GetStats(); - result.db += shard_stats.db; - result.events += shard_stats.events; + auto db_stats = shard->db_slice().GetStats(); + result.db += db_stats.db; + result.events += db_stats.events; + + EngineShard::Stats shard_stats = shard->stats(); + result.heap_comitted_bytes += shard_stats.heap_comitted_bytes; + result.heap_used_bytes += shard_stats.heap_used_bytes; } }; @@ -376,13 +380,12 @@ tcp_port:)"; } if (should_enter("MEMORY")) { - size_t db_size = m.db.table_mem_usage + m.db.obj_memory_usage; - absl::StrAppend(&info, "# Memory\n"); - // TODO: should be extracted from mimalloc heaps instead of using table stats. - absl::StrAppend(&info, "used_memory:", db_size, "\n"); - absl::StrAppend(&info, "used_memory_human:", HumanReadableNumBytes(db_size), "\n"); + absl::StrAppend(&info, "used_memory:", m.heap_used_bytes, "\n"); + absl::StrAppend(&info, "used_memory_human:", HumanReadableNumBytes(m.heap_used_bytes), "\n"); + absl::StrAppend(&info, "comitted_memory:", m.heap_comitted_bytes, "\n"); + if (sdata_res.has_value()) { absl::StrAppend(&info, "used_memory_rss:", sdata_res->vm_rss, "\n"); absl::StrAppend(&info, "used_memory_rss_human:", HumanReadableNumBytes(sdata_res->vm_rss), @@ -393,7 +396,12 @@ tcp_port:)"; // TBD: should be the max of all seen used_memory values. absl::StrAppend(&info, "used_memory_peak:", -1, "\n"); - absl::StrAppend(&info, "object_used_memory:", m.db.obj_memory_usage, "\n"); + + // Blob - all these cases where the key/objects are represented by a single blob allocated on + // heap. For example, strings or intsets. members of lists, sets, zsets etc + // are not accounted for to avoid complex computations. In some cases, when number of members is + // known we approximate their allocations by taking 16 bytes per member. + absl::StrAppend(&info, "blob_used_memory:", m.db.obj_memory_usage, "\n"); absl::StrAppend(&info, "table_used_memory:", m.db.table_mem_usage, "\n"); absl::StrAppend(&info, "num_entries:", m.db.key_count, "\n"); absl::StrAppend(&info, "inline_keys:", m.db.inline_keys, "\n"); @@ -507,6 +515,10 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) { [&](util::ProactorBase* pb) { ServerState::tlocal()->is_master = is_master; }); } +void ServerFamily::Role(CmdArgList args, ConnectionContext* cntx) { + (*cntx)->SendDirect("*3\r\n$6\r\nmaster\r\n:0\r\n*0\r\n"); +} + void ServerFamily::Script(CmdArgList args, ConnectionContext* cntx) { args.remove_prefix(1); ToUpper(&args.front()); @@ -549,6 +561,7 @@ void ServerFamily::SyncGeneric(std::string_view repl_master_id, uint64_t offs, void ServerFamily::Register(CommandRegistry* registry) { constexpr auto kReplicaOpts = CO::ADMIN | CO::GLOBAL_TRANS; *registry << CI{"AUTH", CO::NOSCRIPT | CO::FAST | CO::LOADING, -2, 0, 0, 0}.HFUNC(Auth) + << CI{"BGSAVE", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Save) << CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize) << CI{"DEBUG", CO::RANDOM | CO::ADMIN | CO::LOADING, -2, 0, 0, 0}.HFUNC(Debug) << CI{"FLUSHDB", CO::WRITE | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(FlushDb) @@ -559,6 +572,7 @@ void ServerFamily::Register(CommandRegistry* registry) { << CI{"SHUTDOWN", CO::ADMIN | CO::NOSCRIPT | CO::LOADING, 1, 0, 0, 0}.HFUNC(_Shutdown) << CI{"SLAVEOF", kReplicaOpts, 3, 0, 0, 0}.HFUNC(ReplicaOf) << CI{"REPLICAOF", kReplicaOpts, 3, 0, 0, 0}.HFUNC(ReplicaOf) + << CI{"ROLE", CO::LOADING | CO::FAST | CO::NOSCRIPT, 1, 0, 0, 0}.HFUNC(Role) << CI{"SYNC", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Sync) << CI{"PSYNC", CO::ADMIN | CO::GLOBAL_TRANS, 3, 0, 0, 0}.HFUNC(Psync) << CI{"SCRIPT", CO::NOSCRIPT, -2, 0, 0, 0}.HFUNC(Script); diff --git a/src/server/server_family.h b/src/server/server_family.h index cd675d8..acbd423 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -25,6 +25,8 @@ struct Metrics { SliceEvents events; size_t qps = 0; + size_t heap_used_bytes = 0; + size_t heap_comitted_bytes = 0; ConnectionStats conn_stats; }; @@ -64,6 +66,7 @@ class ServerFamily { void LastSave(CmdArgList args, ConnectionContext* cntx); void Psync(CmdArgList args, ConnectionContext* cntx); void ReplicaOf(CmdArgList args, ConnectionContext* cntx); + void Role(CmdArgList args, ConnectionContext* cntx); void Save(CmdArgList args, ConnectionContext* cntx); void Script(CmdArgList args, ConnectionContext* cntx); void Sync(CmdArgList args, ConnectionContext* cntx);