feat(server): Implement GETEX command #385 (#387)

feat(server): Implement GETEX command

Signed-off-by: matchyc <dawnlight.yc@protonmail.com>
This commit is contained in:
Match_yc 2022-10-16 19:51:30 +08:00 committed by GitHub
parent 15725f49b9
commit 8a2de6a1fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 179 additions and 11 deletions

View File

@ -3,6 +3,7 @@
* **[Amir Alperin](https://github.com/iko1)**
* **[Philipp Born](https://github.com/tamcore)**
* Helm Chart
* **[Meng Chen](https://github.com/matchyc)**
* **[Yuxuan Chen](https://github.com/YuxuanChen98)**
* **[Redha Lhimeur](https://github.com/redhal)**
* **[Braydn Moore](https://github.com/braydnm)**

View File

@ -562,14 +562,12 @@ OpStatus DbSlice::UpdateExpire(const Context& cntx, PrimeIterator prime_it,
return OpStatus::OUT_OF_RANGE;
}
// TODO: to support persist.
if (rel_msec <= 0) {
if (rel_msec <= 0 && !params.persist) {
CHECK(Del(cntx.db_index, prime_it));
} else if (IsValid(expire_it)) {
expire_it->second = FromAbsoluteTime(now_msec + rel_msec);
} else {
UpdateExpire(cntx.db_index, prime_it, rel_msec + now_msec);
UpdateExpire(cntx.db_index, prime_it, params.persist ? 0 : rel_msec + now_msec);
}
return OpStatus::OK;

View File

@ -191,12 +191,18 @@ OpResult<bool> ExtendOrSkip(const OpArgs& op_args, string_view key, string_view
return ExtendExisting(op_args, *it_res, key, val, prepend);
}
OpResult<string> OpGet(const OpArgs& op_args, string_view key, bool del_hit = false) {
OpResult<PrimeIterator> it_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_STRING);
if (!it_res.ok())
return it_res.status();
OpResult<string> OpGet(const OpArgs& op_args, string_view key, bool del_hit = false,
const DbSlice::ExpireParams& exp_params = {}) {
/*Get primeIterator and ExpireIterator at the same time*/
auto [it, it_expire] = op_args.shard->db_slice().FindExt(op_args.db_cntx, key);
const PrimeValue& pv = it_res.value()->second;
if (!IsValid(it))
return OpStatus::KEY_NOTFOUND;
if (it->second.ObjType() != OBJ_STRING)
return OpStatus::WRONG_TYPE;
const PrimeValue& pv = it->second;
if (del_hit) {
string key_bearer = GetString(op_args.shard, pv);
@ -204,12 +210,23 @@ OpResult<string> OpGet(const OpArgs& op_args, string_view key, bool del_hit = fa
DVLOG(1) << "Del: " << key;
auto& db_slice = op_args.shard->db_slice();
CHECK(db_slice.Del(op_args.db_cntx.db_index, it_res.value()));
CHECK(db_slice.Del(op_args.db_cntx.db_index, it));
return key_bearer;
}
return GetString(op_args.shard, pv);
/*Get value before expire*/
string ret_val = GetString(op_args.shard, pv);
if (exp_params.IsDefined()) {
DVLOG(1) << "Expire: " << key;
auto& db_slice = op_args.shard->db_slice();
OpStatus status = db_slice.UpdateExpire(op_args.db_cntx, it, it_expire, exp_params);
if (status != OpStatus::OK)
return status;
}
return ret_val;
}
OpResult<double> OpIncrFloat(const OpArgs& op_args, string_view key, double val) {
@ -666,6 +683,70 @@ void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) {
return (*cntx)->SendNull();
}
void StringFamily::GetEx(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
DbSlice::ExpireParams exp_params;
int64_t int_arg = 0;
for (size_t i = 2; i < args.size(); i++) {
ToUpper(&args[i]);
string_view cur_arg = ArgS(args, i);
if (cur_arg == "EX" || cur_arg == "PX" || cur_arg == "EXAT" || cur_arg == "PXAT") {
i++;
if (i >= args.size()) {
return (*cntx)->SendError(kSyntaxErr);
}
string_view ex = ArgS(args, i);
if (!absl::SimpleAtoi(ex, &int_arg)) {
return (*cntx)->SendError(kInvalidIntErr);
}
if (int_arg <= 0) {
return (*cntx)->SendError(InvalidExpireTime("getex"));
}
if (cur_arg == "EXAT" || cur_arg == "PXAT") {
exp_params.absolute = true;
}
exp_params.value = int_arg;
if (cur_arg == "EX" || cur_arg == "EXAT") {
exp_params.unit = TimeUnit::SEC;
} else {
exp_params.unit = TimeUnit::MSEC;
}
} else if (cur_arg == "PERSIST") {
exp_params.persist = true;
} else {
return (*cntx)->SendError(kSyntaxErr);
}
}
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpGet(t->GetOpArgs(shard), key, false, exp_params);
};
DVLOG(1) << "Before Get::ScheduleSingleHopT " << key;
OpResult<string> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result)
return (*cntx)->SendBulkString(*result);
switch (result.status()) {
case OpStatus::WRONG_TYPE:
(*cntx)->SendError(kWrongTypeErr);
break;
default:
DVLOG(1) << "GET " << key << " nil";
(*cntx)->SendNull();
}
}
void StringFamily::Incr(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
return IncrByGeneric(key, 1, cntx);
@ -1080,6 +1161,7 @@ void StringFamily::Register(CommandRegistry* registry) {
<< CI{"DECRBY", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, 1}.HFUNC(DecrBy)
<< CI{"GET", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Get)
<< CI{"GETDEL", CO::WRITE | CO::DENYOOM | CO::FAST, 2, 1, 1, 1}.HFUNC(GetDel)
<< CI{"GETEX", CO::WRITE | CO::DENYOOM | CO::FAST, -1, 1, 1, 1}.HFUNC(GetEx)
<< CI{"GETSET", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, 1}.HFUNC(GetSet)
<< CI{"MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING, -2, 1, -1, 1}.HFUNC(MGet)
<< CI{"MSET", CO::WRITE | CO::DENYOOM, -3, 1, -1, 2}.HFUNC(MSet)

View File

@ -60,6 +60,7 @@ class StringFamily {
static void GetDel(CmdArgList args, ConnectionContext* cntx);
static void GetRange(CmdArgList args, ConnectionContext* cntx);
static void GetSet(CmdArgList args, ConnectionContext* cntx);
static void GetEx(CmdArgList args, ConnectionContext* cntx);
static void Incr(CmdArgList args, ConnectionContext* cntx);
static void IncrBy(CmdArgList args, ConnectionContext* cntx);
static void IncrByFloat(CmdArgList args, ConnectionContext* cntx);

View File

@ -399,4 +399,90 @@ TEST_F(StringFamilyTest, GetDel) {
ASSERT_THAT(resp, ArgType(RespExpr::NIL));
}
TEST_F(StringFamilyTest, GetEx) {
auto resp = Run({"set", "foo", "bar"});
EXPECT_THAT(resp, "OK");
resp = Run({"getex", "foo", "EX"});
EXPECT_THAT(resp, ErrArg("syntax error"));
resp = Run({"getex", "foo", "bar", "EX"});
EXPECT_THAT(resp, ErrArg("syntax error"));
resp = Run({"getex", "foo", "PERSIST", "1"});
EXPECT_THAT(resp, ErrArg("syntax error"));
resp = Run({"getex", "foo", "PXAT"});
EXPECT_THAT(resp, ErrArg("syntax error"));
resp = Run({"getex", "foo", "EX", "0"});
EXPECT_THAT(resp, ErrArg("invalid expire time"));
resp = Run({"getex", "foo", "PXAT", "-1"});
EXPECT_THAT(resp, ErrArg("invalid expire time"));
EXPECT_EQ(Run({"getex", "foo"}), "bar");
resp = Run({"getex", "foo", "PERSIST"});
EXPECT_EQ(resp, "bar");
EXPECT_THAT(Run({"TTL", "foo"}), IntArg(-1));
resp = Run({"getex", "foo", "pxat", absl::StrCat(TEST_current_time_ms - 1)});
EXPECT_EQ(resp, "bar");
EXPECT_THAT(Run({"getex", "foo"}), ArgType(RespExpr::NIL));
Run({"set", "foo", "bar"});
resp = Run({"getex", "foo", "PXAT", absl::StrCat(TEST_current_time_ms + 10)});
EXPECT_EQ(resp, "bar");
AdvanceTime(9);
EXPECT_EQ(Run({"getex", "foo"}), "bar");
AdvanceTime(1);
EXPECT_THAT(Run({"getex", "foo"}), ArgType(RespExpr::NIL));
Run({"set", "foo", "bar"});
resp = Run({"getex", "foo", "exat", absl::StrCat(TEST_current_time_ms / 1000 - 1)});
EXPECT_EQ(resp, "bar");
EXPECT_THAT(Run({"getex", "foo"}), ArgType(RespExpr::NIL));
Run({"set", "foo", "bar"});
uint64_t next_two_seconds = TEST_current_time_ms + 2000;
uint64_t next_two_seconds_round_down = static_cast<uint64_t>(next_two_seconds / 1000);
uint64_t diff = next_two_seconds_round_down * 1000 - TEST_current_time_ms;
resp = Run({"getex", "foo", "EXAT", absl::StrCat(next_two_seconds_round_down)});
EXPECT_EQ(resp, "bar");
AdvanceTime(diff - 1);
EXPECT_EQ(Run({"getex", "foo"}), "bar");
AdvanceTime(1);
EXPECT_THAT(Run({"getex", "foo"}), ArgType(RespExpr::NIL));
Run({"set", "foo", "bar"});
resp = Run({"getex", "foo", "PX", "10"});
AdvanceTime(9);
EXPECT_EQ(Run({"getex", "foo"}), "bar");
AdvanceTime(1);
EXPECT_THAT(Run({"getex", "foo"}), ArgType(RespExpr::NIL));
Run({"set", "foo", "bar"});
resp = Run({"getex", "foo", "ex", "1"});
AdvanceTime(999);
EXPECT_EQ(Run({"getex", "foo"}), "bar");
AdvanceTime(1);
EXPECT_THAT(Run({"getex", "foo"}), ArgType(RespExpr::NIL));
}
} // namespace dfly