diff --git a/core/dash.h b/core/dash.h index e78cfbd..e9331a0 100644 --- a/core/dash.h +++ b/core/dash.h @@ -12,6 +12,7 @@ namespace dfly { struct BasicDashPolicy { enum { kSlotNum = 12, kBucketNum = 64, kStashBucketNum = 2 }; + static constexpr bool kUseVersion = false; template static void DestroyValue(const U&) { } @@ -32,6 +33,7 @@ class DashTable : public detail::DashTableBase { static constexpr unsigned NUM_SLOTS = Policy::kSlotNum; static constexpr unsigned BUCKET_CNT = Policy::kBucketNum; static constexpr unsigned STASH_BUCKET_NUM = Policy::kStashBucketNum; + static constexpr bool USE_VERSION = Policy::kUseVersion; }; using Base = detail::DashTableBase; @@ -52,6 +54,7 @@ class DashTable : public detail::DashTableBase { static constexpr double kTaxAmount = SegmentType::kTaxSize; static constexpr size_t kSegBytes = sizeof(SegmentType); static constexpr size_t kSegCapacity = SegmentType::capacity(); + static constexpr bool kUseVersion = Policy::kUseVersion; // if IsSingleBucket is true - iterates only over a single bucket. template class Iterator { @@ -152,6 +155,20 @@ class DashTable : public detail::DashTableBase { return owner_ == nullptr; } + template std::enable_if_t GetVersion() const { + return owner_->segment_[seg_id_]->GetVersion(bucket_id_, slot_id_); + } + + // Returns the minimum version of the physical bucket that this iterator points to. + // Note: In an ideal world I would introduce a bucket iterator... + template std::enable_if_t MinVersion() const { + return owner_->segment_[seg_id_]->MinVersion(bucket_id_); + } + + template std::enable_if_t SetVersion(uint64_t v) { + return owner_->segment_[seg_id_]->SetVersion(bucket_id_, slot_id_, v); + } + friend bool operator==(const Iterator& lhs, const Iterator& rhs) { if (lhs.owner_ == nullptr && rhs.owner_ == nullptr) return true; diff --git a/core/dash_internal.h b/core/dash_internal.h index e8c1862..51daf19 100644 --- a/core/dash_internal.h +++ b/core/dash_internal.h @@ -4,6 +4,8 @@ #pragma once +#include + #include #include #include @@ -226,21 +228,79 @@ static_assert(sizeof(BucketBase<12, 4>) == 24, ""); static_assert(alignof(BucketBase<14, 4>) == 1, ""); static_assert(alignof(BucketBase<12, 4>) == 1, ""); +// Optional version support as part of DashTable. +// This works like this: each slot has 2 bytes for version and a bucket has another 6. +// therefore all slots in the bucket shared the same 6 high bytes of 8-byte version. +// In order to achieve this we store high6(max{version(entry)}) for every entry. +// Hence our version control may have false positives, i.e. signal that an entry has changed +// when in practice its neighbour incremented the high6 part of its bucket. +template +class VersionedBB : public BucketBase { + using Base = BucketBase; + + public: + // invariant: slot.version = max(bucket.version, version) + // invariant: bucket.version = max(high6(all slots)) + // In other words, if version is less than BaseVersion, + // then we set slot version to be BaseVersion - we never decrease the base version. + // If high6(version) is greater then we update the bucket version. + // We update the slots version accordingly. + void SetVersion(unsigned slot_id, uint64_t version); + + uint64_t GetVersion(unsigned slot_id) const { + uint64_t c = BaseVersion(); + c |= low_[slot_id]; + return c; + } + + // Returns 64 bit bucket version of 2 low bytes zeroed. + uint64_t BaseVersion() const { + // high_ is followed by low array. + // Hence little endian load from high_ ptr copies 2 bytes from low_ into 2 highest bytes of c. + uint64_t c = absl::little_endian::Load64(high_); + + // Fix the version by getting rid of 2 garbage bytes. + return c << 16; + } + + uint64_t MinVersion() const; + + void Clear() { + Base::Clear(); + low_.fill(0); + memset(high_, 0, 6); + } + + private: + using Byte6 = uint8_t[6]; + + Byte6 high_ = {0}; + std::array low_ = {0}; +}; + +static_assert(alignof(VersionedBB<14, 4>) == 2, ""); +static_assert(sizeof(VersionedBB<12, 4>) == 12 * 4 + 6, ""); +static_assert(sizeof(VersionedBB<14, 4>) <= 14 * 4 + 6, ""); + // Segment - static-hashtable of size NUM_SLOTS*(BUCKET_CNT + STASH_BUCKET_NUM). struct DefaultSegmentPolicy { static constexpr unsigned NUM_SLOTS = 12; static constexpr unsigned BUCKET_CNT = 64; static constexpr unsigned STASH_BUCKET_NUM = 2; + static constexpr bool USE_VERSION = true; }; + template class Segment { static constexpr unsigned BUCKET_CNT = Policy::BUCKET_CNT; static constexpr unsigned STASH_BUCKET_NUM = Policy::STASH_BUCKET_NUM; static constexpr unsigned NUM_SLOTS = Policy::NUM_SLOTS; + static constexpr bool USE_VERSION = Policy::USE_VERSION; static_assert(BUCKET_CNT + STASH_BUCKET_NUM < 255); static constexpr unsigned kFingerBits = 8; - using BucketType = BucketBase; + using BucketType = + std::conditional_t, BucketBase>; struct Bucket : public BucketType { using BucketType::kNanSlot; @@ -334,6 +394,20 @@ template + std::enable_if_t GetVersion(uint8_t bid, uint8_t slot_id) { + return bucket_[bid].GetVersion(slot_id); + } + + template std::enable_if_t MinVersion(uint8_t bid) { + return bucket_[bid].MinVersion(); + } + + template + std::enable_if_t SetVersion(uint8_t bid, uint8_t slot_id, uint64_t v) { + return bucket_[bid].SetVersion(slot_id, v); + } + // Traverses over Segment's bucket bid and calls cb(const Iterator& it) 0 or more times // for each slot in the bucket. returns false if bucket is empty. // Please note that `it` will not necessary point to bid due to probing and stash buckets @@ -386,6 +460,13 @@ template Iterator InsertUniq(U&& key, V&& value, Hash_t key_hash); + // capture version change in case of insert. + // Returns ids of buckets that would be modified upon insertion of key_hash into the segment. + // Returns 0 if segment is full. Otherwise, returns number of touched bucket ids (1 or 2) + // if the insertion would happen. The ids are put into bid array that should have at least 2 + // spaces. + unsigned CVCOnInsert(Hash_t key_hash, uint8_t bid[2]) const; + // Finds a valid entry going from specified indices up. Iterator FindValidStartingFrom(unsigned bid, unsigned slot) const; @@ -472,7 +553,8 @@ class DashTableBase { template class IteratorPair { public: - IteratorPair(_Key& k, _Value& v) : first(k), second(v) {} + IteratorPair(_Key& k, _Value& v) : first(k), second(v) { + } IteratorPair* operator->() { return this; @@ -676,6 +758,40 @@ auto BucketBase::IterateStash(uint8_t fp, bool is_probe, F&& return std::pair(0, BucketBase::kNanSlot); } +template +void VersionedBB::SetVersion(unsigned slot_id, uint64_t version) { + uint64_t nbv = version >> 16; // possible new bucket version + uint64_t obv = BaseVersion() >> 16; // old bucket version + + if (nbv < obv) { + // nbv is too low, instead we upgrade the slot to higher BaseVersion. + low_[slot_id] = 0; // we increase the slot version to bigger than "version". + } else { + if (nbv > obv) { // We bump up the high part for the whole bucket and set low parts to 0. + absl::little_endian::Store64(high_, nbv); // We put garbage into 2 bytes of low_. + low_.fill(0); // We do not mind because we reset low_ anyway. + } + low_[slot_id] = version & 0xFFFF; // In any case we set slot version to lowest 2 bytes. + } +} + +template +uint64_t VersionedBB::MinVersion() const { + uint32_t mask = this->GetBusy(); + if (mask == 0) + return 0; + + // it's enough to compare low_ parts since base version is the same for all of them. + uint16_t res = 0xFFFF; + for (unsigned j = 0; j < NUM_SLOTS; ++j) { + if ((mask & 1) && low_[j] < res) { + res = low_[j]; + } + mask >>= 1; + } + return BaseVersion() + res; +} + /* ____ ____ ____ _ _ ____ _ _ ___ [__ |___ | __ |\/| |___ |\ | | @@ -711,6 +827,11 @@ bool Segment::TryMoveFromStash(unsigned stash_id, unsigned s } if (reg_slot >= 0) { + if constexpr (USE_VERSION) { + // We maintain the invariant for the physical bucket by updating the version when + // the entries move between buckets. + bucket_[bid].SetVersion(reg_slot, bucket_[stash_bid].GetVersion(stash_slot_id)); + } RemoveStashReference(stash_id, key_hash); return true; } @@ -854,7 +975,12 @@ void Segment::Split(HFunc&& hfn, Segment* dest_right) { invalid_mask |= (1u << slot); - dest_right->InsertUniq(std::forward(key), std::forward(Value(i, slot)), hash); + auto it = dest_right->InsertUniq(std::forward(key), + std::forward(Value(i, slot)), hash); + if constexpr (USE_VERSION) { + // Maintaining consistent versioning. + dest_right->bucket_[it.index].SetVersion(it.slot, bucket_[i].GetVersion(slot)); + } }; bucket_[i].ForEachSlot(std::move(cb)); @@ -881,8 +1007,13 @@ void Segment::Split(HFunc&& hfn, Segment* dest_right) { } invalid_mask |= (1u << slot); - dest_right->InsertUniq(std::forward(Key(bid, slot)), - std::forward(Value(bid, slot)), hash); + auto it = dest_right->InsertUniq(std::forward(Key(bid, slot)), + std::forward(Value(bid, slot)), hash); + + if constexpr (USE_VERSION) { + // Update the version in the destination bucket. + dest_right->bucket_[it.index].SetVersion(it.slot, stash.GetVersion(slot)); + } // Remove stash reference pointing to stach bucket i. RemoveStashReference(i, hash); @@ -908,6 +1039,12 @@ int Segment::MoveToOther(bool own_items, unsigned from, unsi if (dst_slot < 0) return -1; + // We never decrease the version of the entry. + if constexpr (USE_VERSION) { + auto& dst = bucket_[to]; + dst.SetVersion(dst_slot, src.GetVersion(src_slot)); + } + src.Delete(src_slot); return src_slot; @@ -979,6 +1116,47 @@ auto Segment::InsertUniq(U&& key, V&& value, Hash_t key_hash return Iterator{}; } +template +unsigned Segment::CVCOnInsert(Hash_t key_hash, uint8_t bid_res[2]) const { + const uint8_t bid = BucketIndex(key_hash); + const uint8_t nid = (bid + 1) % kNumBuckets; + + const Bucket& target = bucket_[bid]; + const Bucket& neighbor = bucket_[nid]; + uint8_t first = target.Size() > neighbor.Size() ? nid : bid; + + if (!bucket_[first].IsFull()) { + bid_res[0] = first; + return 1; + } + + const uint8_t after_next = (nid + 1) % kNumBuckets; + if (CheckIfMovesToOther(true, nid, after_next)) { + bid_res[0] = nid; + bid_res[1] = after_next; + return 2; + } + + const uint8_t prev_bid = (bid == 0) ? kNumBuckets - 1 : bid - 1; + if (CheckIfMovesToOther(false, bid, prev_bid)) { + bid_res[0] = bid; + bid_res[1] = prev_bid; + + return 2; + } + + for (unsigned i = 0; i < STASH_BUCKET_NUM; ++i) { + unsigned stash_bid = kNumBuckets + (bid + i) % STASH_BUCKET_NUM; + const Bucket& stash = bucket_[stash_bid]; + if (!stash.IsFull()) { + bid_res[0] = stash_bid; + return 1; + } + } + + return 0; +} + template template bool Segment::TraverseLogicalBucket(uint8_t bid, HashFn&& hfun, Cb&& cb) const { diff --git a/core/dash_test.cc b/core/dash_test.cc index 2d0c896..c76fc3b 100644 --- a/core/dash_test.cc +++ b/core/dash_test.cc @@ -174,6 +174,7 @@ TEST_F(DashTest, Basic) { ASSERT_EQ(1, has_called); ASSERT_EQ(0, segment_.TraverseLogicalBucket(cursor, hfun, cb)); ASSERT_EQ(1, has_called); + EXPECT_EQ(0, segment_.GetVersion(0, 0)); } TEST_F(DashTest, Segment) { @@ -347,7 +348,7 @@ TEST_F(DashTest, Insert) { Dash64::const_iterator it = dt_.Find(i); ASSERT_TRUE(it != dt_.end()); - ASSERT_EQ(it.value(), i); + ASSERT_EQ(it->second, i); ASSERT_LE(dt_.load_factor(), 1) << i; } @@ -355,6 +356,7 @@ TEST_F(DashTest, Insert) { Dash64::const_iterator it = dt_.Find(i); ASSERT_TRUE(it == dt_.end()); } + EXPECT_EQ(kNumItems, dt_.size()); EXPECT_EQ(1, dt_.Erase(0)); EXPECT_EQ(0, dt_.Erase(0)); @@ -440,6 +442,43 @@ TEST_F(DashTest, Eviction) { EXPECT_EQ(bucket_cnt, dt_.bucket_count()); } +struct VersionPolicy : public BasicDashPolicy { + static constexpr bool kUseVersion = true; + + static uint64_t HashFn(int v) { + return XXH3_64bits(&v, sizeof(v)); + } +}; + +TEST_F(DashTest, Version) { + DashTable dt; + auto [it, inserted] = dt.Insert(1, 1); + + EXPECT_EQ(0, it.GetVersion()); + it.SetVersion(5); + EXPECT_EQ(5, it.GetVersion()); + + dt.Clear(); + constexpr int kNum = 68000; + for (int i = 0; i < kNum; ++i) { + auto it = dt.Insert(i, 0).first; + it.SetVersion(i + 65000); + if (i) { + auto p = dt.Find(i - 1); + ASSERT_GE(p.GetVersion(), i - 1 + 65000) << i; + } + } + + unsigned items = 0; + for (auto it = dt.begin(); it != dt.end(); ++it) { + ASSERT_FALSE(it.is_done()); + ASSERT_GE(it.GetVersion(), it->first + 65000) + << it.segment_id() << " " << it.bucket_id() << " " << it.slot_id(); + ++items; + } + ASSERT_EQ(kNum, items); +} + struct A { int a = 0; unsigned moved = 0; @@ -484,6 +523,7 @@ TEST_F(DashTest, Moveable) { struct SdsDashPolicy { enum { kSlotNum = 12, kBucketNum = 64, kStashBucketNum = 2 }; + static constexpr bool kUseVersion = false; static uint64_t HashFn(sds u) { return XXH3_64bits(reinterpret_cast(u), sdslen(u));