feat(server): Introduce transaction clock.

Partially implements #6.
Before, each shard lazily updated its clock used for the expiry evaluation.
Now, the clock value is set during the transaction scheduling phase and is assigned
to each transaction. From now on DbSlice methods use this value when checking whether
the entry is expired via passed DbContext argument.

Also, implemented transactionally consistent TIME command and
verify that time is the same during the transaction. See
https://ably.com/blog/redis-keys-do-not-expire-atomically for motivation.

Still have not implemented any lamport style updates for background processes
(not sure if it's the right way to proceed).
This commit is contained in:
Roman Gershman 2022-09-25 16:18:59 +03:00 committed by Roman Gershman
parent 347a619b44
commit 0925829afb
28 changed files with 448 additions and 387 deletions

View File

@ -39,4 +39,4 @@ cxx_test(json_family_test dfly_test_lib LABELS DFLY)
add_custom_target(check_dfly WORKING_DIRECTORY .. COMMAND ctest -L DFLY)
add_dependencies(check_dfly dragonfly_test json_family_test list_family_test
generic_family_test memcache_parser_test rdb_test
redis_parser_test snapshot_test stream_family_test string_family_test bitops_family_test)
redis_parser_test snapshot_test stream_family_test string_family_test bitops_family_test set_family_test zset_family_test)

View File

