Add ZREVRANGE and skeleton for PUBLISH/SUBSCRIBE

Fix a memory leak with dispatch queue in client connection.
This commit is contained in:
Roman Gershman 2022-03-23 20:45:19 +02:00
parent cb14df0e6b
commit 47caa972da
6 changed files with 106 additions and 66 deletions

View File

@ -112,7 +112,7 @@ API 1.0
- [X] ZRANGEBYSCORE
- [X] ZREM
- [X] ZREMRANGEBYSCORE
- [ ] ZREVRANGE
- [X] ZREVRANGE
- [X] ZSCORE
- [ ] Not sure whether these are required for the initial release.
- [X] AUTH
@ -224,7 +224,7 @@ API 2.0
- [ ] ZRANGEBYLEX
- [ ] ZRANK
- [ ] ZREMRANGEBYLEX
- [ ] ZREMRANGEBYRANK
- [X] ZREMRANGEBYRANK
- [ ] ZREVRANGEBYSCORE
- [ ] ZREVRANK
- [ ] ZUNIONSTORE

View File

@ -94,7 +94,8 @@ struct Connection::Request {
// I do not use mi_heap_t explicitly but mi_stl_allocator at the end does the same job
// of using the thread's heap.
absl::FixedArray<char, 256, mi_stl_allocator<char>> storage;
// The capacity is chosen so that we allocate a fully utilized (512 bytes) block.
absl::FixedArray<char, 190, mi_stl_allocator<char>> storage;
Request(size_t nargs, size_t capacity) : args(nargs), storage(capacity) {
}
@ -107,6 +108,9 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
: io_buf_(kMinReadSize), http_listener_(http_listener), ctx_(ctx), service_(service) {
protocol_ = protocol;
constexpr size_t kReqSz = sizeof(Connection::Request);
(void)kReqSz;
switch (protocol) {
case Protocol::REDIS:
redis_parser_.reset(new RedisParser);
@ -276,6 +280,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
}
}
// After the client disconnected.
cc_->conn_closing = true; // Signal dispatch to close.
evc_.notify();
dispatch_fb.join();
@ -494,7 +499,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
while (!builder->GetError()) {
evc_.await([this] { return cc_->conn_closing || !dispatch_q_.empty(); });
if (cc_->conn_closing)
break; // TODO: We have a memory leak with pending requests in the queue.
break;
Request* req = dispatch_q_.front();
dispatch_q_.pop_front();
@ -510,6 +515,14 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
}
cc_->conn_closing = true;
// Clean up leftovers.
while (!dispatch_q_.empty()) {
Request* req = dispatch_q_.front();
dispatch_q_.pop_front();
req->~Request();
mi_free(req);
}
}
auto Connection::FromArgs(RespVec args, mi_heap_t* heap) -> Request* {

View File

@ -855,6 +855,15 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Exec completed";
}
void Service::Publish(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendLong(0);
}
void Service::Subscribe(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendOk();
}
VarzValue::Map Service::GetVarzStats() {
VarzValue::Map res;
@ -883,7 +892,9 @@ void Service::RegisterCommands() {
<< CI{"MULTI", CO::NOSCRIPT | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(Multi)
<< CI{"EVAL", CO::NOSCRIPT, -3, 0, 0, 0}.MFUNC(Eval).SetValidator(&EvalValidator)
<< CI{"EVALSHA", CO::NOSCRIPT, -3, 0, 0, 0}.MFUNC(EvalSha).SetValidator(&EvalValidator)
<< CI{"EXEC", kExecMask, 1, 0, 0, 0}.MFUNC(Exec);
<< CI{"EXEC", kExecMask, 1, 0, 0, 0}.MFUNC(Exec)
<< CI{"PUBLISH", CO::LOADING| CO::FAST, 3, 0, 0, 0}.HFUNC(Publish)
<< CI{"SUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.HFUNC(Subscribe);
StringFamily::Register(&registry_);
GenericFamily::Register(&registry_);

View File

@ -70,6 +70,8 @@ class Service : public facade::ServiceInterface {
private:
static void Quit(CmdArgList args, ConnectionContext* cntx);
static void Multi(CmdArgList args, ConnectionContext* cntx);
static void Publish(CmdArgList args, ConnectionContext* cntx);
static void Subscribe(CmdArgList args, ConnectionContext* cntx);
void Eval(CmdArgList args, ConnectionContext* cntx);
void EvalSha(CmdArgList args, ConnectionContext* cntx);

View File

@ -87,7 +87,7 @@ class IntervalVisitor {
void ActionRem(const zrangespec& range); // score
void Next(uint8_t* zl, uint8_t** eptr, uint8_t** sptr) const {
if (reverse_) {
if (params_.reverse) {
zzlPrev(zl, eptr, sptr);
} else {
zzlNext(zl, eptr, sptr);
@ -95,7 +95,7 @@ class IntervalVisitor {
}
bool IsUnder(double score, const zrangespec& spec) const {
return reverse_ ? zslValueGteMin(score, &spec) : zslValueLteMax(score, &spec);
return params_.reverse ? zslValueGteMin(score, &spec) : zslValueLteMax(score, &spec);
}
void AddResult(const uint8_t* vstr, unsigned vlen, long long vlon, double score);
@ -104,7 +104,6 @@ class IntervalVisitor {
ZSetFamily::RangeParams params_;
robj* zobj_;
bool reverse_ = false;
ZSetFamily::ScoredArray result_;
unsigned removed_ = 0;
};
@ -165,10 +164,11 @@ void IntervalVisitor::ActionRange(unsigned start, unsigned end) {
long long vlong;
double score = 0.0;
if (reverse_)
eptr = lpSeek(zl, -2 - (2 * start));
if (params_.reverse)
eptr = lpSeek(zl, -2 - long(2 * start));
else
eptr = lpSeek(zl, 2 * start);
DCHECK(eptr);
sptr = lpNext(zl, eptr);
@ -189,7 +189,7 @@ void IntervalVisitor::ActionRange(unsigned start, unsigned end) {
zskiplistNode* ln;
/* Check if starting point is trivial, before doing log(N) lookup. */
if (reverse_) {
if (params_.reverse) {
ln = zsl->tail;
unsigned long llen = zsetLength(zobj_);
if (start > 0)
@ -204,7 +204,7 @@ void IntervalVisitor::ActionRange(unsigned start, unsigned end) {
DCHECK(ln != NULL);
sds ele = ln->ele;
result_.emplace_back(string(ele, sdslen(ele)), ln->score);
ln = reverse_ ? ln->backward : ln->level[0].forward;
ln = params_.reverse ? ln->backward : ln->level[0].forward;
}
} else {
LOG(FATAL) << "Unknown sorted set encoding" << zobj_->encoding;
@ -262,7 +262,7 @@ void IntervalVisitor::ExtractListPack(const zrangespec& range) {
unsigned limit = params_.limit;
/* If reversed, get the last node in range as starting point. */
if (reverse_) {
if (params_.reverse) {
eptr = zzlLastInRange(zl, &range);
} else {
eptr = zzlFirstInRange(zl, &range);
@ -306,7 +306,7 @@ void IntervalVisitor::ExtractSkipList(const zrangespec& range) {
unsigned rangelen = 0;
/* If reversed, get the last node in range as starting point. */
if (reverse_) {
if (params_.reverse) {
ln = zslLastInRange(zsl, &range);
} else {
ln = zslFirstInRange(zsl, &range);
@ -315,7 +315,7 @@ void IntervalVisitor::ExtractSkipList(const zrangespec& range) {
/* If there is an offset, just traverse the number of elements without
* checking the score because that is done in the next loop. */
while (ln && offset--) {
if (reverse_) {
if (params_.reverse) {
ln = ln->backward;
} else {
ln = ln->level[0].forward;
@ -331,7 +331,7 @@ void IntervalVisitor::ExtractSkipList(const zrangespec& range) {
result_.emplace_back(string{ln->ele, sdslen(ln->ele)}, ln->score);
/* Move to next node */
if (reverse_) {
if (params_.reverse) {
ln = ln->backward;
} else {
ln = ln->level[0].forward;
@ -479,49 +479,11 @@ void ZSetFamily::ZIncrBy(CmdArgList args, ConnectionContext* cntx) {
}
void ZSetFamily::ZRange(CmdArgList args, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1);
std::string_view min_s = ArgS(args, 2);
std::string_view max_s = ArgS(args, 3);
ZRangeGeneric(std::move(args), false, cntx);
}
bool parse_score = false;
RangeParams range_params;
for (size_t i = 4; i < args.size(); ++i) {
ToUpper(&args[i]);
string_view cur_arg = ArgS(args, i);
if (cur_arg == "BYSCORE") {
parse_score = true;
} else if (cur_arg == "WITHSCORES") {
range_params.with_scores = true;
} else {
return cntx->reply_builder()->SendError(absl::StrCat("unsupported option ", cur_arg));
}
}
if (parse_score) {
ZRangeByScoreInternal(key, min_s, max_s, range_params, cntx);
return;
}
IndexInterval ii;
if (!absl::SimpleAtoi(min_s, &ii.first) || !absl::SimpleAtoi(max_s, &ii.second)) {
(*cntx)->SendError(kInvalidIntErr);
return;
}
ZRangeSpec range_spec;
range_spec.params = range_params;
range_spec.interval = ii;
auto cb = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args{shard, t->db_index()};
return OpRange(range_spec, op_args, key);
};
OpResult<ScoredArray> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
OutputScoredArrayResult(result, range_params.with_scores, cntx);
void ZSetFamily::ZRevRange(CmdArgList args, ConnectionContext* cntx) {
ZRangeGeneric(std::move(args), true, cntx);
}
void ZSetFamily::ZRangeByScore(CmdArgList args, ConnectionContext* cntx) {
@ -635,11 +597,11 @@ void ZSetFamily::ZRangeByScoreInternal(std::string_view key, std::string_view mi
};
OpResult<ScoredArray> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
OutputScoredArrayResult(result, params.with_scores, cntx);
OutputScoredArrayResult(result, params, cntx);
}
void ZSetFamily::OutputScoredArrayResult(const OpResult<ScoredArray>& result, bool with_scores,
ConnectionContext* cntx) {
void ZSetFamily::OutputScoredArrayResult(const OpResult<ScoredArray>& result,
const RangeParams& params, ConnectionContext* cntx) {
if (result.status() == OpStatus::WRONG_TYPE) {
return (*cntx)->SendError(kWrongTypeErr);
}
@ -647,11 +609,12 @@ void ZSetFamily::OutputScoredArrayResult(const OpResult<ScoredArray>& result, bo
LOG_IF(WARNING, !result && result.status() != OpStatus::KEY_NOTFOUND)
<< "Unexpected status " << result.status();
(*cntx)->StartArray(result->size() * (with_scores ? 2 : 1));
for (const auto& p : result.value()) {
(*cntx)->StartArray(result->size() * (params.with_scores ? 2 : 1));
const ScoredArray& array = result.value();
for (const auto& p : array) {
(*cntx)->SendBulkString(p.first);
if (with_scores) {
if (params.with_scores) {
(*cntx)->SendDouble(p.second);
}
}
@ -672,6 +635,53 @@ void ZSetFamily::ZRemRangeGeneric(std::string_view key, const ZRangeSpec& range_
}
}
void ZSetFamily::ZRangeGeneric(CmdArgList args, bool reverse, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1);
std::string_view min_s = ArgS(args, 2);
std::string_view max_s = ArgS(args, 3);
bool parse_score = false;
RangeParams range_params;
range_params.reverse = reverse;
for (size_t i = 4; i < args.size(); ++i) {
ToUpper(&args[i]);
string_view cur_arg = ArgS(args, i);
if (!reverse && cur_arg == "BYSCORE") {
parse_score = true;
} else if (cur_arg == "WITHSCORES") {
range_params.with_scores = true;
} else {
return cntx->reply_builder()->SendError(absl::StrCat("unsupported option ", cur_arg));
}
}
if (parse_score) {
ZRangeByScoreInternal(key, min_s, max_s, range_params, cntx);
return;
}
IndexInterval ii;
if (!absl::SimpleAtoi(min_s, &ii.first) || !absl::SimpleAtoi(max_s, &ii.second)) {
(*cntx)->SendError(kInvalidIntErr);
return;
}
ZRangeSpec range_spec;
range_spec.params = range_params;
range_spec.interval = ii;
auto cb = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args{shard, t->db_index()};
return OpRange(range_spec, op_args, key);
};
OpResult<ScoredArray> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
OutputScoredArrayResult(result, range_params, cntx);
}
OpResult<unsigned> ZSetFamily::OpAdd(const ZParams& zparams, const OpArgs& op_args, string_view key,
ScoredMemberSpan members) {
DCHECK(!members.empty());
@ -798,7 +808,8 @@ void ZSetFamily::Register(CommandRegistry* registry) {
<< CI{"ZRANGEBYSCORE", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRangeByScore)
<< CI{"ZSCORE", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(ZScore)
<< CI{"ZREMRANGEBYRANK", CO::WRITE, 4, 1, 1, 1}.HFUNC(ZRemRangeByRank)
<< CI{"ZREMRANGEBYSCORE", CO::WRITE, 4, 1, 1, 1}.HFUNC(ZRemRangeByScore);
<< CI{"ZREMRANGEBYSCORE", CO::WRITE, 4, 1, 1, 1}.HFUNC(ZRemRangeByScore)
<< CI{"ZREVRANGE", CO::WRITE, 4, 1, 1, 1}.HFUNC(ZRevRange);
}
} // namespace dfly

View File

@ -31,6 +31,7 @@ class ZSetFamily {
uint32_t offset = 0;
uint32_t limit = UINT32_MAX;
bool with_scores = false;
bool reverse = false;
};
struct ZRangeSpec {
@ -53,14 +54,16 @@ class ZSetFamily {
static void ZRangeByScore(CmdArgList args, ConnectionContext* cntx);
static void ZRemRangeByRank(CmdArgList args, ConnectionContext* cntx);
static void ZRemRangeByScore(CmdArgList args, ConnectionContext* cntx);
static void ZRevRange(CmdArgList args, ConnectionContext* cntx);
static void ZRangeByScoreInternal(std::string_view key, std::string_view min_s,
std::string_view max_s, const RangeParams& params,
ConnectionContext* cntx);
static void OutputScoredArrayResult(const OpResult<ScoredArray>& arr, bool with_scores,
static void OutputScoredArrayResult(const OpResult<ScoredArray>& arr, const RangeParams& params,
ConnectionContext* cntx);
static void ZRemRangeGeneric(std::string_view key, const ZRangeSpec& range_spec,
ConnectionContext* cntx);
static void ZRangeGeneric(CmdArgList args, bool reverse, ConnectionContext* cntx);
struct ZParams {
unsigned flags = 0; // mask of ZADD_IN_ macros.