From 7ee6bd8471007512ef7184cacbfe4b6c78de5f42 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Tue, 25 Jan 2022 07:51:15 +0200 Subject: [PATCH] Implement snapshot consistency under the write load --- core/dash.h | 37 +++++++++++++++++++++++++++++ server/snapshot.cc | 58 ++++++++++++++++++++++++++++++++-------------- server/snapshot.h | 8 +++++-- 3 files changed, 83 insertions(+), 20 deletions(-) diff --git a/core/dash.h b/core/dash.h index 318c694..5811002 100644 --- a/core/dash.h +++ b/core/dash.h @@ -298,6 +298,17 @@ class DashTable : public detail::DashTableBase { return const_bucket_iterator{it.owner_, it.seg_id_, it.bucket_id_, 0}; } + // Capture Version Change. Runs cb(it) on every bucket! (not entry) in the table whose version + // would potentially change upon insertion of 'k'. + // In practice traversal is limited to a single segment. The operation is read-only and + // simulates insertion process. 'cb' must accept const_iterator. Note: the interface a bit hacky + // since the iterator may point to non-existing slot. In practice it only can be sent to + // TraverseBucket function. + // The function returns any bucket that is touched by the insertion flow. In practice, we + // could tighten estimates by taking the new version into account. + // Not sure how important this is though. + template void CVCUponInsert(const U& key, Cb&& cb) const; + void Clear(); private: @@ -349,6 +360,32 @@ DashTable<_Key, _Value, Policy>::~DashTable() { }); } +template +template +void DashTable<_Key, _Value, Policy>::CVCUponInsert(const U& key, Cb&& cb) const { + uint64_t key_hash = DoHash(key); + uint32_t seg_id = SegmentId(key_hash); + assert(seg_id < segment_.size()); + const SegmentType* target = segment_[seg_id]; + + uint8_t bids[2]; + unsigned num_touched = target->CVCOnInsert(key_hash, bids); + if (num_touched > 0) { + for (unsigned i = 0; i < num_touched; ++i) { + cb(const_iterator{this, seg_id, bids[i], 0}); + } + return; + } + + static_assert(kPhysicalBucketNum < 0xFF, ""); + + // Segment is full, we need to return the whole segment, because it can be split + // and its entries can be reshuffled into different buckets. + for (uint8_t i = 0; i < kPhysicalBucketNum; ++i) { + cb(const_iterator{this, seg_id, i, 0}); + } +} + template void DashTable<_Key, _Value, Policy>::Clear() { auto cb = [this](SegmentType* seg) { diff --git a/server/snapshot.cc b/server/snapshot.cc index 8099225..60d1a8a 100644 --- a/server/snapshot.cc +++ b/server/snapshot.cc @@ -33,9 +33,26 @@ SliceSnapshot::~SliceSnapshot() { void SliceSnapshot::Start(DbSlice* slice) { DCHECK(!fb_.joinable()); + db_slice_ = slice; - snapshot_version_ = - slice->RegisterOnChange([this](DbIndex index, const DbSlice::ChangeReq& req) {}); + auto on_change = [this, slice](DbIndex db_index, const DbSlice::ChangeReq& req) { + PrimeTable* table = slice->GetTables(db_index).first; + + if (const MainIterator* it = get_if(&req)) { + if (it->GetVersion() < snapshot_version_) { + side_saved_ += SerializePhysicalBucket(table, *it); + } + } else { + string_view key = get(req); + table->CVCUponInsert(key, [this, table](PrimeTable::const_iterator it) { + if (it.MinVersion() < snapshot_version_) { + side_saved_ += SerializePhysicalBucket(table, it); + } + }); + } + }; + + snapshot_version_ = slice->RegisterOnChange(move(on_change)); VLOG(1) << "DbSaver::Start - saving entries with version less than " << snapshot_version_; sfile_.reset(new io::StringFile); @@ -53,7 +70,7 @@ void SliceSnapshot::Join() { static_assert(sizeof(PrimeTable::const_iterator) == 16); -void SliceSnapshot::SerializeCb(MainIterator it) { +void SliceSnapshot::SerializeSingleEntry(MainIterator it) { error_code ec; string tmp; @@ -163,23 +180,28 @@ bool SliceSnapshot::SaveCb(MainIterator it) { } physical_mask_.set(it.bucket_id()); - - // Both traversals below execute atomically. - // traverse physical bucket and write into string file. - prime_table_->TraverseBucket(it, [this](auto entry_it) { this->SerializeCb(move(entry_it)); }); - - // Theoretically we could merge version_cb into the traversal above but then would would need - // to give up on DCHECK. - auto version_cb = [this](MainIterator entry_it) { - DCHECK_LE(entry_it.GetVersion(), snapshot_version_); - DVLOG(3) << "Bumping up version " << entry_it.bucket_id() << ":" << entry_it.slot_id(); - - entry_it.SetVersion(snapshot_version_); - }; - - prime_table_->TraverseBucket(it, version_cb); + SerializePhysicalBucket(prime_table_, it); return false; } +unsigned SliceSnapshot::SerializePhysicalBucket(PrimeTable* table, PrimeTable::const_iterator it) { + // Both traversals below execute atomically. + // traverse physical bucket and write into string file. + unsigned result = 0; + table->TraverseBucket(it, [&](auto entry_it) { + ++result; + SerializeSingleEntry(move(entry_it)); + }); + + table->TraverseBucket(it, [this](MainIterator entry_it) { + DCHECK_LE(entry_it.GetVersion(), snapshot_version_); + DVLOG(3) << "Bumping up version " << entry_it.bucket_id() << ":" << entry_it.slot_id(); + + entry_it.SetVersion(snapshot_version_); + }); + + return result; +} + } // namespace dfly diff --git a/server/snapshot.h b/server/snapshot.h index e0f0ae6..57be594 100644 --- a/server/snapshot.h +++ b/server/snapshot.h @@ -33,9 +33,12 @@ class SliceSnapshot { private: void FiberFunc(); bool FlushSfile(bool force); - void SerializeCb(MainIterator it); + void SerializeSingleEntry(MainIterator it); bool SaveCb(MainIterator it); + // Returns number of entries serialized. + unsigned SerializePhysicalBucket(PrimeTable* table, PrimeTable::const_iterator it); + ::boost::fibers::fiber fb_; enum {PHYSICAL_LEN = 128}; @@ -47,9 +50,10 @@ class SliceSnapshot { // version upper bound for entries that should be saved (not included). uint64_t snapshot_version_ = 0; PrimeTable* prime_table_; + DbSlice* db_slice_ = nullptr; StringChannel* dest_; - size_t serialized_ = 0, skipped_ = 0; + size_t serialized_ = 0, skipped_ = 0, side_saved_ = 0; }; } // namespace dfly