@ -184,25 +184,23 @@ bool SetBitValue(uint32_t offset, bool bit_value, std::string* entry) {
// Helper functions to access the data or change it
class OverrideValue {
EngineShard* shard_ = nullptr;
DbIndex index_ = 0;
const OpArgs& args_;
public:
explicit OverrideValue(const OpArgs& args) : shard_{args.shard}, index_{args.db_ind} {
explicit OverrideValue(const OpArgs& args) : args_{args} {
}
OpResult<bool> Set(std::string_view key, uint32_t offset, bool bit_value);
};
OpResult<bool> OverrideValue::Set(std::string_view key, uint32_t offset, bool bit_value) {
auto& db_slice = shard_->db_slice();
DbIndex index = index_;
auto& db_slice = args_.shard->db_slice();
DCHECK(db_slice.IsDbValid(index_));
DCHECK(db_slice.IsDbValid(args_.db_cntx.db_index));
std::pair<PrimeIterator, bool> add_res;
try {
add_res = db_slice.AddOrFind(index_, key);
add_res = db_slice.AddOrFind(args_.db_cntx, key);
} catch (const std::bad_alloc&) {
return OpStatus::OUT_OF_MEMORY;
}
@ -210,9 +208,9 @@ OpResult<bool> OverrideValue::Set(std::string_view key, uint32_t offset, bool bi
PrimeIterator& it = add_res.first;
bool added = add_res.second;
auto UpdateBitMapValue = [&](std::string_view value) {
db_slice.PreUpdate(index, it);
db_slice.PreUpdate(args_.db_cntx.db_index, it);
it->second.SetString(value);
db_slice.PostUpdate(index, it, key, !added);
db_slice.PostUpdate(args_.db_cntx.db_index, it, key, !added);
};
if (added) { // this is a new entry in the "table"
@ -224,7 +222,7 @@ OpResult<bool> OverrideValue::Set(std::string_view key, uint32_t offset, bool bi
return OpStatus::WRONG_TYPE;
}
bool reset = false;
std::string existing_entry{GetString(shard_, it->second)};
std::string existing_entry{GetString(args_.shard, it->second)};
if ((existing_entry.size() * OFFSET_FACTOR) <= offset) { // need to resize first
existing_entry.resize(GetByteIndex(offset) + 1, 0);
reset = true;
@ -398,7 +396,7 @@ OpResult<bool> ReadValueBitsetAt(const OpArgs& op_args, std::string_view key, ui
}
OpResult<std::string> ReadValue(const OpArgs& op_args, std::string_view key) {
OpResult<PrimeIterator> it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_STRING);
OpResult<PrimeIterator> it_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_STRING);
if (!it_res.ok()) {
return it_res.status();
}

View File

@ -106,18 +106,22 @@ void BlockingController::RunStep(Transaction* completed_t) {
}
}
DbContext context;
context.time_now_ms = GetCurrentTimeMs();
for (DbIndex index : awakened_indices_) {
auto dbit = watched_dbs_.find(index);
if (dbit == watched_dbs_.end())
continue;
context.db_index = index;
DbWatchTable& wt = *dbit->second;
for (auto key : wt.awakened_keys) {
string_view sv_key = static_cast<string_view>(key);
DVLOG(1) << "Processing awakened key " << sv_key;
// Double verify we still got the item.
auto [it, exp_it] = owner_->db_slice().FindExt(index, sv_key);
auto [it, exp_it] = owner_->db_slice().FindExt(context, sv_key);
if (!IsValid(it) || it->second.ObjType() != OBJ_LIST) // Only LIST is allowed to block.
continue;

View File

@ -69,15 +69,20 @@ struct KeyIndex {
}
};
struct DbContext {
DbIndex db_index = 0;
uint64_t time_now_ms = 0;
};
struct OpArgs {
EngineShard* shard;
TxId txid;
DbIndex db_ind;
DbContext db_cntx;
OpArgs() : shard(nullptr), txid(0), db_ind(0) {
OpArgs() : shard(nullptr), txid(0) {
}
OpArgs(EngineShard* s, TxId i, DbIndex d) : shard(s), txid(i), db_ind(d) {
OpArgs(EngineShard* s, TxId i, const DbContext& cntx) : shard(s), txid(i), db_cntx(cntx) {
}
};

View File

@ -61,9 +61,9 @@ class PrimeEvictionPolicy {
static constexpr bool can_evict = true; // we implement eviction functionality.
static constexpr bool can_gc = true;
PrimeEvictionPolicy(DbIndex db_indx, bool can_evict, ssize_t mem_budget, ssize_t soft_limit,
DbSlice* db_slice)
: db_slice_(db_slice), mem_budget_(mem_budget), soft_limit_(soft_limit), db_indx_(db_indx),
PrimeEvictionPolicy(const DbContext& cntx, bool can_evict, ssize_t mem_budget,
ssize_t soft_limit, DbSlice* db_slice)
: db_slice_(db_slice), mem_budget_(mem_budget), soft_limit_(soft_limit), cntx_(cntx),
can_evict_(can_evict) {
}
@ -94,9 +94,10 @@ class PrimeEvictionPolicy {
DbSlice* db_slice_;
ssize_t mem_budget_;
ssize_t soft_limit_ = 0;
const DbContext cntx_;
unsigned evicted_ = 0;
unsigned checked_ = 0;
const DbIndex db_indx_;
// unlike static constexpr can_evict, this parameter tells whether we can evict
// items in runtime.
@ -138,7 +139,7 @@ unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& e
for (; !bucket_it.is_done(); ++bucket_it) {
if (bucket_it->second.HasExpire()) {
++checked_;
auto [prime_it, exp_it] = db_slice_->ExpireIfNeeded(db_indx_, bucket_it);
auto [prime_it, exp_it] = db_slice_->ExpireIfNeeded(cntx_, bucket_it);
if (prime_it.is_done())
++res;
}
@ -164,7 +165,7 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT
return 0;
}
DbTable* table = db_slice_->GetDBTable(db_indx_);
DbTable* table = db_slice_->GetDBTable(cntx_.db_index);
EvictItemFun(last_slot_it, table);
++evicted_;
}
@ -256,9 +257,9 @@ void DbSlice::Reserve(DbIndex db_ind, size_t key_size) {
db->prime.Reserve(key_size);
}
auto DbSlice::Find(DbIndex db_index, string_view key, unsigned req_obj_type) const
auto DbSlice::Find(const Context& cntx, string_view key, unsigned req_obj_type) const
-> OpResult<PrimeIterator> {
auto it = FindExt(db_index, key).first;
auto it = FindExt(cntx, key).first;
if (!IsValid(it))
return OpStatus::KEY_NOTFOUND;
@ -270,13 +271,13 @@ auto DbSlice::Find(DbIndex db_index, string_view key, unsigned req_obj_type) con
return it;
}
pair<PrimeIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, string_view key) const {
pair<PrimeIterator, ExpireIterator> DbSlice::FindExt(const Context& cntx, string_view key) const {
pair<PrimeIterator, ExpireIterator> res;
if (!IsDbValid(db_ind))
if (!IsDbValid(cntx.db_index))
return res;
auto& db = *db_arr_[db_ind];
auto& db = *db_arr_[cntx.db_index];
res.first = db.prime.Find(key);
if (!IsValid(res.first)) {
@ -284,14 +285,14 @@ pair<PrimeIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, string_view
}
if (res.first->second.HasExpire()) { // check expiry state
res = ExpireIfNeeded(db_ind, res.first);
res = ExpireIfNeeded(cntx, res.first);
}
if (caching_mode_ && IsValid(res.first)) {
if (!change_cb_.empty()) {
auto bump_cb = [&](PrimeTable::bucket_iterator bit) {
for (const auto& ccb : change_cb_) {
ccb.second(db_ind, bit);
ccb.second(cntx.db_index, bit);
}
};
@ -306,12 +307,12 @@ pair<PrimeIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, string_view
return res;
}
OpResult<pair<PrimeIterator, unsigned>> DbSlice::FindFirst(DbIndex db_index, ArgSlice args) {
OpResult<pair<PrimeIterator, unsigned>> DbSlice::FindFirst(const Context& cntx, ArgSlice args) {
DCHECK(!args.empty());
for (unsigned i = 0; i < args.size(); ++i) {
string_view s = args[i];
OpResult<PrimeIterator> res = Find(db_index, s, OBJ_LIST);
OpResult<PrimeIterator> res = Find(cntx, s, OBJ_LIST);
if (res)
return make_pair(res.value(), i);
if (res.status() != OpStatus::KEY_NOTFOUND)
@ -322,20 +323,20 @@ OpResult<pair<PrimeIterator, unsigned>> DbSlice::FindFirst(DbIndex db_index, Arg
return OpStatus::KEY_NOTFOUND;
}
pair<PrimeIterator, bool> DbSlice::AddOrFind(DbIndex db_index, string_view key) noexcept(false) {
auto res = AddOrFind2(db_index, key);
pair<PrimeIterator, bool> DbSlice::AddOrFind(const Context& cntx, string_view key) noexcept(false) {
auto res = AddOrFind2(cntx, key);
return make_pair(get<0>(res), get<2>(res));
}
tuple<PrimeIterator, ExpireIterator, bool> DbSlice::AddOrFind2(DbIndex db_index,
tuple<PrimeIterator, ExpireIterator, bool> DbSlice::AddOrFind2(const Context& cntx,
string_view key) noexcept(false) {
DCHECK(IsDbValid(db_index));
DCHECK(IsDbValid(cntx.db_index));
DbTable& db = *db_arr_[db_index];
DbTable& db = *db_arr_[cntx.db_index];
// If we have some registered onchange callbacks, we must know in advance whether its Find or Add.
if (!change_cb_.empty()) {
auto res = FindExt(db_index, key);
auto res = FindExt(cntx, key);
if (IsValid(res.first)) {
return tuple_cat(res, make_tuple(true));
@ -343,11 +344,11 @@ tuple<PrimeIterator, ExpireIterator, bool> DbSlice::AddOrFind2(DbIndex db_index,
// It's a new entry.
for (const auto& ccb : change_cb_) {
ccb.second(db_index, key);
ccb.second(cntx.db_index, key);
}
}
PrimeEvictionPolicy evp{db_index, bool(caching_mode_), int64_t(memory_budget_ - key.size()),
PrimeEvictionPolicy evp{cntx, bool(caching_mode_), int64_t(memory_budget_ - key.size()),
ssize_t(soft_budget_limit_), this};
// If we are over limit in non-cache scenario, just be conservative and throw.
@ -405,7 +406,7 @@ tuple<PrimeIterator, ExpireIterator, bool> DbSlice::AddOrFind2(DbIndex db_index,
// TODO: to implement the incremental update of expiry values using multi-generation
// expire_base_ update. Right now we use only index 0.
uint64_t delta_ms = now_ms_ - expire_base_[0];
uint64_t delta_ms = cntx.time_now_ms - expire_base_[0];
if (expire_it->second.duration_ms() <= delta_ms) {
db.expire.Erase(expire_it);
@ -535,28 +536,27 @@ uint32_t DbSlice::GetMCFlag(DbIndex db_ind, const PrimeKey& key) const {
return it.is_done() ? 0 : it->second;
}
PrimeIterator DbSlice::AddNew(DbIndex db_ind, string_view key, PrimeValue obj,
PrimeIterator DbSlice::AddNew(const Context& cntx, string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false) {
auto [it, added] = AddEntry(db_ind, key, std::move(obj), expire_at_ms);
auto [it, added] = AddEntry(cntx, key, std::move(obj), expire_at_ms);
CHECK(added);
return it;
}
pair<PrimeIterator, bool> DbSlice::AddEntry(DbIndex db_ind, string_view key, PrimeValue obj,
pair<PrimeIterator, bool> DbSlice::AddEntry(const Context& cntx, string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false) {
DCHECK_LT(db_ind, db_arr_.size());
DCHECK(!obj.IsRef());
pair<PrimeIterator, bool> res = AddOrFind(db_ind, key);
pair<PrimeIterator, bool> res = AddOrFind(cntx, key);
if (!res.second) // have not inserted.
return res;
auto& db = *db_arr_[db_ind];
auto& db = *db_arr_[cntx.db_index];
auto& it = res.first;
it->second = std::move(obj);
PostUpdate(db_ind, it, key, false);
PostUpdate(cntx.db_index, it, key, false);
if (expire_at_ms) {
it->second.SetExpire(true);
@ -685,10 +685,10 @@ void DbSlice::PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key,
}
}
pair<PrimeIterator, ExpireIterator> DbSlice::ExpireIfNeeded(DbIndex db_ind,
pair<PrimeIterator, ExpireIterator> DbSlice::ExpireIfNeeded(const Context& cntx,
PrimeIterator it) const {
DCHECK(it->second.HasExpire());
auto& db = db_arr_[db_ind];
auto& db = db_arr_[cntx.db_index];
auto expire_it = db->expire.Find(it->first);
@ -697,7 +697,7 @@ pair<PrimeIterator, ExpireIterator> DbSlice::ExpireIfNeeded(DbIndex db_ind,
// TODO: to employ multi-generation update of expire-base and the underlying values.
time_t expire_time = ExpireTime(expire_it);
if (now_ms_ < expire_time)
if (time_t(cntx.time_now_ms) < expire_time)
return make_pair(it, expire_it);
db->expire.Erase(expire_it);
@ -725,17 +725,17 @@ void DbSlice::UnregisterOnChange(uint64_t id) {
LOG(DFATAL) << "Could not find " << id << " to unregister";
}
auto DbSlice::DeleteExpiredStep(DbIndex db_ind, unsigned count) -> DeleteExpiredStats {
auto& db = *db_arr_[db_ind];
auto DbSlice::DeleteExpiredStep(const Context& cntx, unsigned count) -> DeleteExpiredStats {
auto& db = *db_arr_[cntx.db_index];
DeleteExpiredStats result;
auto cb = [&](ExpireIterator it) {
result.traversed++;
time_t ttl = ExpireTime(it) - Now();
time_t ttl = ExpireTime(it) - cntx.time_now_ms;
if (ttl <= 0) {
auto prime_it = db.prime.Find(it->first);
CHECK(!prime_it.is_done());
ExpireIfNeeded(db_ind, prime_it);
ExpireIfNeeded(cntx, prime_it);
++result.deleted;
} else {
result.survivor_ttl_sum += ttl;
@ -853,7 +853,8 @@ size_t DbSlice::EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* t
return freed_memory_fun();
};
void DbSlice::RegisterWatchedKey(DbIndex db_indx, std::string_view key, ConnectionState::ExecInfo* exec_info) {
void DbSlice::RegisterWatchedKey(DbIndex db_indx, std::string_view key,
ConnectionState::ExecInfo* exec_info) {
db_arr_[db_indx]->watched_keys[key].push_back(exec_info);
}

View File

@ -8,8 +8,8 @@
#include "facade/op_status.h"
#include "server/common.h"
#include "server/table.h"
#include "server/conn_context.h"
#include "server/table.h"
namespace util {
class ProactorBase;
@ -65,6 +65,8 @@ class DbSlice {
size_t small_string_bytes = 0;
};
using Context = DbContext;
// ChangeReq - describes the change to the table.
struct ChangeReq {
// If iterator is set then it's an update to the existing bucket.
@ -90,12 +92,6 @@ class DbSlice {
// Returns statistics for the whole db slice. A bit heavy operation.
Stats GetStats() const;
//! UpdateExpireClock updates the expire clock for this db slice.
//! Must be a wall clock so we could replicate it between machines.
void UpdateExpireClock(uint64_t now_ms) {
now_ms_ = now_ms;
}
void UpdateExpireBase(uint64_t now, unsigned generation) {
expire_base_[generation & 1] = now;
}
@ -124,37 +120,34 @@ class DbSlice {
return ExpirePeriod{time_ms - expire_base_[0]};
}
// returns wall clock in millis as it has been set via UpdateExpireClock.
time_t Now() const {
return now_ms_;
}
OpResult<PrimeIterator> Find(DbIndex db_index, std::string_view key, unsigned req_obj_type) const;
OpResult<PrimeIterator> Find(const Context& cntx, std::string_view key,
unsigned req_obj_type) const;
// Returns (value, expire) dict entries if key exists, null if it does not exist or has expired.
std::pair<PrimeIterator, ExpireIterator> FindExt(DbIndex db_ind, std::string_view key) const;
std::pair<PrimeIterator, ExpireIterator> FindExt(const Context& cntx, std::string_view key) const;
// Returns (iterator, args-index) if found, KEY_NOTFOUND otherwise.
// If multiple keys are found, returns the first index in the ArgSlice.
OpResult<std::pair<PrimeIterator, unsigned>> FindFirst(DbIndex db_index, ArgSlice args);
OpResult<std::pair<PrimeIterator, unsigned>> FindFirst(const Context& cntx, ArgSlice args);
// Return .second=true if insertion ocurred, false if we return the existing key.
// throws: bad_alloc is insertion could not happen due to out of memory.
std::pair<PrimeIterator, bool> AddOrFind(DbIndex db_index, std::string_view key) noexcept(false);
std::pair<PrimeIterator, bool> AddOrFind(const Context& cntx,
std::string_view key) noexcept(false);
std::tuple<PrimeIterator, ExpireIterator, bool> AddOrFind2(DbIndex db_index,
std::tuple<PrimeIterator, ExpireIterator, bool> AddOrFind2(const Context& cntx,
std::string_view key) noexcept(false);
// Returns second=true if insertion took place, false otherwise.
// expire_at_ms equal to 0 - means no expiry.
// throws: bad_alloc is insertion could not happen due to out of memory.
std::pair<PrimeIterator, bool> AddEntry(DbIndex db_ind, std::string_view key, PrimeValue obj,
std::pair<PrimeIterator, bool> AddEntry(const Context& cntx, std::string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false);
// Adds a new entry. Requires: key does not exist in this slice.
// Returns the iterator to the newly added entry.
// throws: bad_alloc is insertion could not happen due to out of memory.
PrimeIterator AddNew(DbIndex db_ind, std::string_view key, PrimeValue obj,
PrimeIterator AddNew(const Context& cntx, std::string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false);
// Either adds or removes (if at == 0) expiry. Returns true if a change was made.
@ -218,7 +211,8 @@ class DbSlice {
// Callback functions called upon writing to the existing key.
void PreUpdate(DbIndex db_ind, PrimeIterator it);
void PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key, bool existing_entry = true);
void PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key,
bool existing_entry = true);
DbTableStats* MutableStats(DbIndex db_ind) {
return &db_arr_[db_ind]->stats;
@ -226,7 +220,8 @@ class DbSlice {
// Check whether 'it' has not expired. Returns it if it's still valid. Otherwise, erases it
// from both tables and return PrimeIterator{}.
std::pair<PrimeIterator, ExpireIterator> ExpireIfNeeded(DbIndex db_ind, PrimeIterator it) const;
std::pair<PrimeIterator, ExpireIterator> ExpireIfNeeded(const Context& cntx,
PrimeIterator it) const;
// Current version of this slice.
// We maintain a shared versioning scheme for all databases in the slice.
@ -251,7 +246,7 @@ class DbSlice {
};
// Deletes some amount of possible expired items.
DeleteExpiredStats DeleteExpiredStep(DbIndex db_indx, unsigned count);
DeleteExpiredStats DeleteExpiredStep(const Context& cntx, unsigned count);
void FreeMemWithEvictionStep(DbIndex db_indx, size_t increase_goal_bytes);
const DbTableArray& databases() const {
@ -262,7 +257,8 @@ class DbSlice {
caching_mode_ = 1;
}
void RegisterWatchedKey(DbIndex db_indx, std::string_view key, ConnectionState::ExecInfo* exec_info);
void RegisterWatchedKey(DbIndex db_indx, std::string_view key,
ConnectionState::ExecInfo* exec_info);
// Unregisted all watched key entries for connection.
void UnregisterConnectionWatches(ConnectionState::ExecInfo* exec_info);
@ -284,7 +280,6 @@ class DbSlice {
EngineShard* owner_;
time_t now_ms_ = 0; // Used for expire logic, represents a real clock.
time_t expire_base_[2]; // Used for expire logic, represents a real clock.
uint64_t version_ = 1; // Used to version entries in the PrimeTable.

View File

@ -60,7 +60,8 @@ struct ObjInfo {
void DoPopulateBatch(std::string_view prefix, size_t val_size, const SetCmd::SetParams& params,
const PopulateBatch& batch) {
OpArgs op_args(EngineShard::tlocal(), 0, params.db_index);
DbContext db_cntx{batch.dbid, 0};
OpArgs op_args(EngineShard::tlocal(), 0, db_cntx);
SetCmd sg(op_args);
for (unsigned i = 0; i < batch.sz; ++i) {
@ -277,7 +278,7 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view
DbIndex db_indx = cntx_->db_index();
EngineShardSet& ess = *shard_set;
std::vector<PopulateBatch> ps(ess.size(), PopulateBatch{db_indx});
SetCmd::SetParams params{db_indx};
SetCmd::SetParams params;
for (uint64_t i = from; i < from + len; ++i) {
StrAppend(&key, i);
@ -324,7 +325,7 @@ void DebugCmd::Inspect(string_view key) {
CHECK(!exp_it.is_done());
time_t exp_time = db_slice.ExpireTime(exp_it);
oinfo.ttl = exp_time - db_slice.Now();
oinfo.ttl = exp_time - GetCurrentTimeMs();
oinfo.has_sec_precision = exp_it->second.is_second_precision();
}
}

View File

@ -612,7 +612,9 @@ TEST_F(DflyEngineTest, Watch) {
// Check EXEC doesn't miss watched key expiration.
Run({"watch", "a"});
Run({"expire", "a", "1"});
UpdateTime(expire_now_ + 1000);
AdvanceTime(1000);
Run({"multi"});
ASSERT_THAT(Run({"exec"}), kExecFail);
@ -637,7 +639,9 @@ TEST_F(DflyEngineTest, Watch) {
Run({"watch", "a"});
Run({"set", "c", "1"});
Run({"expire", "a", "1"}); // a existed
UpdateTime(expire_now_ + 1000);
AdvanceTime(1000);
Run({"multi"});
ASSERT_THAT(Run({"exec"}), kExecFail);

View File

@ -22,10 +22,9 @@ using namespace std;
ABSL_FLAG(string, backing_prefix, "", "");
ABSL_FLAG(uint32_t, hz, 1000,
"Base frequency at which the server updates its expiry clock "
"and performs other background tasks. Warning: not advised to decrease in production, "
"because it can affect expiry precision for PSETEX etc.");
ABSL_FLAG(uint32_t, hz, 100,
"Base frequency at which the server performs other background tasks. "
"Warning: not advised to decrease in production.");
ABSL_FLAG(bool, cache_mode, false,
"If true, the backend behaves like a cache, "
@ -48,6 +47,7 @@ constexpr size_t kQueueLen = 64;
thread_local EngineShard* EngineShard::shard_ = nullptr;
EngineShardSet* shard_set = nullptr;
uint64_t TEST_current_time_ms = 0;
EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) {
ooo_runs += o.ooo_runs;
@ -284,43 +284,41 @@ bool EngineShard::HasResultConverged(TxId notifyid) const {
#endif
void EngineShard::Heartbeat() {
// absl::GetCurrentTimeNanos() returns current time since the Unix Epoch.
db_slice().UpdateExpireClock(absl::GetCurrentTimeNanos() / 1000000);
CacheStats();
constexpr double kTtlDeleteLimit = 200;
constexpr double kRedLimitFactor = 0.1;
if (task_iters_++ % 8 == 0) {
CacheStats();
constexpr double kTtlDeleteLimit = 200;
constexpr double kRedLimitFactor = 0.1;
uint32_t traversed = GetMovingSum6(TTL_TRAVERSE);
uint32_t deleted = GetMovingSum6(TTL_DELETE);
unsigned ttl_delete_target = 5;
uint32_t traversed = GetMovingSum6(TTL_TRAVERSE);
uint32_t deleted = GetMovingSum6(TTL_DELETE);
unsigned ttl_delete_target = 5;
if (deleted > 10) {
// deleted should be <= traversed.
// hence we map our delete/traversed ratio into a range [0, kTtlDeleteLimit).
// The higher t
ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10);
}
if (deleted > 10) {
// deleted should be <= traversed.
// hence we map our delete/traversed ratio into a range [0, kTtlDeleteLimit).
// The higher t
ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10);
ssize_t redline = (max_memory_limit * kRedLimitFactor) / shard_set->size();
DbContext db_cntx;
db_cntx.time_now_ms = GetCurrentTimeMs();
for (unsigned i = 0; i < db_slice_.db_array_size(); ++i) {
if (!db_slice_.IsDbValid(i))
continue;
db_cntx.db_index = i;
auto [pt, expt] = db_slice_.GetTables(i);
if (expt->size() > pt->size() / 4) {
DbSlice::DeleteExpiredStats stats = db_slice_.DeleteExpiredStep(db_cntx, ttl_delete_target);
counter_[TTL_TRAVERSE].IncBy(stats.traversed);
counter_[TTL_DELETE].IncBy(stats.deleted);
}
ssize_t redline = (max_memory_limit * kRedLimitFactor) / shard_set->size();
for (unsigned i = 0; i < db_slice_.db_array_size(); ++i) {
if (!db_slice_.IsDbValid(i))
continue;
auto [pt, expt] = db_slice_.GetTables(i);
if (expt->size() > pt->size() / 4) {
DbSlice::DeleteExpiredStats stats = db_slice_.DeleteExpiredStep(i, ttl_delete_target);
counter_[TTL_TRAVERSE].IncBy(stats.traversed);
counter_[TTL_DELETE].IncBy(stats.deleted);
}
// if our budget is below the limit
if (db_slice_.memory_budget() < redline) {
db_slice_.FreeMemWithEvictionStep(i, redline - db_slice_.memory_budget());
}
// if our budget is below the limit
if (db_slice_.memory_budget() < redline) {
db_slice_.FreeMemWithEvictionStep(i, redline - db_slice_.memory_budget());
}
}
}

View File

@ -176,7 +176,6 @@ class EngineShard {
IntentLock shard_lock_;
uint32_t periodic_task_ = 0;
uint64_t task_iters_ = 0;
std::unique_ptr<TieredStorage> tiered_storage_;
std::unique_ptr<BlockingController> blocking_controller_;
@ -287,6 +286,16 @@ inline ShardId Shard(std::string_view v, ShardId shard_num) {
}
// absl::GetCurrentTimeNanos is twice faster than clock_gettime(CLOCK_REALTIME) on my laptop
// and 4 times faster than on a VM. it takes 5-10ns to do a call.
extern uint64_t TEST_current_time_ms;
inline uint64_t GetCurrentTimeMs() {
return TEST_current_time_ms ? TEST_current_time_ms : absl::GetCurrentTimeNanos() / 1000000;
}
extern EngineShardSet* shard_set;
} // namespace dfly

View File

@ -31,7 +31,7 @@ namespace {
class Renamer {
public:
Renamer(DbIndex dind, ShardId source_id) : db_indx_(dind), src_sid_(source_id) {
Renamer(ShardId source_id) : src_sid_(source_id) {
}
void Find(Transaction* t);
@ -46,7 +46,6 @@ class Renamer {
OpStatus MoveSrc(Transaction* t, EngineShard* es);
OpStatus UpdateDest(Transaction* t, EngineShard* es);
DbIndex db_indx_;
ShardId src_sid_;
struct FindResult {
@ -73,7 +72,7 @@ void Renamer::Find(Transaction* t) {
res->key = args.front();
auto& db_slice = EngineShard::tlocal()->db_slice();
auto [it, exp_it] = db_slice.FindExt(db_indx_, res->key);
auto [it, exp_it] = db_slice.FindExt(t->db_context(), res->key);
res->found = IsValid(it);
if (IsValid(it)) {
@ -116,7 +115,7 @@ void Renamer::Finalize(Transaction* t, bool skip_exist_dest) {
OpStatus Renamer::MoveSrc(Transaction* t, EngineShard* es) {
if (es->shard_id() == src_sid_) { // Handle source key.
// TODO: to call PreUpdate/PostUpdate.
auto it = es->db_slice().FindExt(db_indx_, src_res_.key).first;
auto it = es->db_slice().FindExt(t->db_context(), src_res_.key).first;
CHECK(IsValid(it));
// We distinguish because of the SmallString that is pinned to its thread by design,
@ -129,7 +128,7 @@ OpStatus Renamer::MoveSrc(Transaction* t, EngineShard* es) {
pv_ = std::move(it->second);
it->second.SetExpire(has_expire);
}
CHECK(es->db_slice().Del(db_indx_, it)); // delete the entry with empty value in it.
CHECK(es->db_slice().Del(t->db_index(), it)); // delete the entry with empty value in it.
}
return OpStatus::OK;
@ -139,7 +138,7 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) {
if (es->shard_id() != src_sid_) {
auto& db_slice = es->db_slice();
string_view dest_key = dest_res_.key;
PrimeIterator dest_it = db_slice.FindExt(db_indx_, dest_key).first;
PrimeIterator dest_it = db_slice.FindExt(t->db_context(), dest_key).first;
bool is_prior_list = false;
if (IsValid(dest_it)) {
@ -152,18 +151,18 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) {
dest_it->second = std::move(pv_);
}
dest_it->second.SetExpire(has_expire); // preserve expire flag.
db_slice.UpdateExpire(db_indx_, dest_it, src_res_.expire_ts);
db_slice.UpdateExpire(t->db_index(), dest_it, src_res_.expire_ts);
} else {
if (src_res_.ref_val.ObjType() == OBJ_STRING) {
pv_.SetString(str_val_);
}
dest_it = db_slice.AddNew(db_indx_, dest_key, std::move(pv_), src_res_.expire_ts);
dest_it = db_slice.AddNew(t->db_context(), dest_key, std::move(pv_), src_res_.expire_ts);
}
dest_it->first.SetSticky(src_res_.sticky);
if (!is_prior_list && dest_it->second.ObjType() == OBJ_LIST && es->blocking_controller()) {
es->blocking_controller()->AwakeWatched(db_indx_, dest_key);
es->blocking_controller()->AwakeWatched(t->db_index(), dest_key);
}
}
@ -181,7 +180,7 @@ struct ScanOpts {
bool ScanCb(const OpArgs& op_args, PrimeIterator it, const ScanOpts& opts, StringVec* res) {
auto& db_slice = op_args.shard->db_slice();
if (it->second.HasExpire()) {
it = db_slice.ExpireIfNeeded(op_args.db_ind, it).first;
it = db_slice.ExpireIfNeeded(op_args.db_cntx, it).first;
}
if (!IsValid(it))
@ -211,15 +210,15 @@ bool ScanCb(const OpArgs& op_args, PrimeIterator it, const ScanOpts& opts, Strin
void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor, StringVec* vec) {
auto& db_slice = op_args.shard->db_slice();
DCHECK(db_slice.IsDbValid(op_args.db_ind));
DCHECK(db_slice.IsDbValid(op_args.db_cntx.db_index));
unsigned cnt = 0;
VLOG(1) << "PrimeTable " << db_slice.shard_id() << "/" << op_args.db_ind << " has "
<< db_slice.DbSize(op_args.db_ind);
VLOG(1) << "PrimeTable " << db_slice.shard_id() << "/" << op_args.db_cntx.db_index << " has "
<< db_slice.DbSize(op_args.db_cntx.db_index);
PrimeTable::Cursor cur = *cursor;
auto [prime_table, expire_table] = db_slice.GetTables(op_args.db_ind);
auto [prime_table, expire_table] = db_slice.GetTables(op_args.db_cntx.db_index);
do {
cur = prime_table->Traverse(
cur, [&](PrimeIterator it) { cnt += ScanCb(op_args, it, scan_opts, vec); });
@ -245,10 +244,11 @@ uint64_t ScanGeneric(uint64_t cursor, const ScanOpts& scan_opts, StringVec* keys
}
cursor >>= 10;
DbContext db_cntx{.db_index = cntx->conn_state.db_index, .time_now_ms = GetCurrentTimeMs()};
do {
ess->Await(sid, [&] {
OpArgs op_args{EngineShard::tlocal(), 0, cntx->conn_state.db_index};
OpArgs op_args{EngineShard::tlocal(), 0, db_cntx};
OpScan(op_args, scan_opts, &cursor, keys);
});
@ -543,7 +543,7 @@ OpResult<SortEntryList> OpFetchSortEntries(const OpArgs& op_args, std::string_vi
bool alpha) {
using namespace container_utils;
auto [it, _] = op_args.shard->db_slice().FindExt(op_args.db_ind, key);
auto [it, _] = op_args.shard->db_slice().FindExt(op_args.db_cntx, key);
if (!IsValid(it) || !IsContainer(it->second)) {
return OpStatus::KEY_NOTFOUND;
}
@ -731,7 +731,7 @@ void GenericFamily::Type(CmdArgList args, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<int> {
auto it = shard->db_slice().FindExt(t->db_index(), key).first;
auto it = shard->db_slice().FindExt(t->db_context(), key).first;
if (!it.is_done()) {
return it->second.ObjType();
} else {
@ -746,6 +746,19 @@ void GenericFamily::Type(CmdArgList args, ConnectionContext* cntx) {
}
}
void GenericFamily::Time(CmdArgList args, ConnectionContext* cntx) {
uint64_t now_usec;
if (cntx->transaction) {
now_usec = cntx->transaction->db_context().time_now_ms * 1000;
} else {
now_usec = absl::GetCurrentTimeNanos() / 1000;
}
(*cntx)->StartArray(2);
(*cntx)->SendLong(now_usec / 1000000);
(*cntx)->SendLong(now_usec % 1000000);
}
OpResult<void> GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_dest,
ConnectionContext* cntx) {
string_view key[2] = {ArgS(args, 1), ArgS(args, 2)};
@ -763,7 +776,7 @@ OpResult<void> GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des
transaction->Schedule();
unsigned shard_count = shard_set->size();
Renamer renamer{transaction->db_index(), Shard(key[0], shard_count)};
Renamer renamer{Shard(key[0], shard_count)};
// Phase 1 -> Fetch keys from both shards.
// Phase 2 -> If everything is ok, clone the source object, delete the destination object, and
@ -835,23 +848,23 @@ void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) {
OpStatus GenericFamily::OpExpire(const OpArgs& op_args, string_view key,
const ExpireParams& params) {
auto& db_slice = op_args.shard->db_slice();
auto [it, expire_it] = db_slice.FindExt(op_args.db_ind, key);
auto [it, expire_it] = db_slice.FindExt(op_args.db_cntx, key);
if (!IsValid(it))
return OpStatus::KEY_NOTFOUND;
int64_t msec = (params.unit == TimeUnit::SEC) ? params.ts * 1000 : params.ts;
int64_t now_msec = db_slice.Now();
int64_t now_msec = op_args.db_cntx.time_now_ms;
int64_t rel_msec = params.absolute ? msec - now_msec : msec;
if (rel_msec > kMaxExpireDeadlineSec * 1000) {
return OpStatus::OUT_OF_RANGE;
}
if (rel_msec <= 0) {
CHECK(db_slice.Del(op_args.db_ind, it));
CHECK(db_slice.Del(op_args.db_cntx.db_index, it));
} else if (IsValid(expire_it)) {
expire_it->second = db_slice.FromAbsoluteTime(now_msec + rel_msec);
} else {
db_slice.UpdateExpire(op_args.db_ind, it, rel_msec + now_msec);
db_slice.UpdateExpire(op_args.db_cntx.db_index, it, rel_msec + now_msec);
}
return OpStatus::OK;
@ -859,14 +872,14 @@ OpStatus GenericFamily::OpExpire(const OpArgs& op_args, string_view key,
OpResult<uint64_t> GenericFamily::OpTtl(Transaction* t, EngineShard* shard, string_view key) {
auto& db_slice = shard->db_slice();
auto [it, expire_it] = db_slice.FindExt(t->db_index(), key);
auto [it, expire_it] = db_slice.FindExt(t->db_context(), key);
if (!IsValid(it))
return OpStatus::KEY_NOTFOUND;
if (!IsValid(expire_it))
return OpStatus::SKIPPED;
int64_t ttl_ms = db_slice.ExpireTime(expire_it) - db_slice.Now();
int64_t ttl_ms = db_slice.ExpireTime(expire_it) - t->db_context().time_now_ms;
DCHECK_GT(ttl_ms, 0); // Otherwise FindExt would return null.
return ttl_ms;
}
@ -878,10 +891,10 @@ OpResult<uint32_t> GenericFamily::OpDel(const OpArgs& op_args, ArgSlice keys) {
uint32_t res = 0;
for (uint32_t i = 0; i < keys.size(); ++i) {
auto fres = db_slice.FindExt(op_args.db_ind, keys[i]);
auto fres = db_slice.FindExt(op_args.db_cntx, keys[i]);
if (!IsValid(fres.first))
continue;
res += int(db_slice.Del(op_args.db_ind, fres.first));
res += int(db_slice.Del(op_args.db_cntx.db_index, fres.first));
}
return res;
@ -893,7 +906,7 @@ OpResult<uint32_t> GenericFamily::OpExists(const OpArgs& op_args, ArgSlice keys)
uint32_t res = 0;
for (uint32_t i = 0; i < keys.size(); ++i) {
auto find_res = db_slice.FindExt(op_args.db_ind, keys[i]);
auto find_res = db_slice.FindExt(op_args.db_cntx, keys[i]);
res += IsValid(find_res.first);
}
return res;
@ -903,12 +916,12 @@ OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from_key,
bool skip_exists) {
auto* es = op_args.shard;
auto& db_slice = es->db_slice();
auto [from_it, from_expire] = db_slice.FindExt(op_args.db_ind, from_key);
auto [from_it, from_expire] = db_slice.FindExt(op_args.db_cntx, from_key);
if (!IsValid(from_it))
return OpStatus::KEY_NOTFOUND;
bool is_prior_list = false;
auto [to_it, to_expire] = db_slice.FindExt(op_args.db_ind, to_key);
auto [to_it, to_expire] = db_slice.FindExt(op_args.db_cntx, to_key);
if (IsValid(to_it)) {
if (skip_exists)
return OpStatus::KEY_EXISTS;
@ -932,20 +945,20 @@ OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from_key,
// It is guaranteed that UpdateExpire() call does not erase the element because then
// from_it would be invalid. Therefore, UpdateExpire does not invalidate any iterators,
// therefore we can delete 'from_it'.
db_slice.UpdateExpire(op_args.db_ind, to_it, exp_ts);
CHECK(db_slice.Del(op_args.db_ind, from_it));
db_slice.UpdateExpire(op_args.db_cntx.db_index, to_it, exp_ts);
CHECK(db_slice.Del(op_args.db_cntx.db_index, from_it));
} else {
// Here we first delete from_it because AddNew below could invalidate from_it.
// On the other hand, AddNew does not rely on the iterators - this is why we keep
// the value in `from_obj`.
CHECK(db_slice.Del(op_args.db_ind, from_it));
to_it = db_slice.AddNew(op_args.db_ind, to_key, std::move(from_obj), exp_ts);
CHECK(db_slice.Del(op_args.db_cntx.db_index, from_it));
to_it = db_slice.AddNew(op_args.db_cntx, to_key, std::move(from_obj), exp_ts);
}
to_it->first.SetSticky(sticky);
if (!is_prior_list && to_it->second.ObjType() == OBJ_LIST && es->blocking_controller()) {
es->blocking_controller()->AwakeWatched(op_args.db_ind, to_key);
es->blocking_controller()->AwakeWatched(op_args.db_cntx.db_index, to_key);
}
return OpStatus::OK;
}
@ -957,7 +970,7 @@ OpResult<uint32_t> GenericFamily::OpStick(const OpArgs& op_args, ArgSlice keys)
uint32_t res = 0;
for (uint32_t i = 0; i < keys.size(); ++i) {
auto [it, _] = db_slice.FindExt(op_args.db_ind, keys[i]);
auto [it, _] = db_slice.FindExt(op_args.db_cntx, keys[i]);
if (IsValid(it) && !it->first.IsSticky()) {
it->first.SetSticky(true);
++res;
@ -974,12 +987,14 @@ OpStatus GenericFamily::OpMove(const OpArgs& op_args, string_view key, DbIndex t
auto& db_slice = op_args.shard->db_slice();
// Fetch value at key in current db.
auto [from_it, from_expire] = db_slice.FindExt(op_args.db_ind, key);
auto [from_it, from_expire] = db_slice.FindExt(op_args.db_cntx, key);
if (!IsValid(from_it))
return OpStatus::KEY_NOTFOUND;
// Fetch value at key in target db.
auto [to_it, _] = db_slice.FindExt(target_db, key);
DbContext target_cntx = op_args.db_cntx;
target_cntx.db_index = target_db;
auto [to_it, _] = db_slice.FindExt(target_cntx, key);
if (IsValid(to_it))
return OpStatus::KEY_EXISTS;
@ -993,8 +1008,8 @@ OpStatus GenericFamily::OpMove(const OpArgs& op_args, string_view key, DbIndex t
// Restore expire flag after std::move.
from_it->second.SetExpire(IsValid(from_expire));
CHECK(db_slice.Del(op_args.db_ind, from_it));
to_it = db_slice.AddNew(target_db, key, std::move(from_obj), exp_ts);
CHECK(db_slice.Del(op_args.db_cntx.db_index, from_it));
to_it = db_slice.AddNew(target_cntx, key, std::move(from_obj), exp_ts);
to_it->first.SetSticky(sticky);
if (to_it->second.ObjType() == OBJ_LIST && op_args.shard->blocking_controller()) {
@ -1029,6 +1044,7 @@ void GenericFamily::Register(CommandRegistry* registry) {
<< CI{"SCAN", CO::READONLY | CO::FAST | CO::LOADING, -2, 0, 0, 0}.HFUNC(Scan)
<< CI{"TTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Ttl)
<< CI{"PTTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Pttl)
<< CI{"TIME", CO::LOADING | CO::FAST, 1, 0, 0, 0}.HFUNC(Time)
<< CI{"TYPE", CO::READONLY | CO::FAST | CO::LOADING, 2, 1, 1, 1}.HFUNC(Type)
<< CI{"UNLINK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Del)
<< CI{"STICK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Stick)

View File

@ -60,6 +60,7 @@ class GenericFamily {
static void Echo(CmdArgList args, ConnectionContext* cntx);
static void Select(CmdArgList args, ConnectionContext* cntx);
static void Scan(CmdArgList args, ConnectionContext* cntx);
static void Time(CmdArgList args, ConnectionContext* cntx);
static void Type(CmdArgList args, ConnectionContext* cntx);
static OpResult<void> RenameGeneric(CmdArgList args, bool skip_exist_dest,

View File

@ -30,23 +30,23 @@ TEST_F(GenericFamilyTest, Expire) {
auto resp = Run({"expire", "key", "1"});
EXPECT_THAT(resp, IntArg(1));
UpdateTime(expire_now_ + 1000);
AdvanceTime(1000);
resp = Run({"get", "key"});
EXPECT_THAT(resp, ArgType(RespExpr::NIL));
Run({"set", "key", "val"});
resp = Run({"pexpireat", "key", absl::StrCat(expire_now_ + 2000)});
resp = Run({"pexpireat", "key", absl::StrCat(TEST_current_time_ms + 2000)});
EXPECT_THAT(resp, IntArg(1));
// override
resp = Run({"pexpireat", "key", absl::StrCat(expire_now_ + 3000)});
resp = Run({"pexpireat", "key", absl::StrCat(TEST_current_time_ms + 3000)});
EXPECT_THAT(resp, IntArg(1));
UpdateTime(expire_now_ + 2999);
AdvanceTime(2999);
resp = Run({"get", "key"});
EXPECT_THAT(resp, "val");
UpdateTime(expire_now_ + 3000);
AdvanceTime(1);
resp = Run({"get", "key"});
EXPECT_THAT(resp, ArgType(RespExpr::NIL));
}
@ -331,4 +331,28 @@ TEST_F(GenericFamilyTest, Sort) {
ASSERT_THAT(Run({"sort", "list-2"}), ErrArg("One or more scores can't be converted into double"));
}
TEST_F(GenericFamilyTest, Time) {
auto resp = Run({"time"});
EXPECT_THAT(resp, ArrLen(2));
EXPECT_THAT(resp.GetVec()[0], ArgType(RespExpr::INT64));
EXPECT_THAT(resp.GetVec()[1], ArgType(RespExpr::INT64));
// Check that time is the same inside a transaction.
Run({"multi"});
Run({"time"});
usleep(2000);
Run({"time"});
resp = Run({"exec"});
EXPECT_THAT(resp, ArrLen(2));
ASSERT_THAT(resp.GetVec()[0], ArrLen(2));
ASSERT_THAT(resp.GetVec()[1], ArrLen(2));
for (int i = 0; i < 2; ++i) {
int64_t val0 = get<int64_t>(resp.GetVec()[0].GetVec()[i].u);
int64_t val1 = get<int64_t>(resp.GetVec()[1].GetVec()[i].u);
EXPECT_EQ(val0, val1);
}
}
} // namespace dfly

View File

@ -131,7 +131,7 @@ void HSetFamily::HExists(CmdArgList args, ConnectionContext* cntx) {
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<int> {
auto& db_slice = shard->db_slice();
auto it_res = db_slice.Find(t->db_index(), key, OBJ_HASH);
auto it_res = db_slice.Find(t->db_context(), key, OBJ_HASH);
if (it_res) {
robj* hset = (*it_res)->second.AsRObj();
@ -387,7 +387,7 @@ void HSetFamily::HRandField(CmdArgList args, ConnectionContext* cntx) {
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<StringVec> {
auto& db_slice = shard->db_slice();
auto it_res = db_slice.Find(t->db_index(), key, OBJ_HASH);
auto it_res = db_slice.Find(t->db_context(), key, OBJ_HASH);
if (!it_res)
return it_res.status();
@ -437,12 +437,12 @@ OpResult<uint32_t> HSetFamily::OpSet(const OpArgs& op_args, string_view key, Cmd
auto& db_slice = op_args.shard->db_slice();
pair<PrimeIterator, bool> add_res;
try {
add_res = db_slice.AddOrFind(op_args.db_ind, key);
add_res = db_slice.AddOrFind(op_args.db_cntx, key);
} catch(bad_alloc&) {
return OpStatus::OUT_OF_MEMORY;
}
DbTableStats* stats = db_slice.MutableStats(op_args.db_ind);
DbTableStats* stats = db_slice.MutableStats(op_args.db_cntx.db_index);
robj* hset = nullptr;
uint8_t* lp = nullptr;
@ -459,7 +459,7 @@ OpResult<uint32_t> HSetFamily::OpSet(const OpArgs& op_args, string_view key, Cmd
if (it->second.ObjType() != OBJ_HASH)
return OpStatus::WRONG_TYPE;
db_slice.PreUpdate(op_args.db_ind, it);
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
}
hset = it->second.AsRObj();
@ -509,7 +509,7 @@ OpResult<uint32_t> HSetFamily::OpSet(const OpArgs& op_args, string_view key, Cmd
}
}
it->second.SyncRObj();
db_slice.PostUpdate(op_args.db_ind, it, key);
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key);
return created;
}
@ -518,17 +518,17 @@ OpResult<uint32_t> HSetFamily::OpDel(const OpArgs& op_args, string_view key, Cmd
DCHECK(!values.empty());
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH);
auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH);
if (!it_res)
return it_res.status();
db_slice.PreUpdate(op_args.db_ind, *it_res);
db_slice.PreUpdate(op_args.db_cntx.db_index, *it_res);
CompactObj& co = (*it_res)->second;
robj* hset = co.AsRObj();
unsigned deleted = 0;
bool key_remove = false;
DbTableStats* stats = db_slice.MutableStats(op_args.db_ind);
DbTableStats* stats = db_slice.MutableStats(op_args.db_cntx.db_index);
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
stats->listpack_bytes -= lpBytes((uint8_t*)hset->ptr);
@ -548,12 +548,12 @@ OpResult<uint32_t> HSetFamily::OpDel(const OpArgs& op_args, string_view key, Cmd
co.SyncRObj();
db_slice.PostUpdate(op_args.db_ind, *it_res, key);
db_slice.PostUpdate(op_args.db_cntx.db_index, *it_res, key);
if (key_remove) {
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
stats->listpack_blob_cnt--;
}
db_slice.Del(op_args.db_ind, *it_res);
db_slice.Del(op_args.db_cntx.db_index, *it_res);
} else if (hset->encoding == OBJ_ENCODING_LISTPACK) {
stats->listpack_bytes += lpBytes((uint8_t*)hset->ptr);
}
@ -566,7 +566,7 @@ auto HSetFamily::OpMGet(const OpArgs& op_args, std::string_view key, CmdArgList
DCHECK(!fields.empty());
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH);
auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH);
if (!it_res)
return it_res.status();
@ -631,7 +631,7 @@ auto HSetFamily::OpMGet(const OpArgs& op_args, std::string_view key, CmdArgList
OpResult<uint32_t> HSetFamily::OpLen(const OpArgs& op_args, string_view key) {
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH);
auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH);
if (it_res) {
robj* hset = (*it_res)->second.AsRObj();
@ -644,7 +644,7 @@ OpResult<uint32_t> HSetFamily::OpLen(const OpArgs& op_args, string_view key) {
OpResult<string> HSetFamily::OpGet(const OpArgs& op_args, string_view key, string_view field) {
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH);
auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH);
if (!it_res)
return it_res.status();
@ -682,7 +682,7 @@ OpResult<string> HSetFamily::OpGet(const OpArgs& op_args, string_view key, strin
OpResult<vector<string>> HSetFamily::OpGetAll(const OpArgs& op_args, string_view key,
uint8_t mask) {
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH);
auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH);
if (!it_res) {
if (it_res.status() == OpStatus::KEY_NOTFOUND)
return vector<string>{};
@ -729,7 +729,7 @@ OpResult<vector<string>> HSetFamily::OpGetAll(const OpArgs& op_args, string_view
OpResult<size_t> HSetFamily::OpStrLen(const OpArgs& op_args, string_view key, string_view field) {
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH);
auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH);
if (!it_res) {
if (it_res.status() == OpStatus::KEY_NOTFOUND)
@ -761,9 +761,9 @@ OpResult<size_t> HSetFamily::OpStrLen(const OpArgs& op_args, string_view key, st
OpStatus HSetFamily::OpIncrBy(const OpArgs& op_args, string_view key, string_view field,
IncrByParam* param) {
auto& db_slice = op_args.shard->db_slice();
const auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key);
const auto [it, inserted] = db_slice.AddOrFind(op_args.db_cntx, key);
DbTableStats* stats = db_slice.MutableStats(op_args.db_ind);
DbTableStats* stats = db_slice.MutableStats(op_args.db_cntx.db_index);
robj* hset = nullptr;
size_t lpb = 0;
@ -777,7 +777,7 @@ OpStatus HSetFamily::OpIncrBy(const OpArgs& op_args, string_view key, string_vie
if (it->second.ObjType() != OBJ_HASH)
return OpStatus::WRONG_TYPE;
db_slice.PreUpdate(op_args.db_ind, it);
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
hset = it->second.AsRObj();
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
@ -874,14 +874,14 @@ OpStatus HSetFamily::OpIncrBy(const OpArgs& op_args, string_view key, string_vie
}
it->second.SyncRObj();
db_slice.PostUpdate(op_args.db_ind, it, key);
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key);
return OpStatus::OK;
}
OpResult<StringVec> HSetFamily::OpScan(const OpArgs& op_args, std::string_view key,
uint64_t* cursor) {
OpResult<PrimeIterator> find_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_HASH);
OpResult<PrimeIterator> find_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_HASH);
if (!find_res)
return find_res.status();

View File

@ -53,17 +53,18 @@ string GetString(EngineShard* shard, const PrimeValue& pv) {
inline void RecordJournal(const OpArgs& op_args, string_view key, const PrimeKey& pvalue) {
if (op_args.shard->journal()) {
journal::Entry entry{op_args.db_ind, op_args.txid, key, pvalue};
journal::Entry entry{op_args.db_cntx.db_index, op_args.txid, key, pvalue};
op_args.shard->journal()->RecordEntry(entry);
}
}
void SetString(const OpArgs& op_args, string_view key, const string& value) {
auto& db_slice = op_args.shard->db_slice();
auto [it_output, added] = db_slice.AddOrFind(op_args.db_ind, key);
db_slice.PreUpdate(op_args.db_ind, it_output);
DbIndex db_index = op_args.db_cntx.db_index;
auto [it_output, added] = db_slice.AddOrFind(op_args.db_cntx, key);
db_slice.PreUpdate(db_index, it_output);
it_output->second.SetString(value);
db_slice.PostUpdate(op_args.db_ind, it_output, key);
db_slice.PostUpdate(db_index, it_output, key);
RecordJournal(op_args, key, it_output->second);
}
@ -140,7 +141,7 @@ bool JsonErrorHandler(json_errc ec, const ser_context&) {
}
OpResult<json> GetJson(const OpArgs& op_args, string_view key) {
OpResult<PrimeIterator> it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_STRING);
OpResult<PrimeIterator> it_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_STRING);
if (!it_res.ok())
return it_res.status();
@ -447,8 +448,8 @@ OpResult<long> OpDel(const OpArgs& op_args, string_view key, string_view path) {
long total_deletions = 0;
if (path.empty()) {
auto& db_slice = op_args.shard->db_slice();
auto [it, _] = db_slice.FindExt(op_args.db_ind, key);
total_deletions += long(db_slice.Del(op_args.db_ind, it));
auto [it, _] = db_slice.FindExt(op_args.db_cntx, key);
total_deletions += long(db_slice.Del(op_args.db_cntx.db_index, it));
return total_deletions;
}

View File

@ -126,7 +126,7 @@ OpResult<ShardFFResult> FindFirst(Transaction* trans) {
auto args = t->ShardArgsInShard(shard->shard_id());
OpResult<pair<PrimeIterator, unsigned>> ff_res =
shard->db_slice().FindFirst(t->db_index(), args);
shard->db_slice().FindFirst(t->db_context(), args);
if (ff_res) {
FFResult ff_result(ff_res->first->first.AsRef(), ff_res->second);
@ -260,7 +260,7 @@ OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) {
if (shard->shard_id() == ff_result_.sid) {
ff_result_.key.GetString(&key_);
auto& db_slice = shard->db_slice();
auto it_res = db_slice.Find(t->db_index(), key_, OBJ_LIST);
auto it_res = db_slice.Find(t->db_context(), key_, OBJ_LIST);
CHECK(it_res); // must exist and must be ok.
PrimeIterator it = *it_res;
quicklist* ql = GetQL(it->second);
@ -278,7 +278,7 @@ OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) {
OpResult<string> OpRPopLPushSingleShard(const OpArgs& op_args, string_view src, string_view dest) {
auto& db_slice = op_args.shard->db_slice();
auto src_res = db_slice.Find(op_args.db_ind, src, OBJ_LIST);
auto src_res = db_slice.Find(op_args.db_cntx, src, OBJ_LIST);
if (!src_res)
return src_res.status();
@ -286,11 +286,11 @@ OpResult<string> OpRPopLPushSingleShard(const OpArgs& op_args, string_view src,
quicklist* src_ql = GetQL(src_it->second);
if (src == dest) { // simple case.
db_slice.PreUpdate(op_args.db_ind, src_it);
db_slice.PreUpdate(op_args.db_cntx.db_index, src_it);
string val = ListPop(ListDir::RIGHT, src_ql);
quicklistPushHead(src_ql, val.data(), val.size());
db_slice.PostUpdate(op_args.db_ind, src_it, src);
db_slice.PostUpdate(op_args.db_cntx.db_index, src_it, src);
return val;
}
@ -299,7 +299,7 @@ OpResult<string> OpRPopLPushSingleShard(const OpArgs& op_args, string_view src,
PrimeIterator dest_it;
bool new_key = false;
try {
tie(dest_it, new_key) = db_slice.AddOrFind(op_args.db_ind, dest);
tie(dest_it, new_key) = db_slice.AddOrFind(op_args.db_cntx, dest);
} catch (bad_alloc&) {
return OpStatus::OUT_OF_MEMORY;
}
@ -312,25 +312,25 @@ OpResult<string> OpRPopLPushSingleShard(const OpArgs& op_args, string_view src,
dest_it->second.ImportRObj(obj);
// Insertion of dest could invalidate src_it. Find it again.
src_it = db_slice.GetTables(op_args.db_ind).first->Find(src);
src_it = db_slice.GetTables(op_args.db_cntx.db_index).first->Find(src);
} else {
if (dest_it->second.ObjType() != OBJ_LIST)
return OpStatus::WRONG_TYPE;
dest_ql = GetQL(dest_it->second);
db_slice.PreUpdate(op_args.db_ind, dest_it);
db_slice.PreUpdate(op_args.db_cntx.db_index, dest_it);
}
db_slice.PreUpdate(op_args.db_ind, src_it);
db_slice.PreUpdate(op_args.db_cntx.db_index, src_it);
string val = ListPop(ListDir::RIGHT, src_ql);
quicklistPushHead(dest_ql, val.data(), val.size());
db_slice.PostUpdate(op_args.db_ind, src_it, src);
db_slice.PostUpdate(op_args.db_ind, dest_it, dest, !new_key);
db_slice.PostUpdate(op_args.db_cntx.db_index, src_it, src);
db_slice.PostUpdate(op_args.db_cntx.db_index, dest_it, dest, !new_key);
if (quicklistCount(src_ql) == 0) {
CHECK(db_slice.Del(op_args.db_ind, src_it));
CHECK(db_slice.Del(op_args.db_cntx.db_index, src_it));
}
return val;
@ -339,7 +339,7 @@ OpResult<string> OpRPopLPushSingleShard(const OpArgs& op_args, string_view src,
// Read-only peek operation that determines wether the list exists and optionally
// returns the first from right value without popping it from the list.
OpResult<string> RPeek(const OpArgs& op_args, string_view key, bool fetch) {
auto it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST);
auto it_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_LIST);
if (!it_res) {
return it_res.status();
}
@ -366,13 +366,13 @@ OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir d
bool new_key = false;
if (skip_notexist) {
auto it_res = es->db_slice().Find(op_args.db_ind, key, OBJ_LIST);
auto it_res = es->db_slice().Find(op_args.db_cntx, key, OBJ_LIST);
if (!it_res)
return it_res.status();
it = *it_res;
} else {
try {
tie(it, new_key) = es->db_slice().AddOrFind(op_args.db_ind, key);
tie(it, new_key) = es->db_slice().AddOrFind(op_args.db_cntx, key);
} catch (bad_alloc&) {
return OpStatus::OUT_OF_MEMORY;
}
@ -389,7 +389,7 @@ OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir d
} else {
if (it->second.ObjType() != OBJ_LIST)
return OpStatus::WRONG_TYPE;
es->db_slice().PreUpdate(op_args.db_ind, it);
es->db_slice().PreUpdate(op_args.db_cntx.db_index, it);
ql = GetQL(it->second);
}
@ -405,10 +405,10 @@ OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir d
if (es->blocking_controller()) {
string tmp;
string_view key = it->first.GetSlice(&tmp);
es->blocking_controller()->AwakeWatched(op_args.db_ind, key);
es->blocking_controller()->AwakeWatched(op_args.db_cntx.db_index, key);
}
} else {
es->db_slice().PostUpdate(op_args.db_ind, it, key, true);
es->db_slice().PostUpdate(op_args.db_cntx.db_index, it, key, true);
}
return quicklistCount(ql);
@ -417,13 +417,13 @@ OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir d
OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, ListDir dir, uint32_t count,
bool return_results) {
auto& db_slice = op_args.shard->db_slice();
OpResult<PrimeIterator> it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST);
OpResult<PrimeIterator> it_res = db_slice.Find(op_args.db_cntx, key, OBJ_LIST);
if (!it_res)
return it_res.status();
PrimeIterator it = *it_res;
quicklist* ql = GetQL(it->second);
db_slice.PreUpdate(op_args.db_ind, it);
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
StringVec res;
if (quicklistCount(ql) < count) {
@ -441,10 +441,10 @@ OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, ListDir dir, u
}
}
db_slice.PostUpdate(op_args.db_ind, it, key);
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key);
if (quicklistCount(ql) == 0) {
CHECK(db_slice.Del(op_args.db_ind, it));
CHECK(db_slice.Del(op_args.db_cntx.db_index, it));
}
return res;
@ -821,7 +821,7 @@ void ListFamily::PopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cnt
}
OpResult<uint32_t> ListFamily::OpLen(const OpArgs& op_args, std::string_view key) {
auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST);
auto res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_LIST);
if (!res)
return res.status();
@ -831,7 +831,7 @@ OpResult<uint32_t> ListFamily::OpLen(const OpArgs& op_args, std::string_view key
}
OpResult<string> ListFamily::OpIndex(const OpArgs& op_args, std::string_view key, long index) {
auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST);
auto res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_LIST);
if (!res)
return res.status();
quicklist* ql = GetQL(res.value()->second);
@ -856,7 +856,7 @@ OpResult<string> ListFamily::OpIndex(const OpArgs& op_args, std::string_view key
OpResult<int> ListFamily::OpInsert(const OpArgs& op_args, string_view key, string_view pivot,
string_view elem, int insert_param) {
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST);
auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_LIST);
if (!it_res)
return it_res.status();
@ -874,14 +874,14 @@ OpResult<int> ListFamily::OpInsert(const OpArgs& op_args, string_view key, strin
int res = -1;
if (found) {
db_slice.PreUpdate(op_args.db_ind, *it_res);
db_slice.PreUpdate(op_args.db_cntx.db_index, *it_res);
if (insert_param == LIST_TAIL) {
quicklistInsertAfter(qiter, &entry, elem.data(), elem.size());
} else {
DCHECK_EQ(LIST_HEAD, insert_param);
quicklistInsertBefore(qiter, &entry, elem.data(), elem.size());
}
db_slice.PostUpdate(op_args.db_ind, *it_res, key);
db_slice.PostUpdate(op_args.db_cntx.db_index, *it_res, key);
res = quicklistCount(ql);
}
quicklistReleaseIterator(qiter);
@ -892,7 +892,7 @@ OpResult<uint32_t> ListFamily::OpRem(const OpArgs& op_args, string_view key, str
long count) {
DCHECK(!elem.empty());
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST);
auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_LIST);
if (!it_res)
return it_res.status();
@ -912,7 +912,7 @@ OpResult<uint32_t> ListFamily::OpRem(const OpArgs& op_args, string_view key, str
unsigned removed = 0;
const uint8_t* elem_ptr = reinterpret_cast<const uint8_t*>(elem.data());
db_slice.PreUpdate(op_args.db_ind, it);
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
while (quicklistNext(qiter, &entry)) {
if (quicklistCompare(&entry, elem_ptr, elem.size())) {
quicklistDelEntry(qiter, &entry);
@ -921,12 +921,12 @@ OpResult<uint32_t> ListFamily::OpRem(const OpArgs& op_args, string_view key, str
break;
}
}
db_slice.PostUpdate(op_args.db_ind, it, key);
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key);
quicklistReleaseIterator(qiter);
if (quicklistCount(ql) == 0) {
CHECK(db_slice.Del(op_args.db_ind, it));
CHECK(db_slice.Del(op_args.db_cntx.db_index, it));
}
return removed;
@ -935,16 +935,16 @@ OpResult<uint32_t> ListFamily::OpRem(const OpArgs& op_args, string_view key, str
OpStatus ListFamily::OpSet(const OpArgs& op_args, string_view key, string_view elem, long index) {
DCHECK(!elem.empty());
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST);
auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_LIST);
if (!it_res)
return it_res.status();
PrimeIterator it = *it_res;
quicklist* ql = GetQL(it->second);
db_slice.PreUpdate(op_args.db_ind, it);
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
int replaced = quicklistReplaceAtIndex(ql, index, elem.data(), elem.size());
db_slice.PostUpdate(op_args.db_ind, it, key);
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key);
if (!replaced) {
return OpStatus::OUT_OF_RANGE;
@ -954,7 +954,7 @@ OpStatus ListFamily::OpSet(const OpArgs& op_args, string_view key, string_view e
OpStatus ListFamily::OpTrim(const OpArgs& op_args, string_view key, long start, long end) {
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST);
auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_LIST);
if (!it_res)
return it_res.status();
@ -985,20 +985,20 @@ OpStatus ListFamily::OpTrim(const OpArgs& op_args, string_view key, long start,
rtrim = llen - end - 1;
}
db_slice.PreUpdate(op_args.db_ind, it);
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
quicklistDelRange(ql, 0, ltrim);
quicklistDelRange(ql, -rtrim, rtrim);
db_slice.PostUpdate(op_args.db_ind, it, key);
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key);
if (quicklistCount(ql) == 0) {
CHECK(db_slice.Del(op_args.db_ind, it));
CHECK(db_slice.Del(op_args.db_cntx.db_index, it));
}
return OpStatus::OK;
}
OpResult<StringVec> ListFamily::OpRange(const OpArgs& op_args, std::string_view key, long start,
long end) {
auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST);
auto res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_LIST);
if (!res)
return res.status();
@ -1025,7 +1025,7 @@ OpResult<StringVec> ListFamily::OpRange(const OpArgs& op_args, std::string_view
str_vec.emplace_back(ce.ToString());
return true;
}, start, end);
return str_vec;
}

View File

@ -52,7 +52,8 @@ TEST_F(ListFamilyTest, Expire) {
resp = Run({"expire", kKey1, "1"});
EXPECT_THAT(resp, IntArg(1));
UpdateTime(expire_now_ + 1000);
AdvanceTime(1000);
resp = Run({"lpush", kKey1, "1"});
EXPECT_THAT(resp, IntArg(1));
}

View File

@ -1232,6 +1232,8 @@ void RdbLoader::FlushShardAsync(ShardId sid) {
void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
DbSlice& db_slice = EngineShard::tlocal()->db_slice();
DbContext db_cntx{.db_index = db_ind, .time_now_ms = GetCurrentTimeMs()};
for (const auto& item : ib) {
std::string_view key{item.key};
PrimeValue pv;
@ -1245,7 +1247,10 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
break;
}
auto [it, added] = db_slice.AddEntry(db_ind, key, std::move(pv), item.expire_ms);
if (item.expire_ms > 0 && db_cntx.time_now_ms >= item.expire_ms)
continue;
auto [it, added] = db_slice.AddEntry(db_cntx, key, std::move(pv), item.expire_ms);
if (!added) {
LOG(WARNING) << "RDB has duplicated key '" << key << "' in DB " << db_ind;

View File

@ -484,8 +484,8 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
// to overwrite the key. However, if the set is empty it means we should delete the
// key if it exists.
if (overwrite && vals.empty()) {
auto it = db_slice.FindExt(op_args.db_ind, key).first;
db_slice.Del(op_args.db_ind, it);
auto it = db_slice.FindExt(op_args.db_cntx, key).first;
db_slice.Del(op_args.db_cntx.db_index, it);
return 0;
}
@ -494,7 +494,7 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
bool new_key = false;
try {
tie(it, new_key) = db_slice.AddOrFind(op_args.db_ind, key);
tie(it, new_key) = db_slice.AddOrFind(op_args.db_cntx, key);
} catch (bad_alloc& e) {
return OpStatus::OUT_OF_MEMORY;
}
@ -507,7 +507,7 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
return OpStatus::WRONG_TYPE;
// Update stats and trigger any handle the old value if needed.
db_slice.PreUpdate(op_args.db_ind, it);
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
}
if (new_key || overwrite) {
@ -555,7 +555,7 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
res = AddStrSet(std::move(vals), &co);
}
db_slice.PostUpdate(op_args.db_ind, it, key, !new_key);
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !new_key);
return res;
}
@ -563,20 +563,20 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
OpResult<uint32_t> OpRem(const OpArgs& op_args, std::string_view key, const ArgSlice& vals) {
auto* es = op_args.shard;
auto& db_slice = es->db_slice();
OpResult<PrimeIterator> find_res = db_slice.Find(op_args.db_ind, key, OBJ_SET);
OpResult<PrimeIterator> find_res = db_slice.Find(op_args.db_cntx, key, OBJ_SET);
if (!find_res) {
return find_res.status();
}
db_slice.PreUpdate(op_args.db_ind, *find_res);
db_slice.PreUpdate(op_args.db_cntx.db_index, *find_res);
CompactObj& co = find_res.value()->second;
auto [removed, isempty] = RemoveSet(vals, &co);
db_slice.PostUpdate(op_args.db_ind, *find_res, key);
db_slice.PostUpdate(op_args.db_cntx.db_index, *find_res, key);
if (isempty) {
CHECK(db_slice.Del(op_args.db_ind, find_res.value()));
CHECK(db_slice.Del(op_args.db_cntx.db_index, find_res.value()));
}
return removed;
@ -610,7 +610,7 @@ OpStatus Mover::OpFind(Transaction* t, EngineShard* es) {
for (auto k : largs) {
unsigned index = (k == src_) ? 0 : 1;
OpResult<PrimeIterator> res = es->db_slice().Find(t->db_index(), k, OBJ_SET);
OpResult<PrimeIterator> res = es->db_slice().Find(t->db_context(), k, OBJ_SET);
if (res && index == 0) { // successful src find.
DCHECK(!res->is_done());
const CompactObj& val = res.value()->second;
@ -676,7 +676,7 @@ OpResult<StringVec> OpUnion(const OpArgs& op_args, ArgSlice keys) {
absl::flat_hash_set<string> uniques;
for (std::string_view key : keys) {
OpResult<PrimeIterator> find_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_SET);
OpResult<PrimeIterator> find_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_SET);
if (find_res) {
container_utils::IterateSet(find_res.value()->second, [&uniques](container_utils::ContainerEntry ce){
uniques.emplace(ce.ToString());
@ -698,7 +698,7 @@ OpResult<StringVec> OpDiff(const OpArgs& op_args, ArgSlice keys) {
DCHECK(!keys.empty());
DVLOG(1) << "OpDiff from " << keys.front();
EngineShard* es = op_args.shard;
OpResult<PrimeIterator> find_res = es->db_slice().Find(op_args.db_ind, keys.front(), OBJ_SET);
OpResult<PrimeIterator> find_res = es->db_slice().Find(op_args.db_cntx, keys.front(), OBJ_SET);
if (!find_res) {
return find_res.status();
@ -713,7 +713,7 @@ OpResult<StringVec> OpDiff(const OpArgs& op_args, ArgSlice keys) {
DCHECK(!uniques.empty()); // otherwise the key would not exist.
for (size_t i = 1; i < keys.size(); ++i) {
OpResult<PrimeIterator> diff_res = es->db_slice().Find(op_args.db_ind, keys[i], OBJ_SET);
OpResult<PrimeIterator> diff_res = es->db_slice().Find(op_args.db_cntx, keys[i], OBJ_SET);
if (!diff_res) {
if (diff_res.status() == OpStatus::WRONG_TYPE) {
return OpStatus::WRONG_TYPE;
@ -750,7 +750,7 @@ OpResult<StringVec> OpInter(const Transaction* t, EngineShard* es, bool remove_f
StringVec result;
if (keys.size() == 1) {
OpResult<PrimeIterator> find_res = es->db_slice().Find(t->db_index(), keys.front(), OBJ_SET);
OpResult<PrimeIterator> find_res = es->db_slice().Find(t->db_context(), keys.front(), OBJ_SET);
if (!find_res)
return find_res.status();
@ -767,7 +767,7 @@ OpResult<StringVec> OpInter(const Transaction* t, EngineShard* es, bool remove_f
OpStatus status = OpStatus::OK;
for (size_t i = 0; i < keys.size(); ++i) {
OpResult<PrimeIterator> find_res = es->db_slice().Find(t->db_index(), keys[i], OBJ_SET);
OpResult<PrimeIterator> find_res = es->db_slice().Find(t->db_context(), keys[i], OBJ_SET);
if (!find_res) {
if (status == OpStatus::OK || status == OpStatus::KEY_NOTFOUND ||
find_res.status() != OpStatus::KEY_NOTFOUND) {
@ -841,7 +841,7 @@ void SetFamily::SIsMember(CmdArgList args, ConnectionContext* cntx) {
std::string_view val = ArgS(args, 2);
auto cb = [&](Transaction* t, EngineShard* shard) {
OpResult<PrimeIterator> find_res = shard->db_slice().Find(t->db_index(), key, OBJ_SET);
OpResult<PrimeIterator> find_res = shard->db_slice().Find(t->db_context(), key, OBJ_SET);
if (find_res) {
SetType st{find_res.value()->second.RObjPtr(), find_res.value()->second.Encoding()};
@ -906,7 +906,7 @@ void SetFamily::SCard(CmdArgList args, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<uint32_t> {
OpResult<PrimeIterator> find_res = shard->db_slice().Find(t->db_index(), key, OBJ_SET);
OpResult<PrimeIterator> find_res = shard->db_slice().Find(t->db_context(), key, OBJ_SET);
if (!find_res) {
return find_res.status();
}
@ -1220,7 +1220,7 @@ void SetFamily::SScan(CmdArgList args, ConnectionContext* cntx) {
OpResult<StringVec> SetFamily::OpPop(const OpArgs& op_args, std::string_view key, unsigned count) {
auto& db_slice = op_args.shard->db_slice();
OpResult<PrimeIterator> find_res = db_slice.Find(op_args.db_ind, key, OBJ_SET);
OpResult<PrimeIterator> find_res = db_slice.Find(op_args.db_cntx, key, OBJ_SET);
if (!find_res)
return find_res.status();
@ -1241,10 +1241,10 @@ OpResult<StringVec> SetFamily::OpPop(const OpArgs& op_args, std::string_view key
});
/* Delete the set as it is now empty */
CHECK(db_slice.Del(op_args.db_ind, it));
CHECK(db_slice.Del(op_args.db_cntx.db_index, it));
} else {
SetType st{it->second.RObjPtr(), it->second.Encoding()};
db_slice.PreUpdate(op_args.db_ind, it);
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
if (st.second == kEncodingIntSet) {
intset* is = (intset*)st.first;
int64_t val = 0;
@ -1260,14 +1260,14 @@ OpResult<StringVec> SetFamily::OpPop(const OpArgs& op_args, std::string_view key
} else {
result = PopStrSet(count, st);
}
db_slice.PostUpdate(op_args.db_ind, it, key);
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key);
}
return result;
}
OpResult<StringVec> SetFamily::OpScan(const OpArgs& op_args, std::string_view key,
uint64_t* cursor) {
OpResult<PrimeIterator> find_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_SET);
OpResult<PrimeIterator> find_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_SET);
if (!find_res)
return find_res.status();

View File

@ -145,7 +145,7 @@ OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const AddOpts&
pair<PrimeIterator, bool> add_res;
try {
add_res = db_slice.AddOrFind(op_args.db_ind, key);
add_res = db_slice.AddOrFind(op_args.db_cntx, key);
} catch (bad_alloc&) {
return OpStatus::OUT_OF_MEMORY;
}
@ -161,7 +161,7 @@ OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const AddOpts&
if (it->second.ObjType() != OBJ_STREAM)
return OpStatus::WRONG_TYPE;
db_slice.PreUpdate(op_args.db_ind, it);
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
}
stream* stream_inst = (stream*)it->second.RObjPtr();
@ -201,7 +201,7 @@ OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const AddOpts&
OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, const RangeOpts& opts) {
auto& db_slice = op_args.shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM);
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM);
if (!res_it)
return res_it.status();
@ -247,7 +247,7 @@ OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, const RangeO
OpResult<uint32_t> OpLen(const OpArgs& op_args, string_view key) {
auto& db_slice = op_args.shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM);
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM);
if (!res_it)
return res_it.status();
CompactObj& cobj = (*res_it)->second;
@ -255,9 +255,10 @@ OpResult<uint32_t> OpLen(const OpArgs& op_args, string_view key) {
return s->length;
}
OpResult<vector<GroupInfo>> OpListGroups(DbIndex db_index, string_view key, EngineShard* shard) {
OpResult<vector<GroupInfo>> OpListGroups(const DbContext& db_cntx, string_view key,
EngineShard* shard) {
auto& db_slice = shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(db_index, key, OBJ_STREAM);
OpResult<PrimeIterator> res_it = db_slice.Find(db_cntx, key, OBJ_STREAM);
if (!res_it)
return res_it.status();
@ -294,7 +295,7 @@ struct CreateOpts {
OpStatus OpCreate(const OpArgs& op_args, string_view key, const CreateOpts& opts) {
auto* shard = op_args.shard;
auto& db_slice = shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM);
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM);
if (!res_it)
return res_it.status();
@ -323,7 +324,7 @@ OpResult<pair<stream*, streamCG*>> FindGroup(const OpArgs& op_args, string_view
string_view gname) {
auto* shard = op_args.shard;
auto& db_slice = shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM);
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM);
if (!res_it)
return res_it.status();
@ -406,7 +407,7 @@ OpStatus OpSetId(const OpArgs& op_args, string_view key, string_view gname, stri
OpStatus OpSetId2(const OpArgs& op_args, string_view key, const streamID& sid) {
auto* shard = op_args.shard;
auto& db_slice = shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM);
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM);
if (!res_it)
return res_it.status();
@ -445,7 +446,7 @@ OpStatus OpSetId2(const OpArgs& op_args, string_view key, const streamID& sid) {
OpResult<uint32_t> OpDel(const OpArgs& op_args, string_view key, absl::Span<streamID> ids) {
auto* shard = op_args.shard;
auto& db_slice = shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM);
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM);
if (!res_it)
return res_it.status();
@ -719,7 +720,8 @@ void StreamFamily::XInfo(CmdArgList args, ConnectionContext* cntx) {
// We do not use transactional xemantics for xinfo since it's informational command.
auto cb = [&]() {
EngineShard* shard = EngineShard::tlocal();
return OpListGroups(cntx->db_index(), key, shard);
DbContext db_context{.db_index = cntx->db_index(), .time_now_ms = GetCurrentTimeMs()};
return OpListGroups(db_context, key, shard);
};
OpResult<vector<GroupInfo>> result = shard_set->Await(sid, std::move(cb));
@ -744,9 +746,7 @@ void StreamFamily::XInfo(CmdArgList args, ConnectionContext* cntx) {
void StreamFamily::XLen(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpLen(t->GetOpArgs(shard), key);
};
auto cb = [&](Transaction* t, EngineShard* shard) { return OpLen(t->GetOpArgs(shard), key); };
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result || result.status() == OpStatus::KEY_NOTFOUND) {

View File

@ -66,7 +66,7 @@ string_view GetSlice(EngineShard* shard, const PrimeValue& pv, string* tmp) {
inline void RecordJournal(const OpArgs& op_args, string_view key, const PrimeKey& pvalue) {
if (op_args.shard->journal()) {
journal::Entry entry{op_args.db_ind, op_args.txid, key, pvalue};
journal::Entry entry{op_args.db_cntx.db_index, op_args.txid, key, pvalue};
op_args.shard->journal()->RecordEntry(entry);
}
}
@ -77,7 +77,7 @@ OpResult<uint32_t> OpSetRange(const OpArgs& op_args, string_view key, size_t sta
size_t range_len = start + value.size();
if (range_len == 0) {
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_STRING);
auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_STRING);
if (it_res) {
return it_res.value()->second.Size();
} else {
@ -85,7 +85,7 @@ OpResult<uint32_t> OpSetRange(const OpArgs& op_args, string_view key, size_t sta
}
}
auto [it, added] = db_slice.AddOrFind(op_args.db_ind, key);
auto [it, added] = db_slice.AddOrFind(op_args.db_cntx, key);
string s;
@ -99,12 +99,12 @@ OpResult<uint32_t> OpSetRange(const OpArgs& op_args, string_view key, size_t sta
if (s.size() < range_len)
s.resize(range_len);
db_slice.PreUpdate(op_args.db_ind, it);
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
}
memcpy(s.data() + start, value.data(), value.size());
it->second.SetString(s);
db_slice.PostUpdate(op_args.db_ind, it, key, !added);
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !added);
RecordJournal(op_args, key, it->second);
return it->second.Size();
@ -112,7 +112,7 @@ OpResult<uint32_t> OpSetRange(const OpArgs& op_args, string_view key, size_t sta
OpResult<string> OpGetRange(const OpArgs& op_args, string_view key, int32_t start, int32_t end) {
auto& db_slice = op_args.shard->db_slice();
OpResult<PrimeIterator> it_res = db_slice.Find(op_args.db_ind, key, OBJ_STRING);
OpResult<PrimeIterator> it_res = db_slice.Find(op_args.db_cntx, key, OBJ_STRING);
if (!it_res.ok())
return it_res.status();
@ -142,8 +142,8 @@ OpResult<string> OpGetRange(const OpArgs& op_args, string_view key, int32_t star
return string(slice.substr(start, end - start + 1));
};
size_t ExtendExisting(const OpArgs& op_args, PrimeIterator it, string_view key,
string_view val, bool prepend) {
size_t ExtendExisting(const OpArgs& op_args, PrimeIterator it, string_view key, string_view val,
bool prepend) {
string tmp, new_val;
auto* shard = op_args.shard;
string_view slice = GetSlice(shard, it->second, &tmp);
@ -153,9 +153,9 @@ size_t ExtendExisting(const OpArgs& op_args, PrimeIterator it, string_view key,
new_val = absl::StrCat(slice, val);
auto& db_slice = shard->db_slice();
db_slice.PreUpdate(op_args.db_ind, it);
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
it->second.SetString(new_val);
db_slice.PostUpdate(op_args.db_ind, it, key, true);
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, true);
RecordJournal(op_args, key, it->second);
return new_val.size();
@ -166,10 +166,10 @@ OpResult<uint32_t> ExtendOrSet(const OpArgs& op_args, string_view key, string_vi
bool prepend) {
auto* shard = op_args.shard;
auto& db_slice = shard->db_slice();
auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key);
auto [it, inserted] = db_slice.AddOrFind(op_args.db_cntx, key);
if (inserted) {
it->second.SetString(val);
db_slice.PostUpdate(op_args.db_ind, it, key, false);
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, false);
RecordJournal(op_args, key, it->second);
return val.size();
@ -181,10 +181,9 @@ OpResult<uint32_t> ExtendOrSet(const OpArgs& op_args, string_view key, string_vi
return ExtendExisting(op_args, it, key, val, prepend);
}
OpResult<bool> ExtendOrSkip(const OpArgs& op_args, string_view key, string_view val,
bool prepend) {
OpResult<bool> ExtendOrSkip(const OpArgs& op_args, string_view key, string_view val, bool prepend) {
auto& db_slice = op_args.shard->db_slice();
OpResult<PrimeIterator> it_res = db_slice.Find(op_args.db_ind, key, OBJ_STRING);
OpResult<PrimeIterator> it_res = db_slice.Find(op_args.db_cntx, key, OBJ_STRING);
if (!it_res) {
return false;
}
@ -193,7 +192,7 @@ OpResult<bool> ExtendOrSkip(const OpArgs& op_args, string_view key, string_view
}
OpResult<string> OpGet(const OpArgs& op_args, string_view key) {
OpResult<PrimeIterator> it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_STRING);
OpResult<PrimeIterator> it_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_STRING);
if (!it_res.ok())
return it_res.status();
@ -204,14 +203,14 @@ OpResult<string> OpGet(const OpArgs& op_args, string_view key) {
OpResult<double> OpIncrFloat(const OpArgs& op_args, string_view key, double val) {
auto& db_slice = op_args.shard->db_slice();
auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key);
auto [it, inserted] = db_slice.AddOrFind(op_args.db_cntx, key);
char buf[128];
if (inserted) {
char* str = RedisReplyBuilder::FormatDouble(val, buf, sizeof(buf));
it->second.SetString(str);
db_slice.PostUpdate(op_args.db_ind, it, key, false);
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, false);
RecordJournal(op_args, key, it->second);
return val;
@ -241,9 +240,9 @@ OpResult<double> OpIncrFloat(const OpArgs& op_args, string_view key, double val)
char* str = RedisReplyBuilder::FormatDouble(base, buf, sizeof(buf));
db_slice.PreUpdate(op_args.db_ind, it);
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
it->second.SetString(str);
db_slice.PostUpdate(op_args.db_ind, it, key, true);
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, true);
RecordJournal(op_args, key, it->second);
return base;
@ -255,7 +254,7 @@ OpResult<int64_t> OpIncrBy(const OpArgs& op_args, string_view key, int64_t incr,
auto& db_slice = op_args.shard->db_slice();
// we avoid using AddOrFind because of skip_on_missing option for memcache.
auto [it, expire_it] = db_slice.FindExt(op_args.db_ind, key);
auto [it, expire_it] = db_slice.FindExt(op_args.db_cntx, key);
if (!IsValid(it)) {
if (skip_on_missing)
@ -266,7 +265,7 @@ OpResult<int64_t> OpIncrBy(const OpArgs& op_args, string_view key, int64_t incr,
// AddNew calls PostUpdate inside.
try {
it = db_slice.AddNew(op_args.db_ind, key, std::move(cobj), 0);
it = db_slice.AddNew(op_args.db_cntx, key, std::move(cobj), 0);
} catch (bad_alloc&) {
return OpStatus::OUT_OF_MEMORY;
}
@ -293,9 +292,9 @@ OpResult<int64_t> OpIncrBy(const OpArgs& op_args, string_view key, int64_t incr,
int64_t new_val = prev + incr;
DCHECK(!it->second.IsExternal());
db_slice.PreUpdate(op_args.db_ind, it);
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
it->second.SetInt(new_val);
db_slice.PostUpdate(op_args.db_ind, it, key);
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key);
RecordJournal(op_args, key, it->second);
return new_val;
@ -318,7 +317,7 @@ int64_t CalculateAbsTime(int64_t unix_time, bool as_milli) {
OpStatus OpMSet(const OpArgs& op_args, ArgSlice args) {
DCHECK(!args.empty() && args.size() % 2 == 0);
SetCmd::SetParams params{op_args.db_ind};
SetCmd::SetParams params;
SetCmd sg(op_args);
for (size_t i = 0; i < args.size(); i += 2) {
@ -332,19 +331,29 @@ OpStatus OpMSet(const OpArgs& op_args, ArgSlice args) {
return OpStatus::OK;
}
OpResult<void> SetGeneric(ConnectionContext* cntx, const SetCmd::SetParams& sparams,
string_view key, string_view value) {
DCHECK(cntx->transaction);
auto cb = [&](Transaction* t, EngineShard* shard) {
SetCmd sg(t->GetOpArgs(shard));
return sg.Set(sparams, key, value);
};
return cntx->transaction->ScheduleSingleHop(std::move(cb));
}
} // namespace
OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value) {
EngineShard* shard = op_args_.shard;
auto& db_slice = shard->db_slice();
DCHECK_LT(params.db_index, db_slice.db_array_size());
DCHECK(db_slice.IsDbValid(params.db_index));
DCHECK(db_slice.IsDbValid(op_args_.db_cntx.db_index));
VLOG(2) << "Set " << key << "(" << db_slice.shard_id() << ") ";
if (params.IsConditionalSet()) {
const auto [it, expire_it] = db_slice.FindExt(params.db_index, key);
const auto [it, expire_it] = db_slice.FindExt(op_args_.db_cntx, key);
// Make sure that we have this key, and only add it if it does exists
if (params.how == SET_IF_EXISTS) {
if (IsValid(it)) {
@ -363,7 +372,7 @@ OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value
// Trying to add a new entry.
tuple<PrimeIterator, ExpireIterator, bool> add_res;
try {
add_res = db_slice.AddOrFind2(params.db_index, key);
add_res = db_slice.AddOrFind2(op_args_.db_cntx, key);
} catch (bad_alloc& e) {
return OpStatus::OUT_OF_MEMORY;
}
@ -377,20 +386,21 @@ OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value
PrimeValue tvalue{value};
tvalue.SetFlag(params.memcache_flags != 0);
it->second = std::move(tvalue);
db_slice.PostUpdate(params.db_index, it, key, false);
db_slice.PostUpdate(op_args_.db_cntx.db_index, it, key, false);
if (params.expire_after_ms) {
db_slice.UpdateExpire(params.db_index, it, params.expire_after_ms + db_slice.Now());
db_slice.UpdateExpire(op_args_.db_cntx.db_index, it,
params.expire_after_ms + op_args_.db_cntx.time_now_ms);
}
if (params.memcache_flags)
db_slice.SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags);
db_slice.SetMCFlag(op_args_.db_cntx.db_index, it->first.AsRef(), params.memcache_flags);
if (shard->tiered_storage()) { // external storage enabled.
// TODO: we may have a bug if we block the fiber inside UnloadItem - "it" may be invalid
// afterwards.
if (value.size() >= kMinTieredLen) {
shard->tiered_storage()->UnloadItem(params.db_index, it);
shard->tiered_storage()->UnloadItem(op_args_.db_cntx.db_index, it);
}
}
@ -415,23 +425,24 @@ OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIt
}
DbSlice& db_slice = shard->db_slice();
uint64_t at_ms = params.expire_after_ms ? params.expire_after_ms + db_slice.Now() : 0;
uint64_t at_ms =
params.expire_after_ms ? params.expire_after_ms + op_args_.db_cntx.time_now_ms : 0;
if (IsValid(e_it) && at_ms) {
e_it->second = db_slice.FromAbsoluteTime(at_ms);
} else {
// We need to update expiry, or maybe erase the object if it was expired.
bool changed = db_slice.UpdateExpire(params.db_index, it, at_ms);
bool changed = db_slice.UpdateExpire(op_args_.db_cntx.db_index, it, at_ms);
if (changed && at_ms == 0) // erased.
return OpStatus::OK; // TODO: to update journal with deletion.
}
db_slice.PreUpdate(params.db_index, it);
db_slice.PreUpdate(op_args_.db_cntx.db_index, it);
// Check whether we need to update flags table.
bool req_flag_update = (params.memcache_flags != 0) != prime_value.HasFlag();
if (req_flag_update) {
prime_value.SetFlag(params.memcache_flags != 0);
db_slice.SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags);
db_slice.SetMCFlag(op_args_.db_cntx.db_index, it->first.AsRef(), params.memcache_flags);
}
// overwrite existing entry.
@ -443,11 +454,11 @@ OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIt
// can be invalid after the function returns and the functions that follow may access invalid
// entry.
if (shard->tiered_storage()) {
shard->tiered_storage()->UnloadItem(params.db_index, it);
shard->tiered_storage()->UnloadItem(op_args_.db_cntx.db_index, it);
}
}
db_slice.PostUpdate(params.db_index, it, key);
db_slice.PostUpdate(op_args_.db_cntx.db_index, it, key);
RecordJournal(op_args_, key, it->second);
return OpStatus::OK;
@ -459,7 +470,7 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
string_view value = ArgS(args, 2);
SetCmd::SetParams sparams{cntx->db_index()};
SetCmd::SetParams sparams;
sparams.memcache_flags = cntx->conn_state.memcache_flag;
int64_t int_arg;
@ -518,7 +529,8 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
}
}
const auto result{SetGeneric(cntx, std::move(sparams), key, value)};
const auto result{SetGeneric(cntx, sparams, key, value)};
if (result == OpStatus::OK) {
return builder->SendStored();
}
@ -532,17 +544,6 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
return builder->SendSetSkipped();
}
OpResult<void> StringFamily::SetGeneric(ConnectionContext* cntx, SetCmd::SetParams sparams,
std::string_view key, std::string_view value) {
DCHECK(cntx->transaction);
auto cb = [&](Transaction* t, EngineShard* shard) {
SetCmd sg(t->GetOpArgs(shard));
return sg.Set(sparams, key, value);
};
return cntx->transaction->ScheduleSingleHop(std::move(cb));
}
void StringFamily::SetEx(CmdArgList args, ConnectionContext* cntx) {
SetExGeneric(true, std::move(args), cntx);
}
@ -555,7 +556,7 @@ void StringFamily::SetNx(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
string_view value = ArgS(args, 2);
SetCmd::SetParams sparams{cntx->db_index()};
SetCmd::SetParams sparams;
sparams.how = SetCmd::SET_IF_NOTEXIST;
sparams.memcache_flags = cntx->conn_state.memcache_flag;
const auto results{SetGeneric(cntx, std::move(sparams), key, value)};
@ -601,7 +602,7 @@ void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) {
string_view value = ArgS(args, 2);
std::optional<string> prev_val;
SetCmd::SetParams sparams{cntx->db_index()};
SetCmd::SetParams sparams;
sparams.prev_val = &prev_val;
auto cb = [&](Transaction* t, EngineShard* shard) {
@ -771,7 +772,7 @@ void StringFamily::SetExGeneric(bool seconds, CmdArgList args, ConnectionContext
return (*cntx)->SendError(InvalidExpireTime(ArgS(args, 0)));
}
SetCmd::SetParams sparams{cntx->db_index()};
SetCmd::SetParams sparams;
if (seconds)
sparams.expire_after_ms = uint64_t(unit_vals) * 1000;
else
@ -876,7 +877,7 @@ void StringFamily::MSetNx(CmdArgList args, ConnectionContext* cntx) {
auto cb = [&](Transaction* t, EngineShard* es) {
auto args = t->ShardArgsInShard(es->shard_id());
for (size_t i = 0; i < args.size(); i += 2) {
auto it = es->db_slice().FindExt(t->db_index(), args[i]).first;
auto it = es->db_slice().FindExt(t->db_context(), args[i]).first;
if (IsValid(it)) {
exists.store(true, memory_order_relaxed);
break;
@ -906,7 +907,7 @@ void StringFamily::StrLen(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<size_t> {
OpResult<PrimeIterator> it_res = shard->db_slice().Find(t->db_index(), key, OBJ_STRING);
OpResult<PrimeIterator> it_res = shard->db_slice().Find(t->db_context(), key, OBJ_STRING);
if (!it_res.ok())
return it_res.status();
@ -993,7 +994,7 @@ auto StringFamily::OpMGet(bool fetch_mcflag, bool fetch_mcver, const Transaction
auto& db_slice = shard->db_slice();
for (size_t i = 0; i < args.size(); ++i) {
OpResult<PrimeIterator> it_res = db_slice.Find(t->db_index(), args[i], OBJ_STRING);
OpResult<PrimeIterator> it_res = db_slice.Find(t->db_context(), args[i], OBJ_STRING);
if (!it_res)
continue;

View File

@ -26,7 +26,6 @@ class SetCmd {
struct SetParams {
SetHow how = SET_ALWAYS;
DbIndex db_index = 0;
uint32_t memcache_flags = 0;
// Relative value based on now. 0 means no expiration.
@ -34,9 +33,6 @@ class SetCmd {
mutable std::optional<std::string>* prev_val = nullptr; // GETSET option
bool keep_expire = false; // KEEPTTL - TODO: to implement it.
explicit SetParams(DbIndex dib) : db_index(dib) {
}
constexpr bool IsConditionalSet() const {
return how == SET_IF_NOTEXIST || how == SET_IF_EXISTS;
}
@ -82,8 +78,6 @@ class StringFamily {
static void IncrByGeneric(std::string_view key, int64_t val, ConnectionContext* cntx);
static void ExtendGeneric(CmdArgList args, bool prepend, ConnectionContext* cntx);
static void SetExGeneric(bool seconds, CmdArgList args, ConnectionContext* cntx);
static OpResult<void> SetGeneric(ConnectionContext* cntx, SetCmd::SetParams sparams,
std::string_view key, std::string_view value);
struct GetResp {
std::string value;

View File

@ -70,6 +70,8 @@ TEST_F(StringFamilyTest, Incr) {
TEST_F(StringFamilyTest, Append) {
Run({"setex", "key", "100", "val"});
EXPECT_THAT(Run({"ttl", "key"}), IntArg(100));
EXPECT_THAT(Run({"append", "key", "bar"}), IntArg(6));
EXPECT_THAT(Run({"ttl", "key"}), IntArg(100));
}
@ -77,17 +79,17 @@ TEST_F(StringFamilyTest, Append) {
TEST_F(StringFamilyTest, Expire) {
ASSERT_EQ(Run({"set", "key", "val", "PX", "20"}), "OK");
UpdateTime(expire_now_ + 10);
AdvanceTime(10);
EXPECT_EQ(Run({"get", "key"}), "val");
UpdateTime(expire_now_ + 20);
AdvanceTime(10);
EXPECT_THAT(Run({"get", "key"}), ArgType(RespExpr::NIL));
ASSERT_THAT(Run({"set", "i", "1", "PX", "10"}), "OK");
ASSERT_THAT(Run({"incr", "i"}), IntArg(2));
UpdateTime(expire_now_ + 30);
AdvanceTime(10);
ASSERT_THAT(Run({"incr", "i"}), IntArg(1));
}

View File

@ -131,10 +131,9 @@ void BaseFamilyTest::SetUp() {
opts.disable_time_update = true;
service_->Init(nullptr, nullptr, opts);
expire_now_ = absl::GetCurrentTimeNanos() / 1000000;
TEST_current_time_ms = absl::GetCurrentTimeNanos() / 1000000;
auto cb = [&](EngineShard* s) {
s->db_slice().UpdateExpireBase(expire_now_ - 1000, 0);
s->db_slice().UpdateExpireClock(expire_now_);
s->db_slice().UpdateExpireBase(TEST_current_time_ms - 1000, 0);
};
shard_set->RunBriefInParallel(cb);
@ -151,12 +150,6 @@ void BaseFamilyTest::TearDown() {
LOG(INFO) << "Finishing " << test_info->name();
}
// ts is ms
void BaseFamilyTest::UpdateTime(uint64_t ms) {
auto cb = [ms](EngineShard* s) { s->db_slice().UpdateExpireClock(ms); };
shard_set->RunBriefInParallel(cb);
}
void BaseFamilyTest::WaitUntilLocked(DbIndex db_index, string_view key, double timeout) {
auto step = 50us;
auto timeout_micro = chrono::duration_cast<chrono::microseconds> (1000ms * timeout);

View File

@ -68,8 +68,9 @@ class BaseFamilyTest : public ::testing::Test {
TestConnWrapper* AddFindConn(Protocol proto, std::string_view id);
static std::vector<std::string> StrArray(const RespExpr& expr);
// ts is ms
void UpdateTime(uint64_t ms);
void AdvanceTime(int64_t ms) {
TEST_current_time_ms += ms;
}
// Wait for a locked key to unlock. Aborts after timeout seconds passed.
void WaitUntilLocked(DbIndex db_index, std::string_view key, double timeout = 3);
@ -88,7 +89,7 @@ class BaseFamilyTest : public ::testing::Test {
absl::flat_hash_map<std::string, std::unique_ptr<TestConnWrapper>> connections_;
::boost::fibers::mutex mu_;
ConnectionContext::DebugInfo last_cmd_dbg_info_;
uint64_t expire_now_;
std::vector<RespVec*> resp_vec_;
bool single_response_ = true;
};

View File

@ -24,7 +24,7 @@ thread_local Transaction::TLTmpSpace Transaction::tmp_space;
namespace {
std::atomic_uint64_t op_seq{1};
atomic_uint64_t op_seq{1};
[[maybe_unused]] constexpr size_t kTransSize = sizeof(Transaction);
@ -449,10 +449,12 @@ void Transaction::ScheduleInternal() {
}
while (true) {
txid_ = op_seq.fetch_add(1, std::memory_order_relaxed);
txid_ = op_seq.fetch_add(1, memory_order_relaxed);
std::atomic_uint32_t lock_granted_cnt{0};
std::atomic_uint32_t success{0};
atomic_uint32_t lock_granted_cnt{0};
atomic_uint32_t success{0};
time_now_ms_ = GetCurrentTimeMs();
auto cb = [&](EngineShard* shard) {
pair<bool, bool> res = ScheduleInShard(shard);
@ -551,6 +553,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
// above.
// IsArmedInShard() first checks run_count_ before accessing shard_data.
run_count_.fetch_add(1, memory_order_release);
time_now_ms_ = GetCurrentTimeMs();
// Please note that schedule_cb can not update any data on ScheduleSingleHop stack
// since the latter can exit before ScheduleUniqueShard returns.
@ -805,7 +808,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
}
// we can do it because only a single thread writes into txid_ and sd.
txid_ = op_seq.fetch_add(1, std::memory_order_relaxed);
txid_ = op_seq.fetch_add(1, memory_order_relaxed);
sd.pq_pos = shard->txq()->Insert(this);
DCHECK_EQ(0, sd.local_mask & KEYLOCK_ACQUIRED);
@ -1111,7 +1114,7 @@ inline uint32_t Transaction::DecreaseRunCnt() {
::boost::intrusive_ptr guard(this);
// We use release so that no stores will be reordered after.
uint32_t res = run_count_.fetch_sub(1, std::memory_order_release);
uint32_t res = run_count_.fetch_sub(1, memory_order_release);
if (res == 1) {
run_ec_.notify();
}

View File

@ -135,10 +135,6 @@ class Transaction {
const char* Name() const;
DbIndex db_index() const {
return db_index_; // TODO: support multiple db indexes.
}
uint32_t unique_shard_cnt() const {
return unique_shard_cnt_;
}
@ -180,7 +176,15 @@ class Transaction {
KeyLockArgs GetLockArgs(ShardId sid) const;
OpArgs GetOpArgs(EngineShard* shard) const {
return OpArgs{shard, txid_, db_index_};
return OpArgs{shard, txid_, db_context()};
}
DbContext db_context() const {
return DbContext{.db_index = db_index_, .time_now_ms = time_now_ms_};
}
DbIndex db_index() const {
return db_index_;
}
private:
@ -287,15 +291,15 @@ class Transaction {
const CommandId* cid_;
TxId txid_{0};
uint64_t time_now_ms_{0};
std::atomic<TxId> notify_txid_{kuint64max};
std::atomic_uint32_t use_count_{0}, run_count_{0}, seqlock_{0};
// unique_shard_cnt_ and unique_shard_id_ is accessed only by coordinator thread.
uint32_t unique_shard_cnt_{0}; // number of unique shards span by args_
ShardId unique_shard_id_{kInvalidSid};
DbIndex db_index_ = 0;
DbIndex db_index_;
// Used for single-hop transactions with unique_shards_ == 1, hence no data-race.
OpStatus local_result_ = OpStatus::OK;

View File

@ -87,13 +87,13 @@ OpResult<PrimeIterator> FindZEntry(const ZParams& zparams, const OpArgs& op_args
size_t member_len) {
auto& db_slice = op_args.shard->db_slice();
if (zparams.flags & ZADD_IN_XX) {
return db_slice.Find(op_args.db_ind, key, OBJ_ZSET);
return db_slice.Find(op_args.db_cntx, key, OBJ_ZSET);
}
pair<PrimeIterator, bool> add_res;
try {
add_res = db_slice.AddOrFind(op_args.db_ind, key);
add_res = db_slice.AddOrFind(op_args.db_cntx, key);
} catch (bad_alloc&) {
return OpStatus::OUT_OF_MEMORY;
}
@ -110,13 +110,13 @@ OpResult<PrimeIterator> FindZEntry(const ZParams& zparams, const OpArgs& op_args
DVLOG(2) << "Created zset " << zobj->ptr;
if (!add_res.second) {
db_slice.PreUpdate(op_args.db_ind, it);
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
}
it->second.ImportRObj(zobj);
} else {
if (it->second.ObjType() != OBJ_ZSET)
return OpStatus::WRONG_TYPE;
db_slice.PreUpdate(op_args.db_ind, it);
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
}
return it;
@ -615,7 +615,7 @@ OpResult<ScoredMap> OpUnion(EngineShard* shard, Transaction* t, string_view dest
return OpStatus::OK; // return empty map
for (unsigned j = start; j < keys.size(); ++j) {
auto it_res = db_slice.Find(t->db_index(), keys[j], OBJ_ZSET);
auto it_res = db_slice.Find(t->db_context(), keys[j], OBJ_ZSET);
if (it_res == OpStatus::WRONG_TYPE) // TODO: support sets with default score 1.
return it_res.status();
if (!it_res)
@ -661,7 +661,7 @@ OpResult<ScoredMap> OpInter(EngineShard* shard, Transaction* t, string_view dest
return OpStatus::SKIPPED; // return noop
for (unsigned j = start; j < keys.size(); ++j) {
auto it_res = db_slice.Find(t->db_index(), keys[j], OBJ_ZSET);
auto it_res = db_slice.Find(t->db_context(), keys[j], OBJ_ZSET);
if (it_res == OpStatus::WRONG_TYPE) // TODO: support sets with default score 1.
return it_res.status();
@ -710,8 +710,8 @@ OpResult<AddResult> OpAdd(const OpArgs& op_args, const ZParams& zparams, string_
auto& db_slice = op_args.shard->db_slice();
if (zparams.override && members.empty()) {
auto it = db_slice.FindExt(op_args.db_ind, key).first;
db_slice.Del(op_args.db_ind, it);
auto it = db_slice.FindExt(op_args.db_cntx, key).first;
db_slice.Del(op_args.db_cntx.db_index, it);
return OpStatus::OK;
}
@ -763,7 +763,7 @@ OpResult<AddResult> OpAdd(const OpArgs& op_args, const ZParams& zparams, string_
DVLOG(2) << "ZAdd " << zobj->ptr;
res_it.value()->second.SyncRObj();
op_args.shard->db_slice().PostUpdate(op_args.db_ind, *res_it, key);
op_args.shard->db_slice().PostUpdate(op_args.db_cntx.db_index, *res_it, key);
if (zparams.flags & ZADD_IN_INCR) {
aresult.new_score = new_score;
@ -934,7 +934,7 @@ void ZSetFamily::ZCard(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<uint32_t> {
OpResult<PrimeIterator> find_res = shard->db_slice().Find(t->db_index(), key, OBJ_ZSET);
OpResult<PrimeIterator> find_res = shard->db_slice().Find(t->db_context(), key, OBJ_ZSET);
if (!find_res) {
return find_res.status();
}
@ -1505,7 +1505,7 @@ bool ZSetFamily::ParseRangeByScoreParams(CmdArgList args, RangeParams* params) {
OpResult<StringVec> ZSetFamily::OpScan(const OpArgs& op_args, std::string_view key,
uint64_t* cursor) {
OpResult<PrimeIterator> find_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET);
OpResult<PrimeIterator> find_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET);
if (!find_res)
return find_res.status();
@ -1564,11 +1564,11 @@ OpResult<StringVec> ZSetFamily::OpScan(const OpArgs& op_args, std::string_view k
OpResult<unsigned> ZSetFamily::OpRem(const OpArgs& op_args, string_view key, ArgSlice members) {
auto& db_slice = op_args.shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_ind, key, OBJ_ZSET);
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_cntx, key, OBJ_ZSET);
if (!res_it)
return res_it.status();
db_slice.PreUpdate(op_args.db_ind, *res_it);
db_slice.PreUpdate(op_args.db_cntx.db_index, *res_it);
robj* zobj = res_it.value()->second.AsRObj();
sds& tmp_str = op_args.shard->tmp_str1;
unsigned deleted = 0;
@ -1578,17 +1578,17 @@ OpResult<unsigned> ZSetFamily::OpRem(const OpArgs& op_args, string_view key, Arg
}
auto zlen = zsetLength(zobj);
res_it.value()->second.SyncRObj();
db_slice.PostUpdate(op_args.db_ind, *res_it, key);
db_slice.PostUpdate(op_args.db_cntx.db_index, *res_it, key);
if (zlen == 0) {
CHECK(op_args.shard->db_slice().Del(op_args.db_ind, res_it.value()));
CHECK(op_args.shard->db_slice().Del(op_args.db_cntx.db_index, res_it.value()));
}
return deleted;
}
OpResult<double> ZSetFamily::OpScore(const OpArgs& op_args, string_view key, string_view member) {
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET);
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET);
if (!res_it)
return res_it.status();
@ -1605,7 +1605,7 @@ OpResult<double> ZSetFamily::OpScore(const OpArgs& op_args, string_view key, str
auto ZSetFamily::OpRange(const ZRangeSpec& range_spec, const OpArgs& op_args, string_view key)
-> OpResult<ScoredArray> {
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET);
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET);
if (!res_it)
return res_it.status();
@ -1620,11 +1620,11 @@ auto ZSetFamily::OpRange(const ZRangeSpec& range_spec, const OpArgs& op_args, st
OpResult<unsigned> ZSetFamily::OpRemRange(const OpArgs& op_args, string_view key,
const ZRangeSpec& range_spec) {
auto& db_slice = op_args.shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_ind, key, OBJ_ZSET);
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_cntx, key, OBJ_ZSET);
if (!res_it)
return res_it.status();
db_slice.PreUpdate(op_args.db_ind, *res_it);
db_slice.PreUpdate(op_args.db_cntx.db_index, *res_it);
robj* zobj = res_it.value()->second.AsRObj();
@ -1632,11 +1632,11 @@ OpResult<unsigned> ZSetFamily::OpRemRange(const OpArgs& op_args, string_view key
std::visit(iv, range_spec.interval);
res_it.value()->second.SyncRObj();
db_slice.PostUpdate(op_args.db_ind, *res_it, key);
db_slice.PostUpdate(op_args.db_cntx.db_index, *res_it, key);
auto zlen = zsetLength(zobj);
if (zlen == 0) {
CHECK(op_args.shard->db_slice().Del(op_args.db_ind, res_it.value()));
CHECK(op_args.shard->db_slice().Del(op_args.db_cntx.db_index, res_it.value()));
}
return iv.removed();
@ -1644,7 +1644,7 @@ OpResult<unsigned> ZSetFamily::OpRemRange(const OpArgs& op_args, string_view key
OpResult<unsigned> ZSetFamily::OpRank(const OpArgs& op_args, string_view key, string_view member,
bool reverse) {
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET);
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET);
if (!res_it)
return res_it.status();
@ -1659,7 +1659,7 @@ OpResult<unsigned> ZSetFamily::OpRank(const OpArgs& op_args, string_view key, st
OpResult<unsigned> ZSetFamily::OpCount(const OpArgs& op_args, std::string_view key,
const ScoreInterval& interval) {
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET);
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET);
if (!res_it)
return res_it.status();
@ -1730,7 +1730,7 @@ OpResult<unsigned> ZSetFamily::OpCount(const OpArgs& op_args, std::string_view k
OpResult<unsigned> ZSetFamily::OpLexCount(const OpArgs& op_args, string_view key,
const ZSetFamily::LexInterval& interval) {
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET);
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET);
if (!res_it)
return res_it.status();