Upon full segment try unloading stash buckets back to regular ones

This commit is contained in:
Roman Gershman 2022-05-12 13:43:03 +03:00
parent 4be8c25711
commit 2b553535b2
9 changed files with 170 additions and 72 deletions

2
helio

@ -1 +1 @@
Subproject commit 0420a22a085f09b01ff9a0849f70779492988039
Subproject commit 84c11521f7df58427b3a4e1706518babaff06bef

View File

@ -47,6 +47,7 @@ class DashTable : public detail::DashTableBase {
public:
using Key_t = _Key;
using Value_t = _Value;
using Segment_t = SegmentType;
//! Number of "official" buckets that are used to position a key. In other words, does not include
//! stash buckets.
@ -75,29 +76,7 @@ class DashTable : public detail::DashTableBase {
: owner_(me), seg_id_(seg_id), bucket_id_(bid), slot_id_(sid) {
}
void FindValid() {
if constexpr (IsSingleBucket) {
const auto& b = owner_->segment_[seg_id_]->GetBucket(bucket_id_);
uint32_t mask = b.GetBusy() >> slot_id_;
if (mask) {
int slot = __builtin_ctz(mask);
slot_id_ += slot;
return;
}
} else {
while (seg_id_ < owner_->segment_.size()) {
auto seg_it = owner_->segment_[seg_id_]->FindValidStartingFrom(bucket_id_, slot_id_);
if (seg_it.found()) {
bucket_id_ = seg_it.index;
slot_id_ = seg_it.slot;
return;
}
seg_id_ = owner_->NextSeg(seg_id_);
bucket_id_ = slot_id_ = 0;
}
}
owner_ = nullptr;
}
void FindValid();
public:
using iterator_category = std::forward_iterator_tag;
@ -197,7 +176,7 @@ class DashTable : public detail::DashTableBase {
unsigned slot_id() const {
return slot_id_;
}
unsigned segment_id() const {
return seg_id_;
}
@ -223,22 +202,23 @@ class DashTable : public detail::DashTableBase {
return true;
}
void RecordSplit() {}
/*
/// Required interface in case can_gc is true
// Returns number of garbage collected items deleted. 0 - means nothing has been
// deleted.
unsigned GarbageCollect(const EvictionBuckets& eb, DashTable* me) const {
return 0;
void RecordSplit(SegmentType* segment) {
}
/*
/// Required interface in case can_gc is true
// Returns number of garbage collected items deleted. 0 - means nothing has been
// deleted.
unsigned GarbageCollect(const EvictionBuckets& eb, DashTable* me) const {
return 0;
}
// Required interface in case can_gc is true
// returns number of items evicted from the table.
// 0 means - nothing has been evicted.
unsigned Evict(const EvictionBuckets& eb, DashTable* me) {
return 0;
}
*/
// Required interface in case can_gc is true
// returns number of items evicted from the table.
// 0 means - nothing has been evicted.
unsigned Evict(const EvictionBuckets& eb, DashTable* me) {
return 0;
}
*/
};
DashTable(size_t capacity_log = 1, const Policy& policy = Policy{},
@ -288,6 +268,11 @@ class DashTable : public detail::DashTableBase {
using Base::size;
using Base::unique_segments;
// Direct access to the segment for debugging purposes.
Segment_t* GetSegment(unsigned segment_id) {
return segment_[segment_id];
}
template <typename U> uint64_t DoHash(const U& k) const {
return policy_.HashFn(k);
}
@ -341,8 +326,17 @@ class DashTable : public detail::DashTableBase {
void Clear();
uint64_t garbage_collected() const { return garbage_collected_;}
uint64_t evicted() const { return evicted_;}
uint64_t garbage_collected() const {
return garbage_collected_;
}
uint64_t evicted() const {
return evicted_;
}
uint64_t stash_unloaded() const {
return stash_unloaded_;
}
private:
template <typename U, typename V, typename EvictionPolicy>
@ -367,7 +361,46 @@ class DashTable : public detail::DashTableBase {
uint64_t garbage_collected_ = 0;
uint64_t evicted_ = 0;
};
uint64_t stash_unloaded_ = 0;
}; // DashTable
/**
_____ _ _ _ _
|_ _| | | | | | | (_)
| | _ __ ___ _ __ | | ___ _ __ ___ ___ _ __ | |_ __ _| |_ _ ___ _ __
| | | '_ ` _ \| '_ \| |/ _ \ '_ ` _ \ / _ \ '_ \| __/ _` | __| |/ _ \| '_ \
_| |_| | | | | | |_) | | __/ | | | | | __/ | | | || (_| | |_| | (_) | | | |
|_____|_| |_| |_| .__/|_|\___|_| |_| |_|\___|_| |_|\__\__,_|\__|_|\___/|_| |_|
| |
|_|
**/
template <typename _Key, typename _Value, typename Policy>
template <bool IsConst, bool IsSingleBucket>
void DashTable<_Key, _Value, Policy>::Iterator<IsConst, IsSingleBucket>::FindValid() {
if constexpr (IsSingleBucket) {
const auto& b = owner_->segment_[seg_id_]->GetBucket(bucket_id_);
uint32_t mask = b.GetBusy() >> slot_id_;
if (mask) {
int slot = __builtin_ctz(mask);
slot_id_ += slot;
return;
}
} else {
while (seg_id_ < owner_->segment_.size()) {
auto seg_it = owner_->segment_[seg_id_]->FindValidStartingFrom(bucket_id_, slot_id_);
if (seg_it.found()) {
bucket_id_ = seg_it.index;
slot_id_ = seg_it.slot;
return;
}
seg_id_ = owner_->NextSeg(seg_id_);
bucket_id_ = slot_id_ = 0;
}
}
owner_ = nullptr;
}
template <typename _Key, typename _Value, typename Policy>
DashTable<_Key, _Value, Policy>::DashTable(size_t capacity_log, const Policy& policy,
@ -593,9 +626,10 @@ auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, Evictio
return std::make_pair(iterator{this, seg_id, it.index, it.slot}, false);
}
// At this point we must split the segment.
// try garbage collect or evict.
if constexpr (ev.can_evict || ev.can_gc) {
// Try eviction.
// Try gc.
uint8_t bid[2];
SegmentType::FillProbeArray(key_hash, bid);
EvictionBuckets buckets;
@ -617,6 +651,13 @@ auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, Evictio
continue;
}
auto hash_fn = [this](const auto& k) { return policy_.HashFn(k); };
unsigned moved = target->UnloadStash(hash_fn);
if (moved > 0) {
stash_unloaded_ += moved;
continue;
}
// We evict only if our policy says we can not grow
if constexpr (ev.can_evict) {
bool can_grow = ev.CanGrow(*this);
@ -641,8 +682,8 @@ auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, Evictio
assert(seg_id < segment_.size() && segment_[seg_id] == target);
}
ev.RecordSplit(target);
Split(seg_id);
ev.RecordSplit();
}
return std::make_pair(iterator{}, false);
@ -674,9 +715,9 @@ void DashTable<_Key, _Value, Policy>::Split(uint32_t seg_id) {
SegmentType* target = alloc.allocate(1);
alloc.construct(target, source->local_depth() + 1);
auto cb = [this](const auto& k) { return policy_.HashFn(k); };
auto hash_fn = [this](const auto& k) { return policy_.HashFn(k); };
source->Split(std::move(cb), target); // increases the depth.
source->Split(std::move(hash_fn), target); // increases the depth.
++unique_segments_;
for (size_t i = start_idx + chunk_size / 2; i < start_idx + chunk_size; ++i) {

View File

@ -474,6 +474,10 @@ template <typename _Key, typename _Value, typename Policy = DefaultSegmentPolicy
// Finds a valid entry going from specified indices up.
Iterator FindValidStartingFrom(unsigned bid, unsigned slot) const;
// Tries to move stash entries back to their normal buckets (exact or neighour).
// Returns number of entries that succeeded to unload.
template <typename HFunc> unsigned UnloadStash(HFunc&& hfunc);
private:
static constexpr uint8_t kNanBid = 0xFF;
using SlotId = typename BucketType::SlotId;
@ -1229,6 +1233,7 @@ bool Segment<Key, Value, Policy>::TraverseLogicalBucket(uint8_t bid, HashFn&& hf
uint8_t nid = (bid + 1) % kNumBuckets;
const Bucket& next = bucket_[nid];
// check for probing entries in the next bucket, i.e. those that should reside in b.
if (next.GetProbe(true)) {
next.ForEachSlot([&](SlotId slot, bool probe) {
@ -1285,5 +1290,32 @@ auto Segment<Key, Value, Policy>::FindValidStartingFrom(unsigned bid, unsigned s
return Iterator{};
}
template <typename Key, typename Value, typename Policy>
template <typename HFunc>
unsigned Segment<Key, Value, Policy>::UnloadStash(HFunc&& hfunc) {
unsigned moved = 0;
for (unsigned i = 0; i < STASH_BUCKET_NUM; ++i) {
unsigned bid = kNumBuckets + i;
Bucket& stash = bucket_[bid];
uint32_t invalid_mask = 0;
auto cb = [&](unsigned slot, bool probe) {
auto& key = Key(bid, slot);
Hash_t hash = hfunc(key);
bool res = TryMoveFromStash(i, slot, hash);
if (res) {
++moved;
invalid_mask |= (1u << slot);
}
};
stash.ForEachSlot(cb);
stash.ClearSlots(invalid_mask);
}
return moved;
}
} // namespace detail
} // namespace dfly

View File

@ -474,7 +474,7 @@ struct TestEvictionPolicy {
return tbl.bucket_count() < max_capacity;
}
void RecordSplit() {}
void RecordSplit(Dash64::Segment_t*) {}
unsigned Evict(const Dash64::EvictionBuckets& eb, Dash64* me) const {
if (!evict_enabled)

View File

@ -373,7 +373,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
}
if (ec && !FiberSocketBase::IsConnClosed(ec)) {
LOG(WARNING) << "Socket error " << ec;
LOG(WARNING) << "Socket error " << ec << " " << ec.message();
}
--stats->num_conns;

View File

@ -45,45 +45,44 @@ class PrimeEvictionPolicy {
db_indx_ = db_indx;
}
void RecordSplit() {
void RecordSplit(PrimeTable::Segment_t* segment) {
mem_budget_ -= PrimeTable::kSegBytes;
DVLOG(1) << "split: " << segment->SlowSize() << "/" << segment->capacity();
}
bool CanGrow(const PrimeTable& tbl) const {
return mem_budget_ > int64_t(PrimeTable::kSegBytes);
}
unsigned GarbageCollect(const PrimeTable::EvictionBuckets& eb, PrimeTable* me) {
unsigned res = 0;
for (unsigned i = 0; i < ABSL_ARRAYSIZE(eb.iter); ++i) {
auto it = eb.iter[i];
for (; !it.is_done(); ++it) {
if (it->second.HasExpire()) {
auto [prime_it, exp_it] = db_slice_->ExpireIfNeeded(db_indx_, it);
if (prime_it.is_done())
++res;
}
}
}
gc_count_ += res;
return res;
}
unsigned GarbageCollect(const PrimeTable::EvictionBuckets& eb, PrimeTable* me);
int64_t mem_budget() const {
return mem_budget_;
}
unsigned gc_count() const {
return gc_count_;
}
private:
DbSlice* db_slice_;
int64_t mem_budget_;
DbIndex db_indx_;
unsigned gc_count_ = 0;
};
unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::EvictionBuckets& eb,
PrimeTable* me) {
unsigned res = 0;
for (unsigned i = 0; i < ABSL_ARRAYSIZE(eb.iter); ++i) {
auto it = eb.iter[i];
for (; !it.is_done(); ++it) {
if (it->second.HasExpire()) {
auto [prime_it, exp_it] = db_slice_->ExpireIfNeeded(db_indx_, it);
if (prime_it.is_done())
++res;
}
}
}
return res;
}
} // namespace
#define ADD(x) (x) += o.x
@ -118,11 +117,12 @@ DbStats& DbStats::operator+=(const DbStats& o) {
}
SliceEvents& SliceEvents::operator+=(const SliceEvents& o) {
static_assert(sizeof(SliceEvents) == 24, "You should update this function with new fields");
static_assert(sizeof(SliceEvents) == 32, "You should update this function with new fields");
ADD(evicted_keys);
ADD(expired_keys);
ADD(garbage_collected);
ADD(stash_unloaded);
return *this;
}
@ -258,7 +258,8 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pair<PrimeIterator
db->stats.inline_keys += it->first.IsInline();
db->stats.obj_memory_usage += it->first.MallocUsed();
events_.garbage_collected += evp.gc_count();
events_.garbage_collected += db->prime_table.garbage_collected();
events_.stash_unloaded = db->prime_table.stash_unloaded();
it.SetVersion(NextVersion());
memory_budget_ = evp.mem_budget();

View File

@ -60,6 +60,7 @@ struct SliceEvents {
size_t evicted_keys = 0;
size_t expired_keys = 0;
size_t garbage_collected = 0;
size_t stash_unloaded = 0;
SliceEvents& operator+=(const SliceEvents& o);
};

View File

@ -567,7 +567,8 @@ tcp_port:)";
append("instantaneous_output_kbps", -1);
append("rejected_connections", -1);
append("expired_keys", m.events.expired_keys);
append("gc_keys", m.events.garbage_collected);
append("gc_entries", m.events.garbage_collected);
append("stash_unloaded", m.events.stash_unloaded);
append("keyspace_hits", -1);
append("keyspace_misses", -1);
append("total_reads_processed", m.conn_stats.io_read_cnt);

22
tests/stress_shutdown.sh Executable file
View File

@ -0,0 +1,22 @@
#!/bin/bash
while true; do
./dragonfly --vmodule=accept_server=1,listener_interface=1 --logbuflevel=-1 &
DRAGON_PID=$!
echo "dragonfly pid $DRAGON_PID"
sleep 0.5
memtier_benchmark -p 6379 --ratio 1:0 -n 100000 --threads=2 --expiry-range=15-25 --distinct-client-seed \
--hide-histogram 2> /dev/null > /dev/null &
MEMT_ID=$!
echo "memtier pid $MEMT_ID"
echo "Running.............."
sleep 5
echo "killing dragonfly"
kill $DRAGON_PID
wait $DRAGON_PID
done