Allow cache mode - intelligent eviction of less likely to be used items.

This id one by shifting right slots in a stash bucket of the full segment.
In addition, I added eviction related stats to memory and stats section.
I also updated README with the changes. Finally, I added a flag that allows
to disable http-admin console in dragonfly.
This commit is contained in:
Roman Gershman 2022-05-16 19:39:13 +03:00
parent 797c8121b1
commit cfaf173236
9 changed files with 205 additions and 83 deletions

View File

@ -311,6 +311,12 @@ instance.
## Design decisions along the way
### Novel cache design
Redis allows choosing 8 different eviction policies using `maxmemory-policy` flag.
Dragonfly has one unified adaptive caching algorithm that is more memory efficient
than of Redis. You can enable caching mode by passing `--cache_mode=true` flag. Once this mode is on, Dragonfly will evict items when it reaches maxmemory limit.
### Expiration deadlines with relative accuracy
Expiration ranges are limited to ~4 years. Moreover, expiration deadlines
with millisecond precision (PEXPIRE/PSETEX etc) will be rounded to closest second
@ -318,4 +324,14 @@ with millisecond precision (PEXPIRE/PSETEX etc) will be rounded to closest secon
Such rounding has less than 0.001% error which I hope is acceptable for large ranges.
If it breaks your use-cases - talk to me or open an issue and explain your case.
For more detailed differences between this and Redis implementations [see here](doc/differences.md).
For more detailed differences between this and Redis implementations [see here](doc/differences.md).
### Native Http console and Prometheus compatible metrics
By default Dragonfly also allows http access on its main TCP port (6379). That's right, you
can use for Redis protocol and for HTTP protocol - type of protocol is determined automatically
during the connection initiation. Go ahead and try it with your browser.
Right now it does not have much info but in the future we are planning to add there useful
debugging and management info. If you go to `:6379/metrics` url you will see some prometheus
compatible metrics.
Important! Http console is meant to be accessed within a safe network. If you expose Dragonfly's TCP port externally, it is advised to disable the console with `--http_admin_console=false` or `--nohttp_admin_console`.

View File

