Some small refactorings around DbSlice::AddOrFind.
Code clean-ups in string-family in preparation for the work on point in time snapshotting for other types.
This commit is contained in:
parent
afd52d5571
commit
f65d6308c7
|
@ -238,19 +238,23 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pair<PrimeIterator
|
|||
|
||||
PrimeEvictionPolicy evp{db_index, this, int64_t(memory_budget_ - key.size())};
|
||||
|
||||
// Fast-path - change_cb_ is empty so we Find or Add using
|
||||
// Fast-path if change_cb_ is empty so we Find or Add using
|
||||
// the insert operation: twice more efficient.
|
||||
CompactObj co_key{key};
|
||||
|
||||
auto [it, inserted] = db->prime_table.Insert(std::move(co_key), PrimeValue{});
|
||||
auto [it, inserted] = db->prime_table.Insert(std::move(co_key), PrimeValue{}, evp);
|
||||
if (inserted) { // new entry
|
||||
db->stats.inline_keys += it->first.IsInline();
|
||||
db->stats.obj_memory_usage += it->first.MallocUsed();
|
||||
|
||||
events_.garbage_collected += evp.gc_count();
|
||||
|
||||
it.SetVersion(NextVersion());
|
||||
memory_budget_ = evp.mem_budget();
|
||||
|
||||
return make_pair(it, true);
|
||||
}
|
||||
|
||||
auto& existing = it;
|
||||
|
||||
DCHECK(IsValid(existing));
|
||||
|
@ -391,42 +395,34 @@ PrimeIterator DbSlice::AddNew(DbIndex db_ind, string_view key, PrimeValue obj,
|
|||
ccb.second(db_ind, ChangeReq{key});
|
||||
}
|
||||
|
||||
auto [res, added] = AddIfNotExist(db_ind, key, std::move(obj), expire_at_ms);
|
||||
auto [res, added] = AddOrFind(db_ind, key, std::move(obj), expire_at_ms);
|
||||
CHECK(added);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
pair<PrimeIterator, bool> DbSlice::AddIfNotExist(DbIndex db_ind, string_view key, PrimeValue obj,
|
||||
uint64_t expire_at_ms) {
|
||||
pair<PrimeIterator, bool> DbSlice::AddOrFind(DbIndex db_ind, string_view key, PrimeValue obj,
|
||||
uint64_t expire_at_ms) {
|
||||
DCHECK_LT(db_ind, db_arr_.size());
|
||||
DCHECK(!obj.IsRef());
|
||||
|
||||
auto res = AddOrFind(db_ind, key);
|
||||
if (!res.second) // have not inserted.
|
||||
return res;
|
||||
|
||||
auto& db = *db_arr_[db_ind];
|
||||
CompactObj co_key{key};
|
||||
memory_budget_ -= key.size();
|
||||
auto& new_it = res.first;
|
||||
|
||||
PrimeEvictionPolicy evp{db_ind, this, memory_budget_};
|
||||
auto [new_entry, inserted] = db.prime_table.Insert(std::move(co_key), std::move(obj), evp);
|
||||
|
||||
// in this case obj won't be moved and will be destroyed during unwinding.
|
||||
if (!inserted)
|
||||
return make_pair(new_entry, false);
|
||||
|
||||
events_.garbage_collected += evp.gc_count();
|
||||
memory_budget_ = evp.mem_budget();
|
||||
new_entry.SetVersion(NextVersion());
|
||||
|
||||
db.stats.inline_keys += new_entry->first.IsInline();
|
||||
db.stats.obj_memory_usage += (new_entry->first.MallocUsed() + new_entry->second.MallocUsed());
|
||||
db.stats.obj_memory_usage += obj.MallocUsed();
|
||||
new_it->second = std::move(obj);
|
||||
|
||||
if (expire_at_ms) {
|
||||
new_entry->second.SetExpire(true);
|
||||
new_it->second.SetExpire(true);
|
||||
uint64_t delta = expire_at_ms - expire_base_[0];
|
||||
CHECK(db.expire_table.Insert(new_entry->first.AsRef(), ExpirePeriod(delta)).second);
|
||||
CHECK(db.expire_table.Insert(new_it->first.AsRef(), ExpirePeriod(delta)).second);
|
||||
}
|
||||
|
||||
return make_pair(new_entry, true);
|
||||
return res;
|
||||
}
|
||||
|
||||
size_t DbSlice::DbSize(DbIndex db_ind) const {
|
||||
|
|
|
@ -129,8 +129,15 @@ class DbSlice {
|
|||
OpResult<std::pair<PrimeIterator, unsigned>> FindFirst(DbIndex db_index, ArgSlice args);
|
||||
|
||||
// Return .second=true if insertion ocurred, false if we return the existing key.
|
||||
// throws: bad_alloc is insertion could not happen due to out of memory.
|
||||
std::pair<PrimeIterator, bool> AddOrFind(DbIndex db_ind, std::string_view key);
|
||||
|
||||
// Returns true if insertion took place, false otherwise.
|
||||
// expire_at_ms equal to 0 - means no expiry.
|
||||
// throws: bad_alloc is insertion could not happen due to out of memory.
|
||||
std::pair<PrimeIterator, bool> AddOrFind(DbIndex db_ind, std::string_view key, PrimeValue obj,
|
||||
uint64_t expire_at_ms);
|
||||
|
||||
// Either adds or removes (if at == 0) expiry. Returns true if a change was made.
|
||||
// Does not change expiry if at != 0 and expiry already exists.
|
||||
bool Expire(DbIndex db_ind, PrimeIterator main_it, uint64_t at);
|
||||
|
@ -143,12 +150,6 @@ class DbSlice {
|
|||
// throws: bad_alloc is insertion could not happen due to out of memory.
|
||||
PrimeIterator AddNew(DbIndex db_ind, std::string_view key, PrimeValue obj, uint64_t expire_at_ms);
|
||||
|
||||
// Adds a new entry if a key does not exists. Returns true if insertion took place,
|
||||
// false otherwise. expire_at_ms equal to 0 - means no expiry.
|
||||
// throws: bad_alloc is insertion could not happen due to out of memory.
|
||||
std::pair<PrimeIterator, bool> AddIfNotExist(DbIndex db_ind, std::string_view key, PrimeValue obj,
|
||||
uint64_t expire_at_ms);
|
||||
|
||||
// Creates a database with index `db_ind`. If such database exists does nothing.
|
||||
void ActivateDb(DbIndex db_ind);
|
||||
|
||||
|
|
|
@ -557,7 +557,7 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
|
|||
DbSlice& db_slice = EngineShard::tlocal()->db_slice();
|
||||
for (const auto& item : ib) {
|
||||
std::string_view key{item.key, sdslen(item.key)};
|
||||
db_slice.AddIfNotExist(db_ind, key, PrimeValue{item.val}, item.expire_ms);
|
||||
db_slice.AddOrFind(db_ind, key, PrimeValue{item.val}, item.expire_ms);
|
||||
sdsfree(item.key);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,7 +9,6 @@ extern "C" {
|
|||
}
|
||||
|
||||
#include <absl/container/inlined_vector.h>
|
||||
|
||||
#include <double-conversion/string-to-double.h>
|
||||
|
||||
#include "base/logging.h"
|
||||
|
@ -36,58 +35,93 @@ DEFINE_VARZ(VarzQps, get_qps);
|
|||
|
||||
constexpr uint32_t kMaxStrLen = 1 << 28;
|
||||
|
||||
OpResult<uint32_t> OpSetRange(const OpArgs& op_args, string_view key, size_t start,
|
||||
string_view value) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
size_t range_len = start + value.size();
|
||||
|
||||
if (range_len == 0) {
|
||||
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_STRING);
|
||||
if (it_res) {
|
||||
return it_res.value()->second.Size();
|
||||
} else {
|
||||
return it_res.status();
|
||||
}
|
||||
}
|
||||
|
||||
auto [it, added] = db_slice.AddOrFind(op_args.db_ind, key);
|
||||
|
||||
string s;
|
||||
|
||||
if (added) {
|
||||
s.resize(range_len);
|
||||
} else {
|
||||
if (it->second.ObjType() != OBJ_STRING)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
||||
it->second.GetString(&s);
|
||||
if (s.size() < range_len)
|
||||
s.resize(range_len);
|
||||
|
||||
db_slice.PreUpdate(op_args.db_ind, it);
|
||||
}
|
||||
memcpy(s.data() + start, value.data(), value.size());
|
||||
it->second.SetString(s);
|
||||
db_slice.PostUpdate(op_args.db_ind, it);
|
||||
|
||||
return it->second.Size();
|
||||
}
|
||||
|
||||
OpResult<string> OpGetRange(const OpArgs& op_args, string_view key, int32_t start, int32_t end) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
OpResult<PrimeIterator> it_res = db_slice.Find(op_args.db_ind, key, OBJ_STRING);
|
||||
if (!it_res.ok())
|
||||
return it_res.status();
|
||||
|
||||
const CompactObj& co = it_res.value()->second;
|
||||
size_t strlen = co.Size();
|
||||
|
||||
if (start < 0)
|
||||
start = strlen + start;
|
||||
if (end < 0)
|
||||
end = strlen + end;
|
||||
|
||||
if (start < 0)
|
||||
start = 0;
|
||||
if (end < 0)
|
||||
end = 0;
|
||||
|
||||
if (strlen == 0 || start > end || size_t(start) >= strlen) {
|
||||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
if (size_t(end) >= strlen)
|
||||
end = strlen - 1;
|
||||
|
||||
string tmp;
|
||||
string_view slice = co.GetSlice(&tmp);
|
||||
|
||||
return string(slice.substr(start, end - start + 1));
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
SetCmd::SetCmd(DbSlice* db_slice) : db_slice_(db_slice) {
|
||||
SetCmd::SetCmd(DbSlice* db_slice) : db_slice_(*db_slice) {
|
||||
}
|
||||
|
||||
SetCmd::~SetCmd() {
|
||||
}
|
||||
|
||||
OpResult<void> SetCmd::Set(const SetParams& params, std::string_view key, std::string_view value) {
|
||||
DCHECK_LT(params.db_index, db_slice_->db_array_size());
|
||||
DCHECK(db_slice_->IsDbValid(params.db_index));
|
||||
DCHECK_LT(params.db_index, db_slice_.db_array_size());
|
||||
DCHECK(db_slice_.IsDbValid(params.db_index));
|
||||
|
||||
VLOG(2) << "Set " << key << "(" << db_slice_->shard_id() << ") ";
|
||||
VLOG(2) << "Set " << key << "(" << db_slice_.shard_id() << ") ";
|
||||
|
||||
auto [it, expire_it] = db_slice_->FindExt(params.db_index, key);
|
||||
uint64_t at_ms = params.expire_after_ms ? params.expire_after_ms + db_slice_->Now() : 0;
|
||||
auto [it, expire_it] = db_slice_.FindExt(params.db_index, key);
|
||||
|
||||
if (IsValid(it)) { // existing
|
||||
if (params.how == SET_IF_NOTEXIST)
|
||||
return OpStatus::SKIPPED;
|
||||
|
||||
PrimeValue& prime_value = it->second;
|
||||
if (params.prev_val) {
|
||||
if (prime_value.ObjType() != OBJ_STRING)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
||||
string val;
|
||||
prime_value.GetString(&val);
|
||||
params.prev_val->emplace(move(val));
|
||||
}
|
||||
|
||||
if (IsValid(expire_it) && at_ms) {
|
||||
expire_it->second = db_slice_->FromAbsoluteTime(at_ms);
|
||||
} else {
|
||||
bool changed = db_slice_->Expire(params.db_index, it, at_ms);
|
||||
if (changed && at_ms == 0) // erased.
|
||||
return OpStatus::OK;
|
||||
}
|
||||
db_slice_->PreUpdate(params.db_index, it);
|
||||
|
||||
// Check whether we need to update flags table.
|
||||
bool req_flag_update = (params.memcache_flags != 0) != prime_value.HasFlag();
|
||||
if (req_flag_update) {
|
||||
prime_value.SetFlag(params.memcache_flags != 0);
|
||||
db_slice_->SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags);
|
||||
}
|
||||
|
||||
// overwrite existing entry.
|
||||
prime_value.SetString(value);
|
||||
db_slice_->PostUpdate(params.db_index, it);
|
||||
|
||||
return OpStatus::OK;
|
||||
return SetExisting(params, it, expire_it, value);
|
||||
}
|
||||
|
||||
// New entry
|
||||
|
@ -96,9 +130,10 @@ OpResult<void> SetCmd::Set(const SetParams& params, std::string_view key, std::s
|
|||
|
||||
PrimeValue tvalue{value};
|
||||
tvalue.SetFlag(params.memcache_flags != 0);
|
||||
it = db_slice_->AddNew(params.db_index, key, std::move(tvalue), at_ms);
|
||||
uint64_t at_ms = params.expire_after_ms ? params.expire_after_ms + db_slice_.Now() : 0;
|
||||
it = db_slice_.AddNew(params.db_index, key, std::move(tvalue), at_ms);
|
||||
|
||||
EngineShard* shard = db_slice_->shard_owner();
|
||||
EngineShard* shard = db_slice_.shard_owner();
|
||||
|
||||
if (shard->tiered_storage()) { // external storage enabled.
|
||||
if (value.size() >= 64 && value.size() < 2_MB) {
|
||||
|
@ -107,7 +142,47 @@ OpResult<void> SetCmd::Set(const SetParams& params, std::string_view key, std::s
|
|||
}
|
||||
|
||||
if (params.memcache_flags)
|
||||
db_slice_->SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags);
|
||||
db_slice_.SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags);
|
||||
|
||||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIterator e_it,
|
||||
std::string_view value) {
|
||||
if (params.how == SET_IF_NOTEXIST)
|
||||
return OpStatus::SKIPPED;
|
||||
|
||||
PrimeValue& prime_value = it->second;
|
||||
if (params.prev_val) {
|
||||
if (prime_value.ObjType() != OBJ_STRING)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
||||
string val;
|
||||
prime_value.GetString(&val);
|
||||
params.prev_val->emplace(move(val));
|
||||
}
|
||||
|
||||
uint64_t at_ms = params.expire_after_ms ? params.expire_after_ms + db_slice_.Now() : 0;
|
||||
if (IsValid(e_it) && at_ms) {
|
||||
e_it->second = db_slice_.FromAbsoluteTime(at_ms);
|
||||
} else {
|
||||
bool changed = db_slice_.Expire(params.db_index, it, at_ms);
|
||||
if (changed && at_ms == 0) // erased.
|
||||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
db_slice_.PreUpdate(params.db_index, it);
|
||||
|
||||
// Check whether we need to update flags table.
|
||||
bool req_flag_update = (params.memcache_flags != 0) != prime_value.HasFlag();
|
||||
if (req_flag_update) {
|
||||
prime_value.SetFlag(params.memcache_flags != 0);
|
||||
db_slice_.SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags);
|
||||
}
|
||||
|
||||
// overwrite existing entry.
|
||||
prime_value.SetString(value);
|
||||
db_slice_.PostUpdate(params.db_index, it);
|
||||
|
||||
return OpStatus::OK;
|
||||
}
|
||||
|
@ -545,35 +620,8 @@ void StringFamily::GetRange(CmdArgList args, ConnectionContext* cntx) {
|
|||
return (*cntx)->SendError(kInvalidIntErr);
|
||||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<string> {
|
||||
OpResult<PrimeIterator> it_res = shard->db_slice().Find(t->db_index(), key, OBJ_STRING);
|
||||
if (!it_res.ok())
|
||||
return it_res.status();
|
||||
|
||||
const CompactObj& co = it_res.value()->second;
|
||||
size_t strlen = co.Size();
|
||||
|
||||
if (start < 0)
|
||||
start = strlen + start;
|
||||
if (end < 0)
|
||||
end = strlen + end;
|
||||
|
||||
if (start < 0)
|
||||
start = 0;
|
||||
if (end < 0)
|
||||
end = 0;
|
||||
|
||||
if (strlen == 0 || start > end || size_t(start) >= strlen) {
|
||||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
if (size_t(end) >= strlen)
|
||||
end = strlen - 1;
|
||||
|
||||
string tmp;
|
||||
string_view slice = co.GetSlice(&tmp);
|
||||
|
||||
return string(slice.substr(start, end - start + 1));
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpGetRange(OpArgs{shard, t->db_index()}, key, start, end);
|
||||
};
|
||||
|
||||
Transaction* trans = cntx->transaction;
|
||||
|
@ -606,34 +654,7 @@ void StringFamily::SetRange(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<uint32_t> {
|
||||
if (min_size == 0) {
|
||||
auto it_res = shard->db_slice().Find(t->db_index(), key, OBJ_STRING);
|
||||
if (it_res) {
|
||||
return it_res.value()->second.Size();
|
||||
} else {
|
||||
return it_res.status();
|
||||
}
|
||||
}
|
||||
|
||||
auto [it, added] = shard->db_slice().AddOrFind(t->db_index(), key);
|
||||
|
||||
string s;
|
||||
s.reserve(min_size);
|
||||
|
||||
if (added) {
|
||||
s.resize(min_size);
|
||||
} else {
|
||||
if (it->second.ObjType() != OBJ_STRING)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
||||
it->second.GetString(&s);
|
||||
if (s.size() < min_size)
|
||||
s.resize(min_size);
|
||||
}
|
||||
memcpy(s.data() + start, value.data(), value.size());
|
||||
it->second.SetString(s);
|
||||
|
||||
return it->second.Size();
|
||||
return OpSetRange(OpArgs{shard, t->db_index()}, key, start, value);
|
||||
};
|
||||
|
||||
Transaction* trans = cntx->transaction;
|
||||
|
@ -699,6 +720,8 @@ OpStatus StringFamily::OpMSet(const OpArgs& op_args, ArgSlice args) {
|
|||
OpResult<int64_t> StringFamily::OpIncrBy(const OpArgs& op_args, std::string_view key, int64_t incr,
|
||||
bool skip_on_missing) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
|
||||
// we avoid using AddOrFind because of skip_on_missing option for memcache.
|
||||
auto [it, expire_it] = db_slice.FindExt(op_args.db_ind, key);
|
||||
|
||||
if (!IsValid(it)) {
|
||||
|
@ -709,6 +732,7 @@ OpResult<int64_t> StringFamily::OpIncrBy(const OpArgs& op_args, std::string_view
|
|||
cobj.SetInt(incr);
|
||||
|
||||
db_slice.AddNew(op_args.db_ind, key, std::move(cobj), 0);
|
||||
|
||||
return incr;
|
||||
}
|
||||
|
||||
|
@ -744,6 +768,7 @@ OpResult<double> StringFamily::OpIncrFloat(const OpArgs& op_args, std::string_vi
|
|||
if (inserted) {
|
||||
char* str = RedisReplyBuilder::FormatDouble(val, buf, sizeof(buf));
|
||||
it->second.SetString(str);
|
||||
db_slice.PostUpdate(op_args.db_ind, it);
|
||||
|
||||
return val;
|
||||
}
|
||||
|
@ -785,6 +810,8 @@ OpResult<uint32_t> StringFamily::ExtendOrSet(const OpArgs& op_args, std::string_
|
|||
auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key);
|
||||
if (inserted) {
|
||||
it->second.SetString(val);
|
||||
db_slice.PostUpdate(op_args.db_ind, it);
|
||||
|
||||
return val.size();
|
||||
}
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ using facade::OpResult;
|
|||
using facade::OpStatus;
|
||||
|
||||
class SetCmd {
|
||||
DbSlice* db_slice_;
|
||||
DbSlice& db_slice_;
|
||||
|
||||
public:
|
||||
explicit SetCmd(DbSlice* db_slice);
|
||||
|
@ -39,6 +39,10 @@ class SetCmd {
|
|||
};
|
||||
|
||||
OpResult<void> Set(const SetParams& params, std::string_view key, std::string_view value);
|
||||
|
||||
private:
|
||||
OpStatus SetExisting(const SetParams& params, PrimeIterator it, ExpireIterator e_it,
|
||||
std::string_view value);
|
||||
};
|
||||
|
||||
class StringFamily {
|
||||
|
@ -100,7 +104,6 @@ class StringFamily {
|
|||
std::string_view val, bool prepend);
|
||||
|
||||
static OpResult<std::string> OpGet(const OpArgs& op_args, std::string_view key);
|
||||
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
Loading…
Reference in New Issue