Implement ongoing garbage collection of expired items

This commit is contained in:
Roman Gershman 2022-03-14 02:16:51 +02:00
parent 4fcb74930e
commit affabbaee7
10 changed files with 114 additions and 34 deletions

View File

@ -214,6 +214,7 @@ class DashTable : public detail::DashTableBase {
return true; return true;
} }
void RecordSplit() {}
/* /*
/// Required interface in case can_gc is true /// Required interface in case can_gc is true
// Returns number of garbage collected items deleted. 0 - means nothing has been // Returns number of garbage collected items deleted. 0 - means nothing has been
@ -625,6 +626,7 @@ auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, Evictio
} }
Split(seg_id); Split(seg_id);
ev.RecordSplit();
} }
return std::make_pair(iterator{}, false); return std::make_pair(iterator{}, false);

View File

@ -473,6 +473,8 @@ struct TestEvictionPolicy {
return tbl.bucket_count() < max_capacity; return tbl.bucket_count() < max_capacity;
} }
void RecordSplit() {}
unsigned Evict(const Dash64::EvictionBuckets& eb, Dash64* me) const { unsigned Evict(const Dash64::EvictionBuckets& eb, Dash64* me) const {
if (!evict_enabled) if (!evict_enabled)
return 0; return 0;

View File

@ -73,17 +73,23 @@ bool ParseHumanReadableBytes(std::string_view str, int64_t* num_bytes) {
// NB: an int64 can only go up to <8 EB. // NB: an int64 can only go up to <8 EB.
case 'E': case 'E':
scale <<= 10; // Fall through... scale <<= 10; // Fall through...
ABSL_FALLTHROUGH_INTENDED;
case 'P': case 'P':
scale <<= 10; scale <<= 10;
ABSL_FALLTHROUGH_INTENDED;
case 'T': case 'T':
scale <<= 10; scale <<= 10;
ABSL_FALLTHROUGH_INTENDED;
case 'G': case 'G':
scale <<= 10; scale <<= 10;
ABSL_FALLTHROUGH_INTENDED;
case 'M': case 'M':
scale <<= 10; scale <<= 10;
ABSL_FALLTHROUGH_INTENDED;
case 'K': case 'K':
case 'k': case 'k':
scale <<= 10; scale <<= 10;
ABSL_FALLTHROUGH_INTENDED;
case 'B': case 'B':
case '\0': case '\0':
break; // To here. break; // To here.

View File

@ -23,6 +23,8 @@ using namespace std;
using namespace util; using namespace util;
using facade::OpStatus; using facade::OpStatus;
namespace {
constexpr auto kPrimeSegmentSize = PrimeTable::kSegBytes; constexpr auto kPrimeSegmentSize = PrimeTable::kSegBytes;
constexpr auto kExpireSegmentSize = ExpireTable::kSegBytes; constexpr auto kExpireSegmentSize = ExpireTable::kSegBytes;
@ -32,6 +34,57 @@ static_assert(kPrimeSegmentSize == 32720);
// 20480 is the next goodsize so we are loosing ~300 bytes or 1.5%. // 20480 is the next goodsize so we are loosing ~300 bytes or 1.5%.
static_assert(kExpireSegmentSize == 20168); static_assert(kExpireSegmentSize == 20168);
class PrimeEvictionPolicy {
public:
static constexpr bool can_evict = false;
static constexpr bool can_gc = true;
PrimeEvictionPolicy(DbIndex db_indx, DbSlice* db_slice, int64_t mem_budget)
: db_slice_(db_slice), mem_budget_(mem_budget) {
db_indx_ = db_indx;
}
void RecordSplit() {
mem_budget_ -= PrimeTable::kSegBytes;
}
bool CanGrow(const PrimeTable& tbl) const {
return mem_budget_ > int64_t(PrimeTable::kSegBytes);
}
unsigned GarbageCollect(const PrimeTable::EvictionBuckets& eb, PrimeTable* me) {
unsigned res = 0;
for (unsigned i = 0; i < ABSL_ARRAYSIZE(eb.iter); ++i) {
auto it = eb.iter[i];
for (; !it.is_done(); ++it) {
if (it->second.HasExpire()) {
auto [prime_it, exp_it] = db_slice_->ExpireIfNeeded(db_indx_, it);
if (prime_it.is_done())
++res;
}
}
}
gc_count_ += res;
return res;
}
int64_t mem_budget() const {
return mem_budget_;
}
unsigned gc_count() const {
return gc_count_;
}
private:
DbSlice* db_slice_;
int64_t mem_budget_;
DbIndex db_indx_;
unsigned gc_count_ = 0;
};
} // namespace
#define ADD(x) (x) += o.x #define ADD(x) (x) += o.x
DbStats& DbStats::operator+=(const DbStats& o) { DbStats& DbStats::operator+=(const DbStats& o) {
@ -52,10 +105,11 @@ DbStats& DbStats::operator+=(const DbStats& o) {
} }
SliceEvents& SliceEvents::operator+=(const SliceEvents& o) { SliceEvents& SliceEvents::operator+=(const SliceEvents& o) {
static_assert(sizeof(SliceEvents) == 16, "You should update this function with new fields"); static_assert(sizeof(SliceEvents) == 24, "You should update this function with new fields");
ADD(evicted_keys); ADD(evicted_keys);
ADD(expired_keys); ADD(expired_keys);
ADD(garbage_collected);
return *this; return *this;
} }
@ -181,14 +235,18 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pair<MainIterator,
} }
} }
PrimeEvictionPolicy evp{db_index, this, int64_t(memory_budget_ - key.size())};
// Fast-path - change_cb_ is empty so we Find or Add using // Fast-path - change_cb_ is empty so we Find or Add using
// the insert operation: twice more efficient. // the insert operation: twice more efficient.
CompactObj co_key{key}; CompactObj co_key{key};
auto [it, inserted] = db->prime_table.Insert(std::move(co_key), PrimeValue{}); auto [it, inserted] = db->prime_table.Insert(std::move(co_key), PrimeValue{});
if (inserted) { // new entry if (inserted) { // new entry
db->stats.inline_keys += it->first.IsInline(); db->stats.inline_keys += it->first.IsInline();
db->stats.obj_memory_usage += it->first.MallocUsed(); db->stats.obj_memory_usage += it->first.MallocUsed();
it.SetVersion(NextVersion()); it.SetVersion(NextVersion());
memory_budget_ = evp.mem_budget();
return make_pair(it, true); return make_pair(it, true);
} }
@ -289,9 +347,9 @@ size_t DbSlice::FlushDb(DbIndex db_ind) {
// Returns true if a state has changed, false otherwise. // Returns true if a state has changed, false otherwise.
bool DbSlice::Expire(DbIndex db_ind, MainIterator it, uint64_t at) { bool DbSlice::Expire(DbIndex db_ind, MainIterator it, uint64_t at) {
auto& db = db_arr_[db_ind]; auto& db = *db_arr_[db_ind];
if (at == 0 && it->second.HasExpire()) { if (at == 0 && it->second.HasExpire()) {
CHECK_EQ(1u, db->expire_table.Erase(it->first)); CHECK_EQ(1u, db.expire_table.Erase(it->first));
it->second.SetExpire(false); it->second.SetExpire(false);
return true; return true;
@ -300,7 +358,7 @@ bool DbSlice::Expire(DbIndex db_ind, MainIterator it, uint64_t at) {
if (!it->second.HasExpire() && at) { if (!it->second.HasExpire() && at) {
uint64_t delta = at - expire_base_[0]; // TODO: employ multigen expire updates. uint64_t delta = at - expire_base_[0]; // TODO: employ multigen expire updates.
CHECK(db->expire_table.Insert(it->first.AsRef(), ExpirePeriod(delta)).second); CHECK(db.expire_table.Insert(it->first.AsRef(), ExpirePeriod(delta)).second);
it->second.SetExpire(true); it->second.SetExpire(true);
return true; return true;
@ -344,13 +402,17 @@ pair<MainIterator, bool> DbSlice::AddIfNotExist(DbIndex db_ind, string_view key,
auto& db = *db_arr_[db_ind]; auto& db = *db_arr_[db_ind];
CompactObj co_key{key}; CompactObj co_key{key};
memory_budget_ -= key.size();
auto [new_entry, inserted] = db.prime_table.Insert(std::move(co_key), std::move(obj)); PrimeEvictionPolicy evp{db_ind, this, memory_budget_};
auto [new_entry, inserted] = db.prime_table.Insert(std::move(co_key), std::move(obj), evp);
// in this case obj won't be moved and will be destroyed during unwinding. // in this case obj won't be moved and will be destroyed during unwinding.
if (!inserted) if (!inserted)
return make_pair(new_entry, false); return make_pair(new_entry, false);
events_.garbage_collected += evp.gc_count();
memory_budget_ = evp.mem_budget();
new_entry.SetVersion(NextVersion()); new_entry.SetVersion(NextVersion());
db.stats.inline_keys += new_entry->first.IsInline(); db.stats.inline_keys += new_entry->first.IsInline();

View File

@ -51,6 +51,7 @@ struct SliceEvents {
// Number of eviction events. // Number of eviction events.
size_t evicted_keys = 0; size_t evicted_keys = 0;
size_t expired_keys = 0; size_t expired_keys = 0;
size_t garbage_collected = 0;
SliceEvents& operator+=(const SliceEvents& o); SliceEvents& operator+=(const SliceEvents& o);
}; };
@ -96,8 +97,8 @@ class DbSlice {
expire_base_[generation & 1] = now; expire_base_[generation & 1] = now;
} }
void SetMaxMemory(size_t max_memory) { void SetMemoryBudget(int64_t budget) {
max_memory_ = max_memory; memory_budget_ = budget;
} }
uint64_t expire_base() const { uint64_t expire_base() const {
@ -130,10 +131,12 @@ class DbSlice {
// Adds a new entry. Requires: key does not exist in this slice. // Adds a new entry. Requires: key does not exist in this slice.
// Returns the iterator to the newly added entry. // Returns the iterator to the newly added entry.
// throws: bad_alloc is insertion could not happen due to out of memory.
MainIterator AddNew(DbIndex db_ind, std::string_view key, PrimeValue obj, uint64_t expire_at_ms); MainIterator AddNew(DbIndex db_ind, std::string_view key, PrimeValue obj, uint64_t expire_at_ms);
// Adds a new entry if a key does not exists. Returns true if insertion took place, // Adds a new entry if a key does not exists. Returns true if insertion took place,
// false otherwise. expire_at_ms equal to 0 - means no expiry. // false otherwise. expire_at_ms equal to 0 - means no expiry.
// throws: bad_alloc is insertion could not happen due to out of memory.
std::pair<MainIterator, bool> AddIfNotExist(DbIndex db_ind, std::string_view key, PrimeValue obj, std::pair<MainIterator, bool> AddIfNotExist(DbIndex db_ind, std::string_view key, PrimeValue obj,
uint64_t expire_at_ms); uint64_t expire_at_ms);
@ -227,7 +230,7 @@ class DbSlice {
uint64_t expire_base_[2]; // Used for expire logic, represents a real clock. uint64_t expire_base_[2]; // Used for expire logic, represents a real clock.
uint64_t version_ = 1; // Used to version entries in the PrimeTable. uint64_t version_ = 1; // Used to version entries in the PrimeTable.
uint64_t max_memory_ = -1; int64_t memory_budget_ = INT64_MAX;
mutable SliceEvents events_; // we may change this even for const operations. mutable SliceEvents events_; // we may change this even for const operations.
using LockTable = absl::flat_hash_map<std::string, IntentLock>; using LockTable = absl::flat_hash_map<std::string, IntentLock>;

View File

@ -508,12 +508,14 @@ void EngineShard::CacheStats() {
#endif #endif
// mi_heap_visit_blocks(tlh, false /* visit all blocks*/, visit_cb, &sum); // mi_heap_visit_blocks(tlh, false /* visit all blocks*/, visit_cb, &sum);
mi_stats_merge(); mi_stats_merge();
// stats_.heap_used_bytes = sum.used;
stats_.heap_used_bytes = size_t used_mem = UsedMemory();
mi_resource_.used() + zmalloc_used_memory_tl + SmallString::UsedThreadLocal(); cached_stats[db_slice_.shard_id()].used_memory.store(used_mem,
cached_stats[db_slice_.shard_id()].used_memory.store(stats_.heap_used_bytes,
memory_order_relaxed); memory_order_relaxed);
// stats_.heap_comitted_bytes = sum.comitted; }
size_t EngineShard::UsedMemory() const {
return mi_resource_.used() + zmalloc_used_memory_tl + SmallString::UsedThreadLocal();
} }
void EngineShardSet::Init(uint32_t sz) { void EngineShardSet::Init(uint32_t sz) {

View File

@ -27,15 +27,6 @@ class EngineShard {
struct Stats { struct Stats {
uint64_t ooo_runs = 0; // how many times transactions run as OOO. uint64_t ooo_runs = 0; // how many times transactions run as OOO.
uint64_t quick_runs = 0; // how many times single shard "RunQuickie" transaction run. uint64_t quick_runs = 0; // how many times single shard "RunQuickie" transaction run.
// number of bytes that were allocated by the application (with mi_mallocxxx methods).
// should be less than or equal to heap_comitted_bytes.
// Cached every few millis.
size_t heap_used_bytes = 0;
// number of bytes comitted by the allocator library (i.e. mmapped into physical memory).
//
// size_t heap_comitted_bytes = 0;
}; };
// EngineShard() is private down below. // EngineShard() is private down below.
@ -127,6 +118,9 @@ class EngineShard {
return stats_; return stats_;
} }
// Returns used memory for this shard.
size_t UsedMemory() const;
// for everyone to use for string transformations during atomic cpu sequences. // for everyone to use for string transformations during atomic cpu sequences.
sds tmp_str1, tmp_str2; sds tmp_str1, tmp_str2;

View File

@ -344,9 +344,7 @@ Metrics ServerFamily::GetMetrics() const {
result.db += db_stats.db; result.db += db_stats.db;
result.events += db_stats.events; result.events += db_stats.events;
EngineShard::Stats shard_stats = shard->stats(); result.heap_used_bytes += shard->UsedMemory();
// result.heap_comitted_bytes += shard_stats.heap_comitted_bytes;
result.heap_used_bytes += shard_stats.heap_used_bytes;
} }
}; };
@ -446,6 +444,7 @@ tcp_port:)";
absl::StrAppend(&info, "instantaneous_output_kbps:", -1, "\n"); absl::StrAppend(&info, "instantaneous_output_kbps:", -1, "\n");
absl::StrAppend(&info, "rejected_connections:", -1, "\n"); absl::StrAppend(&info, "rejected_connections:", -1, "\n");
absl::StrAppend(&info, "expired_keys:", m.events.expired_keys, "\n"); absl::StrAppend(&info, "expired_keys:", m.events.expired_keys, "\n");
absl::StrAppend(&info, "gc_keys:", m.events.garbage_collected, "\n");
absl::StrAppend(&info, "keyspace_hits:", -1, "\n"); absl::StrAppend(&info, "keyspace_hits:", -1, "\n");
absl::StrAppend(&info, "keyspace_misses:", -1, "\n"); absl::StrAppend(&info, "keyspace_misses:", -1, "\n");
absl::StrAppend(&info, "total_reads_processed:", m.conn_stats.io_read_cnt, "\n"); absl::StrAppend(&info, "total_reads_processed:", m.conn_stats.io_read_cnt, "\n");

