From 1481cb9d5739d9309f944a50d20bf3bb82c974db Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 28 Apr 2022 20:16:56 +0300 Subject: [PATCH] Finish RPOPLPUSH --- src/server/list_family.cc | 334 +++++++++++++++++++++++--------------- src/server/list_family.h | 8 - 2 files changed, 201 insertions(+), 141 deletions(-) diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 261d891..fa2b030 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -191,7 +191,6 @@ class BPopper { // If OK is returned then use result() to fetch the value. OpStatus Run(Transaction* t, unsigned msec); - // returns (key, value) pair. auto result() const { return make_pair(key_, value_); @@ -265,7 +264,7 @@ OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) { ff_result_.key.GetString(&key_); auto it_res = shard->db_slice().Find(t->db_index(), key_, OBJ_LIST); - CHECK(it_res); // must exist and must be ok. + CHECK(it_res); // must exist and must be ok. PrimeIterator it = *it_res; quicklist* ql = GetQL(it->second); value_ = ListPop(dir_, ql); @@ -278,6 +277,161 @@ OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) { return OpStatus::OK; } +OpResult OpRPopLPushSingleShard(const OpArgs& op_args, string_view src, string_view dest) { + auto& db_slice = op_args.shard->db_slice(); + auto src_res = db_slice.Find(op_args.db_ind, src, OBJ_LIST); + if (!src_res) + return src_res.status(); + + PrimeIterator src_it = *src_res; + quicklist* src_ql = GetQL(src_it->second); + + if (src == dest) { // simple case. + db_slice.PreUpdate(op_args.db_ind, src_it); + string val = ListPop(ListDir::RIGHT, src_ql); + + quicklistPushHead(src_ql, val.data(), val.size()); + db_slice.PostUpdate(op_args.db_ind, src_it); + + return val; + } + + quicklist* dest_ql = nullptr; + auto [dest_it, created] = db_slice.AddOrFind(op_args.db_ind, dest); + if (created) { + robj* obj = createQuicklistObject(); + dest_ql = (quicklist*)obj->ptr; + quicklistSetOptions(dest_ql, FLAGS_list_max_listpack_size, FLAGS_list_compress_depth); + dest_it->second.ImportRObj(obj); + + // Insertion of dest could invalidate src_it. Find it again. + src_it = db_slice.GetTables(op_args.db_ind).first->Find(src); + } else { + if (dest_it->second.ObjType() != OBJ_LIST) + return OpStatus::WRONG_TYPE; + + dest_ql = GetQL(dest_it->second); + db_slice.PreUpdate(op_args.db_ind, dest_it); + } + + db_slice.PreUpdate(op_args.db_ind, src_it); + string val = ListPop(ListDir::RIGHT, src_ql); + quicklistPushHead(dest_ql, val.data(), val.size()); + db_slice.PostUpdate(op_args.db_ind, src_it); + db_slice.PostUpdate(op_args.db_ind, dest_it); + + if (quicklistCount(src_ql) == 0) { + CHECK(db_slice.Del(op_args.db_ind, src_it)); + } + + return val; +} + +OpResult RPeek(const OpArgs& op_args, string_view key, bool fetch) { + auto it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST); + if (!it_res) { + return it_res.status(); + } + if (!fetch) + return OpStatus::OK; + + quicklist* ql = GetQL(it_res.value()->second); + quicklistEntry entry = QLEntry(); + quicklistIter* iter = quicklistGetIterator(ql, AL_START_TAIL); + CHECK(quicklistNext(iter, &entry)); + quicklistReleaseIterator(iter); + + if (entry.value) + return string(reinterpret_cast(entry.value), entry.sz); + else + return absl::StrCat(entry.longval); +} + +OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir dir, + bool skip_notexist, absl::Span vals) { + EngineShard* es = op_args.shard; + PrimeIterator it; + bool new_key = false; + + if (skip_notexist) { + auto it_res = es->db_slice().Find(op_args.db_ind, key, OBJ_LIST); + if (!it_res) + return it_res.status(); + it = *it_res; + } else { + tie(it, new_key) = es->db_slice().AddOrFind(op_args.db_ind, key); + } + quicklist* ql = nullptr; + + if (new_key) { + robj* o = createQuicklistObject(); + ql = (quicklist*)o->ptr; + quicklistSetOptions(ql, FLAGS_list_max_listpack_size, FLAGS_list_compress_depth); + it->second.ImportRObj(o); + } else { + if (it->second.ObjType() != OBJ_LIST) + return OpStatus::WRONG_TYPE; + es->db_slice().PreUpdate(op_args.db_ind, it); + ql = GetQL(it->second); + } + + // Left push is LIST_HEAD. + int pos = (dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL; + + for (auto v : vals) { + es->tmp_str1 = sdscpylen(es->tmp_str1, v.data(), v.size()); + quicklistPush(ql, es->tmp_str1, sdslen(es->tmp_str1), pos); + } + + if (new_key) { + if (es->blocking_controller()) { + string tmp; + string_view key = it->first.GetSlice(&tmp); + es->blocking_controller()->AwakeWatched(op_args.db_ind, key); + } + } else { + es->db_slice().PostUpdate(op_args.db_ind, it); + } + + return quicklistCount(ql); +} + +OpResult OpPop(const OpArgs& op_args, string_view key, ListDir dir, uint32_t count, + bool return_results) { + auto& db_slice = op_args.shard->db_slice(); + OpResult it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST); + if (!it_res) + return it_res.status(); + + PrimeIterator it = *it_res; + quicklist* ql = GetQL(it->second); + db_slice.PreUpdate(op_args.db_ind, it); + + StringVec res; + if (quicklistCount(ql) < count) { + count = quicklistCount(ql); + } + res.reserve(count); + + if (return_results) { + for (unsigned i = 0; i < count; ++i) { + res.push_back(ListPop(dir, ql)); + } + } else { + for (unsigned i = 0; i < count; ++i) { + ListPop(dir, ql); + } + } + + db_slice.PostUpdate(op_args.db_ind, it); + + if (quicklistCount(ql) == 0) { + CHECK(db_slice.Del(op_args.db_ind, it)); + } + + return res; +} + } // namespace void ListFamily::LPush(CmdArgList args, ConnectionContext* cntx) { @@ -317,7 +471,50 @@ void ListFamily::RPopLPush(CmdArgList args, ConnectionContext* cntx) { result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); } else { - return (*cntx)->SendError("tbd: not_implemented"); + CHECK_EQ(2u, cntx->transaction->unique_shard_cnt()); + + OpResult find_res[2]; + + // Transaction is comprised of 2 hops: + // 1 - check for entries existence, their types and if possible - + // read the value we may rpop from the source list. + // 2. If everything is ok, rpop from source and lpush the peeked value into + // the destination. + // + cntx->transaction->Schedule(); + auto cb = [&](Transaction* t, EngineShard* shard) { + auto args = t->ShardArgsInShard(shard->shard_id()); + DCHECK_EQ(1u, args.size()); + bool is_dest = args.front() == dest; + find_res[is_dest] = RPeek(OpArgs{shard, t->db_index()}, args.front(), !is_dest); + return OpStatus::OK; + }; + + cntx->transaction->Execute(move(cb), false); + + if (!find_res[0] || find_res[1].status() == OpStatus::WRONG_TYPE) { + auto cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; + cntx->transaction->Execute(move(cb), true); + result = find_res[0] ? find_res[1] : find_res[0]; + } else { + // Everything is ok, lets proceed with the mutations. + auto cb = [&](Transaction* t, EngineShard* shard) { + auto args = t->ShardArgsInShard(shard->shard_id()); + bool is_dest = args.front() == dest; + OpArgs op_args{shard, t->db_index()}; + + if (is_dest) { + string_view val{find_res[0].value()}; + absl::Span span{&val, 1}; + OpPush(op_args, args.front(), ListDir::LEFT, false, span); + } else { + OpPop(op_args, args.front(), ListDir::RIGHT, 1, false); + } + return OpStatus::OK; + }; + cntx->transaction->Execute(move(cb), true); + result = std::move(find_res[0].value()); + } } if (result) { @@ -577,7 +774,7 @@ void ListFamily::PopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cnt } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpPop(OpArgs{shard, t->db_index()}, key, dir, count); + return OpPop(OpArgs{shard, t->db_index()}, key, dir, count, true); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -605,84 +802,6 @@ void ListFamily::PopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cnt } } -OpResult ListFamily::OpPush(const OpArgs& op_args, std::string_view key, ListDir dir, - bool skip_notexist, absl::Span vals) { - EngineShard* es = op_args.shard; - PrimeIterator it; - bool new_key = false; - - if (skip_notexist) { - auto it_res = es->db_slice().Find(op_args.db_ind, key, OBJ_LIST); - if (!it_res) - return it_res.status(); - it = *it_res; - } else { - tie(it, new_key) = es->db_slice().AddOrFind(op_args.db_ind, key); - } - quicklist* ql = nullptr; - - if (new_key) { - robj* o = createQuicklistObject(); - ql = (quicklist*)o->ptr; - quicklistSetOptions(ql, FLAGS_list_max_listpack_size, FLAGS_list_compress_depth); - it->second.ImportRObj(o); - } else { - if (it->second.ObjType() != OBJ_LIST) - return OpStatus::WRONG_TYPE; - es->db_slice().PreUpdate(op_args.db_ind, it); - ql = GetQL(it->second); - } - - // Left push is LIST_HEAD. - int pos = (dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL; - - for (auto v : vals) { - es->tmp_str1 = sdscpylen(es->tmp_str1, v.data(), v.size()); - quicklistPush(ql, es->tmp_str1, sdslen(es->tmp_str1), pos); - } - - if (new_key) { - if (es->blocking_controller()) { - string tmp; - string_view key = it->first.GetSlice(&tmp); - es->blocking_controller()->AwakeWatched(op_args.db_ind, key); - } - } else { - es->db_slice().PostUpdate(op_args.db_ind, it); - } - - return quicklistCount(ql); -} - -OpResult ListFamily::OpPop(const OpArgs& op_args, string_view key, ListDir dir, - uint32_t count) { - auto& db_slice = op_args.shard->db_slice(); - OpResult it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST); - if (!it_res) - return it_res.status(); - - PrimeIterator it = *it_res; - quicklist* ql = GetQL(it->second); - db_slice.PreUpdate(op_args.db_ind, it); - - StringVec res; - if (quicklistCount(ql) < count) { - count = quicklistCount(ql); - } - res.reserve(count); - - for (unsigned i = 0; i < count; ++i) { - res.push_back(ListPop(dir, ql)); - } - db_slice.PostUpdate(op_args.db_ind, it); - - if (quicklistCount(ql) == 0) { - CHECK(db_slice.Del(op_args.db_ind, it)); - } - - return res; -} - OpResult ListFamily::OpLen(const OpArgs& op_args, std::string_view key) { auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST); if (!res) @@ -882,57 +1001,6 @@ OpResult ListFamily::OpRange(const OpArgs& op_args, std::string_view return str_vec; } -OpResult ListFamily::OpRPopLPushSingleShard(const OpArgs& op_args, string_view src, - string_view dest) { - auto& db_slice = op_args.shard->db_slice(); - auto src_res = db_slice.Find(op_args.db_ind, src, OBJ_LIST); - if (!src_res) - return src_res.status(); - - PrimeIterator src_it = *src_res; - quicklist* src_ql = GetQL(src_it->second); - - if (src == dest) { // simple case. - db_slice.PreUpdate(op_args.db_ind, src_it); - string val = ListPop(ListDir::RIGHT, src_ql); - - quicklistPushHead(src_ql, val.data(), val.size()); - db_slice.PostUpdate(op_args.db_ind, src_it); - - return val; - } - - quicklist* dest_ql = nullptr; - auto [dest_it, created] = db_slice.AddOrFind(op_args.db_ind, dest); - if (created) { - robj* obj = createQuicklistObject(); - dest_ql = (quicklist*)obj->ptr; - quicklistSetOptions(dest_ql, FLAGS_list_max_listpack_size, FLAGS_list_compress_depth); - dest_it->second.ImportRObj(obj); - - // Insertion of dest could invalidate src_it. Find it again. - src_it = db_slice.GetTables(op_args.db_ind).first->Find(src); - } else { - if (dest_it->second.ObjType() != OBJ_LIST) - return OpStatus::WRONG_TYPE; - - dest_ql = GetQL(dest_it->second); - db_slice.PreUpdate(op_args.db_ind, dest_it); - } - - db_slice.PreUpdate(op_args.db_ind, src_it); - string val = ListPop(ListDir::RIGHT, src_ql); - quicklistPushHead(dest_ql, val.data(), val.size()); - db_slice.PostUpdate(op_args.db_ind, src_it); - db_slice.PostUpdate(op_args.db_ind, dest_it); - - if (quicklistCount(src_ql) == 0) { - CHECK(db_slice.Del(op_args.db_ind, src_it)); - } - - return val; -} - using CI = CommandId; #define HFUNC(x) SetHandler(&ListFamily::x) diff --git a/src/server/list_family.h b/src/server/list_family.h index c32f75a..87778c0 100644 --- a/src/server/list_family.h +++ b/src/server/list_family.h @@ -43,12 +43,6 @@ class ListFamily { static void BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cntx); - static OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir dir, - bool skip_notexist, absl::Span vals); - - static OpResult OpPop(const OpArgs& op_args, std::string_view key, ListDir dir, - uint32_t count); - static OpResult OpLen(const OpArgs& op_args, std::string_view key); static OpResult OpIndex(const OpArgs& op_args, std::string_view key, long index); static OpResult OpInsert(const OpArgs& op_args, std::string_view key, std::string_view pivot, @@ -63,8 +57,6 @@ class ListFamily { static OpResult OpRange(const OpArgs& op_args, std::string_view key, long start, long end); - static OpResult OpRPopLPushSingleShard(const OpArgs& op_args, std::string_view src, - std::string_view dest); }; } // namespace dfly