Add decorators for commands like ROLE,BGSAVE,UNLINK. Improve memory usage tracking

This commit is contained in:
Roman Gershman 2022-02-28 17:36:45 +02:00
parent 668a51cafa
commit f255d17a72
7 changed files with 96 additions and 28 deletions

View File

@ -166,6 +166,11 @@ Commands that I prefer not implement before launch:
Also, I would omit keyspace notifications. For that I would like to deep dive and learn Also, I would omit keyspace notifications. For that I would like to deep dive and learn
exact use-cases for this API. exact use-cases for this API.
### Random commands we implemented along the way
- [X] ROLE (2.8) decorator for for master withour replicas
- [X] UNLINK (4.0) decorator for DEL command
- [X] BGSAVE
## Milestone Nymph ## Milestone Nymph
API 2,3,4 without cluster support, without modules, without memory inspection commands. API 2,3,4 without cluster support, without modules, without memory inspection commands.
Without support for keyspace notifications. Without support for keyspace notifications.

View File

@ -19,6 +19,12 @@ extern "C" {
#include "base/logging.h" #include "base/logging.h"
#include "base/pod_array.h" #include "base/pod_array.h"
#if defined(__aarch64__)
#include "base/sse2neon.h"
#else
#include <emmintrin.h>
#endif
namespace dfly { namespace dfly {
using namespace std; using namespace std;
@ -26,22 +32,18 @@ namespace {
constexpr XXH64_hash_t kHashSeed = 24061983; constexpr XXH64_hash_t kHashSeed = 24061983;
size_t QlUsedSize(quicklist* ql) { // Approximation since does not account for listpacks.
size_t QlMAllocSize(quicklist* ql) {
size_t res = ql->len * sizeof(quicklistNode) + znallocx(sizeof(quicklist)); size_t res = ql->len * sizeof(quicklistNode) + znallocx(sizeof(quicklist));
quicklistNode* ptr = ql->head; return res + ql->count * 16; // we account for each member 16 bytes.
while (ptr) {
res += ptr->sz;
ptr = ptr->next;
}
return res;
} }
// Approximated dictionary size. // Approximated dictionary size.
size_t DictMallocSize(dict* d) { size_t DictMallocSize(dict* d) {
size_t res = size_t res = zmalloc_usable_size(d->ht_table[0]) + zmalloc_usable_size(d->ht_table[1]) +
zmalloc_usable_size(d->ht_table[0]) + zmalloc_usable_size(d->ht_table[1]) + sizeof(dict); znallocx(sizeof(dict));
res += dictSize(d) * 8; // approximation.
return res; return res = dictSize(d) * 16; // approximation.
} }
// Deniel's Lemire function validate_ascii_fast() - under Apache/MIT license. // Deniel's Lemire function validate_ascii_fast() - under Apache/MIT license.
@ -175,7 +177,7 @@ size_t RobjWrapper::MallocUsed() const {
break; break;
case OBJ_LIST: case OBJ_LIST:
CHECK_EQ(encoding, OBJ_ENCODING_QUICKLIST); CHECK_EQ(encoding, OBJ_ENCODING_QUICKLIST);
return QlUsedSize((quicklist*)ptr); return QlMAllocSize((quicklist*)ptr);
case OBJ_SET: case OBJ_SET:
switch (encoding) { switch (encoding) {
case OBJ_ENCODING_HT: case OBJ_ENCODING_HT:

View File

@ -61,11 +61,15 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t*
}); });
if (update_db_time) { if (update_db_time) {
periodic_task_ = pb->AddPeriodic(1, [] { constexpr uint32_t kClockCycleMs = 1;
auto* shard = EngineShard::tlocal();
DCHECK(shard); periodic_task_ = pb->AddPeriodic(kClockCycleMs, [this] {
// absl::GetCurrentTimeNanos() returns current time since the Unix Epoch. // absl::GetCurrentTimeNanos() returns current time since the Unix Epoch.
shard->db_slice().UpdateExpireClock(absl::GetCurrentTimeNanos() / 1000000); db_slice().UpdateExpireClock(absl::GetCurrentTimeNanos() / 1000000);
if (task_iters_++ % 8 == 0) {
CacheStats();
}
}); });
} }
@ -466,6 +470,34 @@ bool EngineShard::HasResultConverged(TxId notifyid) const {
(committed_txid_ == notifyid || txq_.HeadScore() > notifyid)); (committed_txid_ == notifyid || txq_.HeadScore() > notifyid));
} }
void EngineShard::CacheStats() {
mi_heap_t* tlh = mi_heap_get_backing();
struct Sum {
size_t used = 0;
size_t comitted = 0;
} sum;
auto visit_cb = [](const mi_heap_t* heap, const mi_heap_area_t* area, void* block,
size_t block_size, void* arg) -> bool {
DCHECK(!block);
Sum* sum = (Sum*)arg;
// mimalloc mistakenly exports used in blocks instead of bytes.
sum->used += block_size * area->used;
sum->comitted += area->committed;
DVLOG(1) << "block_size " << block_size << "/" << area->block_size << ", reserved "
<< area->reserved << " comitted " << area->committed << " used: " << area->used;
return true; // continue iteration
};
mi_heap_visit_blocks(tlh, false /* visit all blocks*/, visit_cb, &sum);
stats_.heap_used_bytes = sum.used;
stats_.heap_comitted_bytes = sum.comitted;
}
void EngineShardSet::Init(uint32_t sz) { void EngineShardSet::Init(uint32_t sz) {
CHECK_EQ(0u, size()); CHECK_EQ(0u, size());

View File

@ -25,8 +25,17 @@ namespace dfly {
class EngineShard { class EngineShard {
public: public:
struct Stats { struct Stats {
uint64_t ooo_runs = 0; uint64_t ooo_runs = 0; // how many times transactions run as OOO.
uint64_t quick_runs = 0; uint64_t quick_runs = 0; // how many times single shard "RunQuickie" transaction run.
// number of bytes that were allocated by the application (with mi_mallocxxx methods).
// should be less than or equal to heap_comitted_bytes.
// Cached every few millis.
size_t heap_used_bytes = 0;
// number of bytes comitted by the allocator library (i.e. mmapped into physical memory).
//
size_t heap_comitted_bytes = 0;
}; };
// EngineShard() is private down below. // EngineShard() is private down below.
@ -128,6 +137,7 @@ class EngineShard {
void OnTxFinish(); void OnTxFinish();
void NotifyConvergence(Transaction* tx); void NotifyConvergence(Transaction* tx);
void CacheStats();
/// Returns the notified transaction, /// Returns the notified transaction,
/// or null if all transactions in the queue have expired.. /// or null if all transactions in the queue have expired..
@ -167,6 +177,7 @@ class EngineShard {
IntentLock shard_lock_; IntentLock shard_lock_;
uint32_t periodic_task_ = 0; uint32_t periodic_task_ = 0;
uint64_t task_iters_ = 0;
static thread_local EngineShard* shard_; static thread_local EngineShard* shard_;
}; };

View File

@ -571,7 +571,8 @@ void GenericFamily::Register(CommandRegistry* registry) {
<< CI{"SCAN", CO::READONLY | CO::FAST, -2, 0, 0, 0}.HFUNC(Scan) << CI{"SCAN", CO::READONLY | CO::FAST, -2, 0, 0, 0}.HFUNC(Scan)
<< CI{"TTL", CO::READONLY | CO::FAST | CO::RANDOM, 2, 1, 1, 1}.HFUNC(Ttl) << CI{"TTL", CO::READONLY | CO::FAST | CO::RANDOM, 2, 1, 1, 1}.HFUNC(Ttl)
<< CI{"PTTL", CO::READONLY | CO::FAST | CO::RANDOM, 2, 1, 1, 1}.HFUNC(Pttl) << CI{"PTTL", CO::READONLY | CO::FAST | CO::RANDOM, 2, 1, 1, 1}.HFUNC(Pttl)
<< CI{"TYPE", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Type); << CI{"TYPE", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Type)
<< CI{"UNLINK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Del);
} }
} // namespace dfly } // namespace dfly

View File

@ -315,9 +315,13 @@ Metrics ServerFamily::GetMetrics() const {
result.qps += uint64_t(ss->MovingSum6()); result.qps += uint64_t(ss->MovingSum6());
if (shard) { if (shard) {
auto shard_stats = shard->db_slice().GetStats(); auto db_stats = shard->db_slice().GetStats();
result.db += shard_stats.db; result.db += db_stats.db;
result.events += shard_stats.events; result.events += db_stats.events;
EngineShard::Stats shard_stats = shard->stats();
result.heap_comitted_bytes += shard_stats.heap_comitted_bytes;
result.heap_used_bytes += shard_stats.heap_used_bytes;
} }
}; };
@ -376,13 +380,12 @@ tcp_port:)";
} }
if (should_enter("MEMORY")) { if (should_enter("MEMORY")) {
size_t db_size = m.db.table_mem_usage + m.db.obj_memory_usage;
absl::StrAppend(&info, "# Memory\n"); absl::StrAppend(&info, "# Memory\n");
// TODO: should be extracted from mimalloc heaps instead of using table stats. absl::StrAppend(&info, "used_memory:", m.heap_used_bytes, "\n");
absl::StrAppend(&info, "used_memory:", db_size, "\n"); absl::StrAppend(&info, "used_memory_human:", HumanReadableNumBytes(m.heap_used_bytes), "\n");
absl::StrAppend(&info, "used_memory_human:", HumanReadableNumBytes(db_size), "\n"); absl::StrAppend(&info, "comitted_memory:", m.heap_comitted_bytes, "\n");
if (sdata_res.has_value()) { if (sdata_res.has_value()) {
absl::StrAppend(&info, "used_memory_rss:", sdata_res->vm_rss, "\n"); absl::StrAppend(&info, "used_memory_rss:", sdata_res->vm_rss, "\n");
absl::StrAppend(&info, "used_memory_rss_human:", HumanReadableNumBytes(sdata_res->vm_rss), absl::StrAppend(&info, "used_memory_rss_human:", HumanReadableNumBytes(sdata_res->vm_rss),
@ -393,7 +396,12 @@ tcp_port:)";
// TBD: should be the max of all seen used_memory values. // TBD: should be the max of all seen used_memory values.
absl::StrAppend(&info, "used_memory_peak:", -1, "\n"); absl::StrAppend(&info, "used_memory_peak:", -1, "\n");
absl::StrAppend(&info, "object_used_memory:", m.db.obj_memory_usage, "\n");
// Blob - all these cases where the key/objects are represented by a single blob allocated on
// heap. For example, strings or intsets. members of lists, sets, zsets etc
// are not accounted for to avoid complex computations. In some cases, when number of members is
// known we approximate their allocations by taking 16 bytes per member.
absl::StrAppend(&info, "blob_used_memory:", m.db.obj_memory_usage, "\n");
absl::StrAppend(&info, "table_used_memory:", m.db.table_mem_usage, "\n"); absl::StrAppend(&info, "table_used_memory:", m.db.table_mem_usage, "\n");
absl::StrAppend(&info, "num_entries:", m.db.key_count, "\n"); absl::StrAppend(&info, "num_entries:", m.db.key_count, "\n");
absl::StrAppend(&info, "inline_keys:", m.db.inline_keys, "\n"); absl::StrAppend(&info, "inline_keys:", m.db.inline_keys, "\n");
@ -507,6 +515,10 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
[&](util::ProactorBase* pb) { ServerState::tlocal()->is_master = is_master; }); [&](util::ProactorBase* pb) { ServerState::tlocal()->is_master = is_master; });
} }
void ServerFamily::Role(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendDirect("*3\r\n$6\r\nmaster\r\n:0\r\n*0\r\n");
}
void ServerFamily::Script(CmdArgList args, ConnectionContext* cntx) { void ServerFamily::Script(CmdArgList args, ConnectionContext* cntx) {
args.remove_prefix(1); args.remove_prefix(1);
ToUpper(&args.front()); ToUpper(&args.front());
@ -549,6 +561,7 @@ void ServerFamily::SyncGeneric(std::string_view repl_master_id, uint64_t offs,
void ServerFamily::Register(CommandRegistry* registry) { void ServerFamily::Register(CommandRegistry* registry) {
constexpr auto kReplicaOpts = CO::ADMIN | CO::GLOBAL_TRANS; constexpr auto kReplicaOpts = CO::ADMIN | CO::GLOBAL_TRANS;
*registry << CI{"AUTH", CO::NOSCRIPT | CO::FAST | CO::LOADING, -2, 0, 0, 0}.HFUNC(Auth) *registry << CI{"AUTH", CO::NOSCRIPT | CO::FAST | CO::LOADING, -2, 0, 0, 0}.HFUNC(Auth)
<< CI{"BGSAVE", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Save)
<< CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize) << CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize)
<< CI{"DEBUG", CO::RANDOM | CO::ADMIN | CO::LOADING, -2, 0, 0, 0}.HFUNC(Debug) << CI{"DEBUG", CO::RANDOM | CO::ADMIN | CO::LOADING, -2, 0, 0, 0}.HFUNC(Debug)
<< CI{"FLUSHDB", CO::WRITE | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(FlushDb) << CI{"FLUSHDB", CO::WRITE | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(FlushDb)
@ -559,6 +572,7 @@ void ServerFamily::Register(CommandRegistry* registry) {
<< CI{"SHUTDOWN", CO::ADMIN | CO::NOSCRIPT | CO::LOADING, 1, 0, 0, 0}.HFUNC(_Shutdown) << CI{"SHUTDOWN", CO::ADMIN | CO::NOSCRIPT | CO::LOADING, 1, 0, 0, 0}.HFUNC(_Shutdown)
<< CI{"SLAVEOF", kReplicaOpts, 3, 0, 0, 0}.HFUNC(ReplicaOf) << CI{"SLAVEOF", kReplicaOpts, 3, 0, 0, 0}.HFUNC(ReplicaOf)
<< CI{"REPLICAOF", kReplicaOpts, 3, 0, 0, 0}.HFUNC(ReplicaOf) << CI{"REPLICAOF", kReplicaOpts, 3, 0, 0, 0}.HFUNC(ReplicaOf)
<< CI{"ROLE", CO::LOADING | CO::FAST | CO::NOSCRIPT, 1, 0, 0, 0}.HFUNC(Role)
<< CI{"SYNC", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Sync) << CI{"SYNC", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Sync)
<< CI{"PSYNC", CO::ADMIN | CO::GLOBAL_TRANS, 3, 0, 0, 0}.HFUNC(Psync) << CI{"PSYNC", CO::ADMIN | CO::GLOBAL_TRANS, 3, 0, 0, 0}.HFUNC(Psync)
<< CI{"SCRIPT", CO::NOSCRIPT, -2, 0, 0, 0}.HFUNC(Script); << CI{"SCRIPT", CO::NOSCRIPT, -2, 0, 0, 0}.HFUNC(Script);

View File

@ -25,6 +25,8 @@ struct Metrics {
SliceEvents events; SliceEvents events;
size_t qps = 0; size_t qps = 0;
size_t heap_used_bytes = 0;
size_t heap_comitted_bytes = 0;
ConnectionStats conn_stats; ConnectionStats conn_stats;
}; };
@ -64,6 +66,7 @@ class ServerFamily {
void LastSave(CmdArgList args, ConnectionContext* cntx); void LastSave(CmdArgList args, ConnectionContext* cntx);
void Psync(CmdArgList args, ConnectionContext* cntx); void Psync(CmdArgList args, ConnectionContext* cntx);
void ReplicaOf(CmdArgList args, ConnectionContext* cntx); void ReplicaOf(CmdArgList args, ConnectionContext* cntx);
void Role(CmdArgList args, ConnectionContext* cntx);
void Save(CmdArgList args, ConnectionContext* cntx); void Save(CmdArgList args, ConnectionContext* cntx);
void Script(CmdArgList args, ConnectionContext* cntx); void Script(CmdArgList args, ConnectionContext* cntx);
void Sync(CmdArgList args, ConnectionContext* cntx); void Sync(CmdArgList args, ConnectionContext* cntx);