Finish RPOPLPUSH

This commit is contained in:
Roman Gershman 2022-04-28 20:16:56 +03:00
parent b36c16b314
commit 1481cb9d57
2 changed files with 201 additions and 141 deletions

View File

@ -191,7 +191,6 @@ class BPopper {
// If OK is returned then use result() to fetch the value.
OpStatus Run(Transaction* t, unsigned msec);
// returns (key, value) pair.
auto result() const {
return make_pair<string_view, string_view>(key_, value_);
@ -265,7 +264,7 @@ OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) {
ff_result_.key.GetString(&key_);
auto it_res = shard->db_slice().Find(t->db_index(), key_, OBJ_LIST);
CHECK(it_res); // must exist and must be ok.
CHECK(it_res); // must exist and must be ok.
PrimeIterator it = *it_res;
quicklist* ql = GetQL(it->second);
value_ = ListPop(dir_, ql);
@ -278,6 +277,161 @@ OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) {
return OpStatus::OK;
}
OpResult<string> OpRPopLPushSingleShard(const OpArgs& op_args, string_view src, string_view dest) {
auto& db_slice = op_args.shard->db_slice();
auto src_res = db_slice.Find(op_args.db_ind, src, OBJ_LIST);
if (!src_res)
return src_res.status();
PrimeIterator src_it = *src_res;
quicklist* src_ql = GetQL(src_it->second);
if (src == dest) { // simple case.
db_slice.PreUpdate(op_args.db_ind, src_it);
string val = ListPop(ListDir::RIGHT, src_ql);
quicklistPushHead(src_ql, val.data(), val.size());
db_slice.PostUpdate(op_args.db_ind, src_it);
return val;
}
quicklist* dest_ql = nullptr;
auto [dest_it, created] = db_slice.AddOrFind(op_args.db_ind, dest);
if (created) {
robj* obj = createQuicklistObject();
dest_ql = (quicklist*)obj->ptr;
quicklistSetOptions(dest_ql, FLAGS_list_max_listpack_size, FLAGS_list_compress_depth);
dest_it->second.ImportRObj(obj);
// Insertion of dest could invalidate src_it. Find it again.
src_it = db_slice.GetTables(op_args.db_ind).first->Find(src);
} else {
if (dest_it->second.ObjType() != OBJ_LIST)
return OpStatus::WRONG_TYPE;
dest_ql = GetQL(dest_it->second);
db_slice.PreUpdate(op_args.db_ind, dest_it);
}
db_slice.PreUpdate(op_args.db_ind, src_it);
string val = ListPop(ListDir::RIGHT, src_ql);
quicklistPushHead(dest_ql, val.data(), val.size());
db_slice.PostUpdate(op_args.db_ind, src_it);
db_slice.PostUpdate(op_args.db_ind, dest_it);
if (quicklistCount(src_ql) == 0) {
CHECK(db_slice.Del(op_args.db_ind, src_it));
}
return val;
}
OpResult<string> RPeek(const OpArgs& op_args, string_view key, bool fetch) {
auto it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST);
if (!it_res) {
return it_res.status();
}
if (!fetch)
return OpStatus::OK;
quicklist* ql = GetQL(it_res.value()->second);
quicklistEntry entry = QLEntry();
quicklistIter* iter = quicklistGetIterator(ql, AL_START_TAIL);
CHECK(quicklistNext(iter, &entry));
quicklistReleaseIterator(iter);
if (entry.value)
return string(reinterpret_cast<char*>(entry.value), entry.sz);
else
return absl::StrCat(entry.longval);
}
OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir dir,
bool skip_notexist, absl::Span<std::string_view> vals) {
EngineShard* es = op_args.shard;
PrimeIterator it;
bool new_key = false;
if (skip_notexist) {
auto it_res = es->db_slice().Find(op_args.db_ind, key, OBJ_LIST);
if (!it_res)
return it_res.status();
it = *it_res;
} else {
tie(it, new_key) = es->db_slice().AddOrFind(op_args.db_ind, key);
}
quicklist* ql = nullptr;
if (new_key) {
robj* o = createQuicklistObject();
ql = (quicklist*)o->ptr;
quicklistSetOptions(ql, FLAGS_list_max_listpack_size, FLAGS_list_compress_depth);
it->second.ImportRObj(o);
} else {
if (it->second.ObjType() != OBJ_LIST)
return OpStatus::WRONG_TYPE;
es->db_slice().PreUpdate(op_args.db_ind, it);
ql = GetQL(it->second);
}
// Left push is LIST_HEAD.
int pos = (dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
for (auto v : vals) {
es->tmp_str1 = sdscpylen(es->tmp_str1, v.data(), v.size());
quicklistPush(ql, es->tmp_str1, sdslen(es->tmp_str1), pos);
}
if (new_key) {
if (es->blocking_controller()) {
string tmp;
string_view key = it->first.GetSlice(&tmp);
es->blocking_controller()->AwakeWatched(op_args.db_ind, key);
}
} else {
es->db_slice().PostUpdate(op_args.db_ind, it);
}
return quicklistCount(ql);
}
OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, ListDir dir, uint32_t count,
bool return_results) {
auto& db_slice = op_args.shard->db_slice();
OpResult<PrimeIterator> it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST);
if (!it_res)
return it_res.status();
PrimeIterator it = *it_res;
quicklist* ql = GetQL(it->second);
db_slice.PreUpdate(op_args.db_ind, it);
StringVec res;
if (quicklistCount(ql) < count) {
count = quicklistCount(ql);
}
res.reserve(count);
if (return_results) {
for (unsigned i = 0; i < count; ++i) {
res.push_back(ListPop(dir, ql));
}
} else {
for (unsigned i = 0; i < count; ++i) {
ListPop(dir, ql);
}
}
db_slice.PostUpdate(op_args.db_ind, it);
if (quicklistCount(ql) == 0) {
CHECK(db_slice.Del(op_args.db_ind, it));
}
return res;
}
} // namespace
void ListFamily::LPush(CmdArgList args, ConnectionContext* cntx) {
@ -317,7 +471,50 @@ void ListFamily::RPopLPush(CmdArgList args, ConnectionContext* cntx) {
result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
} else {
return (*cntx)->SendError("tbd: not_implemented");
CHECK_EQ(2u, cntx->transaction->unique_shard_cnt());
OpResult<string> find_res[2];
// Transaction is comprised of 2 hops:
// 1 - check for entries existence, their types and if possible -
// read the value we may rpop from the source list.
// 2. If everything is ok, rpop from source and lpush the peeked value into
// the destination.
//
cntx->transaction->Schedule();
auto cb = [&](Transaction* t, EngineShard* shard) {
auto args = t->ShardArgsInShard(shard->shard_id());
DCHECK_EQ(1u, args.size());
bool is_dest = args.front() == dest;
find_res[is_dest] = RPeek(OpArgs{shard, t->db_index()}, args.front(), !is_dest);
return OpStatus::OK;
};
cntx->transaction->Execute(move(cb), false);
if (!find_res[0] || find_res[1].status() == OpStatus::WRONG_TYPE) {
auto cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
cntx->transaction->Execute(move(cb), true);
result = find_res[0] ? find_res[1] : find_res[0];
} else {
// Everything is ok, lets proceed with the mutations.
auto cb = [&](Transaction* t, EngineShard* shard) {
auto args = t->ShardArgsInShard(shard->shard_id());
bool is_dest = args.front() == dest;
OpArgs op_args{shard, t->db_index()};
if (is_dest) {
string_view val{find_res[0].value()};
absl::Span<string_view> span{&val, 1};
OpPush(op_args, args.front(), ListDir::LEFT, false, span);
} else {
OpPop(op_args, args.front(), ListDir::RIGHT, 1, false);
}
return OpStatus::OK;
};
cntx->transaction->Execute(move(cb), true);
result = std::move(find_res[0].value());
}
}
if (result) {
@ -577,7 +774,7 @@ void ListFamily::PopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cnt
}
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpPop(OpArgs{shard, t->db_index()}, key, dir, count);
return OpPop(OpArgs{shard, t->db_index()}, key, dir, count, true);
};
OpResult<StringVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
@ -605,84 +802,6 @@ void ListFamily::PopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cnt
}
}
OpResult<uint32_t> ListFamily::OpPush(const OpArgs& op_args, std::string_view key, ListDir dir,
bool skip_notexist, absl::Span<std::string_view> vals) {
EngineShard* es = op_args.shard;
PrimeIterator it;
bool new_key = false;
if (skip_notexist) {
auto it_res = es->db_slice().Find(op_args.db_ind, key, OBJ_LIST);
if (!it_res)
return it_res.status();
it = *it_res;
} else {
tie(it, new_key) = es->db_slice().AddOrFind(op_args.db_ind, key);
}
quicklist* ql = nullptr;
if (new_key) {
robj* o = createQuicklistObject();
ql = (quicklist*)o->ptr;
quicklistSetOptions(ql, FLAGS_list_max_listpack_size, FLAGS_list_compress_depth);
it->second.ImportRObj(o);
} else {
if (it->second.ObjType() != OBJ_LIST)
return OpStatus::WRONG_TYPE;
es->db_slice().PreUpdate(op_args.db_ind, it);
ql = GetQL(it->second);
}
// Left push is LIST_HEAD.
int pos = (dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
for (auto v : vals) {
es->tmp_str1 = sdscpylen(es->tmp_str1, v.data(), v.size());
quicklistPush(ql, es->tmp_str1, sdslen(es->tmp_str1), pos);
}
if (new_key) {
if (es->blocking_controller()) {
string tmp;
string_view key = it->first.GetSlice(&tmp);
es->blocking_controller()->AwakeWatched(op_args.db_ind, key);
}
} else {
es->db_slice().PostUpdate(op_args.db_ind, it);
}
return quicklistCount(ql);
}
OpResult<StringVec> ListFamily::OpPop(const OpArgs& op_args, string_view key, ListDir dir,
uint32_t count) {
auto& db_slice = op_args.shard->db_slice();
OpResult<PrimeIterator> it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST);
if (!it_res)
return it_res.status();
PrimeIterator it = *it_res;
quicklist* ql = GetQL(it->second);
db_slice.PreUpdate(op_args.db_ind, it);
StringVec res;
if (quicklistCount(ql) < count) {
count = quicklistCount(ql);
}
res.reserve(count);
for (unsigned i = 0; i < count; ++i) {
res.push_back(ListPop(dir, ql));
}
db_slice.PostUpdate(op_args.db_ind, it);
if (quicklistCount(ql) == 0) {
CHECK(db_slice.Del(op_args.db_ind, it));
}
return res;
}
OpResult<uint32_t> ListFamily::OpLen(const OpArgs& op_args, std::string_view key) {
auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST);
if (!res)
@ -882,57 +1001,6 @@ OpResult<StringVec> ListFamily::OpRange(const OpArgs& op_args, std::string_view
return str_vec;
}
OpResult<string> ListFamily::OpRPopLPushSingleShard(const OpArgs& op_args, string_view src,
string_view dest) {
auto& db_slice = op_args.shard->db_slice();
auto src_res = db_slice.Find(op_args.db_ind, src, OBJ_LIST);
if (!src_res)
return src_res.status();
PrimeIterator src_it = *src_res;
quicklist* src_ql = GetQL(src_it->second);
if (src == dest) { // simple case.
db_slice.PreUpdate(op_args.db_ind, src_it);
string val = ListPop(ListDir::RIGHT, src_ql);
quicklistPushHead(src_ql, val.data(), val.size());
db_slice.PostUpdate(op_args.db_ind, src_it);
return val;
}
quicklist* dest_ql = nullptr;
auto [dest_it, created] = db_slice.AddOrFind(op_args.db_ind, dest);
if (created) {
robj* obj = createQuicklistObject();
dest_ql = (quicklist*)obj->ptr;
quicklistSetOptions(dest_ql, FLAGS_list_max_listpack_size, FLAGS_list_compress_depth);
dest_it->second.ImportRObj(obj);
// Insertion of dest could invalidate src_it. Find it again.
src_it = db_slice.GetTables(op_args.db_ind).first->Find(src);
} else {
if (dest_it->second.ObjType() != OBJ_LIST)
return OpStatus::WRONG_TYPE;
dest_ql = GetQL(dest_it->second);
db_slice.PreUpdate(op_args.db_ind, dest_it);
}
db_slice.PreUpdate(op_args.db_ind, src_it);
string val = ListPop(ListDir::RIGHT, src_ql);
quicklistPushHead(dest_ql, val.data(), val.size());
db_slice.PostUpdate(op_args.db_ind, src_it);
db_slice.PostUpdate(op_args.db_ind, dest_it);
if (quicklistCount(src_ql) == 0) {
CHECK(db_slice.Del(op_args.db_ind, src_it));
}
return val;
}
using CI = CommandId;
#define HFUNC(x) SetHandler(&ListFamily::x)

View File

@ -43,12 +43,6 @@ class ListFamily {
static void BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cntx);
static OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir dir,
bool skip_notexist, absl::Span<std::string_view> vals);
static OpResult<StringVec> OpPop(const OpArgs& op_args, std::string_view key, ListDir dir,
uint32_t count);
static OpResult<uint32_t> OpLen(const OpArgs& op_args, std::string_view key);
static OpResult<std::string> OpIndex(const OpArgs& op_args, std::string_view key, long index);
static OpResult<int> OpInsert(const OpArgs& op_args, std::string_view key, std::string_view pivot,
@ -63,8 +57,6 @@ class ListFamily {
static OpResult<StringVec> OpRange(const OpArgs& op_args, std::string_view key, long start,
long end);
static OpResult<std::string> OpRPopLPushSingleShard(const OpArgs& op_args, std::string_view src,
std::string_view dest);
};
} // namespace dfly