Adding ZLEXCOUNT and ZRANGEBYLEX commands.

This commit is contained in:
Roman Gershman 2022-04-20 21:50:29 +03:00
parent 0f19e60a81
commit d8697463dc
6 changed files with 355 additions and 37 deletions

View File

@ -205,8 +205,8 @@ API 2.0
- [X] Sorted Set Family
- [X] ZCOUNT
- [ ] ZINTERSTORE
- [ ] ZLEXCOUNT
- [ ] ZRANGEBYLEX
- [X] ZLEXCOUNT
- [X] ZRANGEBYLEX
- [X] ZRANK
- [ ] ZREMRANGEBYLEX
- [X] ZREMRANGEBYRANK

View File

@ -74,10 +74,10 @@
// ROMAN: static representation of sds strings
static char kMinStrData[] = "\110" "minstring";
static char kMaxStrData[] = "\110" "minstring";
static char kMaxStrData[] = "\110" "maxstring";
#define cminstring (kMinStrData + 1)
#define cmaxstring (kMaxStrData + 1)
sds cminstring = kMinStrData + 1;
sds cmaxstring = kMaxStrData + 1;
int zslLexValueGteMin(sds value, const zlexrangespec *spec);
@ -685,7 +685,7 @@ int zslIsInLexRange(zskiplist *zsl, const zlexrangespec *range) {
/* Find the first node that is contained in the specified lex range.
* Returns NULL when no element is contained in the range. */
zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *range) {
zskiplistNode *zslFirstInLexRange(zskiplist *zsl, const zlexrangespec *range) {
zskiplistNode *x;
int i;
@ -711,7 +711,7 @@ zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *range) {
/* Find the last node that is contained in the specified range.
* Returns NULL when no element is contained in the range. */
zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec *range) {
zskiplistNode *zslLastInLexRange(zskiplist *zsl, const zlexrangespec *range) {
zskiplistNode *x;
int i;
@ -947,7 +947,7 @@ int zzlLexValueLteMax(unsigned char *p, const zlexrangespec *spec) {
/* Returns if there is a part of the zset is in range. Should only be used
* internally by zzlFirstInRange and zzlLastInRange. */
int zzlIsInLexRange(unsigned char *zl, zlexrangespec *range) {
int zzlIsInLexRange(unsigned char *zl, const zlexrangespec *range) {
unsigned char *p;
/* Test for ranges that will always be empty. */
@ -970,7 +970,7 @@ int zzlIsInLexRange(unsigned char *zl, zlexrangespec *range) {
/* Find pointer to the first element contained in the specified lex range.
* Returns NULL when no element is contained in the range. */
unsigned char *zzlFirstInLexRange(unsigned char *zl, zlexrangespec *range) {
unsigned char *zzlFirstInLexRange(unsigned char *zl, const zlexrangespec *range) {
unsigned char *eptr = lpSeek(zl,0), *sptr;
/* If everything is out of range, return early. */
@ -995,7 +995,7 @@ unsigned char *zzlFirstInLexRange(unsigned char *zl, zlexrangespec *range) {
/* Find pointer to the last element contained in the specified lex range.
* Returns NULL when no element is contained in the range. */
unsigned char *zzlLastInLexRange(unsigned char *zl, zlexrangespec *range) {
unsigned char *zzlLastInLexRange(unsigned char *zl, const zlexrangespec *range) {
unsigned char *eptr = lpSeek(zl,-2), *sptr;
/* If everything is out of range, return early. */

View File

@ -87,10 +87,10 @@ int zslValueGteMin(double value, const zrangespec* spec);
int zslValueLteMax(double value, const zrangespec* spec);
void zslFreeLexRange(zlexrangespec* spec);
int zslParseLexRange(robj* min, robj* max, zlexrangespec* spec);
unsigned char* zzlFirstInLexRange(unsigned char* zl, zlexrangespec* range);
unsigned char* zzlLastInLexRange(unsigned char* zl, zlexrangespec* range);
zskiplistNode* zslFirstInLexRange(zskiplist* zsl, zlexrangespec* range);
zskiplistNode* zslLastInLexRange(zskiplist* zsl, zlexrangespec* range);
unsigned char* zzlFirstInLexRange(unsigned char* zl, const zlexrangespec* range);
unsigned char* zzlLastInLexRange(unsigned char* zl, const zlexrangespec* range);
zskiplistNode* zslFirstInLexRange(zskiplist* zsl, const zlexrangespec* range);
zskiplistNode* zslLastInLexRange(zskiplist* zsl, const zlexrangespec* range);
int zzlLexValueGteMin(unsigned char* p, const zlexrangespec* spec);
int zzlLexValueLteMax(unsigned char* p, const zlexrangespec* spec);
int zslLexValueGteMin(sds value, const zlexrangespec* spec);
@ -102,4 +102,7 @@ unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned
unsigned long zslDeleteRangeByScore(zskiplist *zsl, const zrangespec *range, dict *dict);
unsigned char *zzlDeleteRangeByScore(unsigned char *zl, const zrangespec *range, unsigned long *deleted);
extern sds cmaxstring;
extern sds cminstring;
#endif

View File

@ -29,7 +29,8 @@ using CI = CommandId;
static const char kNxXxErr[] = "XX and NX options at the same time are not compatible";
static const char kScoreNaN[] = "resulting score is not a number (NaN)";
static const char kRangeErr[] = "min or max is not a float";
static const char kFloatRangeErr[] = "min or max is not a float";
static const char kLexRangeErr[] = "min or max not valid string range item";
constexpr unsigned kMaxListPackValue = 64;
@ -43,6 +44,32 @@ inline zrangespec GetZrangeSpec(const ZSetFamily::ScoreInterval& si) {
return range;
}
zlexrangespec GetLexRange(const ZSetFamily::LexInterval& li) {
zlexrangespec range;
range.minex = 0;
range.maxex = 0;
if (li.first.type == ZSetFamily::LexBound::MINUS_INF) {
range.min = cminstring;
} else if (li.first.type == ZSetFamily::LexBound::PLUS_INF) {
range.min = cmaxstring;
} else {
range.min = sdsnewlen(li.first.val.data(), li.first.val.size());
range.minex = (li.first.type == ZSetFamily::LexBound::OPEN);
}
if (li.second.type == ZSetFamily::LexBound::MINUS_INF) {
range.max = cminstring;
} else if (li.second.type == ZSetFamily::LexBound::PLUS_INF) {
range.max = cmaxstring;
} else {
range.max = sdsnewlen(li.second.val.data(), li.second.val.size());
range.maxex = (li.second.type == ZSetFamily::LexBound::OPEN);
}
return range;
}
OpResult<PrimeIterator> FindZEntry(unsigned flags, const OpArgs& op_args, string_view key,
size_t member_len) {
auto& db_slice = op_args.shard->db_slice();
@ -71,7 +98,7 @@ OpResult<PrimeIterator> FindZEntry(unsigned flags, const OpArgs& op_args, string
enum class Action {
RANGE = 0,
REM = 1,
REMOVE = 1,
};
class IntervalVisitor {
@ -84,6 +111,8 @@ class IntervalVisitor {
void operator()(const ZSetFamily::ScoreInterval& si);
void operator()(const ZSetFamily::LexInterval& li);
ZSetFamily::ScoredArray PopResult() {
return std::move(result_);
}
@ -95,10 +124,17 @@ class IntervalVisitor {
private:
void ExtractListPack(const zrangespec& range);
void ExtractSkipList(const zrangespec& range);
void ExtractListPack(const zlexrangespec& range);
void ExtractSkipList(const zlexrangespec& range);
void ActionRange(unsigned start, unsigned end); // rank
void ActionRange(const zrangespec& range); // score
void ActionRem(unsigned start, unsigned end); // rank
void ActionRem(const zrangespec& range); // score
void ActionRange(const zlexrangespec& range); // lex
void ActionRem(unsigned start, unsigned end); // rank
void ActionRem(const zrangespec& range); // score
void ActionRem(const zlexrangespec& range); // lex
void Next(uint8_t* zl, uint8_t** eptr, uint8_t** sptr) const {
if (params_.reverse) {
@ -108,6 +144,10 @@ class IntervalVisitor {
}
}
zskiplistNode* Next(zskiplistNode* ln) const {
return params_.reverse ? ln->backward : ln->level[0].forward;
}
bool IsUnder(double score, const zrangespec& spec) const {
return params_.reverse ? zslValueGteMin(score, &spec) : zslValueLteMax(score, &spec);
}
@ -140,11 +180,12 @@ void IntervalVisitor::operator()(const ZSetFamily::IndexInterval& ii) {
if (unsigned(end) >= llen)
end = llen - 1;
switch (action_) {
case Action::RANGE:
ActionRange(start, end);
break;
case Action::REM:
case Action::REMOVE:
ActionRem(start, end);
break;
}
@ -157,7 +198,20 @@ void IntervalVisitor::operator()(const ZSetFamily::ScoreInterval& si) {
case Action::RANGE:
ActionRange(range);
break;
case Action::REM:
case Action::REMOVE:
ActionRem(range);
break;
}
}
void IntervalVisitor::operator()(const ZSetFamily::LexInterval& li) {
zlexrangespec range = GetLexRange(li);
switch (action_) {
case Action::RANGE:
ActionRange(range);
break;
case Action::REMOVE:
ActionRem(range);
break;
}
@ -215,7 +269,7 @@ void IntervalVisitor::ActionRange(unsigned start, unsigned end) {
DCHECK(ln != NULL);
sds ele = ln->ele;
result_.emplace_back(string(ele, sdslen(ele)), ln->score);
ln = params_.reverse ? ln->backward : ln->level[0].forward;
ln = Next(ln);
}
}
}
@ -229,6 +283,15 @@ void IntervalVisitor::ActionRange(const zrangespec& range) {
}
}
void IntervalVisitor::ActionRange(const zlexrangespec& range) {
if (zobj_->encoding == OBJ_ENCODING_LISTPACK) {
ExtractListPack(range);
} else {
CHECK_EQ(zobj_->encoding, OBJ_ENCODING_SKIPLIST);
ExtractSkipList(range);
}
}
void IntervalVisitor::ActionRem(unsigned start, unsigned end) {
if (zobj_->encoding == OBJ_ENCODING_LISTPACK) {
uint8_t* zl = (uint8_t*)zobj_->ptr;
@ -257,13 +320,16 @@ void IntervalVisitor::ActionRem(const zrangespec& range) {
}
}
void IntervalVisitor::ActionRem(const zlexrangespec& range) {
LOG(FATAL) << "TBD";
}
void IntervalVisitor::ExtractListPack(const zrangespec& range) {
uint8_t* zl = (uint8_t*)zobj_->ptr;
uint8_t *eptr, *sptr;
uint8_t* vstr;
unsigned int vlen = 0;
long long vlong = 0;
unsigned rangelen = 0;
unsigned offset = params_.offset;
unsigned limit = params_.limit;
@ -297,7 +363,6 @@ void IntervalVisitor::ExtractListPack(const zrangespec& range) {
AddResult(vstr, vlen, vlong, score);
rangelen++;
/* Move to next node */
Next(zl, &eptr, &sptr);
}
@ -309,7 +374,6 @@ void IntervalVisitor::ExtractSkipList(const zrangespec& range) {
zskiplistNode* ln;
unsigned offset = params_.offset;
unsigned limit = params_.limit;
unsigned rangelen = 0;
/* If reversed, get the last node in range as starting point. */
if (params_.reverse) {
@ -321,11 +385,7 @@ void IntervalVisitor::ExtractSkipList(const zrangespec& range) {
/* If there is an offset, just traverse the number of elements without
* checking the score because that is done in the next loop. */
while (ln && offset--) {
if (params_.reverse) {
ln = ln->backward;
} else {
ln = ln->level[0].forward;
}
ln = Next(ln);
}
while (ln && limit--) {
@ -333,15 +393,95 @@ void IntervalVisitor::ExtractSkipList(const zrangespec& range) {
if (!IsUnder(ln->score, range))
break;
rangelen++;
result_.emplace_back(string{ln->ele, sdslen(ln->ele)}, ln->score);
/* Move to next node */
ln = Next(ln);
}
}
void IntervalVisitor::ExtractListPack(const zlexrangespec& range) {
uint8_t* zl = (uint8_t*)zobj_->ptr;
uint8_t *eptr, *sptr;
uint8_t* vstr;
unsigned int vlen;
long long vlong;
unsigned offset = params_.offset;
unsigned limit = params_.limit;
/* If reversed, get the last node in range as starting point. */
if (params_.reverse) {
eptr = zzlLastInLexRange(zl, &range);
} else {
eptr = zzlFirstInLexRange(zl, &range);
}
/* Get score pointer for the first element. */
if (eptr)
sptr = lpNext(zl, eptr);
/* If there is an offset, just traverse the number of elements without
* checking the score because that is done in the next loop. */
while (eptr && offset--) {
Next(zl, &eptr, &sptr);
}
while (eptr && limit--) {
double score = 0;
if (params_.with_scores) /* don't bother to extract the score if it's gonna be ignored. */
score = zzlGetScore(sptr);
/* Abort when the node is no longer in range. */
if (params_.reverse) {
ln = ln->backward;
if (!zzlLexValueGteMin(eptr, &range))
break;
} else {
ln = ln->level[0].forward;
if (!zzlLexValueLteMax(eptr, &range))
break;
}
vstr = lpGetValue(eptr, &vlen, &vlong);
AddResult(vstr, vlen, vlong, score);
/* Move to next node */
Next(zl, &eptr, &sptr);
}
}
void IntervalVisitor::ExtractSkipList(const zlexrangespec& range) {
zset* zs = (zset*)zobj_->ptr;
zskiplist* zsl = zs->zsl;
zskiplistNode* ln;
unsigned offset = params_.offset;
unsigned limit = params_.limit;
/* If reversed, get the last node in range as starting point. */
if (params_.reverse) {
ln = zslLastInLexRange(zsl, &range);
} else {
ln = zslFirstInLexRange(zsl, &range);
}
/* If there is an offset, just traverse the number of elements without
* checking the score because that is done in the next loop. */
while (ln && offset--) {
ln = Next(ln);
}
while (ln && limit--) {
/* Abort when the node is no longer in range. */
if (params_.reverse) {
if (!zslLexValueGteMin(ln->ele, &range))
break;
} else {
if (!zslLexValueLteMax(ln->ele, &range))
break;
}
result_.emplace_back(string{ln->ele, sdslen(ln->ele)}, ln->score);
/* Move to next node */
ln = Next(ln);
}
}
@ -379,6 +519,29 @@ bool ParseBound(string_view src, ZSetFamily::Bound* bound) {
return ParseScore(src, &bound->val);
}
bool ParseLexBound(string_view src, ZSetFamily::LexBound* bound) {
if (src.empty())
return false;
if (src == "+") {
bound->type = ZSetFamily::LexBound::PLUS_INF;
} else if (src == "-") {
bound->type = ZSetFamily::LexBound::MINUS_INF;
} else if (src[0] == '(') {
bound->type = ZSetFamily::LexBound::OPEN;
src.remove_prefix(1);
bound->val = src;
} else if (src[0] == '[') {
bound->type = ZSetFamily::LexBound::CLOSED;
src.remove_prefix(1);
bound->val = src;
} else {
return false;
}
return true;
}
} // namespace
void ZSetFamily::ZAdd(CmdArgList args, ConnectionContext* cntx) {
@ -508,7 +671,7 @@ void ZSetFamily::ZCount(CmdArgList args, ConnectionContext* cntx) {
ScoreInterval si;
if (!ParseBound(min_s, &si.first) || !ParseBound(max_s, &si.second)) {
return (*cntx)->SendError(kRangeErr);
return (*cntx)->SendError(kFloatRangeErr);
}
auto cb = [&](Transaction* t, EngineShard* shard) {
@ -565,6 +728,30 @@ void ZSetFamily::ZIncrBy(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendDouble(add_result.new_score);
}
void ZSetFamily::ZLexCount(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
string_view min_s = ArgS(args, 2);
string_view max_s = ArgS(args, 3);
LexInterval li;
if (!ParseLexBound(min_s, &li.first) || !ParseLexBound(max_s, &li.second)) {
return (*cntx)->SendError(kLexRangeErr);
}
auto cb = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args{shard, t->db_index()};
return OpLexCount(op_args, key, li);
};
OpResult<unsigned> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result.status() == OpStatus::WRONG_TYPE) {
(*cntx)->SendError(kWrongTypeErr);
} else {
(*cntx)->SendLong(*result);
}
}
void ZSetFamily::ZRange(CmdArgList args, ConnectionContext* cntx) {
ZRangeGeneric(std::move(args), false, cntx);
}
@ -581,6 +768,46 @@ void ZSetFamily::ZRevRank(CmdArgList args, ConnectionContext* cntx) {
ZRankGeneric(std::move(args), true, cntx);
}
void ZSetFamily::ZRangeByLex(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
string_view min_s = ArgS(args, 2);
string_view max_s = ArgS(args, 3);
uint32_t offset = 0;
uint32_t count = kuint32max;
if (args.size() > 4) {
if (args.size() != 7)
return (*cntx)->SendError(kSyntaxErr);
ToUpper(&args[4]);
if (ArgS(args, 4) != "LIMIT")
return (*cntx)->SendError(kSyntaxErr);
string_view os = ArgS(args, 5);
string_view cs = ArgS(args, 6);
if (!absl::SimpleAtoi(os, &count) || !absl::SimpleAtoi(cs, &count)) {
return (*cntx)->SendError(kInvalidIntErr);
}
}
LexInterval li;
if (!ParseLexBound(min_s, &li.first) || !ParseLexBound(max_s, &li.second)) {
return (*cntx)->SendError(kLexRangeErr);
}
ZRangeSpec range_spec;
range_spec.params.offset = offset;
range_spec.params.limit = count;
range_spec.interval = li;
auto cb = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args{shard, t->db_index()};
return OpRange(range_spec, op_args, key);
};
OpResult<ScoredArray> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
OutputScoredArrayResult(result, range_spec.params, cntx);
}
void ZSetFamily::ZRangeByScore(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
string_view min_s = ArgS(args, 2);
@ -624,7 +851,7 @@ void ZSetFamily::ZRemRangeByScore(CmdArgList args, ConnectionContext* cntx) {
ScoreInterval si;
if (!ParseBound(min_s, &si.first) || !ParseBound(max_s, &si.second)) {
return (*cntx)->SendError(kRangeErr);
return (*cntx)->SendError(kFloatRangeErr);
}
ZRangeSpec range_spec;
@ -681,7 +908,7 @@ void ZSetFamily::ZRangeByScoreInternal(string_view key, string_view min_s, strin
ScoreInterval si;
if (!ParseBound(min_s, &si.first) || !ParseBound(max_s, &si.second)) {
return (*cntx)->SendError(kRangeErr);
return (*cntx)->SendError(kFloatRangeErr);
}
range_spec.interval = si;
@ -913,7 +1140,7 @@ OpResult<unsigned> ZSetFamily::OpRemRange(const OpArgs& op_args, string_view key
robj* zobj = res_it.value()->second.AsRObj();
IntervalVisitor iv{Action::REM, range_spec.params, zobj};
IntervalVisitor iv{Action::REMOVE, range_spec.params, zobj};
std::visit(iv, range_spec.interval);
res_it.value()->second.SyncRObj();
@ -1011,6 +1238,69 @@ OpResult<unsigned> ZSetFamily::OpCount(const OpArgs& op_args, std::string_view k
return count;
}
OpResult<unsigned> ZSetFamily::OpLexCount(const OpArgs& op_args, string_view key,
const ZSetFamily::LexInterval& interval) {
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET);
if (!res_it)
return res_it.status();
robj* zobj = res_it.value()->second.AsRObj();
zlexrangespec range = GetLexRange(interval);
unsigned count = 0;
if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
uint8_t* zl = (uint8_t*)zobj->ptr;
uint8_t *eptr, *sptr;
/* Use the first element in range as the starting point */
eptr = zzlFirstInLexRange(zl, &range);
/* No "first" element */
if (eptr) {
/* First element is in range */
sptr = lpNext(zl, eptr);
serverAssertWithInfo(c, zobj, zzlLexValueLteMax(eptr, &range));
/* Iterate over elements in range */
while (eptr) {
/* Abort when the node is no longer in range. */
if (!zzlLexValueLteMax(eptr, &range)) {
break;
} else {
count++;
zzlNext(zl, &eptr, &sptr);
}
}
}
} else {
DCHECK_EQ(OBJ_ENCODING_SKIPLIST, zobj->encoding);
zset* zs = (zset*)zobj->ptr;
zskiplist* zsl = zs->zsl;
zskiplistNode* zn;
unsigned long rank;
/* Find first element in range */
zn = zslFirstInLexRange(zsl, &range);
/* Use rank of first element, if any, to determine preliminary count */
if (zn != NULL) {
rank = zslGetRank(zsl, zn->score, zn->ele);
count = (zsl->length - (rank - 1));
/* Find last element in range */
zn = zslLastInLexRange(zsl, &range);
/* Use rank of last element, if any, to determine the actual count */
if (zn != NULL) {
rank = zslGetRank(zsl, zn->score, zn->ele);
count -= (zsl->length - rank);
}
}
}
zslFreeLexRange(&range);
return count;
}
#define HFUNC(x) SetHandler(&ZSetFamily::x)
void ZSetFamily::Register(CommandRegistry* registry) {
@ -1018,9 +1308,11 @@ void ZSetFamily::Register(CommandRegistry* registry) {
<< CI{"ZCARD", CO::FAST | CO::READONLY, 2, 1, 1, 1}.HFUNC(ZCard)
<< CI{"ZCOUNT", CO::FAST | CO::READONLY, 4, 1, 1, 1}.HFUNC(ZCount)
<< CI{"ZINCRBY", CO::FAST | CO::WRITE | CO::DENYOOM, 4, 1, 1, 1}.HFUNC(ZIncrBy)
<< CI{"ZLEXCOUNT", CO::READONLY, 4, 1, 1, 1}.HFUNC(ZLexCount)
<< CI{"ZREM", CO::FAST | CO::WRITE, -3, 1, 1, 1}.HFUNC(ZRem)
<< CI{"ZRANGE", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRange)
<< CI{"ZRANK", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(ZRank)
<< CI{"ZRANGEBYLEX", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRangeByLex)
<< CI{"ZRANGEBYSCORE", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRangeByScore)
<< CI{"ZSCORE", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(ZScore)
<< CI{"ZREMRANGEBYRANK", CO::WRITE, 4, 1, 1, 1}.HFUNC(ZRemRangeByRank)

View File

@ -27,6 +27,13 @@ class ZSetFamily {
using ScoreInterval = std::pair<Bound, Bound>;
struct LexBound {
std::string_view val;
enum Type {PLUS_INF, MINUS_INF, OPEN, CLOSED} type = CLOSED;
};
using LexInterval = std::pair<LexBound, LexBound>;
struct RangeParams {
uint32_t offset = 0;
uint32_t limit = UINT32_MAX;
@ -35,7 +42,7 @@ class ZSetFamily {
};
struct ZRangeSpec {
std::variant<IndexInterval, ScoreInterval> interval;
std::variant<IndexInterval, ScoreInterval, LexInterval> interval;
RangeParams params;
};
@ -49,10 +56,12 @@ class ZSetFamily {
static void ZCard(CmdArgList args, ConnectionContext* cntx);
static void ZCount(CmdArgList args, ConnectionContext* cntx);
static void ZIncrBy(CmdArgList args, ConnectionContext* cntx);
static void ZLexCount(CmdArgList args, ConnectionContext* cntx);
static void ZRange(CmdArgList args, ConnectionContext* cntx);
static void ZRank(CmdArgList args, ConnectionContext* cntx);
static void ZRem(CmdArgList args, ConnectionContext* cntx);
static void ZScore(CmdArgList args, ConnectionContext* cntx);
static void ZRangeByLex(CmdArgList args, ConnectionContext* cntx);
static void ZRangeByScore(CmdArgList args, ConnectionContext* cntx);
static void ZRemRangeByRank(CmdArgList args, ConnectionContext* cntx);
static void ZRemRangeByScore(CmdArgList args, ConnectionContext* cntx);
@ -99,6 +108,10 @@ class ZSetFamily {
static OpResult<unsigned> OpCount(const OpArgs& op_args, std::string_view key,
const ScoreInterval& interval);
static OpResult<unsigned> OpLexCount(const OpArgs& op_args, std::string_view key,
const LexInterval& interval);
};
} // namespace dfly

View File

@ -113,4 +113,14 @@ TEST_F(ZSetFamilyTest, IncrBy) {
EXPECT_THAT(resp[0], ArgType(RespExpr::NIL));
}
TEST_F(ZSetFamilyTest, ByLex) {
Run({
"zadd", "key", "0", "alpha", "0", "bar", "0", "cool", "0", "down",
"0", "elephant", "0", "foo", "0", "great", "0", "hill", "0", "omega",
});
auto resp = Run({"zrangebylex", "key", "-", "[cool"});
EXPECT_THAT(resp, ElementsAre("alpha", "bar", "cool"));
EXPECT_EQ(3, CheckedInt({"ZLEXCOUNT", "key", "(foo", "+"}));
}
} // namespace dfly