View File

@ -42,7 +42,7 @@ OpResult<void> SetCmd::Set(const SetParams& params, std::string_view key, std::s
DCHECK_LT(params.db_index, db_slice_->db_array_size()); DCHECK_LT(params.db_index, db_slice_->db_array_size());
DCHECK(db_slice_->IsDbValid(params.db_index)); DCHECK(db_slice_->IsDbValid(params.db_index));
VLOG(2) << "Set (" << db_slice_->shard_id() << ") "; VLOG(2) << "Set " << key << "(" << db_slice_->shard_id() << ") ";
auto [it, expire_it] = db_slice_->FindExt(params.db_index, key); auto [it, expire_it] = db_slice_->FindExt(params.db_index, key);
uint64_t at_ms = params.expire_after_ms ? params.expire_after_ms + db_slice_->Now() : 0; uint64_t at_ms = params.expire_after_ms ? params.expire_after_ms + db_slice_->Now() : 0;
@ -51,30 +51,35 @@ OpResult<void> SetCmd::Set(const SetParams& params, std::string_view key, std::s
if (params.how == SET_IF_NOTEXIST) if (params.how == SET_IF_NOTEXIST)
return OpStatus::SKIPPED; return OpStatus::SKIPPED;
PrimeValue& prime_value = it->second;
if (params.prev_val) { if (params.prev_val) {
if (it->second.ObjType() != OBJ_STRING) if (prime_value.ObjType() != OBJ_STRING)
return OpStatus::WRONG_TYPE; return OpStatus::WRONG_TYPE;
string val; string val;
it->second.GetString(&val); prime_value.GetString(&val);
params.prev_val->emplace(move(val)); params.prev_val->emplace(move(val));
} }
if (IsValid(expire_it) && at_ms) { if (IsValid(expire_it) && at_ms) {
expire_it->second.Set(at_ms - db_slice_->expire_base()); expire_it->second.Set(at_ms - db_slice_->expire_base());
} else { } else {
db_slice_->Expire(params.db_index, it, at_ms); bool changed = db_slice_->Expire(params.db_index, it, at_ms);
if (changed && at_ms == 0) // erased.
return OpStatus::OK;
} }
db_slice_->PreUpdate(params.db_index, it); db_slice_->PreUpdate(params.db_index, it);
// Check whether we need to update flags table. // Check whether we need to update flags table.
bool req_flag_update = (params.memcache_flags != 0) != it->second.HasFlag(); bool req_flag_update = (params.memcache_flags != 0) != prime_value.HasFlag();
if (req_flag_update) { if (req_flag_update) {
it->second.SetFlag(params.memcache_flags != 0); prime_value.SetFlag(params.memcache_flags != 0);
db_slice_->SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags); db_slice_->SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags);
} }
it->second.SetString(value); prime_value.SetString(value); // erases all masks.
prime_value.SetExpire(at_ms != 0); // set expire mask.
db_slice_->PostUpdate(params.db_index, it); db_slice_->PostUpdate(params.db_index, it);
return OpStatus::OK; return OpStatus::OK;
@ -99,7 +104,6 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1); string_view key = ArgS(args, 1);
string_view value = ArgS(args, 2); string_view value = ArgS(args, 2);
VLOG(2) << "Set " << key << " " << value;
SetCmd::SetParams sparams{cntx->db_index()}; SetCmd::SetParams sparams{cntx->db_index()};
sparams.memcache_flags = cntx->conn_state.memcache_flag; sparams.memcache_flags = cntx->conn_state.memcache_flag;
@ -140,7 +144,7 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
} else if (cur_arg == "XX") { } else if (cur_arg == "XX") {
sparams.how = SetCmd::SET_IF_EXISTS; sparams.how = SetCmd::SET_IF_EXISTS;
} else if (cur_arg == "KEEPTTL") { } else if (cur_arg == "KEEPTTL") {
sparams.keep_expire = true; sparams.keep_expire = true; // TODO
} else { } else {
return builder->SendError(kSyntaxErr); return builder->SendError(kSyntaxErr);
} }

View File

@ -252,4 +252,10 @@ TEST_F(StringFamilyTest, MSetIncr) {
get_fb.join(); get_fb.join();
} }
TEST_F(StringFamilyTest, SetEx) {
ASSERT_THAT(Run({"setex", "key", "10", "val"}), RespEq("OK"));
ASSERT_THAT(Run({"setex", "key", "10", "val"}), RespEq("OK"));
ASSERT_THAT(Run({"setex", "key", "10", "val"}), RespEq("OK"));
}
} // namespace dfly } // namespace dfly