From 254e640a19bd68bda74f2236ca71ba14fd73bc60 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Tue, 25 Jan 2022 21:52:29 +0200 Subject: [PATCH] Introduce per-shard MiMemoryResource --- core/mi_memory_resource.h | 34 ++++++++++++++++++++++++++++++++++ server/db_slice.cc | 26 +++++++++++++------------- server/db_slice.h | 8 ++++++-- server/engine_shard_set.cc | 21 ++++++++++++--------- server/engine_shard_set.h | 16 +++++++++++----- 5 files changed, 76 insertions(+), 29 deletions(-) create mode 100644 core/mi_memory_resource.h diff --git a/core/mi_memory_resource.h b/core/mi_memory_resource.h new file mode 100644 index 0000000..9cc0d9b --- /dev/null +++ b/core/mi_memory_resource.h @@ -0,0 +1,34 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include + +#include + +namespace dfly { + +class MiMemoryResource final : public std::pmr::memory_resource { + public: + explicit MiMemoryResource(mi_heap_t* heap) : heap_(heap) { + } + + private: + void* do_allocate(std::size_t size, std::size_t align) { + return mi_heap_malloc_aligned(heap_, size, align); + } + + void do_deallocate(void* ptr, std::size_t size, std::size_t align) { + mi_free_size_aligned(ptr, size, align); + } + + bool do_is_equal(const std::pmr::memory_resource& o) const noexcept { + return this == &o; + } + + mi_heap_t* heap_; +}; + +} // namespace dfly \ No newline at end of file diff --git a/server/db_slice.cc b/server/db_slice.cc index 3f3c1c7..ada37e0 100644 --- a/server/db_slice.cc +++ b/server/db_slice.cc @@ -68,12 +68,12 @@ auto DbSlice::GetStats() const -> Stats { if (!db) continue; - s.db.key_count += db->main_table.size(); - s.db.bucket_count += db->main_table.bucket_count(); + s.db.key_count += db->prime_table.size(); + s.db.bucket_count += db->prime_table.bucket_count(); s.db.expire_count += db->expire_table.size(); s.db.obj_memory_usage += db->stats.obj_memory_usage; s.db.inline_keys += db->stats.inline_keys; - s.db.table_mem_usage += (db->main_table.mem_usage() + db->expire_table.mem_usage()); + s.db.table_mem_usage += (db->prime_table.mem_usage() + db->expire_table.mem_usage()); } return s; @@ -85,7 +85,7 @@ void DbSlice::Reserve(DbIndex db_ind, size_t key_size) { auto& db = db_arr_[db_ind]; DCHECK(db); - db->main_table.Reserve(key_size); + db->prime_table.Reserve(key_size); } auto DbSlice::Find(DbIndex db_index, string_view key, unsigned req_obj_type) const @@ -106,7 +106,7 @@ pair DbSlice::FindExt(DbIndex db_ind, string_view DCHECK(IsDbValid(db_ind)); auto& db = db_arr_[db_ind]; - MainIterator it = db->main_table.Find(key); + MainIterator it = db->prime_table.Find(key); if (!IsValid(it)) { return make_pair(it, ExpireIterator{}); @@ -156,7 +156,7 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pairmain_table.Insert(std::move(co_key), PrimeValue{}); + auto [it, inserted] = db->prime_table.Insert(std::move(co_key), PrimeValue{}); if (inserted) { // new entry db->stats.inline_keys += it->first.IsInline(); db->stats.obj_memory_usage += it->first.MallocUsed(); @@ -195,7 +195,7 @@ void DbSlice::ActivateDb(DbIndex db_ind) { void DbSlice::CreateDb(DbIndex index) { auto& db = db_arr_[index]; if (!db) { - db.reset(new DbWrapper); + db.reset(new DbWrapper{owner_->memory_resource()}); } } @@ -211,7 +211,7 @@ bool DbSlice::Del(DbIndex db_ind, MainIterator it) { db->stats.inline_keys -= it->first.IsInline(); db->stats.obj_memory_usage -= (it->first.MallocUsed() + it->second.MallocUsed()); - db->main_table.Erase(it); + db->prime_table.Erase(it); return true; } @@ -222,8 +222,8 @@ size_t DbSlice::FlushDb(DbIndex db_ind) { CHECK(db); - size_t removed = db->main_table.size(); - db->main_table.Clear(); + size_t removed = db->prime_table.size(); + db->prime_table.Clear(); db->expire_table.Clear(); db->stats.inline_keys = 0; db->stats.obj_memory_usage = 0; @@ -281,7 +281,7 @@ bool DbSlice::AddIfNotExist(DbIndex db_ind, string_view key, PrimeValue obj, auto& db = db_arr_[db_ind]; CompactObj co_key{key}; - auto [new_entry, success] = db->main_table.Insert(std::move(co_key), std::move(obj)); + auto [new_entry, success] = db->prime_table.Insert(std::move(co_key), std::move(obj)); if (!success) return false; // in this case obj won't be moved and will be destroyed during unwinding. @@ -303,7 +303,7 @@ size_t DbSlice::DbSize(DbIndex db_ind) const { DCHECK_LT(db_ind, db_array_size()); if (IsDbValid(db_ind)) { - return db_arr_[db_ind]->main_table.size(); + return db_arr_[db_ind]->prime_table.size(); } return 0; } @@ -409,7 +409,7 @@ pair DbSlice::ExpireIfNeeded(DbIndex db_ind, MainI db->stats.inline_keys -= it->first.IsInline(); db->stats.obj_memory_usage -= (it->first.MallocUsed() + it->second.MallocUsed()); - db->main_table.Erase(it); + db->prime_table.Erase(it); return make_pair(MainIterator{}, ExpireIterator{}); } diff --git a/server/db_slice.h b/server/db_slice.h index 1e01dc7..a0a388b 100644 --- a/server/db_slice.h +++ b/server/db_slice.h @@ -144,7 +144,7 @@ class DbSlice { } std::pair GetTables(DbIndex id) { - return std::pair(&db_arr_[id]->main_table, + return std::pair(&db_arr_[id]->prime_table, &db_arr_[id]->expire_table); } @@ -196,11 +196,15 @@ class DbSlice { using LockTable = absl::flat_hash_map; struct DbWrapper { - PrimeTable main_table; + PrimeTable prime_table; ExpireTable expire_table; LockTable lock_table; mutable InternalDbStats stats; + + explicit DbWrapper(std::pmr::memory_resource* mr) + : prime_table(4, detail::PrimeTablePolicy{}, mr) { + } }; std::vector> db_arr_; diff --git a/server/engine_shard_set.cc b/server/engine_shard_set.cc index e1c2942..0086504 100644 --- a/server/engine_shard_set.cc +++ b/server/engine_shard_set.cc @@ -1,11 +1,11 @@ -// Copyright 2021, Roman Gershman. All rights reserved. +// Copyright 2022, Roman Gershman. All rights reserved. // See LICENSE for licensing terms. // #include "server/engine_shard_set.h" extern "C" { - #include "redis/zmalloc.h" +#include "redis/zmalloc.h" } #include "base/logging.h" @@ -52,8 +52,8 @@ bool EngineShard::DbWatchTable::RemoveEntry(WatchQueueMap::iterator it) { return queue_map.empty(); } -EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time) - : queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), +EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap) + : queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap), db_slice_(pb->GetIndex(), this) { fiber_q_ = fibers::fiber([this, index = pb->GetIndex()] { this_fiber::properties().set_name(absl::StrCat("shard_queue", index)); @@ -85,7 +85,10 @@ void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) { CHECK(shard_ == nullptr) << pb->GetIndex(); init_zmalloc_threadlocal(); - shard_ = new EngineShard(pb, update_db_time); + + mi_heap_t* tlh = mi_heap_get_backing(); + void* ptr = mi_heap_malloc_aligned(tlh, sizeof(EngineShard), alignof(EngineShard)); + shard_ = new (ptr) EngineShard(pb, update_db_time, tlh); } void EngineShard::DestroyThreadLocal() { @@ -93,7 +96,8 @@ void EngineShard::DestroyThreadLocal() { return; uint32_t index = shard_->db_slice_.shard_id(); - delete shard_; + shard_->~EngineShard(); + mi_free(shard_); shard_ = nullptr; VLOG(1) << "Shard reset " << index; @@ -237,7 +241,7 @@ Transaction* EngineShard::NotifyWatchQueue(WatchQueue* wq) { // Processes potentially awakened keys and verifies that these are indeed // awakened to eliminate false positives. -// In addition it, optionally removes completed_t from watch queues. +// In addition, optionally removes completed_t from the watch queues. void EngineShard::ProcessAwakened(Transaction* completed_t) { for (DbIndex index : awakened_indices_) { DbWatchTable& wt = watched_dbs_[index]; @@ -376,7 +380,7 @@ void EngineShard::GCWatched(const KeyLockArgs& largs) { } // Called from commands like lpush. -void EngineShard::AwakeWatched(DbIndex db_index, std::string_view db_key) { +void EngineShard::AwakeWatched(DbIndex db_index, string_view db_key) { auto it = watched_dbs_.find(db_index); if (it == watched_dbs_.end()) return; @@ -384,7 +388,6 @@ void EngineShard::AwakeWatched(DbIndex db_index, std::string_view db_key) { DbWatchTable& wt = it->second; DCHECK(!wt.queue_map.empty()); - string tmp; auto wit = wt.queue_map.find(db_key); if (wit == wt.queue_map.end()) diff --git a/server/engine_shard_set.h b/server/engine_shard_set.h index eec6a7e..bab27d6 100644 --- a/server/engine_shard_set.h +++ b/server/engine_shard_set.h @@ -1,4 +1,4 @@ -// Copyright 2021, Roman Gershman. All rights reserved. +// Copyright 2022, Roman Gershman. All rights reserved. // See LICENSE for licensing terms. // @@ -14,6 +14,7 @@ extern "C" { #include "base/string_view_sso.h" #include "core/tx_queue.h" +#include "core/mi_memory_resource.h" #include "server/db_slice.h" #include "util/fibers/fiberqueue_threadpool.h" #include "util/fibers/fibers_ext.h" @@ -53,6 +54,10 @@ class EngineShard { return db_slice_; } + std::pmr::memory_resource* memory_resource() { + return &mi_resource_; + } + ::util::fibers_ext::FiberQueue* GetFiberQueue() { return &queue_; } @@ -94,7 +99,7 @@ class EngineShard { bool RemovedWatched(std::string_view key, Transaction* me); void GCWatched(const KeyLockArgs& lock_args); - void AwakeWatched(DbIndex db_index, std::string_view key); + void AwakeWatched(DbIndex db_index, std::string_view db_key); bool HasAwakedTransaction() const { return !awakened_transactions_.empty(); @@ -117,7 +122,7 @@ class EngineShard { sds tmp_str; private: - EngineShard(util::ProactorBase* pb, bool update_db_time); + EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap); struct WatchQueue; @@ -129,7 +134,8 @@ class EngineShard { Transaction* NotifyWatchQueue(WatchQueue* wq); using WatchQueueMap = absl::flat_hash_map>; - // Watch state per db slice. + + // Watch state per db. struct DbWatchTable { WatchQueueMap queue_map; @@ -137,7 +143,6 @@ class EngineShard { // they reference key objects in queue_map. absl::flat_hash_set awakened_keys; - // Returns true if queue_map is empty and DbWatchTable can be removed as well. bool RemoveEntry(WatchQueueMap::iterator it); }; @@ -152,6 +157,7 @@ class EngineShard { ::boost::fibers::fiber fiber_q_; TxQueue txq_; + MiMemoryResource mi_resource_; DbSlice db_slice_; Stats stats_;