Add zcard, zscore, zrem methods

This commit is contained in:
Roman Gershman 2022-03-06 08:46:48 +02:00
parent 6b869b41a7
commit f09f516636
4 changed files with 110 additions and 18 deletions

View File

@ -104,16 +104,16 @@ API 1.0
- [X] RPOP
- [ ] RPOPLPUSH
- [X] RPUSH
- [ ] SortedSet Family
- [ ] ZADD
- [ ] ZCARD
- [X] SortedSet Family
- [X] ZADD
- [X] ZCARD
- [ ] ZINCRBY
- [ ] ZRANGE
- [ ] ZRANGEBYSCORE
- [ ] ZREM
- [X] ZREM
- [ ] ZREMRANGEBYSCORE
- [ ] ZREVRANGE
- [ ] ZSCORE
- [X] ZSCORE
- [ ] Not sure whether these are required for the initial release.
- [X] AUTH
- [ ] BGREWRITEAOF

View File

@ -28,7 +28,6 @@ using CI = CommandId;
static const char kNxXxErr[] = "XX and NX options at the same time are not compatible";
constexpr unsigned kMaxZiplistValue = 64;
OpResult<MainIterator> FindZEntry(unsigned flags, const OpArgs& op_args, string_view key,
size_t member_len) {
auto& db_slice = op_args.shard->db_slice();
@ -168,15 +167,47 @@ void ZSetFamily::ZRangeByScore(CmdArgList args, ConnectionContext* cntx) {
}
void ZSetFamily::ZRem(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendLong(0);
std::string_view key = ArgS(args, 1);
absl::InlinedVector<std::string_view, 8> members(args.size() - 2);
for (size_t i = 2; i < args.size(); ++i) {
members[i - 2] = ArgS(args, i);
}
auto cb = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args{shard, t->db_index()};
return OpRem(op_args, key, members);
};
OpResult<unsigned> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result.status() == OpStatus::WRONG_TYPE) {
(*cntx)->SendError(kWrongTypeErr);
} else {
(*cntx)->SendLong(result.value());
}
}
void ZSetFamily::ZScore(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendDouble(0);
std::string_view key = ArgS(args, 1);
std::string_view member = ArgS(args, 2);
auto cb = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args{shard, t->db_index()};
return OpScore(op_args, key, member);
};
OpResult<double> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result.status() == OpStatus::WRONG_TYPE) {
(*cntx)->SendError(kWrongTypeErr);
} else if (!result) {
(*cntx)->SendNull();
} else {
(*cntx)->SendDouble(result.value());
}
}
OpResult<unsigned> ZSetFamily::OpAdd(const ZParams& zparams, const OpArgs& op_args, string_view key,
const ScoredMemberSpan& members) {
ScoredMemberSpan members) {
DCHECK(!members.empty());
OpResult<MainIterator> res_it =
FindZEntry(zparams.flags, op_args, key, members.front().second.size());
@ -215,6 +246,44 @@ OpResult<unsigned> ZSetFamily::OpAdd(const ZParams& zparams, const OpArgs& op_ar
return zparams.ch ? added + updated : added;
}
OpResult<unsigned> ZSetFamily::OpRem(const OpArgs& op_args, string_view key, ArgSlice members) {
OpResult<MainIterator> res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET);
if (!res_it)
return res_it.status();
robj* zobj = res_it.value()->second.AsRObj();
sds& tmp_str = op_args.shard->tmp_str1;
unsigned deleted = 0;
for (string_view member : members) {
tmp_str = sdscpylen(tmp_str, member.data(), member.size());
deleted += zsetDel(zobj, tmp_str);
}
auto zlen = zsetLength(zobj);
res_it.value()->second.SyncRObj();
if (zlen == 0) {
CHECK(op_args.shard->db_slice().Del(op_args.db_ind, res_it.value()));
}
return deleted;
}
OpResult<double> ZSetFamily::OpScore(const OpArgs& op_args, string_view key, string_view member) {
OpResult<MainIterator> res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET);
if (!res_it)
return res_it.status();
robj* zobj = res_it.value()->second.AsRObj();
sds& tmp_str = op_args.shard->tmp_str1;
tmp_str = sdscpylen(tmp_str, member.data(), member.size());
double score;
int retval = zsetScore(zobj, tmp_str, &score);
if (retval != C_OK) {
return OpStatus::KEY_NOTFOUND;
}
return score;
}
#define HFUNC(x) SetHandler(&ZSetFamily::x)
void ZSetFamily::Register(CommandRegistry* registry) {

View File

@ -19,13 +19,13 @@ class ZSetFamily {
static void Register(CommandRegistry* registry);
private:
static void ZCard(CmdArgList args, ConnectionContext* cntx);
static void ZAdd(CmdArgList args, ConnectionContext* cntx);
static void ZIncrBy(CmdArgList args, ConnectionContext* cntx);
static void ZRange(CmdArgList args, ConnectionContext* cntx);
static void ZRem(CmdArgList args, ConnectionContext* cntx);
static void ZScore(CmdArgList args, ConnectionContext* cntx);
static void ZRangeByScore(CmdArgList args, ConnectionContext* cntx);
static void ZCard(CmdArgList args, ConnectionContext* cntx);
static void ZAdd(CmdArgList args, ConnectionContext* cntx);
static void ZIncrBy(CmdArgList args, ConnectionContext* cntx);
static void ZRange(CmdArgList args, ConnectionContext* cntx);
static void ZRem(CmdArgList args, ConnectionContext* cntx);
static void ZScore(CmdArgList args, ConnectionContext* cntx);
static void ZRangeByScore(CmdArgList args, ConnectionContext* cntx);
struct ZParams {
unsigned flags = 0; // mask of ZADD_IN_ macros.
@ -37,8 +37,10 @@ class ZSetFamily {
template <typename T> using OpResult = facade::OpResult<T>;
static OpResult<unsigned> OpAdd(const ZParams& zparams, const OpArgs& op_args,
std::string_view key, const ScoredMemberSpan& members);
std::string_view key, ScoredMemberSpan members);
static OpResult<unsigned> OpRem(const OpArgs& op_args, std::string_view key, ArgSlice members);
static OpResult<double> OpScore(const OpArgs& op_args, std::string_view key,
std::string_view member);
};
} // namespace dfly

View File

@ -24,11 +24,32 @@ class ZSetFamilyTest : public BaseFamilyTest {
TEST_F(ZSetFamilyTest, Add) {
auto resp = Run({"zadd", "x", "1.1", "a"});
EXPECT_THAT(resp[0], IntArg(1));
resp = Run({"zscore", "x", "a"});
EXPECT_THAT(resp[0], StrArg("1.1"));
resp = Run({"zadd", "x", "2", "a"});
EXPECT_THAT(resp[0], IntArg(0));
resp = Run({"zscore", "x", "a"});
EXPECT_THAT(resp[0], StrArg("2"));
resp = Run({"zadd", "x", "ch", "3", "a"});
EXPECT_THAT(resp[0], IntArg(1));
resp = Run({"zscore", "x", "a"});
EXPECT_THAT(resp[0], StrArg("3"));
resp = Run({"zcard", "x"});
EXPECT_THAT(resp[0], IntArg(1));
}
TEST_F(ZSetFamilyTest, ZRem) {
auto resp = Run({"zadd", "x", "1.1", "b", "2.1", "a"});
EXPECT_THAT(resp[0], IntArg(2));
resp = Run({"zrem", "x", "b", "c"});
EXPECT_THAT(resp[0], IntArg(1));
resp = Run({"zcard", "x"});
EXPECT_THAT(resp[0], IntArg(1));
}
} // namespace dfly