Add more documentation about dashtable. Tune expiry heuristics a bit

This commit is contained in:
Roman Gershman 2022-05-21 20:35:19 +03:00
parent 543979875a
commit c7d1893478
12 changed files with 245 additions and 48 deletions

4
doc/dashsegment.svg Normal file

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 13 KiB

129
doc/dashtable.md Normal file
View File

@ -0,0 +1,129 @@
# Dashtable in Dragonfly
Dashtable is very important data structure in Dragonfly. This document explain
how it fits inside the engine.
Each selectable database holds a primary dashtable that contains all its entries. Another instance of Dashtable holds an optional expiry information, for keys that have TTL expiry on them. Dashtable is equivalent to Redis dictionary but have some wonderful properties that make Dragonfly memory efficient in various situations.
![Database Overview](./db.svg)
## Redis dictionary
*“All problems in computer science can be solved by another level of indirection”*
This section is a brief refresher of how redis dictionary (RD) is implemented.
We shamelessly "borrowed" a diagram from [this blogpost](https://codeburst.io/a-closer-look-at-redis-dictionary-implementation-internals-3fd815aae535), so if you want a deep-dive, you can read the original article.
Each `RD` is in fact two hash-tables (see `ht` field in the diagram below). The second instance is used for incremental resizes of the dictionary.
Each hash-table `dictht` is implemented as a [classic hashtable with separate chaining](https://en.wikipedia.org/wiki/Hash_table#Separate_chaining). `dictEntry` is the link-list entry that wraps each key/value pair inside the table. Each dictEntry has three pointers and takes up 24 bytes of space. The bucket array of `dictht` is resized at powers of two, so usually its utilization is in [50, 100] range.
![RD structure](https://miro.medium.com/max/1400/1*gNc8VzCknWRxXTBP9cVEHQ.png)
<br>
Lets estimate the overhead of `dictht` table inside RD.<br>
*Case 1*: it has `N` items at 100% load factor, in other words, buckets count equals to number of items. Each bucket holds a pointer to dictEntry, i.e. it's 8 bytes. In total we need: $8N + 24N = 32N$ bytes overhead per item. <br>
*Case 2*: `N` items at 75% load factor, in other words, the number of buckets is 1.33 higher than number of items. In total we need: $N\*1.33\*8 + 24N \approx 34N$ bytes overhead per item. <br>
*Case 3*: `N` items at 50% load factor, say right after table growth. Number of buckets is twice the nujmber of items, hence we need $N\*2\*8 + 24N = 40N$ bytes per item.
As you can see, a `dictht` table requires 32-40 bytes per item inserted.
Now lets take incremental growth into account. When `ht[0]` is full (i.e. RD needs to migrate data to a bigger table), it will instantiate a second temporary instance `ht[1]` that will hold additional 2*N buckets. Both instances will live in parallel until all data is migrated to `ht[1]` and then `ht[0]` bucket array will be deleted. All this complexity is hidden from a user by well engineered API of RD. Lets combine case 3 and case 1 to analyze memory spike at this point: `ht[0]` holds `N` items and it is fully utilized. `ht[1]` is allocated with `2N` buckets. Overall, the overhead is $32N + 16N=48N$ bytes or temporary spike of $16N$ bytes.
To summarize, RD requires between 32 and 40 bytes per item with occasional spike of another 16 bytes per item.
## Dash table
[Dashtable](https://arxiv.org/abs/2003.07302) is an evolution of an algorithm from 1979 called [extendible hashing](https://en.wikipedia.org/wiki/Extendible_hashing).
Similarly to a classic hashtable, dashtable (DT) also holds an array of pointers at front. However, unlike with classic tables, it points to `segments` and not to linked lists of items. Each `segment` is, in fact, a mini-hashtable of constant size. The front array of pointers to segments is called `directory`. Similarly to a classic table, when an item is inserted into a DT, it first determines based on items hashvalue the segment to which the item must enter. Then it tries to insert the item into that segment. The segment is implemented as a hashtable with open-addresed hashing scheme and as I said - constant in size. If an item was successfully inserted we finished, otherwise, the segment is "full" and needs splitting. The DT splits the full segment in 2, and the additional segment is added to the directory. Then it tries to reinsert the item again. To summarize, the classic chaining hash-table is built upon a dynamic array of linked-lists while dashtable is more like a dynamic array of flat hash-tables of constant size.
![Dashtable Diagram](./dashtable.svg)
In the diagram above you can see how dashtable looks like. Each segment is comprised of `N` buckets. For example, in our implementation a dashtable has 60 buckets per segment (it's a compile-time parameter that can be configured).
### Segment zoom-in
Below you can see the diagram of a segment. It comprised of regular buckets and stash buckets. Each bucket has `k` slots and each slot can host a key-value entry.
![Segment](./dashsegment.svg)
In our implementation, each segment has 56 regular buckets, 4 stash buckets and each bucket contains 14 slots. Overall, each dashtable segment has capacity to host 840 entries. When an item is inserted into a segment, DT first determines its home bucket based on item's hash value. The home bucket is one of 56 regular buckets that reside in the table. Each bucket has 14 available slots and the item can reside in any free slot. If the home bucket is full,
then DT tries to insert to the regular bucket on the right. And if that bucket is also full,
it tries to insert into one of 4 stash buckets. These are kept deliberately aside to gather
spillovers from the regular buckets. The segment is "full" when the insertion fails, i.e. the home bucket and the neighbour bucket and all 4 stash buckets are full. Please note that segment is not necessary at full capacity, it can be that other buckets are not yet full, but unfortunately, that item can go only into these 6 buckets,
so the segment contents must be split. In case of split event, DT creates a new segment,
adds it to the directory and the items from the old segment partly moved to the new one,
and partly rebalanced within the old one. Only two segments are touched during the split event.
Now we can explain why seemingly similar data-structure has an advantage over a classic hashtable
in terms of memory and cpu.
1. Memory: we need `~N/840` entries or `8N/840` bytes in dashtable directory to host N items on average.
Basically, the overhead of directory almost disappears in DT. Say for 1M items we will
need ~1200 segments or 9600 bytes for the main array. That's in contrast to RD where
we will need a solid `8N` bucket array overhead - no matter what.
For 1M items, it will obviously be 8MB. In addition, dash segments use open addressing collision
scheme with probing, that means that they do not need anything like `dictEntry`.
Dashtable uses lots of tricks to make its own metadata small. In our implementation,
the average `tax` per entry is short of 20 bits compared to 64 bits in RD (dictEntry.next).
Also, DT incremental resize is done when a single segment is full.
It always adds constant space per split event. Assuming that key/pair entry is two 8
byte pointers like in RD, we can say that DT requires $16N + (8N/840) + 2.5N + O(1) \approx 19N$
bytes at 100% utilization. This number is very close to optimum.
In unlikely case when all segments grow together by factor of 2, i.e.
DT is at 50% of utilization we will need $38N$ bytes. In practice, each segment grows independently from others,
so the table has smooth memory usage of 20-30 bytes per item.
2. Speed: RD requires an allocation for dictEntry per insertion and deallocation per deletion. In addition, RD uses chaining, which is cache unfriendly on modern hardware. There is a consensus in engineering and research communities that classic chaining schemes are much slower tha open addressing alternatives.
Having said that, DT also needs to go through a single level of indirection when
fetching a segment pointer. However, DT's directory size is relatively small:
in the example above, all 9K could resize in L1 cache. Once the segment is determined,
the rest of the insertion, however, is very fast an mostly operates on 1-3 memory cache lines.
Finally, during resizes, RD requires to allocate a bucket array of size `2N`.
That could be time consuming - imagine an allocation of 100M buckets for example.
DT on the other hand requires an allocation of constant size per new segment.
## Comparison
There are many other improvements in dragonfly that save memory besides DT. I will not be
able to cover them all here. The results below show the final result as of May 2022.
To compare RD vs DT I use an internal debugging command "debug populate" that quickly fills
both datastores with small data. It just saves time and gives more consistent results compared to memtier_benchmark.
It also shows the raw speed at which each dictionary gets filled without intermediary factors
like networking, parsing etc.
I deliberately fill datasets with a small data to show how overhead of metadata differs between two data structures.
I run "debug populate 20000000" (20M) on both engines on my home machine "AMD Ryzen 5 3400G with 8 cores".
### Single-threaded scenario
| | Dragonfly | Redis 6 |
|-------------|-----------|---------|
| Time | 10.8s | 16.0s |
| Memory used | 1GB | 1.73G |
When looking at Redis6 "info memory" stats, you can see that `used_memory_overhead` field equals
to `1.0GB`. That means that out of 1.73GB bytes allocated, a whooping 1.0GB is used for
the metadata. For small data use-cases the cost of metadata in Redis is larger than the data itself.
### Multi-threaded scenario
Now I run Dragonfly on all 8 cores.
| | Dragonfly | Redis 6 |
|-------------|-----------|---------|
| Time | 2.43s | 16.0s |
| Memory used | 896MB | 1.73G |
Due to shared-nothing architecture, dragonfly maintains a dashtable per thread with its own slice of data.
Each thread fills 1/8th of 20M range it owns - and it much faster, almost 8 times faster.
You can see that the total usage is even smaller, because now we maintain smaller tables in each
thread (it's not always the case though - we could get slightly worse memory usage than with
single-threaded case ,depends where we stand compared to hash table utilization).
*TODO - more benchmarks.*
<br>
<em> All diagrams in this doc are created in [drawio app](https://app.diagrams.net/) <em>

4
doc/dashtable.svg Normal file

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 16 KiB

4
doc/db.svg Normal file

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 7.4 KiB

View File

@ -72,14 +72,24 @@ class DashTable : public detail::DashTableBase {
using cursor = detail::DashCursor;
struct HotspotBuckets {
static constexpr size_t kNumBuckets = 2 + Policy::kStashBucketNum;
static constexpr size_t kRegularBuckets = 4;
static constexpr size_t kNumBuckets = kRegularBuckets + Policy::kStashBucketNum;
bucket_iterator regular_buckets[2];
bucket_iterator stash_buckets[Policy::kStashBucketNum];
struct ByType {
bucket_iterator regular_buckets[kRegularBuckets];
bucket_iterator stash_buckets[Policy::kStashBucketNum];
};
union Probes {
ByType by_type;
bucket_iterator arr[kNumBuckets];
Probes() : arr() {}
} probes;
// id must be in the range [0, kNumBuckets).
bucket_iterator at(unsigned id) const {
return id < 2 ? regular_buckets[id] : stash_buckets[id - 2];
return probes.arr[id];
}
// key_hash of a key that we try to insert.
@ -418,6 +428,10 @@ class DashTable<_Key, _Value, Policy>::Iterator {
template <typename _Key, typename _Value, typename Policy>
template <bool IsConst, bool IsSingleBucket>
void DashTable<_Key, _Value, Policy>::Iterator<IsConst, IsSingleBucket>::Seek2Occupied() {
if (owner_ == nullptr)
return;
assert(seg_id_ < owner_->segment_.size());
if constexpr (IsSingleBucket) {
const auto& b = owner_->segment_[seg_id_]->GetBucket(bucket_id_);
uint32_t mask = b.GetBusy() >> slot_id_;
@ -709,15 +723,18 @@ auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, Evictio
// try garbage collect or evict.
if constexpr (EvictionPolicy::can_evict || EvictionPolicy::can_gc) {
// Try gc.
uint8_t bid[2];
uint8_t bid[HotspotBuckets::kRegularBuckets];
SegmentType::FillProbeArray(key_hash, bid);
HotspotBuckets buckets;
buckets.key_hash = key_hash;
buckets.regular_buckets[0] = bucket_iterator{this, seg_id, bid[0], 0};
buckets.regular_buckets[1] = bucket_iterator{this, seg_id, bid[1], 0};
for (unsigned j = 0; j < HotspotBuckets::kRegularBuckets; ++j) {
buckets.probes.by_type.regular_buckets[j] = bucket_iterator{this, seg_id, bid[j]};
}
for (unsigned i = 0; i < Policy::kStashBucketNum; ++i) {
buckets.stash_buckets[i] = bucket_iterator{this, seg_id, uint8_t(kLogicalBucketNum + i), 0};
buckets.probes.by_type.stash_buckets[i] =
bucket_iterator{this, seg_id, uint8_t(kLogicalBucketNum + i), 0};
}
// The difference between gc and eviction is that gc can be applied even if
@ -726,8 +743,18 @@ auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, Evictio
if constexpr (EvictionPolicy::can_gc) {
unsigned res = ev.GarbageCollect(buckets, this);
garbage_collected_ += res;
if (res)
if (res) {
// We succeeded to gc. Lets continue with the momentum.
// In terms of API abuse it's an awful hack, just to see if it works.
/*unsigned start = (bid[HotspotBuckets::kNumBuckets - 1] + 1) % kLogicalBucketNum;
for (unsigned i = 0; i < HotspotBuckets::kNumBuckets; ++i) {
uint8_t id = (start + i) % kLogicalBucketNum;
buckets.probes.arr[i] = bucket_iterator{this, seg_id, id};
}
garbage_collected_ += ev.GarbageCollect(buckets, this);
*/
continue;
}
}
auto hash_fn = [this](const auto& k) { return policy_.HashFn(k); };

View File

@ -477,9 +477,11 @@ template <typename _Key, typename _Value, typename Policy = DefaultSegmentPolicy
// fill bucket ids that may be used probing for this key_hash.
// The order is: exact, neighbour buckets.
static void FillProbeArray(Hash_t key_hash, uint8_t dest[2]) {
dest[0] = BucketIndex(key_hash);
dest[1] = NextBid(dest[0]);
static void FillProbeArray(Hash_t key_hash, uint8_t dest[4]) {
dest[1] = BucketIndex(key_hash);
dest[0] = PrevBid(dest[1]);
dest[2] = NextBid(dest[1]);
dest[3] = NextBid(dest[2]);
}
template <typename U, typename Pred> Iterator FindIt(U&& key, Hash_t key_hash, Pred&& cf) const;

View File

@ -532,7 +532,7 @@ struct TestEvictionPolicy {
if (!evict_enabled)
return 0;
auto it = hotb.regular_buckets[0];
auto it = hotb.probes.by_type.regular_buckets[0];
unsigned res = 0;
for (; !it.is_done(); ++it) {
LOG(INFO) << "Deleting " << it->first;
@ -586,7 +586,7 @@ TEST_F(DashTest, Eviction) {
while (!dt_.GetSegment(0)->GetBucket(0).IsFull()) {
try {
dt_.Insert(num++, 0, ev);
} catch(bad_alloc&) {
} catch (bad_alloc&) {
}
}
@ -594,7 +594,7 @@ TEST_F(DashTest, Eviction) {
keys.clear();
uint64_t last_key = dt_.GetSegment(0)->Key(0, Dash64::kBucketWidth - 1);
for (Dash64::bucket_iterator bit = dt_.begin(); !bit.is_done(); ++bit) {
keys.insert(bit->first);
keys.insert(bit->first);
}
bit = dt_.begin();
@ -655,7 +655,6 @@ TEST_F(DashTest, Version) {
ASSERT_EQ(kNum, items);
}
TEST_F(DashTest, CVCUponInsert) {
VersionDT dt;
auto [it, added] = dt.Insert(10, 20); // added to slot 0
@ -825,15 +824,22 @@ struct SimpleEvictPolicy {
constexpr unsigned kNumBuckets = U64Dash::HotspotBuckets::kNumBuckets;
uint32_t bid = hotb.key_hash % kNumBuckets;
auto it = hotb.at(bid);
unsigned slot_index = (hotb.key_hash >> 32) % U64Dash::kBucketWidth;
it += slot_index;
DCHECK(!it.is_done());
me->Erase(it);
++evicted;
for (unsigned i = 0; i < kNumBuckets; ++i) {
auto it = hotb.at((bid + i) % kNumBuckets);
it += slot_index;
return 1;
if (it.is_done())
continue;
me->Erase(it);
++evicted;
return 1;
}
return 0;
}
size_t max_capacity = SIZE_MAX;
@ -857,10 +863,10 @@ struct ShiftRightPolicy {
}
unsigned Evict(const U64Dash::HotspotBuckets& hotb, U64Dash* me) {
constexpr unsigned kNumStashBuckets = ABSL_ARRAYSIZE(hotb.stash_buckets);
constexpr unsigned kNumStashBuckets = ABSL_ARRAYSIZE(hotb.probes.by_type.stash_buckets);
unsigned stash_pos = hotb.key_hash % kNumStashBuckets;
auto stash_it = hotb.stash_buckets[stash_pos];
auto stash_it = hotb.probes.by_type.stash_buckets[stash_pos];
stash_it += (U64Dash::kBucketWidth - 1); // go to the last slot.
uint64_t k = stash_it->first;

View File

@ -27,6 +27,7 @@ namespace {
constexpr auto kPrimeSegmentSize = PrimeTable::kSegBytes;
constexpr auto kExpireSegmentSize = ExpireTable::kSegBytes;
constexpr auto kTaxSize = PrimeTable::kTaxAmount;
// mi_malloc good size is 32768. i.e. we have malloc waste of 1.5%.
static_assert(kPrimeSegmentSize == 32288);
@ -103,10 +104,10 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT
if (!can_evict_)
return 0;
constexpr size_t kNumStashBuckets = ABSL_ARRAYSIZE(eb.stash_buckets);
constexpr size_t kNumStashBuckets = ABSL_ARRAYSIZE(eb.probes.by_type.stash_buckets);
// choose "randomly" a stash bucket to evict an item.
auto bucket_it = eb.stash_buckets[eb.key_hash % kNumStashBuckets];
auto bucket_it = eb.probes.by_type.stash_buckets[eb.key_hash % kNumStashBuckets];
auto last_slot_it = bucket_it;
last_slot_it += (PrimeTable::kBucketWidth - 1);
if (!last_slot_it.is_done()) {
@ -150,7 +151,6 @@ SliceEvents& SliceEvents::operator+=(const SliceEvents& o) {
#undef ADD
DbSlice::DbSlice(uint32_t index, bool caching_mode, EngineShard* owner)
: shard_id_(index), caching_mode_(caching_mode), owner_(owner) {
db_arr_.emplace_back();
@ -304,7 +304,7 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) noexcept(false)
db->stats.inline_keys += it->first.IsInline();
db->stats.obj_memory_usage += it->first.MallocUsed();
events_.garbage_collected += db->prime.garbage_collected();
events_.garbage_collected = db->prime.garbage_collected();
events_.stash_unloaded = db->prime.stash_unloaded();
events_.evicted_keys += evp.evicted();
@ -452,7 +452,6 @@ uint32_t DbSlice::GetMCFlag(DbIndex db_ind, const PrimeKey& key) const {
return it.is_done() ? 0 : it->second;
}
PrimeIterator DbSlice::AddNew(DbIndex db_ind, string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false) {
auto [it, added] = AddOrFind(db_ind, key, std::move(obj), expire_at_ms);
@ -461,7 +460,6 @@ PrimeIterator DbSlice::AddNew(DbIndex db_ind, string_view key, PrimeValue obj,
return it;
}
pair<PrimeIterator, bool> DbSlice::AddOrFind(DbIndex db_ind, string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false) {
DCHECK_LT(db_ind, db_arr_.size());
@ -490,7 +488,6 @@ pair<PrimeIterator, bool> DbSlice::AddOrFind(DbIndex db_ind, string_view key, Pr
return res;
}
size_t DbSlice::DbSize(DbIndex db_ind) const {
DCHECK_LT(db_ind, db_array_size());
@ -632,27 +629,36 @@ void DbSlice::UnregisterOnChange(uint64_t id) {
LOG(DFATAL) << "Could not find " << id << " to unregister";
}
pair<unsigned, unsigned> DbSlice::DeleteExpired(DbIndex db_ind) {
auto DbSlice::DeleteExpired(DbIndex db_ind, unsigned count) -> DeleteExpiredStats {
auto& db = *db_arr_[db_ind];
unsigned deleted = 0, candidates = 0;
DeleteExpiredStats result;
auto cb = [&](ExpireIterator it) {
candidates++;
if (ExpireTime(it) <= Now()) {
result.traversed++;
time_t ttl = ExpireTime(it) - Now();
if (ttl <= 0) {
auto prime_it = db.prime.Find(it->first);
CHECK(!prime_it.is_done());
ExpireIfNeeded(db_ind, prime_it);
++deleted;
++result.deleted;
} else {
result.survivor_ttl_sum += ttl;
}
};
for (unsigned i = 0; i < 10; ++i) {
unsigned i = 0;
for (; i < count / 3; ++i) {
db.expire_cursor = db.expire.Traverse(db.expire_cursor, cb);
if (deleted)
break;
}
return make_pair(candidates, deleted);
// continue traversing only if we had strong deletion rate based on the first sample.
if (result.deleted * 4 > result.traversed) {
for (; i < count; ++i) {
db.expire_cursor = db.expire.Traverse(db.expire_cursor, cb);
}
}
return result;
}
} // namespace dfly

View File

@ -220,10 +220,14 @@ class DbSlice {
//! Unregisters the callback.
void UnregisterOnChange(uint64_t id);
struct DeleteExpiredStats {
uint32_t deleted = 0; // number of deleted items due to expiry (less than traversed).
uint32_t traversed = 0; // number of traversed items that have ttl bit
size_t survivor_ttl_sum = 0; // total sum of ttl of survivors (traversed - deleted).
};
// Deletes some amount of possible expired items.
// Returns a pair where first denotes number of traversed items that have ttl bit
// and second denotes number of deleted items due to expiration. (second <= first).
std::pair<unsigned, unsigned> DeleteExpired(DbIndex db_indx);
DeleteExpiredStats DeleteExpired(DbIndex db_indx, unsigned count);
const DbTableArray& databases() const {
return db_arr_;

View File

@ -283,14 +283,23 @@ void EngineShard::Heartbeat() {
if (task_iters_++ % 8 == 0) {
CacheStats();
uint32_t traversed = GetMovingSum6(TTL_TRAVERSE);
uint32_t deleted = GetMovingSum6(TTL_DELETE);
unsigned count = 5;
if (deleted > 10) {
// deleted should leq than traversed.
// hence we map our delete/traversed ration into a range [0, 100).
count = 200.0 * double(deleted) / (double(traversed) + 10);
}
for (unsigned i = 0; i < db_slice_.db_array_size(); ++i) {
if (db_slice_.IsDbValid(i)) {
auto [pt, expt] = db_slice_.GetTables(i);
if (expt->size() > pt->size() / 4) {
auto [trav, del] = db_slice_.DeleteExpired(i);
DbSlice::DeleteExpiredStats stats = db_slice_.DeleteExpired(i, count);
counter_[TTL_TRAVERSE].IncBy(trav);
counter_[TTL_DELETE].IncBy(del);
counter_[TTL_TRAVERSE].IncBy(stats.traversed);
counter_[TTL_DELETE].IncBy(stats.deleted);
}
}
}

View File

@ -126,6 +126,7 @@ class EngineShard {
enum MovingCnt {
TTL_TRAVERSE,
TTL_DELETE,
COUNTER_TOTAL
};
// Returns moving sum over the last 6 seconds.
@ -168,7 +169,8 @@ class EngineShard {
using Counter = util::SlidingCounter<7>;
Counter counter_[2];
Counter counter_[COUNTER_TOTAL];
std::vector<Counter> ttl_survivor_sum_; // we need it per db.
static thread_local EngineShard* shard_;
};

View File

@ -775,7 +775,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
bool show = (i == 0) || (stats.key_count > 0);
if (show) {
string val = StrCat("keys=", stats.key_count, ",expires=", stats.expire_count,
",avg_ttl=todo"); // TODO
",avg_ttl=-1"); // TODO
append(StrCat("db", i), val);
}
}