Support for COUNT argument in RPOP/LPOP

This commit is contained in:
Roman Gershman 2022-03-28 20:13:36 +03:00
parent efbdebaf94
commit 7595ff6236
2 changed files with 50 additions and 9 deletions

View File

@ -399,13 +399,30 @@ void ListFamily::PushGeneric(ListDir dir, bool skip_notexists, CmdArgList args,
} }
void ListFamily::PopGeneric(ListDir dir, const CmdArgList& args, ConnectionContext* cntx) { void ListFamily::PopGeneric(ListDir dir, const CmdArgList& args, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1); string_view key = ArgS(args, 1);
int32_t count = 1;
bool return_arr = false;
if (args.size() > 2) {
if (args.size() > 3)
return (*cntx)->SendError(kSyntaxErr);
string_view count_s = ArgS(args, 2);
if (!absl::SimpleAtoi(count_s, &count)) {
return (*cntx)->SendError(kInvalidIntErr);
}
if (count < 0) {
return (*cntx)->SendError(facade::kUintErr);
}
return_arr = true;
}
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
return OpPop(OpArgs{shard, t->db_index()}, key, dir); return OpPop(OpArgs{shard, t->db_index()}, key, dir, count);
}; };
OpResult<string> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); OpResult<StringVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
switch (result.status()) { switch (result.status()) {
case OpStatus::KEY_NOTFOUND: case OpStatus::KEY_NOTFOUND:
@ -415,7 +432,19 @@ void ListFamily::PopGeneric(ListDir dir, const CmdArgList& args, ConnectionConte
default:; default:;
} }
return (*cntx)->SendBulkString(result.value()); if (return_arr) {
if (result->empty()) {
(*cntx)->SendNullArray();
} else {
(*cntx)->StartArray(result->size());
for (const auto& k : *result) {
(*cntx)->SendBulkString(k);
}
}
} else {
DCHECK_EQ(1u, result->size());
(*cntx)->SendBulkString(result->front());
}
} }
OpResult<uint32_t> ListFamily::OpPush(const OpArgs& op_args, std::string_view key, ListDir dir, OpResult<uint32_t> ListFamily::OpPush(const OpArgs& op_args, std::string_view key, ListDir dir,
@ -466,7 +495,8 @@ OpResult<uint32_t> ListFamily::OpPush(const OpArgs& op_args, std::string_view ke
return quicklistCount(ql); return quicklistCount(ql);
} }
OpResult<string> ListFamily::OpPop(const OpArgs& op_args, string_view key, ListDir dir) { OpResult<StringVec> ListFamily::OpPop(const OpArgs& op_args, string_view key, ListDir dir,
uint32_t count) {
auto& db_slice = op_args.shard->db_slice(); auto& db_slice = op_args.shard->db_slice();
OpResult<MainIterator> it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST); OpResult<MainIterator> it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST);
if (!it_res) if (!it_res)
@ -476,7 +506,15 @@ OpResult<string> ListFamily::OpPop(const OpArgs& op_args, string_view key, ListD
quicklist* ql = GetQL(it->second); quicklist* ql = GetQL(it->second);
db_slice.PreUpdate(op_args.db_ind, it); db_slice.PreUpdate(op_args.db_ind, it);
string res = ListPop(dir, ql); StringVec res;
if (quicklistCount(ql) < count) {
count = quicklistCount(ql);
}
res.reserve(count);
for (unsigned i = 0; i < count; ++i) {
res.push_back(ListPop(dir, ql));
}
db_slice.PostUpdate(op_args.db_ind, it); db_slice.PostUpdate(op_args.db_ind, it);
if (quicklistCount(ql) == 0) { if (quicklistCount(ql) == 0) {
@ -661,10 +699,10 @@ using CI = CommandId;
void ListFamily::Register(CommandRegistry* registry) { void ListFamily::Register(CommandRegistry* registry) {
*registry << CI{"LPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(LPush) *registry << CI{"LPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(LPush)
<< CI{"LPUSHX", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(LPushX) << CI{"LPUSHX", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(LPushX)
<< CI{"LPOP", CO::WRITE | CO::FAST | CO::DENYOOM, 2, 1, 1, 1}.HFUNC(LPop) << CI{"LPOP", CO::WRITE | CO::FAST | CO::DENYOOM, -2, 1, 1, 1}.HFUNC(LPop)
<< CI{"RPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPush) << CI{"RPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPush)
<< CI{"RPUSHX", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPushX) << CI{"RPUSHX", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPushX)
<< CI{"RPOP", CO::WRITE | CO::FAST | CO::DENYOOM, 2, 1, 1, 1}.HFUNC(RPop) << CI{"RPOP", CO::WRITE | CO::FAST | CO::DENYOOM, -2, 1, 1, 1}.HFUNC(RPop)
<< CI{"BLPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BLPop) << CI{"BLPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BLPop)
<< CI{"LLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(LLen) << CI{"LLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(LLen)
<< CI{"LINDEX", CO::READONLY, 3, 1, 1, 1}.HFUNC(LIndex) << CI{"LINDEX", CO::READONLY, 3, 1, 1, 1}.HFUNC(LIndex)

View File

@ -40,7 +40,10 @@ class ListFamily {
static OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir dir, static OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir dir,
bool skip_notexist, absl::Span<std::string_view> vals); bool skip_notexist, absl::Span<std::string_view> vals);
static OpResult<std::string> OpPop(const OpArgs& op_args, std::string_view key, ListDir dir);
static OpResult<StringVec> OpPop(const OpArgs& op_args, std::string_view key, ListDir dir,
uint32_t count);
static OpResult<uint32_t> OpLen(const OpArgs& op_args, std::string_view key); static OpResult<uint32_t> OpLen(const OpArgs& op_args, std::string_view key);
static OpResult<std::string> OpIndex(const OpArgs& op_args, std::string_view key, long index); static OpResult<std::string> OpIndex(const OpArgs& op_args, std::string_view key, long index);