Improve eviction policy design for dash table

This commit is contained in:
Roman Gershman 2022-03-13 22:52:21 +02:00
parent cceb0d90ca
commit 4fcb74930e
8 changed files with 201 additions and 58 deletions

View File

@ -201,17 +201,34 @@ class DashTable : public detail::DashTableBase {
using const_bucket_iterator = Iterator<true, true>;
using bucket_iterator = Iterator<false, true>;
struct EvictionCandidates {
struct EvictionBuckets {
bucket_iterator iter[2 + Policy::kStashBucketNum];
uint64_t key_hash; // key_hash of a key that we try to insert.
// uint64_t key_hash; // key_hash of a key that we try to insert.
};
// EvictionCb is called when Insertion needs to evict items in a segment to make room for a new
// item.
using EvictionCb = std::function<unsigned(const EvictionCandidates&)>;
struct EvictionPolicy {
EvictionCb evict_cb;
size_t max_capacity = UINT64_MAX;
struct DefaultEvictionPolicy {
static constexpr bool can_gc = false;
static constexpr bool can_evict = false;
bool CanGrow(const DashTable&) {
return true;
}
/*
/// Required interface in case can_gc is true
// Returns number of garbage collected items deleted. 0 - means nothing has been
// deleted.
unsigned GarbageCollect(const EvictionBuckets& eb, DashTable* me) const {
return 0;
}
// Required interface in case can_gc is true
// returns number of items evicted from the table.
// 0 means - nothing has been evicted.
unsigned Evict(const EvictionBuckets& eb, DashTable* me) {
return 0;
}
*/
};
DashTable(size_t capacity_log = 1, const Policy& policy = Policy{},
@ -222,11 +239,11 @@ class DashTable : public detail::DashTableBase {
// false for duplicate, true if inserted.
template <typename U, typename V> std::pair<iterator, bool> Insert(U&& key, V&& value) {
return InsertInternal(std::forward<U>(key), std::forward<V>(value), EvictionPolicy{});
return InsertInternal(std::forward<U>(key), std::forward<V>(value), DefaultEvictionPolicy{});
}
template <typename U, typename V>
std::pair<iterator, bool> Insert(U&& key, V&& value, const EvictionPolicy& ev) {
template <typename U, typename V, typename EvictionPolicy>
std::pair<iterator, bool> Insert(U&& key, V&& value, EvictionPolicy& ev) {
return InsertInternal(std::forward<U>(key), std::forward<V>(value), ev);
}
@ -310,9 +327,12 @@ class DashTable : public detail::DashTableBase {
void Clear();
uint64_t garbage_collected() const { return garbage_collected_;}
uint64_t evicted() const { return evicted_;}
private:
template <typename U, typename V>
std::pair<iterator, bool> InsertInternal(U&& key, V&& value, const EvictionPolicy& policy);
template <typename U, typename V, typename EvictionPolicy>
std::pair<iterator, bool> InsertInternal(U&& key, V&& value, EvictionPolicy&& policy);
void IncreaseDepth(unsigned new_depth);
void Split(uint32_t seg_id);
@ -330,6 +350,9 @@ class DashTable : public detail::DashTableBase {
Policy policy_;
std::pmr::vector<SegmentType*> segment_;
uint64_t garbage_collected_ = 0;
uint64_t evicted_ = 0;
};
template <typename _Key, typename _Value, typename Policy>
@ -529,8 +552,8 @@ void DashTable<_Key, _Value, Policy>::Reserve(size_t size) {
}
template <typename _Key, typename _Value, typename Policy>
template <typename U, typename V>
auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, const EvictionPolicy& ev)
template <typename U, typename V, typename EvictionPolicy>
auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, EvictionPolicy&& ev)
-> std::pair<iterator, bool> {
uint64_t key_hash = DoHash(key);
uint32_t seg_id = SegmentId(key_hash);
@ -553,28 +576,44 @@ auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, const E
return std::make_pair(iterator{this, seg_id, it.index, it.slot}, false);
}
// We need to resize the table but first check if we need to trigger
// eviction policy.
if (SegmentType::capacity() + bucket_count() > ev.max_capacity) {
if (ev.evict_cb) {
// Try eviction.
uint8_t bid[2];
SegmentType::FillProbeArray(key_hash, bid);
EvictionCandidates candidates;
// try garbage collect or evict.
if constexpr (ev.can_evict || ev.can_gc) {
// Try eviction.
uint8_t bid[2];
SegmentType::FillProbeArray(key_hash, bid);
EvictionBuckets buckets;
candidates.key_hash = key_hash;
candidates.iter[0] = bucket_iterator{this, seg_id, bid[0], 0};
candidates.iter[1] = bucket_iterator{this, seg_id, bid[1], 0};
buckets.iter[0] = bucket_iterator{this, seg_id, bid[0], 0};
buckets.iter[1] = bucket_iterator{this, seg_id, bid[1], 0};
for (unsigned i = 0; i < Policy::kStashBucketNum; ++i) {
candidates.iter[2 + i] = bucket_iterator{this, seg_id, uint8_t(kLogicalBucketNum + i), 0};
}
unsigned deleted = ev.evict_cb(candidates);
if (deleted) {
continue; // Succeed to evict - retry insertion.
for (unsigned i = 0; i < Policy::kStashBucketNum; ++i) {
buckets.iter[2 + i] = bucket_iterator{this, seg_id, uint8_t(kLogicalBucketNum + i), 0};
}
// The difference between gc and eviction is that gc can be applied even if
// the table can grow since we throw away logically deleted items.
// For eviction to be applied we should reach the growth limit.
if constexpr (ev.can_gc) {
unsigned res = ev.GarbageCollect(buckets, this);
garbage_collected_ += res;
if (res)
continue;
}
// We evict only if our policy says we can not grow
if constexpr (ev.can_evict) {
bool can_grow = ev.CanGrow(*this);
if (!can_grow) {
unsigned res = ev.Evict(buckets, this);
evicted_ += res;
if (res)
continue;
}
}
break; // stop, we can not grow
}
if (!ev.CanGrow(*this)) {
throw std::bad_alloc{};
}
// Split the segment.

View File

@ -339,7 +339,7 @@ TEST_F(DashTest, InsertOOM) {
dt.Insert(i, 0);
}
},
std::bad_alloc);
bad_alloc);
}
struct Item {
@ -462,32 +462,53 @@ TEST_F(DashTest, Bucket) {
EXPECT_EQ(s.size(), num_items);
}
TEST_F(DashTest, Eviction) {
Dash64::EvictionPolicy ev;
ev.max_capacity = 1500;
size_t i = 0;
for (; i < 5000; ++i) {
auto [it, res] = dt_.Insert(i, 0, ev);
if (!res)
break;
}
ASSERT_LT(i, 5000);
EXPECT_LT(dt_.size(), ev.max_capacity);
LOG(INFO) << "size is " << dt_.size();
struct TestEvictionPolicy {
static constexpr bool can_evict = true;
static constexpr bool can_gc = false;
unsigned bucket_cnt = dt_.bucket_count();
Dash64::EvictionCb cb = [this](const Dash64::EvictionCandidates& cand) -> unsigned {
auto it = cand.iter[0];
explicit TestEvictionPolicy(unsigned max_cap) : max_capacity(max_cap) {
}
bool CanGrow(const Dash64& tbl) const {
return tbl.bucket_count() < max_capacity;
}
unsigned Evict(const Dash64::EvictionBuckets& eb, Dash64* me) const {
if (!evict_enabled)
return 0;
auto it = eb.iter[0];
unsigned res = 0;
for (; !it.is_done(); ++it) {
LOG(INFO) << "Deleting " << it->first;
dt_.Erase(it);
me->Erase(it);
++res;
}
return res;
}
bool evict_enabled = false;
unsigned max_capacity;
};
TEST_F(DashTest, Eviction) {
TestEvictionPolicy ev(1500);
size_t i = 0;
auto loop = [&] {
for (; i < 5000; ++i) {
dt_.Insert(i, 0, ev);
}
};
ev.evict_cb = cb;
ASSERT_THROW(loop(), bad_alloc);
ASSERT_LT(i, 5000);
EXPECT_LT(dt_.size(), ev.max_capacity);
LOG(INFO) << "size is " << dt_.size();
ev.evict_enabled = true;
unsigned bucket_cnt = dt_.bucket_count();
auto [it, res] = dt_.Insert(i, 0, ev);
EXPECT_TRUE(res);
EXPECT_EQ(bucket_cnt, dt_.bucket_count());

View File

@ -52,4 +52,52 @@ const char* GlobalState::Name(S s) {
ABSL_INTERNAL_UNREACHABLE;
}
bool ParseHumanReadableBytes(std::string_view str, int64_t* num_bytes) {
if (str.empty())
return false;
const char* cstr = str.data();
bool neg = (*cstr == '-');
if (neg) {
cstr++;
}
char* end;
double d = strtod(cstr, &end);
// If this didn't consume the entire string, fail.
if (end + 1 < str.end())
return false;
int64 scale = 1;
switch (*end) {
// NB: an int64 can only go up to <8 EB.
case 'E':
scale <<= 10; // Fall through...
case 'P':
scale <<= 10;
case 'T':
scale <<= 10;
case 'G':
scale <<= 10;
case 'M':
scale <<= 10;
case 'K':
case 'k':
scale <<= 10;
case 'B':
case '\0':
break; // To here.
default:
return false;
}
d *= scale;
if (d > kint64max || d < 0)
return false;
*num_bytes = static_cast<int64>(d + 0.5);
if (neg) {
*num_bytes = -*num_bytes;
}
return true;
}
} // namespace dfly

View File

@ -68,4 +68,6 @@ inline void ToLower(const MutableSlice* val) {
}
}
bool ParseHumanReadableBytes(std::string_view str, int64_t* num_bytes);
} // namespace dfly

View File

@ -96,11 +96,14 @@ class DbSlice {
expire_base_[generation & 1] = now;
}
void SetMaxMemory(size_t max_memory) {
max_memory_ = max_memory;
}
uint64_t expire_base() const {
return expire_base_[0];
}
// returns wall clock in millis as it has been set via UpdateExpireClock.
uint64_t Now() const {
return now_ms_;
@ -224,6 +227,7 @@ class DbSlice {
uint64_t expire_base_[2]; // Used for expire logic, represents a real clock.
uint64_t version_ = 1; // Used to version entries in the PrimeTable.
uint64_t max_memory_ = -1;
mutable SliceEvents events_; // we may change this even for const operations.
using LockTable = absl::flat_hash_map<std::string, IntentLock>;

View File

@ -5,6 +5,8 @@
#include <mimalloc.h>
#include "base/init.h"
#include "base/proc_util.h"
#include "facade/dragonfly_listener.h"
#include "server/main_service.h"
#include "util/accept_server.h"
@ -13,6 +15,8 @@
DECLARE_uint32(port);
DECLARE_uint32(memcache_port);
DECLARE_uint64(maxmemory);
DEFINE_bool(use_large_pages, false, "If true - uses large memory pages for allocations");
using namespace util;
using namespace std;
@ -20,7 +24,13 @@ using namespace facade;
namespace dfly {
void RunEngine(ProactorPool* pool, AcceptServer* acceptor) {
bool RunEngine(ProactorPool* pool, AcceptServer* acceptor) {
if (FLAGS_maxmemory > 0 && FLAGS_maxmemory < pool->size() * 256_MB ) {
LOG(ERROR) << "Max memory is less than 256MB per thread. Exiting...";
return false;
}
Service service(pool);
service.Init(acceptor);
@ -34,6 +44,7 @@ void RunEngine(ProactorPool* pool, AcceptServer* acceptor) {
acceptor->Wait();
service.Shutdown();
return true;
}
} // namespace dfly
@ -48,7 +59,17 @@ int main(int argc, char* argv[]) {
CHECK_GT(FLAGS_port, 0u);
mi_option_enable(mi_option_large_os_pages);
base::sys::KernelVersion kver;
base::sys::GetKernelVersion(&kver);
if (kver.major < 5 || (kver.major == 5 && kver.minor < 11)) {
LOG(ERROR) << "Kernel 5.11 or later is supported. Exiting...";
return 1;
}
if (FLAGS_use_large_pages) {
mi_option_enable(mi_option_large_os_pages);
}
mi_option_enable(mi_option_show_errors);
mi_option_set(mi_option_max_warnings, 0);
_mi_options_init();
@ -58,9 +79,9 @@ int main(int argc, char* argv[]) {
AcceptServer acceptor(&pp);
dfly::RunEngine(&pp, &acceptor);
int res = dfly::RunEngine(&pp, &acceptor) ? 0 : -1;
pp.Stop();
return 0;
return res;
}

View File

@ -33,9 +33,14 @@ extern "C" {
#include "util/uring/uring_fiber_algo.h"
#include "util/varz.h"
// TODO: to move to absl flags and keep legacy flags only for glog library.
// absl flags allow parsing of custom types and allow specifying which flags appear
// for helpshort.
DEFINE_uint32(port, 6380, "Redis port");
DEFINE_uint32(memcache_port, 0, "Memcached port");
DECLARE_string(requirepass);
DEFINE_uint64(maxmemory, 0, "Limit on maximum-memory that is used by the database");
namespace dfly {

View File

@ -72,10 +72,11 @@ error_code CreateDirs(fs::path dir_path) {
return ec;
}
atomic_uint64_t used_mem_peak(0);
} // namespace
atomic_uint64_t used_mem_peak(0);
atomic_uint64_t used_mem_current(0);
ServerFamily::ServerFamily(Service* service) : service_(*service), ess_(service->shard_set()) {
start_time_ = time(NULL);
last_save_.store(start_time_, memory_order_release);
@ -96,6 +97,8 @@ void ServerFamily::Init(util::AcceptServer* acceptor) {
for (const auto& s : stats)
sum += s.used_memory.load(memory_order_relaxed);
used_mem_current.store(sum, memory_order_relaxed);
// Single writer, so no races.
if (sum > used_mem_peak.load(memory_order_relaxed))
used_mem_peak.store(sum, memory_order_relaxed);