Hook snapshot with db_slice so that it would be possible to maintain snapshot isolation during concurrent writes

This commit is contained in:
Roman Gershman 2022-01-24 21:42:22 +02:00
parent f119e66199
commit a8a05949b0
5 changed files with 69 additions and 7 deletions

View File

@ -140,6 +140,21 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pair<MainIterator,
auto& db = db_arr_[db_index];
// If we have some registered onchange callbacks, we must know in advance whether its Find or Add.
if (!change_cb_.empty()) {
auto it = FindExt(db_index, key).first;
if (IsValid(it)) {
return make_pair(it, true);
}
// It's a new entry.
for (const auto& ccb : change_cb_) {
ccb.second(db_index, ChangeReq{key});
}
}
// Fast-path - change_cb_ is empty so we Find or Add using
// the insert operation: twice more efficient.
CompactObj co_key{key};
auto [it, inserted] = db->main_table.Insert(std::move(co_key), PrimeValue{});
if (inserted) { // new entry
@ -252,6 +267,10 @@ bool DbSlice::Expire(DbIndex db_ind, MainIterator it, uint64_t at) {
}
void DbSlice::AddNew(DbIndex db_ind, string_view key, PrimeValue obj, uint64_t expire_at_ms) {
for (const auto& ccb : change_cb_) {
ccb.second(db_ind, ChangeReq{key});
}
CHECK(AddIfNotExist(db_ind, key, std::move(obj), expire_at_ms));
}
@ -364,7 +383,9 @@ bool DbSlice::CheckLock(IntentLock::Mode mode, const KeyLockArgs& lock_args) con
void DbSlice::PreUpdate(DbIndex db_ind, MainIterator it) {
auto& db = db_arr_[db_ind];
for (const auto& ccb : change_cb_) {
ccb.second(db_ind, ChangeReq{it});
}
db->stats.obj_memory_usage -= it->second.MallocUsed();
it.SetVersion(NextVersion());
}
@ -392,4 +413,21 @@ pair<MainIterator, ExpireIterator> DbSlice::ExpireIfNeeded(DbIndex db_ind, MainI
return make_pair(MainIterator{}, ExpireIterator{});
}
uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) {
uint64_t ver = NextVersion();
change_cb_.emplace_back(ver, std::move(cb));
return ver;
}
//! Unregisters the callback.
void DbSlice::UnregisterOnChange(uint64_t id) {
for (auto it = change_cb_.begin(); it != change_cb_.end(); ++it) {
if (it->first == id) {
change_cb_.erase(it);
return;
}
}
LOG(DFATAL) << "Could not find " << id << " to unregister";
}
} // namespace dfly

View File

@ -163,6 +163,21 @@ class DbSlice {
// We maintain a shared versioning scheme for all databases in the slice.
uint64_t version() const { return version_; }
// ChangeReq - describes the change to the table. If MainIterator is defined then
// it's an update on the existing entry, otherwise if string_view is defined then
// it's a new key that is going to be added to the table.
using ChangeReq = std::variant<MainIterator, std::string_view>;
using ChangeCallback = std::function<void(DbIndex, const ChangeReq&)>;
//! Registers the callback to be called for each change.
//! Returns the registration id which is also the unique version of the dbslice
//! at a time of the call.
uint64_t RegisterOnChange(ChangeCallback cb);
//! Unregisters the callback.
void UnregisterOnChange(uint64_t id);
private:
void CreateDb(DbIndex index);
@ -192,6 +207,8 @@ class DbSlice {
// Used in temporary computations in Acquire/Release.
absl::flat_hash_set<std::string_view> uniq_keys_;
std::vector<std::pair<uint64_t, ChangeCallback>> change_cb_;
};
} // namespace dfly

View File

@ -421,7 +421,7 @@ void RdbSaver::StartSnapshotInShard(EngineShard* shard) {
auto pair = shard->db_slice().GetTables(0);
auto s = make_unique<SliceSnapshot>(pair.first, pair.second, &impl_->channel);
s->Start(shard->db_slice().version());
s->Start(&shard->db_slice());
impl_->shard_snapshots[shard->shard_id()] = move(s);
}

View File

@ -11,6 +11,7 @@ extern "C" {
#include <absl/strings/str_cat.h>
#include "base/logging.h"
#include "server/db_slice.h"
#include "server/rdb_save.h"
#include "util/fiber_sched_algo.h"
#include "util/proactor_base.h"
@ -30,15 +31,20 @@ SliceSnapshot::SliceSnapshot(PrimeTable* prime, ExpireTable* et, StringChannel*
SliceSnapshot::~SliceSnapshot() {
}
void SliceSnapshot::Start(uint64_t version) {
void SliceSnapshot::Start(DbSlice* slice) {
DCHECK(!fb_.joinable());
VLOG(1) << "DbSaver::Start - saving entries with version less than " << version;
snapshot_version_ =
slice->RegisterOnChange([this](DbIndex index, const DbSlice::ChangeReq& req) {});
VLOG(1) << "DbSaver::Start - saving entries with version less than " << snapshot_version_;
sfile_.reset(new io::StringFile);
rdb_serializer_.reset(new RdbSerializer(sfile_.get()));
snapshot_version_ = version;
fb_ = fiber([this] { FiberFunc(); });
fb_ = fiber([slice, this] {
FiberFunc();
slice->UnregisterOnChange(snapshot_version_);
});
}
void SliceSnapshot::Join() {

View File

@ -13,6 +13,7 @@
namespace dfly {
class RdbSerializer;
class DbSlice;
class SliceSnapshot {
public:
@ -22,7 +23,7 @@ class SliceSnapshot {
SliceSnapshot(PrimeTable* prime, ExpireTable* et, StringChannel* dest);
~SliceSnapshot();
void Start(uint64_t version);
void Start(DbSlice* slice);
void Join();
uint64_t snapshot_version() const {