Introduce versioned bucket support for MVCC operations

This commit is contained in:
Roman Gershman 2022-01-22 12:21:08 +02:00
parent 069ed12c68
commit 8d19a6211d
3 changed files with 241 additions and 6 deletions

View File

@ -12,6 +12,7 @@ namespace dfly {
struct BasicDashPolicy {
enum { kSlotNum = 12, kBucketNum = 64, kStashBucketNum = 2 };
static constexpr bool kUseVersion = false;
template <typename U> static void DestroyValue(const U&) {
}
@ -32,6 +33,7 @@ class DashTable : public detail::DashTableBase {
static constexpr unsigned NUM_SLOTS = Policy::kSlotNum;
static constexpr unsigned BUCKET_CNT = Policy::kBucketNum;
static constexpr unsigned STASH_BUCKET_NUM = Policy::kStashBucketNum;
static constexpr bool USE_VERSION = Policy::kUseVersion;
};
using Base = detail::DashTableBase;
@ -52,6 +54,7 @@ class DashTable : public detail::DashTableBase {
static constexpr double kTaxAmount = SegmentType::kTaxSize;
static constexpr size_t kSegBytes = sizeof(SegmentType);
static constexpr size_t kSegCapacity = SegmentType::capacity();
static constexpr bool kUseVersion = Policy::kUseVersion;
// if IsSingleBucket is true - iterates only over a single bucket.
template <bool IsConst, bool IsSingleBucket = false> class Iterator {
@ -152,6 +155,20 @@ class DashTable : public detail::DashTableBase {
return owner_ == nullptr;
}
template <bool B = Policy::kUseVersion> std::enable_if_t<B, uint64_t> GetVersion() const {
return owner_->segment_[seg_id_]->GetVersion(bucket_id_, slot_id_);
}
// Returns the minimum version of the physical bucket that this iterator points to.
// Note: In an ideal world I would introduce a bucket iterator...
template <bool B = Policy::kUseVersion> std::enable_if_t<B, uint64_t> MinVersion() const {
return owner_->segment_[seg_id_]->MinVersion(bucket_id_);
}
template <bool B = Policy::kUseVersion> std::enable_if_t<B> SetVersion(uint64_t v) {
return owner_->segment_[seg_id_]->SetVersion(bucket_id_, slot_id_, v);
}
friend bool operator==(const Iterator& lhs, const Iterator& rhs) {
if (lhs.owner_ == nullptr && rhs.owner_ == nullptr)
return true;

View File

@ -4,6 +4,8 @@
#pragma once
#include <absl/base/internal/endian.h>
#include <array>
#include <cassert>
#include <cstdint>
@ -226,21 +228,79 @@ static_assert(sizeof(BucketBase<12, 4>) == 24, "");
static_assert(alignof(BucketBase<14, 4>) == 1, "");
static_assert(alignof(BucketBase<12, 4>) == 1, "");
// Optional version support as part of DashTable.
// This works like this: each slot has 2 bytes for version and a bucket has another 6.
// therefore all slots in the bucket shared the same 6 high bytes of 8-byte version.
// In order to achieve this we store high6(max{version(entry)}) for every entry.
// Hence our version control may have false positives, i.e. signal that an entry has changed
// when in practice its neighbour incremented the high6 part of its bucket.
template <unsigned NUM_SLOTS, unsigned NUM_STASH_FPS>
class VersionedBB : public BucketBase<NUM_SLOTS, NUM_STASH_FPS> {
using Base = BucketBase<NUM_SLOTS, NUM_STASH_FPS>;
public:
// invariant: slot.version = max(bucket.version, version)
// invariant: bucket.version = max(high6(all slots))
// In other words, if version is less than BaseVersion,
// then we set slot version to be BaseVersion - we never decrease the base version.
// If high6(version) is greater then we update the bucket version.
// We update the slots version accordingly.
void SetVersion(unsigned slot_id, uint64_t version);
uint64_t GetVersion(unsigned slot_id) const {
uint64_t c = BaseVersion();
c |= low_[slot_id];
return c;
}
// Returns 64 bit bucket version of 2 low bytes zeroed.
uint64_t BaseVersion() const {
// high_ is followed by low array.
// Hence little endian load from high_ ptr copies 2 bytes from low_ into 2 highest bytes of c.
uint64_t c = absl::little_endian::Load64(high_);
// Fix the version by getting rid of 2 garbage bytes.
return c << 16;
}
uint64_t MinVersion() const;
void Clear() {
Base::Clear();
low_.fill(0);
memset(high_, 0, 6);
}
private:
using Byte6 = uint8_t[6];
Byte6 high_ = {0};
std::array<uint16_t, NUM_SLOTS> low_ = {0};
};
static_assert(alignof(VersionedBB<14, 4>) == 2, "");
static_assert(sizeof(VersionedBB<12, 4>) == 12 * 4 + 6, "");
static_assert(sizeof(VersionedBB<14, 4>) <= 14 * 4 + 6, "");
// Segment - static-hashtable of size NUM_SLOTS*(BUCKET_CNT + STASH_BUCKET_NUM).
struct DefaultSegmentPolicy {
static constexpr unsigned NUM_SLOTS = 12;
static constexpr unsigned BUCKET_CNT = 64;
static constexpr unsigned STASH_BUCKET_NUM = 2;
static constexpr bool USE_VERSION = true;
};
template <typename _Key, typename _Value, typename Policy = DefaultSegmentPolicy> class Segment {
static constexpr unsigned BUCKET_CNT = Policy::BUCKET_CNT;
static constexpr unsigned STASH_BUCKET_NUM = Policy::STASH_BUCKET_NUM;
static constexpr unsigned NUM_SLOTS = Policy::NUM_SLOTS;
static constexpr bool USE_VERSION = Policy::USE_VERSION;
static_assert(BUCKET_CNT + STASH_BUCKET_NUM < 255);
static constexpr unsigned kFingerBits = 8;
using BucketType = BucketBase<NUM_SLOTS, 4>;
using BucketType =
std::conditional_t<USE_VERSION, VersionedBB<NUM_SLOTS, 4>, BucketBase<NUM_SLOTS, 4>>;
struct Bucket : public BucketType {
using BucketType::kNanSlot;
@ -334,6 +394,20 @@ template <typename _Key, typename _Value, typename Policy = DefaultSegmentPolicy
return local_depth_;
}
template <bool B = Policy::USE_VERSION>
std::enable_if_t<B, uint64_t> GetVersion(uint8_t bid, uint8_t slot_id) {
return bucket_[bid].GetVersion(slot_id);
}
template <bool B = Policy::USE_VERSION> std::enable_if_t<B, uint64_t> MinVersion(uint8_t bid) {
return bucket_[bid].MinVersion();
}
template <bool B = Policy::USE_VERSION>
std::enable_if_t<B> SetVersion(uint8_t bid, uint8_t slot_id, uint64_t v) {
return bucket_[bid].SetVersion(slot_id, v);
}
// Traverses over Segment's bucket bid and calls cb(const Iterator& it) 0 or more times
// for each slot in the bucket. returns false if bucket is empty.
// Please note that `it` will not necessary point to bid due to probing and stash buckets
@ -386,6 +460,13 @@ template <typename _Key, typename _Value, typename Policy = DefaultSegmentPolicy
// Requires: key should be not present in the segment.
template <typename U, typename V> Iterator InsertUniq(U&& key, V&& value, Hash_t key_hash);
// capture version change in case of insert.
// Returns ids of buckets that would be modified upon insertion of key_hash into the segment.
// Returns 0 if segment is full. Otherwise, returns number of touched bucket ids (1 or 2)
// if the insertion would happen. The ids are put into bid array that should have at least 2
// spaces.
unsigned CVCOnInsert(Hash_t key_hash, uint8_t bid[2]) const;
// Finds a valid entry going from specified indices up.
Iterator FindValidStartingFrom(unsigned bid, unsigned slot) const;
@ -472,7 +553,8 @@ class DashTableBase {
template <typename _Key, typename _Value> class IteratorPair {
public:
IteratorPair(_Key& k, _Value& v) : first(k), second(v) {}
IteratorPair(_Key& k, _Value& v) : first(k), second(v) {
}
IteratorPair* operator->() {
return this;
@ -676,6 +758,40 @@ auto BucketBase<NUM_SLOTS, NUM_OVR>::IterateStash(uint8_t fp, bool is_probe, F&&
return std::pair<unsigned, SlotId>(0, BucketBase::kNanSlot);
}
template <unsigned NUM_SLOTS, unsigned NUM_STASH_FPS>
void VersionedBB<NUM_SLOTS, NUM_STASH_FPS>::SetVersion(unsigned slot_id, uint64_t version) {
uint64_t nbv = version >> 16; // possible new bucket version
uint64_t obv = BaseVersion() >> 16; // old bucket version
if (nbv < obv) {
// nbv is too low, instead we upgrade the slot to higher BaseVersion.
low_[slot_id] = 0; // we increase the slot version to bigger than "version".
} else {
if (nbv > obv) { // We bump up the high part for the whole bucket and set low parts to 0.
absl::little_endian::Store64(high_, nbv); // We put garbage into 2 bytes of low_.
low_.fill(0); // We do not mind because we reset low_ anyway.
}
low_[slot_id] = version & 0xFFFF; // In any case we set slot version to lowest 2 bytes.
}
}
template <unsigned NUM_SLOTS, unsigned NUM_STASH_FPS>
uint64_t VersionedBB<NUM_SLOTS, NUM_STASH_FPS>::MinVersion() const {
uint32_t mask = this->GetBusy();
if (mask == 0)
return 0;
// it's enough to compare low_ parts since base version is the same for all of them.
uint16_t res = 0xFFFF;
for (unsigned j = 0; j < NUM_SLOTS; ++j) {
if ((mask & 1) && low_[j] < res) {
res = low_[j];
}
mask >>= 1;
}
return BaseVersion() + res;
}
/*
____ ____ ____ _ _ ____ _ _ ___
[__ |___ | __ |\/| |___ |\ | |
@ -711,6 +827,11 @@ bool Segment<Key, Value, Policy>::TryMoveFromStash(unsigned stash_id, unsigned s
}
if (reg_slot >= 0) {
if constexpr (USE_VERSION) {
// We maintain the invariant for the physical bucket by updating the version when
// the entries move between buckets.
bucket_[bid].SetVersion(reg_slot, bucket_[stash_bid].GetVersion(stash_slot_id));
}
RemoveStashReference(stash_id, key_hash);
return true;
}
@ -854,7 +975,12 @@ void Segment<Key, Value, Policy>::Split(HFunc&& hfn, Segment* dest_right) {
invalid_mask |= (1u << slot);
dest_right->InsertUniq(std::forward<Key_t>(key), std::forward<Value_t>(Value(i, slot)), hash);
auto it = dest_right->InsertUniq(std::forward<Key_t>(key),
std::forward<Value_t>(Value(i, slot)), hash);
if constexpr (USE_VERSION) {
// Maintaining consistent versioning.
dest_right->bucket_[it.index].SetVersion(it.slot, bucket_[i].GetVersion(slot));
}
};
bucket_[i].ForEachSlot(std::move(cb));
@ -881,8 +1007,13 @@ void Segment<Key, Value, Policy>::Split(HFunc&& hfn, Segment* dest_right) {
}
invalid_mask |= (1u << slot);
dest_right->InsertUniq(std::forward<Key_t>(Key(bid, slot)),
std::forward<Value_t>(Value(bid, slot)), hash);
auto it = dest_right->InsertUniq(std::forward<Key_t>(Key(bid, slot)),
std::forward<Value_t>(Value(bid, slot)), hash);
if constexpr (USE_VERSION) {
// Update the version in the destination bucket.
dest_right->bucket_[it.index].SetVersion(it.slot, stash.GetVersion(slot));
}
// Remove stash reference pointing to stach bucket i.
RemoveStashReference(i, hash);
@ -908,6 +1039,12 @@ int Segment<Key, Value, Policy>::MoveToOther(bool own_items, unsigned from, unsi
if (dst_slot < 0)
return -1;
// We never decrease the version of the entry.
if constexpr (USE_VERSION) {
auto& dst = bucket_[to];
dst.SetVersion(dst_slot, src.GetVersion(src_slot));
}
src.Delete(src_slot);
return src_slot;
@ -979,6 +1116,47 @@ auto Segment<Key, Value, Policy>::InsertUniq(U&& key, V&& value, Hash_t key_hash
return Iterator{};
}
template <typename Key, typename Value, typename Policy>
unsigned Segment<Key, Value, Policy>::CVCOnInsert(Hash_t key_hash, uint8_t bid_res[2]) const {
const uint8_t bid = BucketIndex(key_hash);
const uint8_t nid = (bid + 1) % kNumBuckets;
const Bucket& target = bucket_[bid];
const Bucket& neighbor = bucket_[nid];
uint8_t first = target.Size() > neighbor.Size() ? nid : bid;
if (!bucket_[first].IsFull()) {
bid_res[0] = first;
return 1;
}
const uint8_t after_next = (nid + 1) % kNumBuckets;
if (CheckIfMovesToOther(true, nid, after_next)) {
bid_res[0] = nid;
bid_res[1] = after_next;
return 2;
}
const uint8_t prev_bid = (bid == 0) ? kNumBuckets - 1 : bid - 1;
if (CheckIfMovesToOther(false, bid, prev_bid)) {
bid_res[0] = bid;
bid_res[1] = prev_bid;
return 2;
}
for (unsigned i = 0; i < STASH_BUCKET_NUM; ++i) {
unsigned stash_bid = kNumBuckets + (bid + i) % STASH_BUCKET_NUM;
const Bucket& stash = bucket_[stash_bid];
if (!stash.IsFull()) {
bid_res[0] = stash_bid;
return 1;
}
}
return 0;
}
template <typename Key, typename Value, typename Policy>
template <typename Cb, typename HashFn>
bool Segment<Key, Value, Policy>::TraverseLogicalBucket(uint8_t bid, HashFn&& hfun, Cb&& cb) const {

View File

@ -174,6 +174,7 @@ TEST_F(DashTest, Basic) {
ASSERT_EQ(1, has_called);
ASSERT_EQ(0, segment_.TraverseLogicalBucket(cursor, hfun, cb));
ASSERT_EQ(1, has_called);
EXPECT_EQ(0, segment_.GetVersion(0, 0));
}
TEST_F(DashTest, Segment) {
@ -347,7 +348,7 @@ TEST_F(DashTest, Insert) {
Dash64::const_iterator it = dt_.Find(i);
ASSERT_TRUE(it != dt_.end());
ASSERT_EQ(it.value(), i);
ASSERT_EQ(it->second, i);
ASSERT_LE(dt_.load_factor(), 1) << i;
}
@ -355,6 +356,7 @@ TEST_F(DashTest, Insert) {
Dash64::const_iterator it = dt_.Find(i);
ASSERT_TRUE(it == dt_.end());
}
EXPECT_EQ(kNumItems, dt_.size());
EXPECT_EQ(1, dt_.Erase(0));
EXPECT_EQ(0, dt_.Erase(0));
@ -440,6 +442,43 @@ TEST_F(DashTest, Eviction) {
EXPECT_EQ(bucket_cnt, dt_.bucket_count());
}
struct VersionPolicy : public BasicDashPolicy {
static constexpr bool kUseVersion = true;
static uint64_t HashFn(int v) {
return XXH3_64bits(&v, sizeof(v));
}
};
TEST_F(DashTest, Version) {
DashTable<int, int, VersionPolicy> dt;
auto [it, inserted] = dt.Insert(1, 1);
EXPECT_EQ(0, it.GetVersion());
it.SetVersion(5);
EXPECT_EQ(5, it.GetVersion());
dt.Clear();
constexpr int kNum = 68000;
for (int i = 0; i < kNum; ++i) {
auto it = dt.Insert(i, 0).first;
it.SetVersion(i + 65000);
if (i) {
auto p = dt.Find(i - 1);
ASSERT_GE(p.GetVersion(), i - 1 + 65000) << i;
}
}
unsigned items = 0;
for (auto it = dt.begin(); it != dt.end(); ++it) {
ASSERT_FALSE(it.is_done());
ASSERT_GE(it.GetVersion(), it->first + 65000)
<< it.segment_id() << " " << it.bucket_id() << " " << it.slot_id();
++items;
}
ASSERT_EQ(kNum, items);
}
struct A {
int a = 0;
unsigned moved = 0;
@ -484,6 +523,7 @@ TEST_F(DashTest, Moveable) {
struct SdsDashPolicy {
enum { kSlotNum = 12, kBucketNum = 64, kStashBucketNum = 2 };
static constexpr bool kUseVersion = false;
static uint64_t HashFn(sds u) {
return XXH3_64bits(reinterpret_cast<const uint8_t*>(u), sdslen(u));