From 770fe0fe473256d1db59c4fd6aafa2679e9e6c7a Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Fri, 11 Mar 2022 01:25:01 +0200 Subject: [PATCH] Memory accounting Bring back application level used-memory tracking. Use internal mimalloc api for extracting comitted memory stats. This is much better performance-wise because calling mi_heap_visit_blocks becomes very slow for larger database sizes. --- helio | 2 +- src/core/dash_test.cc | 4 ++-- src/core/mi_memory_resource.h | 9 +++++++- src/core/segment_allocator.h | 8 ++++++- src/core/small_string.cc | 4 ++++ src/core/small_string.h | 1 + src/redis/zmalloc.h | 1 + src/redis/zmalloc_mi.c | 24 +++++++++---------- src/server/dfly_main.cc | 1 - src/server/engine_shard_set.cc | 30 ++++++++++++++++++------ src/server/engine_shard_set.h | 18 ++++++++++++--- src/server/main_service.cc | 5 ++++ src/server/server_family.cc | 42 +++++++++++++++++++++++++--------- src/server/server_family.h | 3 +++ 14 files changed, 113 insertions(+), 39 deletions(-) diff --git a/helio b/helio index 6a755e1..2a6629d 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 6a755e1babf7383f90948c1d630728585ea6f10e +Subproject commit 2a6629d7e47da545bede22507d04bd2de409b1cb diff --git a/src/core/dash_test.cc b/src/core/dash_test.cc index 36bd30c..29a57b7 100644 --- a/src/core/dash_test.cc +++ b/src/core/dash_test.cc @@ -7,7 +7,7 @@ #include "core/dash.h" #include -#include +#include #include #include @@ -82,7 +82,7 @@ constexpr size_t foo = Segment::kBucketSz; class DashTest : public testing::Test { protected: static void SetUpTestCase() { - init_zmalloc_threadlocal(); + init_zmalloc_threadlocal(mi_heap_get_backing()); } DashTest() : segment_(1) { diff --git a/src/core/mi_memory_resource.h b/src/core/mi_memory_resource.h index f590804..572bfd2 100644 --- a/src/core/mi_memory_resource.h +++ b/src/core/mi_memory_resource.h @@ -17,13 +17,19 @@ class MiMemoryResource final : public std::pmr::memory_resource { mi_heap_t* heap() { return heap_;} + size_t used() const { return used_;} + private: void* do_allocate(std::size_t size, std::size_t align) { - return mi_heap_malloc_aligned(heap_, size, align); + void* res = mi_heap_malloc_aligned(heap_, size, align); + if (res) + used_ += mi_good_size(size); + return res; } void do_deallocate(void* ptr, std::size_t size, std::size_t align) { mi_free_size_aligned(ptr, size, align); + used_ -= mi_good_size(size); } bool do_is_equal(const std::pmr::memory_resource& o) const noexcept { @@ -31,6 +37,7 @@ class MiMemoryResource final : public std::pmr::memory_resource { } mi_heap_t* heap_; + size_t used_ = 0; }; } // namespace dfly \ No newline at end of file diff --git a/src/core/segment_allocator.h b/src/core/segment_allocator.h index d3468f7..b252e98 100644 --- a/src/core/segment_allocator.h +++ b/src/core/segment_allocator.h @@ -38,13 +38,17 @@ class SegmentAllocator { std::pair Allocate(uint32_t size); void Free(Ptr ptr) { - mi_free(Translate(ptr)); + void* p = Translate(ptr); + used_ -= mi_usable_size(p); + mi_free(p); } mi_heap_t* heap() { return heap_; } + size_t used() const { return used_; } + private: static uint32_t Offset(Ptr p) { return (p >> kSegmentIdBits) * 8; @@ -55,6 +59,7 @@ class SegmentAllocator { std::vector address_table_; absl::flat_hash_map rev_indx_; mi_heap_t* heap_; + size_t used_ = 0; }; inline auto SegmentAllocator::Allocate(uint32_t size) -> std::pair { @@ -69,6 +74,7 @@ inline auto SegmentAllocator::Allocate(uint32_t size) -> std::pairsecond; + used_ += mi_good_size(size); return std::make_pair(res, (uint8_t*)ptr); } diff --git a/src/core/small_string.cc b/src/core/small_string.cc index 80dedc7..3a7495f 100644 --- a/src/core/small_string.cc +++ b/src/core/small_string.cc @@ -42,6 +42,10 @@ void SmallString::InitThreadLocal(void * heap) { XXH3_64bits_reset_withSeed(tl.xxh_state.get(), kHashSeed); } +size_t SmallString::UsedThreadLocal() { + return tl.seg_alloc ? tl.seg_alloc->used() : 0; +} + static_assert(sizeof(SmallString) == 16); // we should use only for sizes greater than kPrefLen diff --git a/src/core/small_string.h b/src/core/small_string.h index 7d745aa..0462605 100644 --- a/src/core/small_string.h +++ b/src/core/small_string.h @@ -19,6 +19,7 @@ class SmallString { public: static void InitThreadLocal(void * heap); + static size_t UsedThreadLocal(); void Reset() { size_ = 0; diff --git a/src/redis/zmalloc.h b/src/redis/zmalloc.h index bb9704c..2eb9e46 100644 --- a/src/redis/zmalloc.h +++ b/src/redis/zmalloc.h @@ -114,6 +114,7 @@ size_t zmalloc_usable_size(const void* p); // roman: void zlibc_free(void *ptr); void init_zmalloc_threadlocal(void* heap); +extern __thread ssize_t zmalloc_used_memory_tl; #undef __zm_str #undef __xstr diff --git a/src/redis/zmalloc_mi.c b/src/redis/zmalloc_mi.c index 8e34935..196179b 100644 --- a/src/redis/zmalloc_mi.c +++ b/src/redis/zmalloc_mi.c @@ -9,7 +9,7 @@ #include "atomicvar.h" #include "zmalloc.h" -// __thread ssize_t used_memory_tl = 0; +__thread ssize_t zmalloc_used_memory_tl = 0; __thread mi_heap_t* zmalloc_heap = NULL; /* Allocate memory or panic */ @@ -27,10 +27,9 @@ size_t zmalloc_usable_size(const void* p) { } void zfree(void* ptr) { - // size_t usable = mi_usable_size(ptr); - // used_memory_tl -= usable; - mi_free(ptr); - // return mi_free_size(ptr, usable); + size_t usable = mi_usable_size(ptr); + zmalloc_used_memory_tl -= usable; + mi_free_size(ptr, usable); } void* zrealloc(void* ptr, size_t size) { @@ -39,9 +38,9 @@ void* zrealloc(void* ptr, size_t size) { } void* zcalloc(size_t size) { - // size_t usable = mi_good_size(size); + size_t usable = mi_good_size(size); - // used_memory_tl += usable; + zmalloc_used_memory_tl += usable; return mi_heap_calloc(zmalloc_heap, 1, size); } @@ -50,17 +49,17 @@ void* zmalloc_usable(size_t size, size_t* usable) { size_t g = mi_good_size(size); *usable = g; - // used_memory_tl += g; + zmalloc_used_memory_tl += g; assert(zmalloc_heap); return mi_heap_malloc(zmalloc_heap, g); } void* zrealloc_usable(void* ptr, size_t size, size_t* usable) { size_t g = mi_good_size(size); - // size_t prev = mi_usable_size(ptr); + size_t prev = mi_usable_size(ptr); *usable = g; - // used_memory_tl += (g - prev); + zmalloc_used_memory_tl += (g - prev); return mi_heap_realloc(zmalloc_heap, ptr, g); } @@ -69,6 +68,7 @@ size_t znallocx(size_t size) { } void zfree_size(void* ptr, size_t size) { + zmalloc_used_memory_tl -= size; mi_free_size(ptr, size); } @@ -78,8 +78,8 @@ void* ztrymalloc(size_t size) { } void* ztrycalloc(size_t size) { - // size_t g = mi_good_size(size); - // used_memory_tl += g; + size_t g = mi_good_size(size); + zmalloc_used_memory_tl += g; return mi_heap_calloc(zmalloc_heap, 1, size); } diff --git a/src/server/dfly_main.cc b/src/server/dfly_main.cc index 8e49853..db386bc 100644 --- a/src/server/dfly_main.cc +++ b/src/server/dfly_main.cc @@ -51,7 +51,6 @@ int main(int argc, char* argv[]) { mi_option_enable(mi_option_large_os_pages); mi_option_enable(mi_option_show_errors); mi_option_set(mi_option_max_warnings, 0); - _mi_options_init(); uring::UringPool pp{1024}; diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 449a1a8..230b9ae 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -20,6 +20,12 @@ using namespace util; namespace this_fiber = ::boost::this_fiber; namespace fibers = ::boost::fibers; +namespace { + +vector cached_stats; // initialized in EngineShardSet::Init + +} // namespace + thread_local EngineShard* EngineShard::shard_ = nullptr; constexpr size_t kQueueLen = 64; @@ -107,10 +113,12 @@ void EngineShard::DestroyThreadLocal() { return; uint32_t index = shard_->db_slice_.shard_id(); + mi_heap_t* tlh = shard_->mi_resource_.heap(); shard_->~EngineShard(); mi_free(shard_); shard_ = nullptr; CompactObj::InitThreadLocal(nullptr); + mi_heap_delete(tlh); VLOG(1) << "Shard reset " << index; } @@ -476,6 +484,7 @@ bool EngineShard::HasResultConverged(TxId notifyid) const { } void EngineShard::CacheStats() { +#if 0 mi_heap_t* tlh = mi_resource_.heap(); struct Sum { size_t used = 0; @@ -493,19 +502,22 @@ void EngineShard::CacheStats() { DVLOG(1) << "block_size " << block_size << "/" << area->block_size << ", reserved " << area->reserved << " comitted " << area->committed << " used: " << area->used; - return true; // continue iteration }; - - mi_heap_visit_blocks(tlh, false /* visit all blocks*/, visit_cb, &sum); - - stats_.heap_used_bytes = sum.used; - stats_.heap_comitted_bytes = sum.comitted; +#endif + // mi_heap_visit_blocks(tlh, false /* visit all blocks*/, visit_cb, &sum); + mi_stats_merge(); + // stats_.heap_used_bytes = sum.used; + stats_.heap_used_bytes = + mi_resource_.used() + zmalloc_used_memory_tl + SmallString::UsedThreadLocal(); + cached_stats[db_slice_.shard_id()].used_memory.store(stats_.heap_used_bytes, + memory_order_relaxed); + // stats_.heap_comitted_bytes = sum.comitted; } void EngineShardSet::Init(uint32_t sz) { CHECK_EQ(0u, size()); - + cached_stats.resize(sz); shard_queue_.resize(sz); } @@ -515,4 +527,8 @@ void EngineShardSet::InitThreadLocal(ProactorBase* pb, bool update_db_time) { shard_queue_[es->shard_id()] = es->GetFiberQueue(); } +const vector& EngineShardSet::GetCachedStats() { + return cached_stats; +} + } // namespace dfly diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 818c55c..cc0f8f7 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -13,8 +13,8 @@ extern "C" { #include #include "base/string_view_sso.h" -#include "core/tx_queue.h" #include "core/mi_memory_resource.h" +#include "core/tx_queue.h" #include "server/db_slice.h" #include "util/fibers/fiberqueue_threadpool.h" #include "util/fibers/fibers_ext.h" @@ -25,7 +25,7 @@ namespace dfly { class EngineShard { public: struct Stats { - uint64_t ooo_runs = 0; // how many times transactions run as OOO. + uint64_t ooo_runs = 0; // how many times transactions run as OOO. uint64_t quick_runs = 0; // how many times single shard "RunQuickie" transaction run. // number of bytes that were allocated by the application (with mi_mallocxxx methods). @@ -35,7 +35,7 @@ class EngineShard { // number of bytes comitted by the allocator library (i.e. mmapped into physical memory). // - size_t heap_comitted_bytes = 0; + // size_t heap_comitted_bytes = 0; }; // EngineShard() is private down below. @@ -184,6 +184,16 @@ class EngineShard { class EngineShardSet { public: + struct CachedStats { + std::atomic_uint64_t used_memory; + + CachedStats() : used_memory(0) { + } + + CachedStats(const CachedStats& o) : used_memory(o.used_memory.load()) { + } + }; + explicit EngineShardSet(util::ProactorPool* pp) : pp_(pp) { } @@ -198,6 +208,8 @@ class EngineShardSet { void Init(uint32_t size); void InitThreadLocal(util::ProactorBase* pb, bool update_db_time); + static const std::vector& GetCachedStats(); + // Uses a shard queue to dispatch. Callback runs in a dedicated fiber. template auto Await(ShardId sid, F&& f) { return shard_queue_[sid]->Await(std::forward(f)); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 98155c4..dd02822 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -308,6 +308,7 @@ void Service::Init(util::AcceptServer* acceptor, const InitOpts& opts) { request_latency_usec.Init(&pp_); StringFamily::Init(&pp_); GenericFamily::Init(&pp_); + server_family_.Init(acceptor); cmd_req.Init(&pp_, {"type"}); } @@ -322,6 +323,7 @@ void Service::Shutdown() { engine_varz.reset(); request_latency_usec.Shutdown(); + // We mark that we are shuttind down. pp_.AwaitFiberOnAll([](ProactorBase* pb) { ServerState::tlocal()->Shutdown(); }); // to shutdown all the runtime components that depend on EngineShard. @@ -331,6 +333,9 @@ void Service::Shutdown() { cmd_req.Shutdown(); shard_set_.RunBlockingInParallel([&](EngineShard*) { EngineShard::DestroyThreadLocal(); }); + + // wait for all the pending callbacks to stop. + boost::this_fiber::sleep_for(10ms); } void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) { diff --git a/src/server/server_family.cc b/src/server/server_family.cc index d65837a..82397ff 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -7,7 +7,7 @@ #include #include // for master_id_ generation. #include -#include +#include #include #include @@ -39,6 +39,8 @@ DEFINE_string(requirepass, "", "password for AUTH authentication"); DECLARE_uint32(port); +extern "C" mi_stats_t _mi_stats_main; + namespace dfly { using namespace std; @@ -70,6 +72,8 @@ error_code CreateDirs(fs::path dir_path) { return ec; } +atomic_uint64_t used_mem_peak(0); + } // namespace ServerFamily::ServerFamily(Service* service) : service_(*service), ess_(service->shard_set()) { @@ -84,12 +88,29 @@ ServerFamily::~ServerFamily() { void ServerFamily::Init(util::AcceptServer* acceptor) { CHECK(acceptor_ == nullptr); acceptor_ = acceptor; + + pb_task_ = ess_.pool()->GetNextProactor(); + auto cache_cb = [] { + uint64_t sum = 0; + const auto& stats = EngineShardSet::GetCachedStats(); + for (const auto& s : stats) + sum += s.used_memory.load(memory_order_relaxed); + + // Single writer, so no races. + if (sum > used_mem_peak.load(memory_order_relaxed)) + used_mem_peak.store(sum, memory_order_relaxed); + }; + + task_10ms_ = pb_task_->AwaitBrief([&] { return pb_task_->AddPeriodic(10, cache_cb); }); } void ServerFamily::Shutdown() { VLOG(1) << "ServerFamily::Shutdown"; - service_.proactor_pool().GetNextProactor()->Await([this] { + pb_task_->Await([this] { + pb_task_->CancelPeriodic(task_10ms_); + task_10ms_ = 0; + unique_lock lk(replica_of_mu_); if (replica_) { replica_->Stop(); @@ -321,7 +342,7 @@ Metrics ServerFamily::GetMetrics() const { result.events += db_stats.events; EngineShard::Stats shard_stats = shard->stats(); - result.heap_comitted_bytes += shard_stats.heap_comitted_bytes; + // result.heap_comitted_bytes += shard_stats.heap_comitted_bytes; result.heap_used_bytes += shard_stats.heap_used_bytes; } }; @@ -385,7 +406,7 @@ tcp_port:)"; absl::StrAppend(&info, "used_memory:", m.heap_used_bytes, "\n"); absl::StrAppend(&info, "used_memory_human:", HumanReadableNumBytes(m.heap_used_bytes), "\n"); - absl::StrAppend(&info, "comitted_memory:", m.heap_comitted_bytes, "\n"); + absl::StrAppend(&info, "comitted_memory:", _mi_stats_main.committed.current, "\n"); if (sdata_res.has_value()) { absl::StrAppend(&info, "used_memory_rss:", sdata_res->vm_rss, "\n"); @@ -395,13 +416,12 @@ tcp_port:)"; LOG(ERROR) << "Error fetching /proc/self/status stats"; } - // TBD: should be the max of all seen used_memory values. - absl::StrAppend(&info, "used_memory_peak:", -1, "\n"); + absl::StrAppend(&info, "used_memory_peak:", used_mem_peak.load(memory_order_relaxed), "\n"); // Blob - all these cases where the key/objects are represented by a single blob allocated on // heap. For example, strings or intsets. members of lists, sets, zsets etc - // are not accounted for to avoid complex computations. In some cases, when number of members is - // known we approximate their allocations by taking 16 bytes per member. + // are not accounted for to avoid complex computations. In some cases, when number of members + // is known we approximate their allocations by taking 16 bytes per member. absl::StrAppend(&info, "blob_used_memory:", m.db.obj_memory_usage, "\n"); absl::StrAppend(&info, "table_used_memory:", m.db.table_mem_usage, "\n"); absl::StrAppend(&info, "num_buckets:", m.db.bucket_count, "\n"); @@ -440,9 +460,9 @@ tcp_port:)"; } else { absl::StrAppend(&info, "role:slave\n"); - // it's safe to access replica_ because replica_ is created before etl.is_master set to false - // and cleared after etl.is_master is set to true. And since the code here that checks for - // is_master and copies shared_ptr is atomic, it1 should be correct. + // it's safe to access replica_ because replica_ is created before etl.is_master set to + // false and cleared after etl.is_master is set to true. And since the code here that checks + // for is_master and copies shared_ptr is atomic, it1 should be correct. auto replica_ptr = replica_; Replica::Info rinfo = replica_ptr->GetInfo(); absl::StrAppend(&info, "master_host:", rinfo.host, "\n"); diff --git a/src/server/server_family.h b/src/server/server_family.h index 293dbbb..c50ef34 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -77,10 +77,12 @@ class ServerFamily { void SyncGeneric(std::string_view repl_master_id, uint64_t offs, ConnectionContext* cntx); + uint32_t task_10ms_ = 0; Service& service_; EngineShardSet& ess_; util::AcceptServer* acceptor_ = nullptr; + util::ProactorBase* pb_task_ = nullptr; ::boost::fibers::mutex replica_of_mu_; std::shared_ptr replica_; // protected by replica_of_mu_ @@ -89,6 +91,7 @@ class ServerFamily { std::atomic last_save_; // in seconds. GlobalState global_state_; time_t start_time_ = 0; // in seconds, epoch time. + }; } // namespace dfly