Add misssing update hooks for list family

This commit is contained in:
Roman Gershman 2022-04-29 20:48:25 +03:00
parent 362f1f4ec4
commit 0582c5750a
1 changed files with 60 additions and 31 deletions

View File

@ -262,13 +262,15 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) {
OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) {
if (shard->shard_id() == ff_result_.sid) {
ff_result_.key.GetString(&key_);
auto it_res = shard->db_slice().Find(t->db_index(), key_, OBJ_LIST);
auto& db_slice = shard->db_slice();
auto it_res = db_slice.Find(t->db_index(), key_, OBJ_LIST);
CHECK(it_res); // must exist and must be ok.
PrimeIterator it = *it_res;
quicklist* ql = GetQL(it->second);
value_ = ListPop(dir_, ql);
db_slice.PreUpdate(t->db_index(), it);
value_ = ListPop(dir_, ql);
db_slice.PostUpdate(t->db_index(), it);
if (quicklistCount(ql) == 0) {
CHECK(shard->db_slice().Del(t->db_index(), it));
}
@ -315,8 +317,10 @@ OpResult<string> OpRPopLPushSingleShard(const OpArgs& op_args, string_view src,
}
db_slice.PreUpdate(op_args.db_ind, src_it);
string val = ListPop(ListDir::RIGHT, src_ql);
quicklistPushHead(dest_ql, val.data(), val.size());
db_slice.PostUpdate(op_args.db_ind, src_it);
db_slice.PostUpdate(op_args.db_ind, dest_it);
@ -327,11 +331,14 @@ OpResult<string> OpRPopLPushSingleShard(const OpArgs& op_args, string_view src,
return val;
}
// Read-only peek operation that determines wether the list exists and optionally
// returns the first from right value without popping it from the list.
OpResult<string> RPeek(const OpArgs& op_args, string_view key, bool fetch) {
auto it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST);
if (!it_res) {
return it_res.status();
}
if (!fetch)
return OpStatus::OK;
@ -837,43 +844,49 @@ OpResult<string> ListFamily::OpIndex(const OpArgs& op_args, std::string_view key
OpResult<int> ListFamily::OpInsert(const OpArgs& op_args, string_view key, string_view pivot,
string_view elem, int insert_param) {
auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST);
if (!res)
return res.status();
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST);
if (!it_res)
return it_res.status();
quicklist* ql = GetQL(res.value()->second);
quicklist* ql = GetQL(it_res.value()->second);
quicklistEntry entry = QLEntry();
quicklistIter* qiter = quicklistGetIterator(ql, AL_START_HEAD);
bool found = false;
while (quicklistNext(qiter, &entry)) {
if (ElemCompare(entry, pivot)) {
if (insert_param == LIST_TAIL) {
quicklistInsertAfter(qiter, &entry, elem.data(), elem.size());
} else {
DCHECK_EQ(LIST_HEAD, insert_param);
quicklistInsertBefore(qiter, &entry, elem.data(), elem.size());
}
found = true;
break;
}
}
quicklistReleaseIterator(qiter);
int res = -1;
if (found) {
return quicklistCount(ql);
db_slice.PreUpdate(op_args.db_ind, *it_res);
if (insert_param == LIST_TAIL) {
quicklistInsertAfter(qiter, &entry, elem.data(), elem.size());
} else {
DCHECK_EQ(LIST_HEAD, insert_param);
quicklistInsertBefore(qiter, &entry, elem.data(), elem.size());
}
db_slice.PostUpdate(op_args.db_ind, *it_res);
res = quicklistCount(ql);
}
return -1;
quicklistReleaseIterator(qiter);
return res;
}
OpResult<uint32_t> ListFamily::OpRem(const OpArgs& op_args, string_view key, string_view elem,
long count) {
DCHECK(!elem.empty());
auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST);
if (!res)
return res.status();
quicklist* ql = GetQL(res.value()->second);
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST);
if (!it_res)
return it_res.status();
PrimeIterator it = *it_res;
quicklist* ql = GetQL(it->second);
int iter_direction = AL_START_HEAD;
long long index = 0;
@ -887,6 +900,8 @@ OpResult<uint32_t> ListFamily::OpRem(const OpArgs& op_args, string_view key, str
quicklistEntry entry;
unsigned removed = 0;
const uint8_t* elem_ptr = reinterpret_cast<const uint8_t*>(elem.data());
db_slice.PreUpdate(op_args.db_ind, it);
while (quicklistNext(qiter, &entry)) {
if (quicklistCompare(&entry, elem_ptr, elem.size())) {
quicklistDelEntry(qiter, &entry);
@ -895,10 +910,12 @@ OpResult<uint32_t> ListFamily::OpRem(const OpArgs& op_args, string_view key, str
break;
}
}
db_slice.PostUpdate(op_args.db_ind, it);
quicklistReleaseIterator(qiter);
if (quicklistCount(ql) == 0) {
CHECK(op_args.shard->db_slice().Del(op_args.db_ind, res.value()));
CHECK(db_slice.Del(op_args.db_ind, it));
}
return removed;
@ -906,12 +923,18 @@ OpResult<uint32_t> ListFamily::OpRem(const OpArgs& op_args, string_view key, str
OpStatus ListFamily::OpSet(const OpArgs& op_args, string_view key, string_view elem, long index) {
DCHECK(!elem.empty());
auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST);
if (!res)
return res.status();
quicklist* ql = GetQL(res.value()->second);
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST);
if (!it_res)
return it_res.status();
PrimeIterator it = *it_res;
quicklist* ql = GetQL(it->second);
db_slice.PreUpdate(op_args.db_ind, it);
int replaced = quicklistReplaceAtIndex(ql, index, elem.data(), elem.size());
db_slice.PostUpdate(op_args.db_ind, it);
if (!replaced) {
return OpStatus::OUT_OF_RANGE;
}
@ -919,10 +942,13 @@ OpStatus ListFamily::OpSet(const OpArgs& op_args, string_view key, string_view e
}
OpStatus ListFamily::OpTrim(const OpArgs& op_args, string_view key, long start, long end) {
auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST);
if (!res)
return res.status();
quicklist* ql = GetQL(res.value()->second);
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST);
if (!it_res)
return it_res.status();
PrimeIterator it = *it_res;
quicklist* ql = GetQL(it->second);
long llen = quicklistCount(ql);
/* convert negative indexes */
@ -947,11 +973,14 @@ OpStatus ListFamily::OpTrim(const OpArgs& op_args, string_view key, long start,
ltrim = start;
rtrim = llen - end - 1;
}
db_slice.PreUpdate(op_args.db_ind, it);
quicklistDelRange(ql, 0, ltrim);
quicklistDelRange(ql, -rtrim, rtrim);
db_slice.PostUpdate(op_args.db_ind, it);
if (quicklistCount(ql) == 0) {
CHECK(op_args.shard->db_slice().Del(op_args.db_ind, res.value()));
CHECK(db_slice.Del(op_args.db_ind, it));
}
return OpStatus::OK;
}