Implement directory shrinkage when we flush the database

This commit is contained in:
Roman Gershman 2022-03-09 09:06:11 +02:00
parent 5dcb50dbaa
commit 8054ed4f3a
2 changed files with 46 additions and 4 deletions

View File

@ -153,7 +153,6 @@ class DashTable : public detail::DashTableBase {
seg->Value(bucket_id_, slot_id_)}; seg->Value(bucket_id_, slot_id_)};
} }
// Make it self-contained. Does not need container::end(). // Make it self-contained. Does not need container::end().
bool is_done() const { bool is_done() const {
return owner_ == nullptr; return owner_ == nullptr;
@ -398,6 +397,43 @@ void DashTable<_Key, _Value, Policy>::Clear() {
IterateUnique(cb); IterateUnique(cb);
size_ = 0; size_ = 0;
// Consider the following case: table with 8 segments overall, 4 unique.
// S1, S1, S1, S1, S2, S3, S4, S4
/* This corresponds to the tree:
R
/ \
S1 /\
/\ S4
S2 S3
We want to collapse this tree into, say, 2 segment directory.
That means we need to keep S1, S2 but delete S3, S4.
That means, we need to move representative segments until we reached the desired size
and the erase all other unique segments.
**********/
if (global_depth_ > initial_depth_) {
std::pmr::polymorphic_allocator<SegmentType> pa(segment_.get_allocator());
size_t dest = 0, src = 0;
size_t new_size = (1 << initial_depth_);
while (src < segment_.size()) {
auto* seg = segment_[src];
size_t next_src = NextSeg(src); // must do before because NextSeg is dependent on seg.
if (dest < new_size) {
seg->set_local_depth(initial_depth_);
segment_[dest++] = seg;
} else {
pa.destroy(seg);
pa.deallocate(seg, 1);
}
src = next_src;
}
global_depth_ = initial_depth_;
unique_segments_ = new_size;
segment_.resize(new_size);
}
} }
template <typename _Key, typename _Value, typename Policy> template <typename _Key, typename _Value, typename Policy>

View File

@ -382,7 +382,7 @@ template <typename _Key, typename _Value, typename Policy = DefaultSegmentPolicy
void Delete(const Iterator& it, Hash_t key_hash); void Delete(const Iterator& it, Hash_t key_hash);
void Clear(); void Clear(); // clears the segment.
size_t SlowSize() const; size_t SlowSize() const;
@ -394,6 +394,10 @@ template <typename _Key, typename _Value, typename Policy = DefaultSegmentPolicy
return local_depth_; return local_depth_;
} }
void set_local_depth(uint32_t depth) {
local_depth_ = depth;
}
template <bool B = Policy::USE_VERSION> template <bool B = Policy::USE_VERSION>
std::enable_if_t<B, uint64_t> GetVersion(uint8_t bid, uint8_t slot_id) { std::enable_if_t<B, uint64_t> GetVersion(uint8_t bid, uint8_t slot_id) {
return bucket_[bid].GetVersion(slot_id); return bucket_[bid].GetVersion(slot_id);
@ -527,7 +531,8 @@ class DashTableBase {
DashTableBase& operator=(const DashTableBase&) = delete; DashTableBase& operator=(const DashTableBase&) = delete;
public: public:
explicit DashTableBase(uint32_t gd) : global_depth_(gd), unique_segments_(1 << gd) { explicit DashTableBase(uint32_t gd)
: initial_depth_(gd), global_depth_(gd), unique_segments_(1 << gd) {
} }
uint32_t unique_segments() const { uint32_t unique_segments() const {
@ -551,6 +556,7 @@ class DashTableBase {
return 0; return 0;
} }
uint32_t initial_depth_;
uint32_t global_depth_; uint32_t global_depth_;
uint32_t unique_segments_; uint32_t unique_segments_;
size_t size_ = 0; size_t size_ = 0;
@ -774,7 +780,7 @@ void VersionedBB<NUM_SLOTS, NUM_STASH_FPS>::SetVersion(unsigned slot_id, uint64_
} else { } else {
if (nbv > obv) { // We bump up the high part for the whole bucket and set low parts to 0. if (nbv > obv) { // We bump up the high part for the whole bucket and set low parts to 0.
absl::little_endian::Store64(high_, nbv); // We put garbage into 2 bytes of low_. absl::little_endian::Store64(high_, nbv); // We put garbage into 2 bytes of low_.
low_.fill(0); // We do not mind because we reset low_ anyway. low_.fill(0); // We do not mind because we reset low_ anyway.
} }
low_[slot_id] = version & 0xFFFF; // In any case we set slot version to lowest 2 bytes. low_[slot_id] = version & 0xFFFF; // In any case we set slot version to lowest 2 bytes.
} }