Add DISCARD and BRPOP commands

This commit is contained in:
Roman Gershman 2022-03-31 14:26:33 +03:00
parent 37f09f315e
commit 4938d8af63
5 changed files with 37 additions and 8 deletions

View File

@ -150,7 +150,7 @@ a distributed log format.
API 2.0
- [ ] List Family
- [X] BLPOP
- [ ] BRPOP
- [X] BRPOP
- [ ] BRPOPLPUSH
- [ ] BLMOVE
- [ ] LINSERT
@ -192,10 +192,10 @@ API 2.0
- [ ] PUBSUB CHANNELS
- [X] SUBSCRIBE
- [X] UNSUBSCRIBE
- [ ] Server Family
- [X] Server Family
- [ ] WATCH
- [ ] UNWATCH
- [ ] DISCARD
- [X] DISCARD
- [ ] CLIENT KILL/LIST/UNPAUSE/PAUSE/GETNAME/SETNAME/REPLY/TRACKINGINFO
- [X] COMMAND
- [ ] COMMAND COUNT/GETKEYS/INFO

View File

@ -102,7 +102,7 @@ string ListPop(ListDir dir, quicklist* ql) {
class BPopper {
public:
explicit BPopper();
explicit BPopper(ListDir dir);
// Returns WRONG_TYPE, OK.
// If OK is returned then use result() to fetch the value.
@ -119,6 +119,8 @@ class BPopper {
private:
OpStatus Pop(Transaction* t, EngineShard* shard);
ListDir dir_;
bool found_ = false;
PrimeIterator find_it_;
ShardId find_sid_ = std::numeric_limits<ShardId>::max();
@ -127,7 +129,7 @@ class BPopper {
string value_;
};
BPopper::BPopper() {
BPopper::BPopper(ListDir dir) : dir_(dir) {
}
OpStatus BPopper::Run(Transaction* t, unsigned msec) {
@ -186,7 +188,7 @@ OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) {
find_it_->first.GetString(&key_);
quicklist* ql = GetQL(find_it_->second);
value_ = ListPop(ListDir::LEFT, ql);
value_ = ListPop(dir_, ql);
if (quicklistCount(ql) == 0) {
CHECK(shard->db_slice().Del(t->db_index(), find_it_));
@ -343,6 +345,14 @@ void ListFamily::LSet(CmdArgList args, ConnectionContext* cntx) {
}
void ListFamily::BLPop(CmdArgList args, ConnectionContext* cntx) {
BPopGeneric(ListDir::LEFT, std::move(args), cntx);
}
void ListFamily::BRPop(CmdArgList args, ConnectionContext* cntx) {
BPopGeneric(ListDir::RIGHT, std::move(args), cntx);
}
void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cntx) {
DCHECK_GE(args.size(), 3u);
float timeout;
@ -356,7 +366,7 @@ void ListFamily::BLPop(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "BLPop start " << timeout;
Transaction* transaction = cntx->transaction;
BPopper popper;
BPopper popper(dir);
OpStatus result = popper.Run(transaction, unsigned(timeout * 1000));
switch (result) {
@ -704,6 +714,7 @@ void ListFamily::Register(CommandRegistry* registry) {
<< CI{"RPUSHX", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPushX)
<< CI{"RPOP", CO::WRITE | CO::FAST | CO::DENYOOM, -2, 1, 1, 1}.HFUNC(RPop)
<< CI{"BLPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BLPop)
<< CI{"BRPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BRPop)
<< CI{"LLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(LLen)
<< CI{"LINDEX", CO::READONLY, 3, 1, 1, 1}.HFUNC(LIndex)
<< CI{"LRANGE", CO::READONLY, 4, 1, 1, 1}.HFUNC(LRange)

View File

@ -27,6 +27,7 @@ class ListFamily {
static void LPop(CmdArgList args, ConnectionContext* cntx);
static void RPop(CmdArgList args, ConnectionContext* cntx);
static void BLPop(CmdArgList args, ConnectionContext* cntx);
static void BRPop(CmdArgList args, ConnectionContext* cntx);
static void LLen(CmdArgList args, ConnectionContext* cntx);
static void LIndex(CmdArgList args, ConnectionContext* cntx);
static void LTrim(CmdArgList args, ConnectionContext* cntx);
@ -38,6 +39,8 @@ class ListFamily {
static void PushGeneric(ListDir dir, bool skip_notexist, CmdArgList args,
ConnectionContext* cntx);
static void BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cntx);
static OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir dir,
bool skip_notexist, absl::Span<std::string_view> vals);

View File

@ -360,7 +360,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
VLOG(2) << "Got: " << args;
string_view cmd_str = ArgS(args, 0);
bool is_trans_cmd = (cmd_str == "EXEC" || cmd_str == "MULTI");
bool is_trans_cmd = (cmd_str == "EXEC" || cmd_str == "MULTI" || cmd_str == "DISCARD");
const CommandId* cid = registry_.Find(cmd_str);
ServerState& etl = *ServerState::tlocal();
@ -813,6 +813,19 @@ void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter,
interpreter->ResetStack();
}
void Service::Discard(CmdArgList args, ConnectionContext* cntx) {
RedisReplyBuilder* rb = (*cntx).operator->();
if (cntx->conn_state.exec_state == ConnectionState::EXEC_INACTIVE) {
return rb->SendError("DISCARD without MULTI");
}
cntx->conn_state.exec_state = ConnectionState::EXEC_INACTIVE;
cntx->conn_state.exec_body.clear();
rb->SendOk();
}
void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
RedisReplyBuilder* rb = (*cntx).operator->();
@ -940,6 +953,7 @@ void Service::RegisterCommands() {
registry_ << CI{"QUIT", CO::READONLY | CO::FAST, 1, 0, 0, 0}.HFUNC(Quit)
<< CI{"MULTI", CO::NOSCRIPT | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(Multi)
<< CI{"DISCARD", CO::NOSCRIPT | CO::FAST| CO::LOADING, 1, 0, 0, 0}.MFUNC(Discard)
<< CI{"EVAL", CO::NOSCRIPT, -3, 0, 0, 0}.MFUNC(Eval).SetValidator(&EvalValidator)
<< CI{"EVALSHA", CO::NOSCRIPT, -3, 0, 0, 0}.MFUNC(EvalSha).SetValidator(&EvalValidator)
<< CI{"EXEC", kExecMask, 1, 0, 0, 0}.MFUNC(Exec)

View File

@ -71,6 +71,7 @@ class Service : public facade::ServiceInterface {
static void Quit(CmdArgList args, ConnectionContext* cntx);
static void Multi(CmdArgList args, ConnectionContext* cntx);
void Discard(CmdArgList args, ConnectionContext* cntx);
void Eval(CmdArgList args, ConnectionContext* cntx);
void EvalSha(CmdArgList args, ConnectionContext* cntx);
void Exec(CmdArgList args, ConnectionContext* cntx);