Introduce dbslice versioning

This commit is contained in:
Roman Gershman 2022-01-22 22:09:01 +02:00
parent ce46ba7cb1
commit af1fe6e114
7 changed files with 82 additions and 29 deletions

View File

@ -120,6 +120,7 @@ pair<MainIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, string_view
if (expire_it->second <= now_ms_) {
db->expire_table.Erase(expire_it);
db->stats.inline_keys -= it->first.IsInline();
db->stats.obj_memory_usage -= (it->first.MallocUsed() + it->second.MallocUsed());
db->main_table.Erase(it);
return make_pair(MainIterator{}, ExpireIterator{});
@ -150,15 +151,16 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pair<MainIterator,
auto& db = db_arr_[db_index];
MainIterator existing;
CompactObj co_key{key};
pair<MainIterator, bool> res = db->main_table.Insert(std::move(co_key), PrimeValue{});
if (res.second) { // new entry
db->stats.obj_memory_usage += res.first->first.MallocUsed();
auto [it, inserted] = db->main_table.Insert(std::move(co_key), PrimeValue{});
if (inserted) { // new entry
db->stats.inline_keys += it->first.IsInline();
db->stats.obj_memory_usage += it->first.MallocUsed();
it.SetVersion(NextVersion());
return make_pair(res.first, true);
return make_pair(it, true);
}
existing = res.first;
auto& existing = it;
DCHECK(IsValid(existing));
@ -193,7 +195,7 @@ void DbSlice::CreateDb(DbIndex index) {
}
}
bool DbSlice::Del(DbIndex db_ind, const MainIterator& it) {
bool DbSlice::Del(DbIndex db_ind, MainIterator it) {
auto& db = db_arr_[db_ind];
if (!IsValid(it)) {
return false;
@ -203,6 +205,7 @@ bool DbSlice::Del(DbIndex db_ind, const MainIterator& it) {
CHECK_EQ(1u, db->expire_table.Erase(it->first));
}
db->stats.inline_keys -= it->first.IsInline();
db->stats.obj_memory_usage -= (it->first.MallocUsed() + it->second.MallocUsed());
db->main_table.Erase(it);
@ -218,7 +221,7 @@ size_t DbSlice::FlushDb(DbIndex db_ind) {
size_t removed = db->main_table.size();
db->main_table.Clear();
db->expire_table.Clear();
db->stats.inline_keys = 0;
db->stats.obj_memory_usage = 0;
return removed;
@ -274,6 +277,9 @@ bool DbSlice::AddIfNotExist(DbIndex db_ind, string_view key, PrimeValue obj,
if (!success)
return false; // in this case obj won't be moved and will be destroyed during unwinding.
new_entry.SetVersion(NextVersion());
db->stats.inline_keys += new_entry->first.IsInline();
db->stats.obj_memory_usage += (new_entry->first.MallocUsed() + new_entry->second.MallocUsed());
if (expire_at_ms) {
@ -367,4 +373,15 @@ bool DbSlice::CheckLock(IntentLock::Mode mode, const KeyLockArgs& lock_args) con
return true;
}
void DbSlice::PreUpdate(DbIndex db_ind, MainIterator it) {
auto& db = db_arr_[db_ind];
db->stats.obj_memory_usage -= it->second.MallocUsed();
it.SetVersion(NextVersion());
}
void DbSlice::PostUpdate(DbIndex db_ind, MainIterator it) {
auto& db = db_arr_[db_ind];
db->stats.obj_memory_usage += it->second.MallocUsed();
}
} // namespace dfly

View File

@ -112,7 +112,7 @@ class DbSlice {
// Creates a database with index `db_ind`. If such database exists does nothing.
void ActivateDb(DbIndex db_ind);
bool Del(DbIndex db_ind, const MainIterator& it);
bool Del(DbIndex db_ind, MainIterator it);
constexpr static DbIndex kDbAll = 0xFFFF;
@ -151,15 +151,27 @@ class DbSlice {
// Returns existing keys count in the db.
size_t DbSize(DbIndex db_ind) const;
// Callback functions called upon writing to the existing key.
void PreUpdate(DbIndex db_ind, MainIterator it);
void PostUpdate(DbIndex db_ind, MainIterator it);
// Current version of this slice.
// We maintain a shared versioning scheme for all databases in the slice.
uint64_t version() const { return version_; }
private:
void CreateDb(DbIndex index);
uint64_t NextVersion() {
return version_++;
}
ShardId shard_id_;
EngineShard* owner_;
uint64_t now_ms_ = 0; // Used for expire logic, represents a real clock.
uint64_t version_ = 1; // Used to version entries in the PrimeTable.
SliceEvents events_;
using LockTable = absl::flat_hash_map<std::string, IntentLock>;

View File

@ -60,8 +60,13 @@ using namespace std;
namespace {
quicklistEntry QLEntry() {
quicklistEntry res{.quicklist = NULL, .node = NULL, .zi = NULL, .value = NULL,
.longval = 0, .sz = 0, .offset = 0};
quicklistEntry res{.quicklist = NULL,
.node = NULL,
.zi = NULL,
.value = NULL,
.longval = 0,
.sz = 0,
.offset = 0};
return res;
}
@ -336,6 +341,7 @@ OpResult<uint32_t> ListFamily::OpPush(const OpArgs& op_args, std::string_view ke
} else {
if (it->second.ObjType() != OBJ_LIST)
return OpStatus::WRONG_TYPE;
es->db_slice().PreUpdate(op_args.db_ind, it);
ql = GetQL(it->second);
}
@ -352,7 +358,10 @@ OpResult<uint32_t> ListFamily::OpPush(const OpArgs& op_args, std::string_view ke
string tmp;
string_view key = it->first.GetSlice(&tmp);
es->AwakeWatched(op_args.db_ind, key);
} else {
es->db_slice().PostUpdate(op_args.db_ind, it);
}
return quicklistCount(ql);
}
@ -362,11 +371,17 @@ OpResult<string> ListFamily::OpPop(const OpArgs& op_args, string_view key, ListD
if (!it_res)
return it_res.status();
quicklist* ql = GetQL((*it_res)->second);
MainIterator it = *it_res;
quicklist* ql = GetQL(it->second);
db_slice.PreUpdate(op_args.db_ind, it);
string res = ListPop(dir, ql);
db_slice.PostUpdate(op_args.db_ind, it);
if (quicklistCount(ql) == 0) {
CHECK(db_slice.Del(op_args.db_ind, *it_res));
CHECK(db_slice.Del(op_args.db_ind, it));
}
return res;
}

View File

@ -285,7 +285,7 @@ error_code RdbSerializer::FlushMem() {
if (sz == 0)
return error_code{};
DVLOG(1) << "Write file " << sz << " bytes";
DVLOG(2) << "FlushMem " << sz << " bytes";
// interrupt point.
RETURN_ON_ERR(sink_->Write(mem_buf_.InputBuffer()));
@ -403,6 +403,7 @@ error_code RdbSaver::SaveBody() {
ivec[i].iov_len = vals[i].size();
}
RETURN_ON_ERR(sink_->Write(ivec.data(), ivec.size()));
num_written += vals.size();
vals.clear();
}
@ -421,7 +422,7 @@ void RdbSaver::StartSnapshotInShard(EngineShard* shard) {
auto pair = shard->db_slice().GetTables(0);
auto s = make_unique<RdbSnapshot>(pair.first, pair.second, &impl_->channel);
s->Start(666); // TODO: to introduce slice versioning.
s->Start(shard->db_slice().version());
impl_->handles[shard->shard_id()] = move(s);
}

View File

@ -142,34 +142,40 @@ void RdbSnapshot::FiberFunc() {
physical_list.clear();
// Flush if needed.
FlushSfile();
FlushSfile(false);
if (processed_ >= last_yield + 200) {
this_fiber::yield();
last_yield = processed_;
// flush in case other fibers (writes commands that pushed previous values) filled the file.
FlushSfile();
FlushSfile(false);
}
} while (cursor > 0);
auto ec = rdb_serializer_->FlushMem();
CHECK(!ec);
FlushSfile();
FlushSfile(true);
dest_->StartClosing();
VLOG(1) << "Exit RdbProducer fiber with " << processed_ << " processed";
}
void RdbSnapshot::FlushSfile() {
// Flush the string file if needed.
if (!sfile_->val.empty()) {
bool RdbSnapshot::FlushSfile(bool force) {
if (force) {
auto ec = rdb_serializer_->FlushMem();
CHECK(!ec);
}
if (sfile_->val.empty() || (!force && sfile_->val.size() < 4096))
return false;
if (!force) {
// Make sure we flush everything from membuffer in order to preserve the atomicity of keyvalue
// serializations.
auto ec = rdb_serializer_->FlushMem();
CHECK(!ec); // stringfile always succeeds.
string tmp = std::move(sfile_->val); // important to move before pushing!
dest_->Push(std::move(tmp));
}
string tmp = std::move(sfile_->val); // important to move before pushing!
dest_->Push(std::move(tmp));
return true;
}
} // namespace dfly

View File

@ -29,7 +29,7 @@ class RdbSnapshot {
private:
void FiberFunc();
void FlushSfile();
bool FlushSfile(bool force);
void PhysicalCb(MainIterator it);
::boost::fibers::fiber fb_;

View File

@ -78,8 +78,9 @@ OpResult<void> SetCmd::SetExisting(DbIndex db_ind, std::string_view value, uint6
} else {
db_slice_->Expire(db_ind, dest, expire_at_ms);
}
db_slice_->PreUpdate(db_ind, dest);
dest->second.SetString(value);
db_slice_->PostUpdate(db_ind, dest);
return OpStatus::OK;
}
@ -379,8 +380,9 @@ OpResult<int64_t> StringFamily::OpIncrBy(const OpArgs& op_args, std::string_view
}
int64_t new_val = prev + incr;
db_slice.PreUpdate(op_args.db_ind, it);
it->second.SetInt(new_val);
db_slice.PostUpdate(op_args.db_ind, it);
return new_val;
}