Add RPUSHX/LPUSHX commands

This commit is contained in:
Roman Gershman 2022-03-24 23:04:02 +02:00
parent b2c50a6fe3
commit 09101c70a5
3 changed files with 41 additions and 21 deletions

View File

@ -154,8 +154,8 @@ API 2.0
- [ ] BRPOPLPUSH
- [ ] BLMOVE
- [ ] LINSERT
- [ ] LPUSHX
- [ ] RPUSHX
- [X] LPUSHX
- [X] RPUSHX
- [X] String Family
- [X] SETEX
- [X] APPEND
@ -165,12 +165,12 @@ API 2.0
- [ ] BITOP
- [ ] BITPOS
- [ ] GETBIT
- [ ] GETRANGE
- [X] GETRANGE
- [ ] INCRBYFLOAT
- [ ] PSETEX
- [ ] SETBIT
- [ ] SETRANGE
- [ ] STRLEN
- [X] STRLEN
- [X] HashSet Family
- [X] HSET
- [X] HMSET
@ -215,7 +215,7 @@ API 2.0
- [X] EVALSHA
- [ ] OBJECT
- [ ] PERSIST
- [ ] PTTL
- [X] PTTL
- [ ] RESTORE
- [X] SCRIPT LOAD
- [ ] SCRIPT DEBUG/KILL/FLUSH/EXISTS

View File

@ -198,7 +198,11 @@ OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) {
} // namespace
void ListFamily::LPush(CmdArgList args, ConnectionContext* cntx) {
return PushGeneric(ListDir::LEFT, std::move(args), cntx);
return PushGeneric(ListDir::LEFT, false, std::move(args), cntx);
}
void ListFamily::LPushX(CmdArgList args, ConnectionContext* cntx) {
return PushGeneric(ListDir::LEFT, true, std::move(args), cntx);
}
void ListFamily::LPop(CmdArgList args, ConnectionContext* cntx) {
@ -206,7 +210,11 @@ void ListFamily::LPop(CmdArgList args, ConnectionContext* cntx) {
}
void ListFamily::RPush(CmdArgList args, ConnectionContext* cntx) {
return PushGeneric(ListDir::RIGHT, std::move(args), cntx);
return PushGeneric(ListDir::RIGHT, false, std::move(args), cntx);
}
void ListFamily::RPushX(CmdArgList args, ConnectionContext* cntx) {
return PushGeneric(ListDir::RIGHT, true, std::move(args), cntx);
}
void ListFamily::RPop(CmdArgList args, ConnectionContext* cntx) {
@ -370,7 +378,8 @@ void ListFamily::BLPop(CmdArgList args, ConnectionContext* cntx) {
return (*cntx)->SendStringArr(str_arr);
}
void ListFamily::PushGeneric(ListDir dir, const CmdArgList& args, ConnectionContext* cntx) {
void ListFamily::PushGeneric(ListDir dir, bool skip_notexists, CmdArgList args,
ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1);
vector<std::string_view> vals(args.size() - 2);
for (size_t i = 2; i < args.size(); ++i) {
@ -378,16 +387,12 @@ void ListFamily::PushGeneric(ListDir dir, const CmdArgList& args, ConnectionCont
}
absl::Span<std::string_view> span{vals.data(), vals.size()};
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpPush(OpArgs{shard, t->db_index()}, key, dir, span);
return OpPush(OpArgs{shard, t->db_index()}, key, dir, skip_notexists, span);
};
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
switch (result.status()) {
case OpStatus::KEY_NOTFOUND:
return (*cntx)->SendNull();
case OpStatus::WRONG_TYPE:
return (*cntx)->SendError(kWrongTypeErr);
default:;
if (result.status() == OpStatus::WRONG_TYPE) {
return (*cntx)->SendError(kWrongTypeErr);
}
return (*cntx)->SendLong(result.value());
@ -414,9 +419,19 @@ void ListFamily::PopGeneric(ListDir dir, const CmdArgList& args, ConnectionConte
}
OpResult<uint32_t> ListFamily::OpPush(const OpArgs& op_args, std::string_view key, ListDir dir,
const absl::Span<std::string_view>& vals) {
bool skip_notexist, absl::Span<std::string_view> vals) {
EngineShard* es = op_args.shard;
auto [it, new_key] = es->db_slice().AddOrFind(op_args.db_ind, key);
MainIterator it;
bool new_key = false;
if (skip_notexist) {
auto it_res = es->db_slice().Find(op_args.db_ind, key, OBJ_LIST);
if (!it_res)
return it_res.status();
it = *it_res;
} else {
tie(it, new_key) = es->db_slice().AddOrFind(op_args.db_ind, key);
}
quicklist* ql;
if (new_key) {
@ -645,8 +660,10 @@ using CI = CommandId;
void ListFamily::Register(CommandRegistry* registry) {
*registry << CI{"LPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(LPush)
<< CI{"LPUSHX", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(LPushX)
<< CI{"LPOP", CO::WRITE | CO::FAST | CO::DENYOOM, 2, 1, 1, 1}.HFUNC(LPop)
<< CI{"RPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPush)
<< CI{"RPUSHX", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPushX)
<< CI{"RPOP", CO::WRITE | CO::FAST | CO::DENYOOM, 2, 1, 1, 1}.HFUNC(RPop)
<< CI{"BLPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BLPop)
<< CI{"LLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(LLen)

View File

@ -21,7 +21,9 @@ class ListFamily {
private:
static void LPush(CmdArgList args, ConnectionContext* cntx);
static void LPushX(CmdArgList args, ConnectionContext* cntx);
static void RPush(CmdArgList args, ConnectionContext* cntx);
static void RPushX(CmdArgList args, ConnectionContext* cntx);
static void LPop(CmdArgList args, ConnectionContext* cntx);
static void RPop(CmdArgList args, ConnectionContext* cntx);
static void BLPop(CmdArgList args, ConnectionContext* cntx);
@ -33,10 +35,11 @@ class ListFamily {
static void LSet(CmdArgList args, ConnectionContext* cntx);
static void PopGeneric(ListDir dir, const CmdArgList& args, ConnectionContext* cntx);
static void PushGeneric(ListDir dir, const CmdArgList& args, ConnectionContext* cntx);
static void PushGeneric(ListDir dir, bool skip_notexist, CmdArgList args,
ConnectionContext* cntx);
static OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir dir,
const absl::Span<std::string_view>& vals);
bool skip_notexist, absl::Span<std::string_view> vals);
static OpResult<std::string> OpPop(const OpArgs& op_args, std::string_view key, ListDir dir);
static OpResult<uint32_t> OpLen(const OpArgs& op_args, std::string_view key);
static OpResult<std::string> OpIndex(const OpArgs& op_args, std::string_view key, long index);
@ -47,8 +50,8 @@ class ListFamily {
long count);
static facade::OpStatus OpTrim(const OpArgs& op_args, std::string_view key, long start, long end);
static OpResult<StringVec> OpRange(const OpArgs& op_args, std::string_view key,
long start, long end);
static OpResult<StringVec> OpRange(const OpArgs& op_args, std::string_view key, long start,
long end);
};
} // namespace dfly