diff --git a/src/core/dash.h b/src/core/dash.h index 161876d..3f1eb4c 100644 --- a/src/core/dash.h +++ b/src/core/dash.h @@ -214,6 +214,7 @@ class DashTable : public detail::DashTableBase { return true; } + void RecordSplit() {} /* /// Required interface in case can_gc is true // Returns number of garbage collected items deleted. 0 - means nothing has been @@ -625,6 +626,7 @@ auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, Evictio } Split(seg_id); + ev.RecordSplit(); } return std::make_pair(iterator{}, false); diff --git a/src/core/dash_test.cc b/src/core/dash_test.cc index ad3a7f0..1f7f102 100644 --- a/src/core/dash_test.cc +++ b/src/core/dash_test.cc @@ -473,6 +473,8 @@ struct TestEvictionPolicy { return tbl.bucket_count() < max_capacity; } + void RecordSplit() {} + unsigned Evict(const Dash64::EvictionBuckets& eb, Dash64* me) const { if (!evict_enabled) return 0; diff --git a/src/server/common.cc b/src/server/common.cc index e4697bf..d4ef2b8 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -73,17 +73,23 @@ bool ParseHumanReadableBytes(std::string_view str, int64_t* num_bytes) { // NB: an int64 can only go up to <8 EB. case 'E': scale <<= 10; // Fall through... + ABSL_FALLTHROUGH_INTENDED; case 'P': scale <<= 10; + ABSL_FALLTHROUGH_INTENDED; case 'T': scale <<= 10; + ABSL_FALLTHROUGH_INTENDED; case 'G': scale <<= 10; + ABSL_FALLTHROUGH_INTENDED; case 'M': scale <<= 10; + ABSL_FALLTHROUGH_INTENDED; case 'K': case 'k': scale <<= 10; + ABSL_FALLTHROUGH_INTENDED; case 'B': case '\0': break; // To here. diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 72fdb1b..ccc7ebc 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -23,6 +23,8 @@ using namespace std; using namespace util; using facade::OpStatus; +namespace { + constexpr auto kPrimeSegmentSize = PrimeTable::kSegBytes; constexpr auto kExpireSegmentSize = ExpireTable::kSegBytes; @@ -32,6 +34,57 @@ static_assert(kPrimeSegmentSize == 32720); // 20480 is the next goodsize so we are loosing ~300 bytes or 1.5%. static_assert(kExpireSegmentSize == 20168); +class PrimeEvictionPolicy { + public: + static constexpr bool can_evict = false; + static constexpr bool can_gc = true; + + PrimeEvictionPolicy(DbIndex db_indx, DbSlice* db_slice, int64_t mem_budget) + : db_slice_(db_slice), mem_budget_(mem_budget) { + db_indx_ = db_indx; + } + + void RecordSplit() { + mem_budget_ -= PrimeTable::kSegBytes; + } + + bool CanGrow(const PrimeTable& tbl) const { + return mem_budget_ > int64_t(PrimeTable::kSegBytes); + } + + unsigned GarbageCollect(const PrimeTable::EvictionBuckets& eb, PrimeTable* me) { + unsigned res = 0; + for (unsigned i = 0; i < ABSL_ARRAYSIZE(eb.iter); ++i) { + auto it = eb.iter[i]; + for (; !it.is_done(); ++it) { + if (it->second.HasExpire()) { + auto [prime_it, exp_it] = db_slice_->ExpireIfNeeded(db_indx_, it); + if (prime_it.is_done()) + ++res; + } + } + } + + gc_count_ += res; + return res; + } + + int64_t mem_budget() const { + return mem_budget_; + } + unsigned gc_count() const { + return gc_count_; + } + + private: + DbSlice* db_slice_; + int64_t mem_budget_; + DbIndex db_indx_; + unsigned gc_count_ = 0; +}; + +} // namespace + #define ADD(x) (x) += o.x DbStats& DbStats::operator+=(const DbStats& o) { @@ -52,10 +105,11 @@ DbStats& DbStats::operator+=(const DbStats& o) { } SliceEvents& SliceEvents::operator+=(const SliceEvents& o) { - static_assert(sizeof(SliceEvents) == 16, "You should update this function with new fields"); + static_assert(sizeof(SliceEvents) == 24, "You should update this function with new fields"); ADD(evicted_keys); ADD(expired_keys); + ADD(garbage_collected); return *this; } @@ -181,14 +235,18 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pairprime_table.Insert(std::move(co_key), PrimeValue{}); if (inserted) { // new entry db->stats.inline_keys += it->first.IsInline(); db->stats.obj_memory_usage += it->first.MallocUsed(); it.SetVersion(NextVersion()); + memory_budget_ = evp.mem_budget(); return make_pair(it, true); } @@ -289,9 +347,9 @@ size_t DbSlice::FlushDb(DbIndex db_ind) { // Returns true if a state has changed, false otherwise. bool DbSlice::Expire(DbIndex db_ind, MainIterator it, uint64_t at) { - auto& db = db_arr_[db_ind]; + auto& db = *db_arr_[db_ind]; if (at == 0 && it->second.HasExpire()) { - CHECK_EQ(1u, db->expire_table.Erase(it->first)); + CHECK_EQ(1u, db.expire_table.Erase(it->first)); it->second.SetExpire(false); return true; @@ -300,7 +358,7 @@ bool DbSlice::Expire(DbIndex db_ind, MainIterator it, uint64_t at) { if (!it->second.HasExpire() && at) { uint64_t delta = at - expire_base_[0]; // TODO: employ multigen expire updates. - CHECK(db->expire_table.Insert(it->first.AsRef(), ExpirePeriod(delta)).second); + CHECK(db.expire_table.Insert(it->first.AsRef(), ExpirePeriod(delta)).second); it->second.SetExpire(true); return true; @@ -344,13 +402,17 @@ pair DbSlice::AddIfNotExist(DbIndex db_ind, string_view key, auto& db = *db_arr_[db_ind]; CompactObj co_key{key}; + memory_budget_ -= key.size(); - auto [new_entry, inserted] = db.prime_table.Insert(std::move(co_key), std::move(obj)); + PrimeEvictionPolicy evp{db_ind, this, memory_budget_}; + auto [new_entry, inserted] = db.prime_table.Insert(std::move(co_key), std::move(obj), evp); // in this case obj won't be moved and will be destroyed during unwinding. if (!inserted) return make_pair(new_entry, false); + events_.garbage_collected += evp.gc_count(); + memory_budget_ = evp.mem_budget(); new_entry.SetVersion(NextVersion()); db.stats.inline_keys += new_entry->first.IsInline(); diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 3a020de..9a1d2dd 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -51,6 +51,7 @@ struct SliceEvents { // Number of eviction events. size_t evicted_keys = 0; size_t expired_keys = 0; + size_t garbage_collected = 0; SliceEvents& operator+=(const SliceEvents& o); }; @@ -96,8 +97,8 @@ class DbSlice { expire_base_[generation & 1] = now; } - void SetMaxMemory(size_t max_memory) { - max_memory_ = max_memory; + void SetMemoryBudget(int64_t budget) { + memory_budget_ = budget; } uint64_t expire_base() const { @@ -130,10 +131,12 @@ class DbSlice { // Adds a new entry. Requires: key does not exist in this slice. // Returns the iterator to the newly added entry. + // throws: bad_alloc is insertion could not happen due to out of memory. MainIterator AddNew(DbIndex db_ind, std::string_view key, PrimeValue obj, uint64_t expire_at_ms); // Adds a new entry if a key does not exists. Returns true if insertion took place, // false otherwise. expire_at_ms equal to 0 - means no expiry. + // throws: bad_alloc is insertion could not happen due to out of memory. std::pair AddIfNotExist(DbIndex db_ind, std::string_view key, PrimeValue obj, uint64_t expire_at_ms); @@ -227,7 +230,7 @@ class DbSlice { uint64_t expire_base_[2]; // Used for expire logic, represents a real clock. uint64_t version_ = 1; // Used to version entries in the PrimeTable. - uint64_t max_memory_ = -1; + int64_t memory_budget_ = INT64_MAX; mutable SliceEvents events_; // we may change this even for const operations. using LockTable = absl::flat_hash_map; diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 6946acf..7587fdb 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -508,12 +508,14 @@ void EngineShard::CacheStats() { #endif // mi_heap_visit_blocks(tlh, false /* visit all blocks*/, visit_cb, &sum); mi_stats_merge(); - // stats_.heap_used_bytes = sum.used; - stats_.heap_used_bytes = - mi_resource_.used() + zmalloc_used_memory_tl + SmallString::UsedThreadLocal(); - cached_stats[db_slice_.shard_id()].used_memory.store(stats_.heap_used_bytes, + + size_t used_mem = UsedMemory(); + cached_stats[db_slice_.shard_id()].used_memory.store(used_mem, memory_order_relaxed); - // stats_.heap_comitted_bytes = sum.comitted; +} + +size_t EngineShard::UsedMemory() const { + return mi_resource_.used() + zmalloc_used_memory_tl + SmallString::UsedThreadLocal(); } void EngineShardSet::Init(uint32_t sz) { diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index cc0f8f7..080a364 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -27,15 +27,6 @@ class EngineShard { struct Stats { uint64_t ooo_runs = 0; // how many times transactions run as OOO. uint64_t quick_runs = 0; // how many times single shard "RunQuickie" transaction run. - - // number of bytes that were allocated by the application (with mi_mallocxxx methods). - // should be less than or equal to heap_comitted_bytes. - // Cached every few millis. - size_t heap_used_bytes = 0; - - // number of bytes comitted by the allocator library (i.e. mmapped into physical memory). - // - // size_t heap_comitted_bytes = 0; }; // EngineShard() is private down below. @@ -127,6 +118,9 @@ class EngineShard { return stats_; } + // Returns used memory for this shard. + size_t UsedMemory() const; + // for everyone to use for string transformations during atomic cpu sequences. sds tmp_str1, tmp_str2; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 9cdf759..079e9ba 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -344,9 +344,7 @@ Metrics ServerFamily::GetMetrics() const { result.db += db_stats.db; result.events += db_stats.events; - EngineShard::Stats shard_stats = shard->stats(); - // result.heap_comitted_bytes += shard_stats.heap_comitted_bytes; - result.heap_used_bytes += shard_stats.heap_used_bytes; + result.heap_used_bytes += shard->UsedMemory(); } }; @@ -446,6 +444,7 @@ tcp_port:)"; absl::StrAppend(&info, "instantaneous_output_kbps:", -1, "\n"); absl::StrAppend(&info, "rejected_connections:", -1, "\n"); absl::StrAppend(&info, "expired_keys:", m.events.expired_keys, "\n"); + absl::StrAppend(&info, "gc_keys:", m.events.garbage_collected, "\n"); absl::StrAppend(&info, "keyspace_hits:", -1, "\n"); absl::StrAppend(&info, "keyspace_misses:", -1, "\n"); absl::StrAppend(&info, "total_reads_processed:", m.conn_stats.io_read_cnt, "\n"); diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 453c6ed..3c95d13 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -42,7 +42,7 @@ OpResult SetCmd::Set(const SetParams& params, std::string_view key, std::s DCHECK_LT(params.db_index, db_slice_->db_array_size()); DCHECK(db_slice_->IsDbValid(params.db_index)); - VLOG(2) << "Set (" << db_slice_->shard_id() << ") "; + VLOG(2) << "Set " << key << "(" << db_slice_->shard_id() << ") "; auto [it, expire_it] = db_slice_->FindExt(params.db_index, key); uint64_t at_ms = params.expire_after_ms ? params.expire_after_ms + db_slice_->Now() : 0; @@ -51,30 +51,35 @@ OpResult SetCmd::Set(const SetParams& params, std::string_view key, std::s if (params.how == SET_IF_NOTEXIST) return OpStatus::SKIPPED; + PrimeValue& prime_value = it->second; if (params.prev_val) { - if (it->second.ObjType() != OBJ_STRING) + if (prime_value.ObjType() != OBJ_STRING) return OpStatus::WRONG_TYPE; string val; - it->second.GetString(&val); + prime_value.GetString(&val); params.prev_val->emplace(move(val)); } if (IsValid(expire_it) && at_ms) { expire_it->second.Set(at_ms - db_slice_->expire_base()); } else { - db_slice_->Expire(params.db_index, it, at_ms); + bool changed = db_slice_->Expire(params.db_index, it, at_ms); + if (changed && at_ms == 0) // erased. + return OpStatus::OK; } db_slice_->PreUpdate(params.db_index, it); // Check whether we need to update flags table. - bool req_flag_update = (params.memcache_flags != 0) != it->second.HasFlag(); + bool req_flag_update = (params.memcache_flags != 0) != prime_value.HasFlag(); if (req_flag_update) { - it->second.SetFlag(params.memcache_flags != 0); + prime_value.SetFlag(params.memcache_flags != 0); db_slice_->SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags); } - it->second.SetString(value); + prime_value.SetString(value); // erases all masks. + prime_value.SetExpire(at_ms != 0); // set expire mask. + db_slice_->PostUpdate(params.db_index, it); return OpStatus::OK; @@ -99,7 +104,6 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); string_view value = ArgS(args, 2); - VLOG(2) << "Set " << key << " " << value; SetCmd::SetParams sparams{cntx->db_index()}; sparams.memcache_flags = cntx->conn_state.memcache_flag; @@ -140,7 +144,7 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) { } else if (cur_arg == "XX") { sparams.how = SetCmd::SET_IF_EXISTS; } else if (cur_arg == "KEEPTTL") { - sparams.keep_expire = true; + sparams.keep_expire = true; // TODO } else { return builder->SendError(kSyntaxErr); } diff --git a/src/server/string_family_test.cc b/src/server/string_family_test.cc index 54caa1d..fcabcb3 100644 --- a/src/server/string_family_test.cc +++ b/src/server/string_family_test.cc @@ -252,4 +252,10 @@ TEST_F(StringFamilyTest, MSetIncr) { get_fb.join(); } +TEST_F(StringFamilyTest, SetEx) { + ASSERT_THAT(Run({"setex", "key", "10", "val"}), RespEq("OK")); + ASSERT_THAT(Run({"setex", "key", "10", "val"}), RespEq("OK")); + ASSERT_THAT(Run({"setex", "key", "10", "val"}), RespEq("OK")); +} + } // namespace dfly