Introduce per-shard MiMemoryResource

This commit is contained in:
Roman Gershman 2022-01-25 21:52:29 +02:00
parent 7ee6bd8471
commit 254e640a19
5 changed files with 76 additions and 29 deletions

34
core/mi_memory_resource.h Normal file
View File

@ -0,0 +1,34 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <mimalloc.h>
#include <memory_resource>
namespace dfly {
class MiMemoryResource final : public std::pmr::memory_resource {
public:
explicit MiMemoryResource(mi_heap_t* heap) : heap_(heap) {
}
private:
void* do_allocate(std::size_t size, std::size_t align) {
return mi_heap_malloc_aligned(heap_, size, align);
}
void do_deallocate(void* ptr, std::size_t size, std::size_t align) {
mi_free_size_aligned(ptr, size, align);
}
bool do_is_equal(const std::pmr::memory_resource& o) const noexcept {
return this == &o;
}
mi_heap_t* heap_;
};
} // namespace dfly

View File

@ -68,12 +68,12 @@ auto DbSlice::GetStats() const -> Stats {
if (!db) if (!db)
continue; continue;
s.db.key_count += db->main_table.size(); s.db.key_count += db->prime_table.size();
s.db.bucket_count += db->main_table.bucket_count(); s.db.bucket_count += db->prime_table.bucket_count();
s.db.expire_count += db->expire_table.size(); s.db.expire_count += db->expire_table.size();
s.db.obj_memory_usage += db->stats.obj_memory_usage; s.db.obj_memory_usage += db->stats.obj_memory_usage;
s.db.inline_keys += db->stats.inline_keys; s.db.inline_keys += db->stats.inline_keys;
s.db.table_mem_usage += (db->main_table.mem_usage() + db->expire_table.mem_usage()); s.db.table_mem_usage += (db->prime_table.mem_usage() + db->expire_table.mem_usage());
} }
return s; return s;
@ -85,7 +85,7 @@ void DbSlice::Reserve(DbIndex db_ind, size_t key_size) {
auto& db = db_arr_[db_ind]; auto& db = db_arr_[db_ind];
DCHECK(db); DCHECK(db);
db->main_table.Reserve(key_size); db->prime_table.Reserve(key_size);
} }
auto DbSlice::Find(DbIndex db_index, string_view key, unsigned req_obj_type) const auto DbSlice::Find(DbIndex db_index, string_view key, unsigned req_obj_type) const
@ -106,7 +106,7 @@ pair<MainIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, string_view
DCHECK(IsDbValid(db_ind)); DCHECK(IsDbValid(db_ind));
auto& db = db_arr_[db_ind]; auto& db = db_arr_[db_ind];
MainIterator it = db->main_table.Find(key); MainIterator it = db->prime_table.Find(key);
if (!IsValid(it)) { if (!IsValid(it)) {
return make_pair(it, ExpireIterator{}); return make_pair(it, ExpireIterator{});
@ -156,7 +156,7 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pair<MainIterator,
// Fast-path - change_cb_ is empty so we Find or Add using // Fast-path - change_cb_ is empty so we Find or Add using
// the insert operation: twice more efficient. // the insert operation: twice more efficient.
CompactObj co_key{key}; CompactObj co_key{key};
auto [it, inserted] = db->main_table.Insert(std::move(co_key), PrimeValue{}); auto [it, inserted] = db->prime_table.Insert(std::move(co_key), PrimeValue{});
if (inserted) { // new entry if (inserted) { // new entry
db->stats.inline_keys += it->first.IsInline(); db->stats.inline_keys += it->first.IsInline();
db->stats.obj_memory_usage += it->first.MallocUsed(); db->stats.obj_memory_usage += it->first.MallocUsed();
@ -195,7 +195,7 @@ void DbSlice::ActivateDb(DbIndex db_ind) {
void DbSlice::CreateDb(DbIndex index) { void DbSlice::CreateDb(DbIndex index) {
auto& db = db_arr_[index]; auto& db = db_arr_[index];
if (!db) { if (!db) {
db.reset(new DbWrapper); db.reset(new DbWrapper{owner_->memory_resource()});
} }
} }
@ -211,7 +211,7 @@ bool DbSlice::Del(DbIndex db_ind, MainIterator it) {
db->stats.inline_keys -= it->first.IsInline(); db->stats.inline_keys -= it->first.IsInline();
db->stats.obj_memory_usage -= (it->first.MallocUsed() + it->second.MallocUsed()); db->stats.obj_memory_usage -= (it->first.MallocUsed() + it->second.MallocUsed());
db->main_table.Erase(it); db->prime_table.Erase(it);
return true; return true;
} }
@ -222,8 +222,8 @@ size_t DbSlice::FlushDb(DbIndex db_ind) {
CHECK(db); CHECK(db);
size_t removed = db->main_table.size(); size_t removed = db->prime_table.size();
db->main_table.Clear(); db->prime_table.Clear();
db->expire_table.Clear(); db->expire_table.Clear();
db->stats.inline_keys = 0; db->stats.inline_keys = 0;
db->stats.obj_memory_usage = 0; db->stats.obj_memory_usage = 0;
@ -281,7 +281,7 @@ bool DbSlice::AddIfNotExist(DbIndex db_ind, string_view key, PrimeValue obj,
auto& db = db_arr_[db_ind]; auto& db = db_arr_[db_ind];
CompactObj co_key{key}; CompactObj co_key{key};
auto [new_entry, success] = db->main_table.Insert(std::move(co_key), std::move(obj)); auto [new_entry, success] = db->prime_table.Insert(std::move(co_key), std::move(obj));
if (!success) if (!success)
return false; // in this case obj won't be moved and will be destroyed during unwinding. return false; // in this case obj won't be moved and will be destroyed during unwinding.
@ -303,7 +303,7 @@ size_t DbSlice::DbSize(DbIndex db_ind) const {
DCHECK_LT(db_ind, db_array_size()); DCHECK_LT(db_ind, db_array_size());
if (IsDbValid(db_ind)) { if (IsDbValid(db_ind)) {
return db_arr_[db_ind]->main_table.size(); return db_arr_[db_ind]->prime_table.size();
} }
return 0; return 0;
} }
@ -409,7 +409,7 @@ pair<MainIterator, ExpireIterator> DbSlice::ExpireIfNeeded(DbIndex db_ind, MainI
db->stats.inline_keys -= it->first.IsInline(); db->stats.inline_keys -= it->first.IsInline();
db->stats.obj_memory_usage -= (it->first.MallocUsed() + it->second.MallocUsed()); db->stats.obj_memory_usage -= (it->first.MallocUsed() + it->second.MallocUsed());
db->main_table.Erase(it); db->prime_table.Erase(it);
return make_pair(MainIterator{}, ExpireIterator{}); return make_pair(MainIterator{}, ExpireIterator{});
} }

View File

@ -144,7 +144,7 @@ class DbSlice {
} }
std::pair<PrimeTable*, ExpireTable*> GetTables(DbIndex id) { std::pair<PrimeTable*, ExpireTable*> GetTables(DbIndex id) {
return std::pair<PrimeTable*, ExpireTable*>(&db_arr_[id]->main_table, return std::pair<PrimeTable*, ExpireTable*>(&db_arr_[id]->prime_table,
&db_arr_[id]->expire_table); &db_arr_[id]->expire_table);
} }
@ -196,11 +196,15 @@ class DbSlice {
using LockTable = absl::flat_hash_map<std::string, IntentLock>; using LockTable = absl::flat_hash_map<std::string, IntentLock>;
struct DbWrapper { struct DbWrapper {
PrimeTable main_table; PrimeTable prime_table;
ExpireTable expire_table; ExpireTable expire_table;
LockTable lock_table; LockTable lock_table;
mutable InternalDbStats stats; mutable InternalDbStats stats;
explicit DbWrapper(std::pmr::memory_resource* mr)
: prime_table(4, detail::PrimeTablePolicy{}, mr) {
}
}; };
std::vector<std::unique_ptr<DbWrapper>> db_arr_; std::vector<std::unique_ptr<DbWrapper>> db_arr_;

View File

@ -1,11 +1,11 @@
// Copyright 2021, Roman Gershman. All rights reserved. // Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms. // See LICENSE for licensing terms.
// //
#include "server/engine_shard_set.h" #include "server/engine_shard_set.h"
extern "C" { extern "C" {
#include "redis/zmalloc.h" #include "redis/zmalloc.h"
} }
#include "base/logging.h" #include "base/logging.h"
@ -52,8 +52,8 @@ bool EngineShard::DbWatchTable::RemoveEntry(WatchQueueMap::iterator it) {
return queue_map.empty(); return queue_map.empty();
} }
EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time) EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap)
: queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), : queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap),
db_slice_(pb->GetIndex(), this) { db_slice_(pb->GetIndex(), this) {
fiber_q_ = fibers::fiber([this, index = pb->GetIndex()] { fiber_q_ = fibers::fiber([this, index = pb->GetIndex()] {
this_fiber::properties<FiberProps>().set_name(absl::StrCat("shard_queue", index)); this_fiber::properties<FiberProps>().set_name(absl::StrCat("shard_queue", index));
@ -85,7 +85,10 @@ void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
CHECK(shard_ == nullptr) << pb->GetIndex(); CHECK(shard_ == nullptr) << pb->GetIndex();
init_zmalloc_threadlocal(); init_zmalloc_threadlocal();
shard_ = new EngineShard(pb, update_db_time);
mi_heap_t* tlh = mi_heap_get_backing();
void* ptr = mi_heap_malloc_aligned(tlh, sizeof(EngineShard), alignof(EngineShard));
shard_ = new (ptr) EngineShard(pb, update_db_time, tlh);
} }
void EngineShard::DestroyThreadLocal() { void EngineShard::DestroyThreadLocal() {
@ -93,7 +96,8 @@ void EngineShard::DestroyThreadLocal() {
return; return;
uint32_t index = shard_->db_slice_.shard_id(); uint32_t index = shard_->db_slice_.shard_id();
delete shard_; shard_->~EngineShard();
mi_free(shard_);
shard_ = nullptr; shard_ = nullptr;
VLOG(1) << "Shard reset " << index; VLOG(1) << "Shard reset " << index;
@ -237,7 +241,7 @@ Transaction* EngineShard::NotifyWatchQueue(WatchQueue* wq) {
// Processes potentially awakened keys and verifies that these are indeed // Processes potentially awakened keys and verifies that these are indeed
// awakened to eliminate false positives. // awakened to eliminate false positives.
// In addition it, optionally removes completed_t from watch queues. // In addition, optionally removes completed_t from the watch queues.
void EngineShard::ProcessAwakened(Transaction* completed_t) { void EngineShard::ProcessAwakened(Transaction* completed_t) {
for (DbIndex index : awakened_indices_) { for (DbIndex index : awakened_indices_) {
DbWatchTable& wt = watched_dbs_[index]; DbWatchTable& wt = watched_dbs_[index];
@ -376,7 +380,7 @@ void EngineShard::GCWatched(const KeyLockArgs& largs) {
} }
// Called from commands like lpush. // Called from commands like lpush.
void EngineShard::AwakeWatched(DbIndex db_index, std::string_view db_key) { void EngineShard::AwakeWatched(DbIndex db_index, string_view db_key) {
auto it = watched_dbs_.find(db_index); auto it = watched_dbs_.find(db_index);
if (it == watched_dbs_.end()) if (it == watched_dbs_.end())
return; return;
@ -384,7 +388,6 @@ void EngineShard::AwakeWatched(DbIndex db_index, std::string_view db_key) {
DbWatchTable& wt = it->second; DbWatchTable& wt = it->second;
DCHECK(!wt.queue_map.empty()); DCHECK(!wt.queue_map.empty());
string tmp;
auto wit = wt.queue_map.find(db_key); auto wit = wt.queue_map.find(db_key);
if (wit == wt.queue_map.end()) if (wit == wt.queue_map.end())

View File

@ -1,4 +1,4 @@
// Copyright 2021, Roman Gershman. All rights reserved. // Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms. // See LICENSE for licensing terms.
// //
@ -14,6 +14,7 @@ extern "C" {
#include "base/string_view_sso.h" #include "base/string_view_sso.h"
#include "core/tx_queue.h" #include "core/tx_queue.h"
#include "core/mi_memory_resource.h"
#include "server/db_slice.h" #include "server/db_slice.h"
#include "util/fibers/fiberqueue_threadpool.h" #include "util/fibers/fiberqueue_threadpool.h"
#include "util/fibers/fibers_ext.h" #include "util/fibers/fibers_ext.h"
@ -53,6 +54,10 @@ class EngineShard {
return db_slice_; return db_slice_;
} }
std::pmr::memory_resource* memory_resource() {
return &mi_resource_;
}
::util::fibers_ext::FiberQueue* GetFiberQueue() { ::util::fibers_ext::FiberQueue* GetFiberQueue() {
return &queue_; return &queue_;
} }
@ -94,7 +99,7 @@ class EngineShard {
bool RemovedWatched(std::string_view key, Transaction* me); bool RemovedWatched(std::string_view key, Transaction* me);
void GCWatched(const KeyLockArgs& lock_args); void GCWatched(const KeyLockArgs& lock_args);
void AwakeWatched(DbIndex db_index, std::string_view key); void AwakeWatched(DbIndex db_index, std::string_view db_key);
bool HasAwakedTransaction() const { bool HasAwakedTransaction() const {
return !awakened_transactions_.empty(); return !awakened_transactions_.empty();
@ -117,7 +122,7 @@ class EngineShard {
sds tmp_str; sds tmp_str;
private: private:
EngineShard(util::ProactorBase* pb, bool update_db_time); EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap);
struct WatchQueue; struct WatchQueue;
@ -129,7 +134,8 @@ class EngineShard {
Transaction* NotifyWatchQueue(WatchQueue* wq); Transaction* NotifyWatchQueue(WatchQueue* wq);
using WatchQueueMap = absl::flat_hash_map<std::string, std::unique_ptr<WatchQueue>>; using WatchQueueMap = absl::flat_hash_map<std::string, std::unique_ptr<WatchQueue>>;
// Watch state per db slice.
// Watch state per db.
struct DbWatchTable { struct DbWatchTable {
WatchQueueMap queue_map; WatchQueueMap queue_map;
@ -137,7 +143,6 @@ class EngineShard {
// they reference key objects in queue_map. // they reference key objects in queue_map.
absl::flat_hash_set<base::string_view_sso> awakened_keys; absl::flat_hash_set<base::string_view_sso> awakened_keys;
// Returns true if queue_map is empty and DbWatchTable can be removed as well. // Returns true if queue_map is empty and DbWatchTable can be removed as well.
bool RemoveEntry(WatchQueueMap::iterator it); bool RemoveEntry(WatchQueueMap::iterator it);
}; };
@ -152,6 +157,7 @@ class EngineShard {
::boost::fibers::fiber fiber_q_; ::boost::fibers::fiber fiber_q_;
TxQueue txq_; TxQueue txq_;
MiMemoryResource mi_resource_;
DbSlice db_slice_; DbSlice db_slice_;
Stats stats_; Stats stats_;