Add reference counting to DbTable. The will help us to implement flushing the tables in parallel with snapshotting
This commit is contained in:
parent
5849d09f73
commit
30cf9541c2
|
@ -249,7 +249,9 @@ class DashTable : public detail::DashTableBase {
|
|||
void IncreaseDepth(unsigned new_depth);
|
||||
void Split(uint32_t seg_id);
|
||||
|
||||
template <typename Cb> void IterateUnique(Cb&& cb);
|
||||
// Segment directory contains multiple segment pointers, some of them pointing to
|
||||
// the same object. IterateDistinct goes over all distinct segments in the table.
|
||||
template <typename Cb> void IterateDistinct(Cb&& cb);
|
||||
|
||||
size_t NextSeg(size_t sid) const {
|
||||
size_t delta = (1u << (global_depth_ - segment_[sid]->local_depth()));
|
||||
|
@ -453,7 +455,7 @@ DashTable<_Key, _Value, Policy>::~DashTable() {
|
|||
std::pmr::polymorphic_allocator<SegmentType> pa(resource);
|
||||
using alloc_traits = std::allocator_traits<decltype(pa)>;
|
||||
|
||||
IterateUnique([&](SegmentType* seg) {
|
||||
IterateDistinct([&](SegmentType* seg) {
|
||||
alloc_traits::destroy(pa, seg);
|
||||
alloc_traits::deallocate(pa, seg, 1);
|
||||
return false;
|
||||
|
@ -517,10 +519,10 @@ void DashTable<_Key, _Value, Policy>::Clear() {
|
|||
return false;
|
||||
};
|
||||
|
||||
IterateUnique(cb);
|
||||
IterateDistinct(cb);
|
||||
size_ = 0;
|
||||
|
||||
// Consider the following case: table with 8 segments overall, 4 unique.
|
||||
// Consider the following case: table with 8 segments overall, 4 distinct.
|
||||
// S1, S1, S1, S1, S2, S3, S4, S4
|
||||
/* This corresponds to the tree:
|
||||
R
|
||||
|
@ -531,7 +533,7 @@ void DashTable<_Key, _Value, Policy>::Clear() {
|
|||
We want to collapse this tree into, say, 2 segment directory.
|
||||
That means we need to keep S1, S2 but delete S3, S4.
|
||||
That means, we need to move representative segments until we reached the desired size
|
||||
and the erase all other unique segments.
|
||||
and then erase all other distinct segments.
|
||||
**********/
|
||||
if (global_depth_ > initial_depth_) {
|
||||
std::pmr::polymorphic_allocator<SegmentType> pa(segment_.get_allocator());
|
||||
|
@ -582,7 +584,7 @@ bool DashTable<_Key, _Value, Policy>::ShiftRight(bucket_iterator it) {
|
|||
|
||||
template <typename _Key, typename _Value, typename Policy>
|
||||
template <typename Cb>
|
||||
void DashTable<_Key, _Value, Policy>::IterateUnique(Cb&& cb) {
|
||||
void DashTable<_Key, _Value, Policy>::IterateDistinct(Cb&& cb) {
|
||||
size_t i = 0;
|
||||
while (i < segment_.size()) {
|
||||
auto* seg = segment_[i];
|
||||
|
|
|
@ -7,7 +7,7 @@ add_library(dragonfly_lib blocking_controller.cc channel_slice.cc command_regist
|
|||
engine_shard_set.cc generic_family.cc hset_family.cc io_mgr.cc
|
||||
list_family.cc main_service.cc rdb_load.cc rdb_save.cc replica.cc
|
||||
snapshot.cc script_mgr.cc server_family.cc
|
||||
set_family.cc string_family.cc tiered_storage.cc
|
||||
set_family.cc string_family.cc table.cc tiered_storage.cc
|
||||
transaction.cc zset_family.cc)
|
||||
|
||||
cxx_link(dragonfly_lib dfly_core dfly_facade redis_lib strings_lib)
|
||||
|
|
|
@ -35,7 +35,7 @@ static_assert(kPrimeSegmentSize == 32288);
|
|||
// 24576
|
||||
static_assert(kExpireSegmentSize == 23528);
|
||||
|
||||
void UpdateStatsOnDeletion(PrimeIterator it, InternalDbStats* stats) {
|
||||
void UpdateStatsOnDeletion(PrimeIterator it, DbTableStats* stats) {
|
||||
size_t value_heap_size = it->second.MallocUsed();
|
||||
stats->inline_keys -= it->first.IsInline();
|
||||
stats->obj_memory_usage -= (it->first.MallocUsed() + value_heap_size);
|
||||
|
@ -108,7 +108,7 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT
|
|||
// choose "randomly" a stash bucket to evict an item.
|
||||
auto bucket_it = eb.stash_buckets[eb.key_hash % kNumStashBuckets];
|
||||
auto last_slot_it = bucket_it;
|
||||
last_slot_it += (PrimeTable::kBucketWidth -1);
|
||||
last_slot_it += (PrimeTable::kBucketWidth - 1);
|
||||
if (!last_slot_it.is_done()) {
|
||||
UpdateStatsOnDeletion(last_slot_it, db_slice_->MutableStats(db_indx_));
|
||||
}
|
||||
|
@ -122,26 +122,11 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT
|
|||
|
||||
#define ADD(x) (x) += o.x
|
||||
|
||||
InternalDbStats& InternalDbStats::operator+=(const InternalDbStats& o) {
|
||||
constexpr size_t kDbSz = sizeof(InternalDbStats);
|
||||
static_assert(kDbSz == 56);
|
||||
|
||||
ADD(inline_keys);
|
||||
ADD(obj_memory_usage);
|
||||
ADD(strval_memory_usage);
|
||||
ADD(listpack_blob_cnt);
|
||||
ADD(listpack_bytes);
|
||||
ADD(external_entries);
|
||||
ADD(external_size);
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
DbStats& DbStats::operator+=(const DbStats& o) {
|
||||
constexpr size_t kDbSz = sizeof(DbStats);
|
||||
static_assert(kDbSz == 88);
|
||||
|
||||
InternalDbStats::operator+=(o);
|
||||
DbTableStats::operator+=(o);
|
||||
|
||||
ADD(key_count);
|
||||
ADD(expire_count);
|
||||
|
@ -165,12 +150,6 @@ SliceEvents& SliceEvents::operator+=(const SliceEvents& o) {
|
|||
|
||||
#undef ADD
|
||||
|
||||
DbSlice::DbWrapper::DbWrapper(std::pmr::memory_resource* mr)
|
||||
: prime_table(4, detail::PrimeTablePolicy{}, mr),
|
||||
expire_table(0, detail::ExpireTablePolicy{}, mr),
|
||||
mcflag_table(0, detail::ExpireTablePolicy{}, mr) {
|
||||
}
|
||||
|
||||
DbSlice::DbSlice(uint32_t index, bool caching_mode, EngineShard* owner)
|
||||
: shard_id_(index), caching_mode_(caching_mode), owner_(owner) {
|
||||
db_arr_.emplace_back();
|
||||
|
@ -200,10 +179,10 @@ auto DbSlice::GetStats() const -> Stats {
|
|||
const auto& db_wrap = *db_arr_[i];
|
||||
DbStats& stats = s.db_stats[i];
|
||||
stats = db_wrap.stats;
|
||||
stats.key_count = db_wrap.prime_table.size();
|
||||
stats.bucket_count = db_wrap.prime_table.bucket_count();
|
||||
stats.expire_count = db_wrap.expire_table.size();
|
||||
stats.table_mem_usage = (db_wrap.prime_table.mem_usage() + db_wrap.expire_table.mem_usage());
|
||||
stats.key_count = db_wrap.prime.size();
|
||||
stats.bucket_count = db_wrap.prime.bucket_count();
|
||||
stats.expire_count = db_wrap.expire.size();
|
||||
stats.table_mem_usage = (db_wrap.prime.mem_usage() + db_wrap.expire.mem_usage());
|
||||
}
|
||||
s.small_string_bytes = CompactObj::GetStats().small_string_bytes;
|
||||
|
||||
|
@ -216,7 +195,7 @@ void DbSlice::Reserve(DbIndex db_ind, size_t key_size) {
|
|||
auto& db = db_arr_[db_ind];
|
||||
DCHECK(db);
|
||||
|
||||
db->prime_table.Reserve(key_size);
|
||||
db->prime.Reserve(key_size);
|
||||
}
|
||||
|
||||
auto DbSlice::Find(DbIndex db_index, string_view key, unsigned req_obj_type) const
|
||||
|
@ -234,18 +213,20 @@ auto DbSlice::Find(DbIndex db_index, string_view key, unsigned req_obj_type) con
|
|||
}
|
||||
|
||||
pair<PrimeIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, string_view key) const {
|
||||
DCHECK(IsDbValid(db_ind));
|
||||
pair<PrimeIterator, ExpireIterator> res;
|
||||
|
||||
if (!IsDbValid(db_ind))
|
||||
return res;
|
||||
|
||||
auto& db = *db_arr_[db_ind];
|
||||
PrimeIterator it = db.prime_table.Find(key);
|
||||
pair<PrimeIterator, ExpireIterator> res(it, ExpireIterator{});
|
||||
res.first = db.prime.Find(key);
|
||||
|
||||
if (!IsValid(it)) {
|
||||
if (!IsValid(res.first)) {
|
||||
return res;
|
||||
}
|
||||
|
||||
if (it->second.HasExpire()) { // check expiry state
|
||||
res = ExpireIfNeeded(db_ind, it);
|
||||
if (res.first->second.HasExpire()) { // check expiry state
|
||||
res = ExpireIfNeeded(db_ind, res.first);
|
||||
}
|
||||
|
||||
if (caching_mode_ && IsValid(res.first)) {
|
||||
|
@ -256,10 +237,10 @@ pair<PrimeIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, string_view
|
|||
}
|
||||
};
|
||||
|
||||
db.prime_table.CVCUponBump(change_cb_.front().first, res.first, bump_cb);
|
||||
db.prime.CVCUponBump(change_cb_.front().first, res.first, bump_cb);
|
||||
}
|
||||
|
||||
res.first = db.prime_table.BumpUp(res.first);
|
||||
res.first = db.prime.BumpUp(res.first);
|
||||
++events_.bumpups;
|
||||
}
|
||||
|
||||
|
@ -308,13 +289,13 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pair<PrimeIterator
|
|||
// the insert operation: twice more efficient.
|
||||
CompactObj co_key{key};
|
||||
|
||||
auto [it, inserted] = db->prime_table.Insert(std::move(co_key), PrimeValue{}, evp);
|
||||
auto [it, inserted] = db->prime.Insert(std::move(co_key), PrimeValue{}, evp);
|
||||
if (inserted) { // new entry
|
||||
db->stats.inline_keys += it->first.IsInline();
|
||||
db->stats.obj_memory_usage += it->first.MallocUsed();
|
||||
|
||||
events_.garbage_collected += db->prime_table.garbage_collected();
|
||||
events_.stash_unloaded = db->prime_table.stash_unloaded();
|
||||
events_.garbage_collected += db->prime.garbage_collected();
|
||||
events_.stash_unloaded = db->prime.stash_unloaded();
|
||||
events_.evicted_keys += evp.evicted();
|
||||
|
||||
it.SetVersion(NextVersion());
|
||||
|
@ -328,7 +309,7 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pair<PrimeIterator
|
|||
DCHECK(IsValid(existing));
|
||||
|
||||
if (existing->second.HasExpire()) {
|
||||
auto expire_it = db->expire_table.Find(existing->first);
|
||||
auto expire_it = db->expire.Find(existing->first);
|
||||
CHECK(IsValid(expire_it));
|
||||
|
||||
// TODO: to implement the incremental update of expiry values using multi-generation
|
||||
|
@ -336,10 +317,10 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pair<PrimeIterator
|
|||
uint64_t delta_ms = now_ms_ - expire_base_[0];
|
||||
|
||||
if (expire_it->second.duration_ms() <= delta_ms) {
|
||||
db->expire_table.Erase(expire_it);
|
||||
db->expire.Erase(expire_it);
|
||||
|
||||
if (existing->second.HasFlag()) {
|
||||
db->mcflag_table.Erase(existing->first);
|
||||
db->mcflag.Erase(existing->first);
|
||||
}
|
||||
|
||||
// Keep the entry but reset the object.
|
||||
|
@ -367,7 +348,7 @@ void DbSlice::ActivateDb(DbIndex db_ind) {
|
|||
void DbSlice::CreateDb(DbIndex index) {
|
||||
auto& db = db_arr_[index];
|
||||
if (!db) {
|
||||
db.reset(new DbWrapper{owner_->memory_resource()});
|
||||
db.reset(new DbTable{owner_->memory_resource()});
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -378,54 +359,53 @@ bool DbSlice::Del(DbIndex db_ind, PrimeIterator it) {
|
|||
|
||||
auto& db = db_arr_[db_ind];
|
||||
if (it->second.HasExpire()) {
|
||||
CHECK_EQ(1u, db->expire_table.Erase(it->first));
|
||||
CHECK_EQ(1u, db->expire.Erase(it->first));
|
||||
}
|
||||
|
||||
if (it->second.HasFlag()) {
|
||||
CHECK_EQ(1u, db->mcflag_table.Erase(it->first));
|
||||
CHECK_EQ(1u, db->mcflag.Erase(it->first));
|
||||
}
|
||||
|
||||
UpdateStatsOnDeletion(it, &db->stats);
|
||||
db->prime_table.Erase(it);
|
||||
db->prime.Erase(it);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
size_t DbSlice::FlushDb(DbIndex db_ind) {
|
||||
auto flush_single = [this](DbIndex id) {
|
||||
auto& db = db_arr_[id];
|
||||
|
||||
CHECK(db);
|
||||
|
||||
size_t removed = db->prime_table.size();
|
||||
db->prime_table.Clear();
|
||||
db->expire_table.Clear();
|
||||
db->mcflag_table.Clear();
|
||||
db->stats = InternalDbStats{};
|
||||
|
||||
return removed;
|
||||
};
|
||||
void DbSlice::FlushDb(DbIndex db_ind) {
|
||||
// TODO: to add preeemptiveness by yielding inside clear.
|
||||
|
||||
if (db_ind != kDbAll) {
|
||||
CHECK_LT(db_ind, db_arr_.size());
|
||||
auto& db = db_arr_[db_ind];
|
||||
auto db_ptr = std::move(db);
|
||||
DCHECK(!db);
|
||||
CreateDb(db_ind);
|
||||
|
||||
return flush_single(db_ind);
|
||||
boost::fibers::fiber([db_ptr = std::move(db_ptr)]() mutable { db_ptr.reset(); }).detach();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
size_t removed = 0;
|
||||
auto all_dbs = std::move(db_arr_);
|
||||
db_arr_.resize(all_dbs.size());
|
||||
for (size_t i = 0; i < db_arr_.size(); ++i) {
|
||||
if (db_arr_[i]) {
|
||||
removed += flush_single(i);
|
||||
if (all_dbs[i]) {
|
||||
CreateDb(i);
|
||||
}
|
||||
}
|
||||
return removed;
|
||||
|
||||
boost::fibers::fiber([all_dbs = std::move(all_dbs)]() mutable {
|
||||
for (auto& db : all_dbs) {
|
||||
db.reset();
|
||||
}
|
||||
}).detach();
|
||||
}
|
||||
|
||||
// Returns true if a state has changed, false otherwise.
|
||||
bool DbSlice::Expire(DbIndex db_ind, PrimeIterator it, uint64_t at) {
|
||||
auto& db = *db_arr_[db_ind];
|
||||
if (at == 0 && it->second.HasExpire()) {
|
||||
CHECK_EQ(1u, db.expire_table.Erase(it->first));
|
||||
CHECK_EQ(1u, db.expire.Erase(it->first));
|
||||
it->second.SetExpire(false);
|
||||
|
||||
return true;
|
||||
|
@ -434,7 +414,7 @@ bool DbSlice::Expire(DbIndex db_ind, PrimeIterator it, uint64_t at) {
|
|||
if (!it->second.HasExpire() && at) {
|
||||
uint64_t delta = at - expire_base_[0]; // TODO: employ multigen expire updates.
|
||||
|
||||
CHECK(db.expire_table.Insert(it->first.AsRef(), ExpirePeriod(delta)).second);
|
||||
CHECK(db.expire.Insert(it->first.AsRef(), ExpirePeriod(delta)).second);
|
||||
it->second.SetExpire(true);
|
||||
|
||||
return true;
|
||||
|
@ -446,9 +426,9 @@ bool DbSlice::Expire(DbIndex db_ind, PrimeIterator it, uint64_t at) {
|
|||
void DbSlice::SetMCFlag(DbIndex db_ind, PrimeKey key, uint32_t flag) {
|
||||
auto& db = *db_arr_[db_ind];
|
||||
if (flag == 0) {
|
||||
db.mcflag_table.Erase(key);
|
||||
db.mcflag.Erase(key);
|
||||
} else {
|
||||
auto [it, inserted] = db.mcflag_table.Insert(std::move(key), flag);
|
||||
auto [it, inserted] = db.mcflag.Insert(std::move(key), flag);
|
||||
if (!inserted)
|
||||
it->second = flag;
|
||||
}
|
||||
|
@ -456,7 +436,7 @@ void DbSlice::SetMCFlag(DbIndex db_ind, PrimeKey key, uint32_t flag) {
|
|||
|
||||
uint32_t DbSlice::GetMCFlag(DbIndex db_ind, const PrimeKey& key) const {
|
||||
auto& db = *db_arr_[db_ind];
|
||||
auto it = db.mcflag_table.Find(key);
|
||||
auto it = db.mcflag.Find(key);
|
||||
return it.is_done() ? 0 : it->second;
|
||||
}
|
||||
|
||||
|
@ -490,7 +470,7 @@ pair<PrimeIterator, bool> DbSlice::AddOrFind(DbIndex db_ind, string_view key, Pr
|
|||
if (expire_at_ms) {
|
||||
new_it->second.SetExpire(true);
|
||||
uint64_t delta = expire_at_ms - expire_base_[0];
|
||||
CHECK(db.expire_table.Insert(new_it->first.AsRef(), ExpirePeriod(delta)).second);
|
||||
CHECK(db.expire.Insert(new_it->first.AsRef(), ExpirePeriod(delta)).second);
|
||||
}
|
||||
|
||||
return res;
|
||||
|
@ -500,7 +480,7 @@ size_t DbSlice::DbSize(DbIndex db_ind) const {
|
|||
DCHECK_LT(db_ind, db_array_size());
|
||||
|
||||
if (IsDbValid(db_ind)) {
|
||||
return db_arr_[db_ind]->prime_table.size();
|
||||
return db_arr_[db_ind]->prime.size();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -508,7 +488,7 @@ size_t DbSlice::DbSize(DbIndex db_ind) const {
|
|||
bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
|
||||
DCHECK(!lock_args.args.empty());
|
||||
|
||||
auto& lt = db_arr_[lock_args.db_index]->lock_table;
|
||||
auto& lt = db_arr_[lock_args.db_index]->trans_locks;
|
||||
bool lock_acquired = true;
|
||||
|
||||
if (lock_args.args.size() == 1) {
|
||||
|
@ -538,7 +518,7 @@ void DbSlice::Release(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
|
|||
if (lock_args.args.size() == 1) {
|
||||
Release(mode, lock_args.db_index, lock_args.args.front(), 1);
|
||||
} else {
|
||||
auto& lt = db_arr_[lock_args.db_index]->lock_table;
|
||||
auto& lt = db_arr_[lock_args.db_index]->trans_locks;
|
||||
uniq_keys_.clear();
|
||||
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
|
||||
auto s = lock_args.args[i];
|
||||
|
@ -554,22 +534,10 @@ void DbSlice::Release(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
|
|||
}
|
||||
}
|
||||
|
||||
void DbSlice::Release(IntentLock::Mode mode, DbIndex db_index, string_view key, unsigned count) {
|
||||
DVLOG(1) << "Release " << IntentLock::ModeName(mode) << " " << count << " for " << key;
|
||||
|
||||
auto& lt = db_arr_[db_index]->lock_table;
|
||||
auto it = lt.find(key);
|
||||
CHECK(it != lt.end()) << key;
|
||||
it->second.Release(mode, count);
|
||||
if (it->second.IsFree()) {
|
||||
lt.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
bool DbSlice::CheckLock(IntentLock::Mode mode, const KeyLockArgs& lock_args) const {
|
||||
DCHECK(!lock_args.args.empty());
|
||||
|
||||
const auto& lt = db_arr_[lock_args.db_index]->lock_table;
|
||||
const auto& lt = db_arr_[lock_args.db_index]->trans_locks;
|
||||
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
|
||||
auto s = lock_args.args[i];
|
||||
auto it = lt.find(s);
|
||||
|
@ -614,7 +582,7 @@ pair<PrimeIterator, ExpireIterator> DbSlice::ExpireIfNeeded(DbIndex db_ind,
|
|||
DCHECK(it->second.HasExpire());
|
||||
auto& db = db_arr_[db_ind];
|
||||
|
||||
auto expire_it = db->expire_table.Find(it->first);
|
||||
auto expire_it = db->expire.Find(it->first);
|
||||
|
||||
CHECK(IsValid(expire_it));
|
||||
|
||||
|
@ -624,9 +592,9 @@ pair<PrimeIterator, ExpireIterator> DbSlice::ExpireIfNeeded(DbIndex db_ind,
|
|||
if (now_ms_ < expire_time)
|
||||
return make_pair(it, expire_it);
|
||||
|
||||
db->expire_table.Erase(expire_it);
|
||||
db->expire.Erase(expire_it);
|
||||
UpdateStatsOnDeletion(it, &db->stats);
|
||||
db->prime_table.Erase(it);
|
||||
db->prime.Erase(it);
|
||||
++events_.expired_keys;
|
||||
|
||||
return make_pair(PrimeIterator{}, ExpireIterator{});
|
||||
|
@ -656,7 +624,7 @@ pair<unsigned, unsigned> DbSlice::DeleteExpired(DbIndex db_ind) {
|
|||
auto cb = [&](ExpireIterator it) {
|
||||
candidates++;
|
||||
if (ExpireTime(it) <= Now()) {
|
||||
auto prime_it = db.prime_table.Find(it->first);
|
||||
auto prime_it = db.prime.Find(it->first);
|
||||
CHECK(!prime_it.is_done());
|
||||
ExpireIfNeeded(db_ind, prime_it);
|
||||
++deleted;
|
||||
|
@ -664,7 +632,7 @@ pair<unsigned, unsigned> DbSlice::DeleteExpired(DbIndex db_ind) {
|
|||
};
|
||||
|
||||
for (unsigned i = 0; i < 10; ++i) {
|
||||
db.expire_cursor = db.expire_table.Traverse(db.expire_cursor, cb);
|
||||
db.expire_cursor = db.expire.Traverse(db.expire_cursor, cb);
|
||||
if (deleted)
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -4,10 +4,8 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <absl/container/flat_hash_map.h>
|
||||
#include <absl/container/flat_hash_set.h>
|
||||
|
||||
#include "core/intent_lock.h"
|
||||
#include "facade/op_status.h"
|
||||
#include "server/common.h"
|
||||
#include "server/table.h"
|
||||
|
@ -20,23 +18,7 @@ namespace dfly {
|
|||
|
||||
using facade::OpResult;
|
||||
|
||||
struct InternalDbStats {
|
||||
// Number of inline keys.
|
||||
uint64_t inline_keys = 0;
|
||||
|
||||
// Object memory usage besides hash-table capacity.
|
||||
// Applies for any non-inline objects.
|
||||
size_t obj_memory_usage = 0;
|
||||
size_t strval_memory_usage = 0;
|
||||
size_t listpack_blob_cnt = 0;
|
||||
size_t listpack_bytes = 0;
|
||||
size_t external_entries = 0;
|
||||
size_t external_size = 0;
|
||||
|
||||
InternalDbStats& operator+=(const InternalDbStats& o);
|
||||
};
|
||||
|
||||
struct DbStats : public InternalDbStats {
|
||||
struct DbStats : public DbTableStats {
|
||||
// number of active keys.
|
||||
size_t key_count = 0;
|
||||
|
||||
|
@ -49,8 +31,8 @@ struct DbStats : public InternalDbStats {
|
|||
// Memory used by dictionaries.
|
||||
size_t table_mem_usage = 0;
|
||||
|
||||
using InternalDbStats::operator+=;
|
||||
using InternalDbStats::operator=;
|
||||
using DbTableStats::operator+=;
|
||||
using DbTableStats::operator=;
|
||||
|
||||
DbStats& operator+=(const DbStats& o);
|
||||
};
|
||||
|
@ -84,8 +66,10 @@ class DbSlice {
|
|||
// Otherwise (string_view is set) then it's a new key that is going to be added to the table.
|
||||
std::variant<PrimeTable::bucket_iterator, std::string_view> change;
|
||||
|
||||
ChangeReq(PrimeTable::bucket_iterator it) : change(it) {}
|
||||
ChangeReq(std::string_view key) : change(key) {}
|
||||
ChangeReq(PrimeTable::bucket_iterator it) : change(it) {
|
||||
}
|
||||
ChangeReq(std::string_view key) : change(key) {
|
||||
}
|
||||
|
||||
const PrimeTable::bucket_iterator* update() const {
|
||||
return std::get_if<PrimeTable::bucket_iterator>(&change);
|
||||
|
@ -171,7 +155,7 @@ class DbSlice {
|
|||
* databases.
|
||||
*
|
||||
*/
|
||||
size_t FlushDb(DbIndex db_ind);
|
||||
void FlushDb(DbIndex db_ind);
|
||||
|
||||
EngineShard* shard_owner() {
|
||||
return owner_;
|
||||
|
@ -184,7 +168,10 @@ class DbSlice {
|
|||
bool Acquire(IntentLock::Mode m, const KeyLockArgs& lock_args);
|
||||
|
||||
void Release(IntentLock::Mode m, const KeyLockArgs& lock_args);
|
||||
void Release(IntentLock::Mode m, DbIndex db_index, std::string_view key, unsigned count);
|
||||
|
||||
void Release(IntentLock::Mode m, DbIndex db_index, std::string_view key, unsigned count) {
|
||||
db_arr_[db_index]->Release(m, key, count);
|
||||
}
|
||||
|
||||
// Returns true if all keys can be locked under m. Does not lock them though.
|
||||
bool CheckLock(IntentLock::Mode m, const KeyLockArgs& lock_args) const;
|
||||
|
@ -198,8 +185,7 @@ class DbSlice {
|
|||
}
|
||||
|
||||
std::pair<PrimeTable*, ExpireTable*> GetTables(DbIndex id) {
|
||||
return std::pair<PrimeTable*, ExpireTable*>(&db_arr_[id]->prime_table,
|
||||
&db_arr_[id]->expire_table);
|
||||
return std::pair<PrimeTable*, ExpireTable*>(&db_arr_[id]->prime, &db_arr_[id]->expire);
|
||||
}
|
||||
|
||||
// Returns existing keys count in the db.
|
||||
|
@ -209,7 +195,7 @@ class DbSlice {
|
|||
void PreUpdate(DbIndex db_ind, PrimeIterator it);
|
||||
void PostUpdate(DbIndex db_ind, PrimeIterator it);
|
||||
|
||||
InternalDbStats* MutableStats(DbIndex db_ind) {
|
||||
DbTableStats* MutableStats(DbIndex db_ind) {
|
||||
return &db_arr_[db_ind]->stats;
|
||||
}
|
||||
|
||||
|
@ -258,22 +244,7 @@ class DbSlice {
|
|||
|
||||
mutable SliceEvents events_; // we may change this even for const operations.
|
||||
|
||||
using LockTable = absl::flat_hash_map<std::string, IntentLock>;
|
||||
|
||||
struct DbWrapper {
|
||||
PrimeTable prime_table;
|
||||
ExpireTable expire_table;
|
||||
DashTable<PrimeKey, uint32_t, detail::ExpireTablePolicy> mcflag_table;
|
||||
|
||||
LockTable lock_table;
|
||||
|
||||
mutable InternalDbStats stats;
|
||||
ExpireTable::cursor expire_cursor;
|
||||
|
||||
explicit DbWrapper(std::pmr::memory_resource* mr);
|
||||
};
|
||||
|
||||
std::vector<std::unique_ptr<DbWrapper>> db_arr_;
|
||||
std::vector<boost::intrusive_ptr<DbTable>> db_arr_;
|
||||
|
||||
// Used in temporary computations in Acquire/Release.
|
||||
absl::flat_hash_set<std::string_view> uniq_keys_;
|
||||
|
|
|
@ -356,6 +356,25 @@ TEST_F(DflyEngineTest, LimitMemory) {
|
|||
}
|
||||
}
|
||||
|
||||
TEST_F(DflyEngineTest, FlushAll) {
|
||||
auto fb0 = pp_->at(0)->LaunchFiber([&] {
|
||||
Run({"flushall"});
|
||||
});
|
||||
|
||||
auto fb1 = pp_->at(1)->LaunchFiber([&] {
|
||||
Run({"select", "2"});
|
||||
|
||||
for (size_t i = 1; i < 100; ++i) {
|
||||
RespExpr resp = Run({"set", "foo", "bar"});
|
||||
ASSERT_EQ(resp, "OK");
|
||||
this_fiber::yield();
|
||||
}
|
||||
});
|
||||
|
||||
fb0.join();
|
||||
fb1.join();
|
||||
}
|
||||
|
||||
// TODO: to test transactions with a single shard since then all transactions become local.
|
||||
// To consider having a parameter in dragonfly engine controlling number of shards
|
||||
// unconditionally from number of cpus. TO TEST BLPOP under multi for single/multi argument case.
|
||||
|
|
|
@ -440,7 +440,7 @@ OpResult<uint32_t> HSetFamily::OpSet(const OpArgs& op_args, string_view key, Cmd
|
|||
auto& db_slice = op_args.shard->db_slice();
|
||||
const auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key);
|
||||
|
||||
InternalDbStats* stats = db_slice.MutableStats(op_args.db_ind);
|
||||
DbTableStats* stats = db_slice.MutableStats(op_args.db_ind);
|
||||
|
||||
robj* hset = nullptr;
|
||||
uint8_t* lp = nullptr;
|
||||
|
@ -525,7 +525,7 @@ OpResult<uint32_t> HSetFamily::OpDel(const OpArgs& op_args, string_view key, Cmd
|
|||
robj* hset = co.AsRObj();
|
||||
unsigned deleted = 0;
|
||||
bool key_remove = false;
|
||||
InternalDbStats* stats = db_slice.MutableStats(op_args.db_ind);
|
||||
DbTableStats* stats = db_slice.MutableStats(op_args.db_ind);
|
||||
|
||||
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
stats->listpack_bytes -= lpBytes((uint8_t*)hset->ptr);
|
||||
|
@ -755,7 +755,7 @@ OpStatus HSetFamily::OpIncrBy(const OpArgs& op_args, string_view key, string_vie
|
|||
auto& db_slice = op_args.shard->db_slice();
|
||||
const auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key);
|
||||
|
||||
InternalDbStats* stats = db_slice.MutableStats(op_args.db_ind);
|
||||
DbTableStats* stats = db_slice.MutableStats(op_args.db_ind);
|
||||
|
||||
robj* hset = nullptr;
|
||||
size_t lpb = 0;
|
||||
|
|
|
@ -145,4 +145,9 @@ TEST_F(RdbTest, ReloadTtl) {
|
|||
EXPECT_LT(990, CheckedInt({"ttl", "key"}));
|
||||
}
|
||||
|
||||
TEST_F(RdbTest, SaveFlush) {
|
||||
Run({"debug", "populate", "1000000"});
|
||||
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
// Copyright 2022, Roman Gershman. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
|
||||
#include "server/table.h"
|
||||
|
||||
#include "base/logging.h"
|
||||
|
||||
namespace dfly {
|
||||
|
||||
#define ADD(x) (x) += o.x
|
||||
|
||||
DbTableStats& DbTableStats::operator+=(const DbTableStats& o) {
|
||||
constexpr size_t kDbSz = sizeof(DbTableStats);
|
||||
static_assert(kDbSz == 56);
|
||||
|
||||
ADD(inline_keys);
|
||||
ADD(obj_memory_usage);
|
||||
ADD(strval_memory_usage);
|
||||
ADD(listpack_blob_cnt);
|
||||
ADD(listpack_bytes);
|
||||
ADD(external_entries);
|
||||
ADD(external_size);
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
DbTable::DbTable(std::pmr::memory_resource* mr)
|
||||
: prime(4, detail::PrimeTablePolicy{}, mr),
|
||||
expire(0, detail::ExpireTablePolicy{}, mr),
|
||||
mcflag(0, detail::ExpireTablePolicy{}, mr) {
|
||||
}
|
||||
|
||||
DbTable::~DbTable() {
|
||||
}
|
||||
|
||||
void DbTable::Clear() {
|
||||
prime.size();
|
||||
prime.Clear();
|
||||
expire.Clear();
|
||||
mcflag.Clear();
|
||||
stats = DbTableStats{};
|
||||
}
|
||||
|
||||
void DbTable::Release(IntentLock::Mode mode, std::string_view key, unsigned count) {
|
||||
DVLOG(1) << "Release " << IntentLock::ModeName(mode) << " " << count << " for " << key;
|
||||
|
||||
auto it = trans_locks.find(key);
|
||||
CHECK(it != trans_locks.end()) << key;
|
||||
it->second.Release(mode, count);
|
||||
if (it->second.IsFree()) {
|
||||
trans_locks.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace dfly
|
|
@ -4,8 +4,14 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include "server/detail/table.h"
|
||||
#include <absl/container/flat_hash_map.h>
|
||||
|
||||
#include <boost/smart_ptr/intrusive_ptr.hpp>
|
||||
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
|
||||
|
||||
#include "core/expire_period.h"
|
||||
#include "core/intent_lock.h"
|
||||
#include "server/detail/table.h"
|
||||
|
||||
namespace dfly {
|
||||
|
||||
|
@ -28,4 +34,41 @@ inline bool IsValid(ExpireIterator it) {
|
|||
return !it.is_done();
|
||||
}
|
||||
|
||||
struct DbTableStats {
|
||||
// Number of inline keys.
|
||||
uint64_t inline_keys = 0;
|
||||
|
||||
// Object memory usage besides hash-table capacity.
|
||||
// Applies for any non-inline objects.
|
||||
size_t obj_memory_usage = 0;
|
||||
size_t strval_memory_usage = 0;
|
||||
size_t listpack_blob_cnt = 0;
|
||||
size_t listpack_bytes = 0;
|
||||
size_t external_entries = 0;
|
||||
size_t external_size = 0;
|
||||
|
||||
DbTableStats& operator+=(const DbTableStats& o);
|
||||
};
|
||||
|
||||
using LockTable = absl::flat_hash_map<std::string, IntentLock>;
|
||||
|
||||
// A single Db table that represents a table that can be chosen with "SELECT" command.
|
||||
struct DbTable : boost::intrusive_ref_counter<DbTable, boost::thread_unsafe_counter> {
|
||||
PrimeTable prime;
|
||||
ExpireTable expire;
|
||||
DashTable<PrimeKey, uint32_t, detail::ExpireTablePolicy> mcflag;
|
||||
|
||||
// Contains transaction locks
|
||||
LockTable trans_locks;
|
||||
|
||||
mutable DbTableStats stats;
|
||||
ExpireTable::cursor expire_cursor;
|
||||
|
||||
explicit DbTable(std::pmr::memory_resource* mr);
|
||||
~DbTable();
|
||||
|
||||
void Clear();
|
||||
void Release(IntentLock::Mode mode, std::string_view key, unsigned count);
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
Loading…
Reference in New Issue