@ -55,7 +55,7 @@ class DashTable : public detail::DashTableBase {
//! Total number of buckets in a segment (including stash).
static constexpr unsigned kPhysicalBucketNum = SegmentType::kTotalBuckets;
static constexpr unsigned kBucketSize = Policy::kSlotNum;
static constexpr unsigned kBucketWidth = Policy::kSlotNum;
static constexpr double kTaxAmount = SegmentType::kTaxSize;
static constexpr size_t kSegBytes = sizeof(SegmentType);
static constexpr size_t kSegCapacity = SegmentType::capacity();
@ -71,9 +71,20 @@ class DashTable : public detail::DashTableBase {
using bucket_iterator = Iterator<false, true>;
using cursor = detail::DashCursor;
struct EvictionBuckets {
bucket_iterator iter[2 + Policy::kStashBucketNum];
// uint64_t key_hash; // key_hash of a key that we try to insert.
struct HotspotBuckets {
static constexpr size_t kNumBuckets = 2 + Policy::kStashBucketNum;
bucket_iterator regular_buckets[2];
bucket_iterator stash_buckets[Policy::kStashBucketNum];
// id must be in the range [0, kNumBuckets).
bucket_iterator at(unsigned id) const {
return id < 2 ? regular_buckets[id] : stash_buckets[id - 2];
}
// key_hash of a key that we try to insert.
// I use it as pseudo-random number in my gc/eviction heuristics.
uint64_t key_hash;
};
struct DefaultEvictionPolicy {
@ -214,14 +225,7 @@ class DashTable : public detail::DashTableBase {
void Clear();
// Returns true if an element was deleted i.e the rightmost slot was busy.
bool ShiftRight(bucket_iterator it) {
auto cb = [this](const auto& k) { return policy_.HashFn(k); };
auto* seg = segment_[it.seg_id_];
bool deleted = seg->ShiftRight(it.bucket_id_, std::move(cb));
size_ -= unsigned(deleted);
return deleted;
}
bool ShiftRight(bucket_iterator it);
iterator BumpUp(iterator it) {
SegmentIterator seg_it =
@ -234,10 +238,6 @@ class DashTable : public detail::DashTableBase {
return garbage_collected_;
}
uint64_t evicted() const {
return evicted_;
}
uint64_t stash_unloaded() const {
return stash_unloaded_;
}
@ -264,7 +264,6 @@ class DashTable : public detail::DashTableBase {
std::pmr::vector<SegmentType*> segment_;
uint64_t garbage_collected_ = 0;
uint64_t evicted_ = 0;
uint64_t stash_unloaded_ = 0;
}; // DashTable
@ -561,6 +560,26 @@ void DashTable<_Key, _Value, Policy>::Clear() {
}
}
template <typename _Key, typename _Value, typename Policy>
bool DashTable<_Key, _Value, Policy>::ShiftRight(bucket_iterator it) {
auto* seg = segment_[it.seg_id_];
typename Segment_t::Hash_t hash_val;
auto& bucket = seg->GetBucket(it.bucket_id_);
if (bucket.GetBusy() & (1 << (kBucketWidth - 1))) {
it.slot_id_ = kBucketWidth - 1;
hash_val = DoHash(it->first);
policy_.DestroyKey(it->first);
policy_.DestroyValue(it->second);
}
bool deleted = seg->ShiftRight(it.bucket_id_, hash_val);
size_ -= unsigned(deleted);
return deleted;
}
template <typename _Key, typename _Value, typename Policy>
template <typename Cb>
void DashTable<_Key, _Value, Policy>::IterateUnique(Cb&& cb) {
@ -683,13 +702,13 @@ auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, Evictio
// Try gc.
uint8_t bid[2];
SegmentType::FillProbeArray(key_hash, bid);
EvictionBuckets buckets;
buckets.iter[0] = bucket_iterator{this, seg_id, bid[0], 0};
buckets.iter[1] = bucket_iterator{this, seg_id, bid[1], 0};
HotspotBuckets buckets;
buckets.key_hash = key_hash;
buckets.regular_buckets[0] = bucket_iterator{this, seg_id, bid[0], 0};
buckets.regular_buckets[1] = bucket_iterator{this, seg_id, bid[1], 0};
for (unsigned i = 0; i < Policy::kStashBucketNum; ++i) {
buckets.iter[2 + i] = bucket_iterator{this, seg_id, uint8_t(kLogicalBucketNum + i), 0};
buckets.stash_buckets[i] = bucket_iterator{this, seg_id, uint8_t(kLogicalBucketNum + i), 0};
}
// The difference between gc and eviction is that gc can be applied even if
@ -714,7 +733,6 @@ auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, Evictio
bool can_grow = ev.CanGrow(*this);
if (!can_grow) {
unsigned res = ev.Evict(buckets, this);
evicted_ += res;
if (res)
continue;
}

View File

@ -505,9 +505,11 @@ template <typename _Key, typename _Value, typename Policy = DefaultSegmentPolicy
// Shifts all slots in the bucket right.
// Returns true if the last slot was busy and the entry has been deleted.
template <typename HashFn> bool ShiftRight(unsigned bid, HashFn&& hfn) {
bool ShiftRight(unsigned bid, Hash_t right_hashval) {
if (bid >= kNumBuckets) { // Stash
RemoveStashReference(bid - kNumBuckets, hfn(Key(bid, kNumSlots - 1)));
constexpr auto kLastSlotMask = 1u << (kNumSlots - 1);
if (bucket_[bid].GetBusy() & kLastSlotMask)
RemoveStashReference(bid - kNumBuckets, right_hashval);
}
return bucket_[bid].ShiftRight();
@ -976,8 +978,8 @@ template <typename Key, typename Value, typename Policy>
bool Segment<Key, Value, Policy>::Bucket::ShiftRight() {
bool res = BucketType::ShiftRight();
for (int i = NUM_SLOTS - 1; i > 0; i--) {
key[i] = key[i - 1];
value[i] = value[i - 1];
std::swap(key[i], key[i - 1]);
std::swap(value[i], value[i - 1]);
}
return res;
}

View File

@ -527,11 +527,11 @@ struct TestEvictionPolicy {
void RecordSplit(Dash64::Segment_t*) {
}
unsigned Evict(const Dash64::EvictionBuckets& eb, Dash64* me) const {
unsigned Evict(const Dash64::HotspotBuckets& hotb, Dash64* me) const {
if (!evict_enabled)
return 0;
auto it = eb.iter[0];
auto it = hotb.regular_buckets[0];
unsigned res = 0;
for (; !it.is_done(); ++it) {
LOG(INFO) << "Deleting " << it->first;
@ -549,21 +549,69 @@ struct TestEvictionPolicy {
TEST_F(DashTest, Eviction) {
TestEvictionPolicy ev(1500);
size_t i = 0;
size_t num = 0;
auto loop = [&] {
for (; i < 5000; ++i) {
dt_.Insert(i, 0, ev);
for (; num < 5000; ++num) {
dt_.Insert(num, 0, ev);
}
};
ASSERT_THROW(loop(), bad_alloc);
ASSERT_LT(i, 5000);
ASSERT_LT(num, 5000);
EXPECT_LT(dt_.size(), ev.max_capacity);
LOG(INFO) << "size is " << dt_.size();
set<uint64_t> keys;
Dash64::bucket_iterator bit = dt_.begin();
unsigned last_slot = 0;
while (!bit.is_done()) {
keys.insert(bit->first);
last_slot = bit.slot_id();
++bit;
}
ASSERT_LT(last_slot, Dash64::kBucketWidth);
bit = dt_.begin();
dt_.ShiftRight(bit);
bit = dt_.begin();
size_t sz = 0;
while (!bit.is_done()) {
EXPECT_EQ(1, keys.count(bit->first));
++sz;
++bit;
}
EXPECT_EQ(sz, keys.size());
while (!dt_.GetSegment(0)->GetBucket(0).IsFull()) {
try {
dt_.Insert(num++, 0, ev);
} catch(bad_alloc&) {
}
}
// Now the bucket is full.
keys.clear();
uint64_t last_key = dt_.GetSegment(0)->Key(0, Dash64::kBucketWidth - 1);
for (Dash64::bucket_iterator bit = dt_.begin(); !bit.is_done(); ++bit) {
keys.insert(bit->first);
}
bit = dt_.begin();
dt_.ShiftRight(bit);
bit = dt_.begin();
sz = 0;
while (!bit.is_done()) {
EXPECT_NE(last_key, bit->first);
EXPECT_EQ(1, keys.count(bit->first));
++sz;
++bit;
}
EXPECT_EQ(sz + 1, keys.size());
ev.evict_enabled = true;
unsigned bucket_cnt = dt_.bucket_count();
auto [it, res] = dt_.Insert(i, 0, ev);
auto [it, res] = dt_.Insert(num, 0, ev);
EXPECT_TRUE(res);
EXPECT_EQ(bucket_cnt, dt_.bucket_count());
}
@ -741,28 +789,30 @@ struct SimpleEvictPolicy {
// Required interface in case can_gc is true
// returns number of items evicted from the table.
// 0 means - nothing has been evicted.
unsigned Evict(const U64Dash::EvictionBuckets& eb, U64Dash* me) {
// kBucketSize - number of slots in the bucket.
constexpr size_t kNumSlots = ABSL_ARRAYSIZE(eb.iter) * U64Dash::kBucketSize;
unsigned Evict(const U64Dash::HotspotBuckets& hotb, U64Dash* me) {
constexpr unsigned kNumBuckets = U64Dash::HotspotBuckets::kNumBuckets;
unsigned slot_index = std::uniform_int_distribution<uint32_t>{0, kNumSlots - 1}(rand_eng_);
auto it = eb.iter[slot_index / U64Dash::kBucketSize];
it += (slot_index % Dash64::kBucketSize);
uint32_t bid = hotb.key_hash % kNumBuckets;
auto it = hotb.at(bid);
unsigned slot_index = (hotb.key_hash >> 32) % U64Dash::kBucketWidth;
it += slot_index;
DCHECK(!it.is_done());
me->Erase(it);
++evicted;
return 1;
}
size_t max_capacity = SIZE_MAX;
default_random_engine rand_eng_{42};
unsigned evicted = 0;
// default_random_engine rand_eng_{42};
};
struct ShiftRightPolicy {
absl::flat_hash_map<uint64_t, unsigned> evicted;
unsigned index = 0;
size_t max_capacity = SIZE_MAX;
unsigned evicted_sum = 0;
static constexpr bool can_gc = false;
static constexpr bool can_evict = true;
@ -774,20 +824,22 @@ struct ShiftRightPolicy {
void RecordSplit(U64Dash::Segment_t* segment) {
}
unsigned Evict(const U64Dash::EvictionBuckets& eb, U64Dash* me) {
constexpr unsigned kBucketNum = ABSL_ARRAYSIZE(eb.iter);
constexpr unsigned kNumStashBuckets = kBucketNum - 2;
unsigned Evict(const U64Dash::HotspotBuckets& hotb, U64Dash* me) {
constexpr unsigned kNumStashBuckets = ABSL_ARRAYSIZE(hotb.stash_buckets);
unsigned stash_pos = index++ % kNumStashBuckets;
auto stash_it = eb.iter[2 + stash_pos];
stash_it += (U64Dash::kBucketSize - 1); // go to the last slot.
unsigned stash_pos = hotb.key_hash % kNumStashBuckets;
auto stash_it = hotb.stash_buckets[stash_pos];
stash_it += (U64Dash::kBucketWidth - 1); // go to the last slot.
uint64_t k = stash_it->first;
DVLOG(1) << "Deleting key " << k << " from " << stash_it.bucket_id() << "/"
<< stash_it.slot_id();
evicted[k]++;
return me->ShiftRight(stash_it);
CHECK(me->ShiftRight(stash_it));
++evicted_sum;
return 1;
};
};
@ -840,7 +892,7 @@ TEST_P(EvictionPolicyTest, HitRate) {
}
}
LOG(INFO) << "Zipf: " << GetParam().zipf_param << ", hits " << hits << " evictions "
<< dt_.evicted();
<< ev_policy.evicted;
}
TEST_P(EvictionPolicyTest, HitRateZipf) {
@ -867,7 +919,7 @@ TEST_P(EvictionPolicyTest, HitRateZipf) {
}
}
LOG(INFO) << "Zipf: " << GetParam().PrintTo() << " hits " << hits << " evictions "
<< dt_.evicted();
<< ev_policy.evicted;
}
TEST_P(EvictionPolicyTest, HitRateZipfShr) {
@ -909,7 +961,7 @@ TEST_P(EvictionPolicyTest, HitRateZipfShr) {
sort(freq_evicted.rbegin(), freq_evicted.rend());
LOG(INFO) << "Params " << GetParam().PrintTo() << " hits " << hits << " evictions "
<< dt_.evicted() << " "
<< ev_policy.evicted_sum << " "
<< "reinserted " << inserted_evicted;
unsigned num_outs = 0;
for (const auto& k_v : freq_evicted) {

View File

@ -20,6 +20,7 @@
#include "util/uring/uring_socket.h"
DEFINE_bool(tcp_nodelay, false, "Configures dragonfly connections with socket option TCP_NODELAY");
DEFINE_bool(http_admin_console, true, "If true allows accessing http console on main TCP port");
using namespace util;
using namespace std;
@ -199,7 +200,9 @@ void Connection::HandleRequests() {
}
FiberSocketBase* peer = tls_sock ? (FiberSocketBase*)tls_sock.get() : socket_.get();
io::Result<bool> http_res = CheckForHttpProto(peer);
io::Result<bool> http_res{false};
if (FLAGS_http_admin_console)
http_res = CheckForHttpProto(peer);
if (http_res) {
if (*http_res) {

View File

@ -35,14 +35,21 @@ static_assert(kPrimeSegmentSize == 32288);
// 24576
static_assert(kExpireSegmentSize == 23528);
void UpdateStatsOnDeletion(PrimeIterator it, InternalDbStats* stats) {
size_t value_heap_size = it->second.MallocUsed();
stats->inline_keys -= it->first.IsInline();
stats->obj_memory_usage -= (it->first.MallocUsed() + value_heap_size);
if (it->second.ObjType() == OBJ_STRING)
stats->strval_memory_usage -= value_heap_size;
}
class PrimeEvictionPolicy {
public:
static constexpr bool can_evict = false;
static constexpr bool can_evict = true; // we implement eviction functionality.
static constexpr bool can_gc = true;
PrimeEvictionPolicy(DbIndex db_indx, DbSlice* db_slice, int64_t mem_budget)
: db_slice_(db_slice), mem_budget_(mem_budget) {
db_indx_ = db_indx;
PrimeEvictionPolicy(DbIndex db_indx, bool can_evict, DbSlice* db_slice, int64_t mem_budget)
: db_slice_(db_slice), mem_budget_(mem_budget), db_indx_(db_indx), can_evict_(can_evict) {
}
void RecordSplit(PrimeTable::Segment_t* segment) {
@ -54,26 +61,35 @@ class PrimeEvictionPolicy {
return mem_budget_ > int64_t(PrimeTable::kSegBytes);
}
unsigned GarbageCollect(const PrimeTable::EvictionBuckets& eb, PrimeTable* me);
unsigned GarbageCollect(const PrimeTable::HotspotBuckets& eb, PrimeTable* me);
unsigned Evict(const PrimeTable::HotspotBuckets& eb, PrimeTable* me);
int64_t mem_budget() const {
return mem_budget_;
}
unsigned evicted() const {
return evicted_;
}
private:
DbSlice* db_slice_;
int64_t mem_budget_;
DbIndex db_indx_;
unsigned evicted_ = 0;
const DbIndex db_indx_;
// unlike static constexpr can_evict, this parameter tells whether we can evict
// items in runtime.
const bool can_evict_;
};
unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::EvictionBuckets& eb,
PrimeTable* me) {
unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& eb, PrimeTable* me) {
unsigned res = 0;
for (unsigned i = 0; i < ABSL_ARRAYSIZE(eb.iter); ++i) {
auto it = eb.iter[i];
for (; !it.is_done(); ++it) {
if (it->second.HasExpire()) {
auto [prime_it, exp_it] = db_slice_->ExpireIfNeeded(db_indx_, it);
for (unsigned i = 0; i < PrimeTable::HotspotBuckets::kNumBuckets; ++i) {
auto bucket_it = eb.at(i);
for (; !bucket_it.is_done(); ++bucket_it) {
if (bucket_it->second.HasExpire()) {
auto [prime_it, exp_it] = db_slice_->ExpireIfNeeded(db_indx_, bucket_it);
if (prime_it.is_done())
++res;
}
@ -83,6 +99,25 @@ unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::EvictionBuckets&
return res;
}
unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeTable* me) {
if (!can_evict_)
return 0;
constexpr size_t kNumStashBuckets = ABSL_ARRAYSIZE(eb.stash_buckets);
// choose "randomly" a stash bucket to evict an item.
auto bucket_it = eb.stash_buckets[eb.key_hash % kNumStashBuckets];
auto last_slot_it = bucket_it;
last_slot_it += (PrimeTable::kBucketWidth -1);
if (!last_slot_it.is_done()) {
UpdateStatsOnDeletion(last_slot_it, db_slice_->MutableStats(db_indx_));
}
CHECK(me->ShiftRight(bucket_it));
++evicted_;
return 1;
}
} // namespace
#define ADD(x) (x) += o.x
@ -266,7 +301,8 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pair<PrimeIterator
}
}
PrimeEvictionPolicy evp{db_index, this, int64_t(memory_budget_ - key.size())};
PrimeEvictionPolicy evp{db_index, bool(caching_mode_), this,
int64_t(memory_budget_ - key.size())};
// Fast-path if change_cb_ is empty so we Find or Add using
// the insert operation: twice more efficient.
@ -279,6 +315,7 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pair<PrimeIterator
events_.garbage_collected += db->prime_table.garbage_collected();
events_.stash_unloaded = db->prime_table.stash_unloaded();
events_.evicted_keys += evp.evicted();
it.SetVersion(NextVersion());
memory_budget_ = evp.mem_budget();
@ -348,12 +385,7 @@ bool DbSlice::Del(DbIndex db_ind, PrimeIterator it) {
CHECK_EQ(1u, db->mcflag_table.Erase(it->first));
}
size_t value_heap_size = it->second.MallocUsed();
db->stats.inline_keys -= it->first.IsInline();
db->stats.obj_memory_usage -= (it->first.MallocUsed() + value_heap_size);
if (it->second.ObjType() == OBJ_STRING)
db->stats.obj_memory_usage -= value_heap_size;
UpdateStatsOnDeletion(it, &db->stats);
db->prime_table.Erase(it);
return true;
@ -593,12 +625,7 @@ pair<PrimeIterator, ExpireIterator> DbSlice::ExpireIfNeeded(DbIndex db_ind,
return make_pair(it, expire_it);
db->expire_table.Erase(expire_it);
db->stats.inline_keys -= it->first.IsInline();
size_t value_heap_size = it->second.MallocUsed();
db->stats.obj_memory_usage -= (it->first.MallocUsed() + value_heap_size);
if (it->second.ObjType() == OBJ_STRING)
db->stats.strval_memory_usage -= value_heap_size;
UpdateStatsOnDeletion(it, &db->stats);
db->prime_table.Erase(it);
++events_.expired_keys;

View File

@ -58,13 +58,12 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t*
}
tmp_str1 = sdsempty();
tmp_str2 = sdsempty();
db_slice_.UpdateExpireBase(absl::GetCurrentTimeNanos() / 1000000, 0);
}
EngineShard::~EngineShard() {
sdsfree(tmp_str1);
sdsfree(tmp_str2);
}
void EngineShard::Shutdown() {

View File

@ -119,7 +119,7 @@ class EngineShard {
}
// for everyone to use for string transformations during atomic cpu sequences.
sds tmp_str1, tmp_str2;
sds tmp_str1;
#if 0
size_t TEST_WatchedDbsLen() const {

View File

@ -41,6 +41,7 @@ DEFINE_string(dbfilename, "", "the filename to save/load the DB");
DEFINE_string(requirepass, "", "password for AUTH authentication");
DECLARE_uint32(port);
DECLARE_bool(cache_mode);
extern "C" mi_stats_t _mi_stats_main;
@ -561,6 +562,9 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("listpack_blobs", total.listpack_blob_cnt);
append("listpack_bytes", total.listpack_bytes);
append("small_string_bytes", m.small_string_bytes);
append("maxmemory", max_memory_limit);
append("maxmemory_human", HumanReadableNumBytes(max_memory_limit));
append("cache_mode", FLAGS_cache_mode ? "cache" : "store");
}
if (should_enter("STATS")) {
@ -575,6 +579,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("instantaneous_output_kbps", -1);
append("rejected_connections", -1);
append("expired_keys", m.events.expired_keys);
append("evicted_keys", m.events.evicted_keys);
append("garbage_collected", m.events.garbage_collected);
append("bump_ups", m.events.bumpups);
append("stash_unloaded", m.events.stash_unloaded);