chore(server): Refactor expire functionality and move it to DbSlice (#388)

This is preparation for https://github.com/dragonflydb/dragonfly/pull/387

Signed-off-by: Roman Gershman <roman@dragonflydb.io>

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2022-10-14 13:14:09 +03:00 committed by GitHub
parent 2e875c81c7
commit ffc25e6ac2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 58 additions and 38 deletions

View File

@ -105,6 +105,11 @@ enum class GlobalState : uint8_t {
SHUTTING_DOWN,
};
enum class TimeUnit : uint8_t {
SEC,
MSEC
};
inline void ToUpper(const MutableSlice* val) {
for (auto& c : *val) {
c = absl::ascii_toupper(c);

View File

@ -550,6 +550,31 @@ PrimeIterator DbSlice::AddNew(const Context& cntx, string_view key, PrimeValue o
return it;
}
OpStatus DbSlice::UpdateExpire(const Context& cntx, PrimeIterator prime_it,
ExpireIterator expire_it, const ExpireParams& params) {
DCHECK(params.IsDefined());
DCHECK(IsValid(prime_it));
int64_t msec = (params.unit == TimeUnit::SEC) ? params.value * 1000 : params.value;
int64_t now_msec = cntx.time_now_ms;
int64_t rel_msec = params.absolute ? msec - now_msec : msec;
if (rel_msec > kMaxExpireDeadlineSec * 1000) {
return OpStatus::OUT_OF_RANGE;
}
// TODO: to support persist.
if (rel_msec <= 0) {
CHECK(Del(cntx.db_index, prime_it));
} else if (IsValid(expire_it)) {
expire_it->second = FromAbsoluteTime(now_msec + rel_msec);
} else {
UpdateExpire(cntx.db_index, prime_it, rel_msec + now_msec);
}
return OpStatus::OK;
}
pair<PrimeIterator, bool> DbSlice::AddEntry(const Context& cntx, string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false) {
DCHECK(!obj.IsRef());

View File

@ -83,6 +83,18 @@ class DbSlice {
}
};
struct ExpireParams {
int64_t value = INT64_MIN; // undefined
bool absolute = false;
TimeUnit unit = TimeUnit::SEC;
bool persist = false;
bool IsDefined() const {
return persist || value > INT64_MIN;
}
};
DbSlice(uint32_t index, bool caching_mode, EngineShard* owner);
~DbSlice();
@ -150,6 +162,9 @@ class DbSlice {
PrimeIterator AddNew(const Context& cntx, std::string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false);
facade::OpStatus UpdateExpire(const Context& cntx, PrimeIterator prime_it, ExpireIterator exp_it,
const ExpireParams& params);
// Either adds or removes (if at == 0) expiry. Returns true if a change was made.
// Does not change expiry if at != 0 and expiry already exists.
bool UpdateExpire(DbIndex db_ind, PrimeIterator main_it, uint64_t at);

View File

@ -350,6 +350,15 @@ uint64_t ScanGeneric(uint64_t cursor, const ScanOpts& scan_opts, StringVec* keys
return cursor;
}
OpStatus OpExpire(const OpArgs& op_args, string_view key, const DbSlice::ExpireParams& params) {
auto& db_slice = op_args.shard->db_slice();
auto [it, expire_it] = db_slice.FindExt(op_args.db_cntx, key);
if (!IsValid(it))
return OpStatus::KEY_NOTFOUND;
return db_slice.UpdateExpire(op_args.db_cntx, it, expire_it, params);
}
} // namespace
void GenericFamily::Init(util::ProactorPool* pp) {
@ -454,7 +463,7 @@ void GenericFamily::Expire(CmdArgList args, ConnectionContext* cntx) {
}
int_arg = std::max(int_arg, -1L);
ExpireParams params{.ts = int_arg};
DbSlice::ExpireParams params{.value = int_arg};
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExpire(t->GetOpArgs(shard), key, params);
@ -472,8 +481,9 @@ void GenericFamily::ExpireAt(CmdArgList args, ConnectionContext* cntx) {
if (!absl::SimpleAtoi(sec, &int_arg)) {
return (*cntx)->SendError(kInvalidIntErr);
}
int_arg = std::max(int_arg, 0L);
ExpireParams params{.ts = int_arg, .absolute = true};
DbSlice::ExpireParams params{.value = int_arg, .absolute = true};
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExpire(t->GetOpArgs(shard), key, params);
@ -517,7 +527,7 @@ void GenericFamily::PexpireAt(CmdArgList args, ConnectionContext* cntx) {
return (*cntx)->SendError(kInvalidIntErr);
}
int_arg = std::max(int_arg, 0L);
ExpireParams params{.ts = int_arg, .absolute = true, .unit = MSEC};
DbSlice::ExpireParams params{.value = int_arg, .absolute = true, .unit = TimeUnit::MSEC};
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExpire(t->GetOpArgs(shard), key, params);
@ -953,31 +963,6 @@ void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) {
}
}
OpStatus GenericFamily::OpExpire(const OpArgs& op_args, string_view key,
const ExpireParams& params) {
auto& db_slice = op_args.shard->db_slice();
auto [it, expire_it] = db_slice.FindExt(op_args.db_cntx, key);
if (!IsValid(it))
return OpStatus::KEY_NOTFOUND;
int64_t msec = (params.unit == TimeUnit::SEC) ? params.ts * 1000 : params.ts;
int64_t now_msec = op_args.db_cntx.time_now_ms;
int64_t rel_msec = params.absolute ? msec - now_msec : msec;
if (rel_msec > kMaxExpireDeadlineSec * 1000) {
return OpStatus::OUT_OF_RANGE;
}
if (rel_msec <= 0) {
CHECK(db_slice.Del(op_args.db_cntx.db_index, it));
} else if (IsValid(expire_it)) {
expire_it->second = db_slice.FromAbsoluteTime(now_msec + rel_msec);
} else {
db_slice.UpdateExpire(op_args.db_cntx.db_index, it, rel_msec + now_msec);
}
return OpStatus::OK;
}
OpResult<uint64_t> GenericFamily::OpTtl(Transaction* t, EngineShard* shard, string_view key) {
auto& db_slice = shard->db_slice();
auto [it, expire_it] = db_slice.FindExt(t->db_context(), key);

View File

@ -32,14 +32,6 @@ class GenericFamily {
static OpResult<uint32_t> OpExists(const OpArgs& op_args, ArgSlice keys);
private:
enum TimeUnit { SEC, MSEC };
struct ExpireParams {
int64_t ts;
bool absolute = false;
TimeUnit unit = SEC;
};
static void Del(CmdArgList args, ConnectionContext* cntx);
static void Ping(CmdArgList args, ConnectionContext* cntx);
@ -69,8 +61,6 @@ class GenericFamily {
ConnectionContext* cntx);
static void TtlGeneric(CmdArgList args, ConnectionContext* cntx, TimeUnit unit);
static OpStatus OpExpire(const OpArgs& op_args, std::string_view key, const ExpireParams& params);
static OpResult<uint64_t> OpTtl(Transaction* t, EngineShard* shard, std::string_view key);
static OpResult<uint32_t> OpDel(const OpArgs& op_args, ArgSlice keys);
static OpResult<void> OpRen(const OpArgs& op_args, std::string_view from, std::string_view to,