diff --git a/src/server/list_family.cc b/src/server/list_family.cc index fa2b030..a3937cf 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -262,13 +262,15 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) { OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) { if (shard->shard_id() == ff_result_.sid) { ff_result_.key.GetString(&key_); - - auto it_res = shard->db_slice().Find(t->db_index(), key_, OBJ_LIST); + auto& db_slice = shard->db_slice(); + auto it_res = db_slice.Find(t->db_index(), key_, OBJ_LIST); CHECK(it_res); // must exist and must be ok. PrimeIterator it = *it_res; quicklist* ql = GetQL(it->second); - value_ = ListPop(dir_, ql); + db_slice.PreUpdate(t->db_index(), it); + value_ = ListPop(dir_, ql); + db_slice.PostUpdate(t->db_index(), it); if (quicklistCount(ql) == 0) { CHECK(shard->db_slice().Del(t->db_index(), it)); } @@ -315,8 +317,10 @@ OpResult OpRPopLPushSingleShard(const OpArgs& op_args, string_view src, } db_slice.PreUpdate(op_args.db_ind, src_it); + string val = ListPop(ListDir::RIGHT, src_ql); quicklistPushHead(dest_ql, val.data(), val.size()); + db_slice.PostUpdate(op_args.db_ind, src_it); db_slice.PostUpdate(op_args.db_ind, dest_it); @@ -327,11 +331,14 @@ OpResult OpRPopLPushSingleShard(const OpArgs& op_args, string_view src, return val; } +// Read-only peek operation that determines wether the list exists and optionally +// returns the first from right value without popping it from the list. OpResult RPeek(const OpArgs& op_args, string_view key, bool fetch) { auto it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST); if (!it_res) { return it_res.status(); } + if (!fetch) return OpStatus::OK; @@ -837,43 +844,49 @@ OpResult ListFamily::OpIndex(const OpArgs& op_args, std::string_view key OpResult ListFamily::OpInsert(const OpArgs& op_args, string_view key, string_view pivot, string_view elem, int insert_param) { - auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST); - if (!res) - return res.status(); + auto& db_slice = op_args.shard->db_slice(); + auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST); + if (!it_res) + return it_res.status(); - quicklist* ql = GetQL(res.value()->second); + quicklist* ql = GetQL(it_res.value()->second); quicklistEntry entry = QLEntry(); quicklistIter* qiter = quicklistGetIterator(ql, AL_START_HEAD); bool found = false; while (quicklistNext(qiter, &entry)) { if (ElemCompare(entry, pivot)) { - if (insert_param == LIST_TAIL) { - quicklistInsertAfter(qiter, &entry, elem.data(), elem.size()); - } else { - DCHECK_EQ(LIST_HEAD, insert_param); - - quicklistInsertBefore(qiter, &entry, elem.data(), elem.size()); - } found = true; break; } } - quicklistReleaseIterator(qiter); + int res = -1; if (found) { - return quicklistCount(ql); + db_slice.PreUpdate(op_args.db_ind, *it_res); + if (insert_param == LIST_TAIL) { + quicklistInsertAfter(qiter, &entry, elem.data(), elem.size()); + } else { + DCHECK_EQ(LIST_HEAD, insert_param); + quicklistInsertBefore(qiter, &entry, elem.data(), elem.size()); + } + db_slice.PostUpdate(op_args.db_ind, *it_res); + res = quicklistCount(ql); } - return -1; + quicklistReleaseIterator(qiter); + return res; } OpResult ListFamily::OpRem(const OpArgs& op_args, string_view key, string_view elem, long count) { DCHECK(!elem.empty()); - auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST); - if (!res) - return res.status(); - quicklist* ql = GetQL(res.value()->second); + auto& db_slice = op_args.shard->db_slice(); + auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST); + if (!it_res) + return it_res.status(); + + PrimeIterator it = *it_res; + quicklist* ql = GetQL(it->second); int iter_direction = AL_START_HEAD; long long index = 0; @@ -887,6 +900,8 @@ OpResult ListFamily::OpRem(const OpArgs& op_args, string_view key, str quicklistEntry entry; unsigned removed = 0; const uint8_t* elem_ptr = reinterpret_cast(elem.data()); + + db_slice.PreUpdate(op_args.db_ind, it); while (quicklistNext(qiter, &entry)) { if (quicklistCompare(&entry, elem_ptr, elem.size())) { quicklistDelEntry(qiter, &entry); @@ -895,10 +910,12 @@ OpResult ListFamily::OpRem(const OpArgs& op_args, string_view key, str break; } } + db_slice.PostUpdate(op_args.db_ind, it); + quicklistReleaseIterator(qiter); if (quicklistCount(ql) == 0) { - CHECK(op_args.shard->db_slice().Del(op_args.db_ind, res.value())); + CHECK(db_slice.Del(op_args.db_ind, it)); } return removed; @@ -906,12 +923,18 @@ OpResult ListFamily::OpRem(const OpArgs& op_args, string_view key, str OpStatus ListFamily::OpSet(const OpArgs& op_args, string_view key, string_view elem, long index) { DCHECK(!elem.empty()); - auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST); - if (!res) - return res.status(); - quicklist* ql = GetQL(res.value()->second); + auto& db_slice = op_args.shard->db_slice(); + auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST); + if (!it_res) + return it_res.status(); + PrimeIterator it = *it_res; + quicklist* ql = GetQL(it->second); + + db_slice.PreUpdate(op_args.db_ind, it); int replaced = quicklistReplaceAtIndex(ql, index, elem.data(), elem.size()); + db_slice.PostUpdate(op_args.db_ind, it); + if (!replaced) { return OpStatus::OUT_OF_RANGE; } @@ -919,10 +942,13 @@ OpStatus ListFamily::OpSet(const OpArgs& op_args, string_view key, string_view e } OpStatus ListFamily::OpTrim(const OpArgs& op_args, string_view key, long start, long end) { - auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST); - if (!res) - return res.status(); - quicklist* ql = GetQL(res.value()->second); + auto& db_slice = op_args.shard->db_slice(); + auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST); + if (!it_res) + return it_res.status(); + + PrimeIterator it = *it_res; + quicklist* ql = GetQL(it->second); long llen = quicklistCount(ql); /* convert negative indexes */ @@ -947,11 +973,14 @@ OpStatus ListFamily::OpTrim(const OpArgs& op_args, string_view key, long start, ltrim = start; rtrim = llen - end - 1; } + + db_slice.PreUpdate(op_args.db_ind, it); quicklistDelRange(ql, 0, ltrim); quicklistDelRange(ql, -rtrim, rtrim); + db_slice.PostUpdate(op_args.db_ind, it); if (quicklistCount(ql) == 0) { - CHECK(op_args.shard->db_slice().Del(op_args.db_ind, res.value())); + CHECK(db_slice.Del(op_args.db_ind, it)); } return OpStatus::OK; }