From 5568205b346d86d6a6ec44f674a651126d30d5c9 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 5 May 2022 12:05:05 +0300 Subject: [PATCH] More work on tiered storage. 1. Reads from external storage support now o_direct mode. 2. Simplify write unloading logic. Make pending buffer a ring buffer with a predefined capacity. 3. Add more tiered stats to info command --- .github/workflows/docker-release.yml | 2 + CMakeLists.txt | 1 - helio | 2 +- src/server/common.cc | 5 +- src/server/common.h | 4 + src/server/db_slice.cc | 66 +++++-- src/server/db_slice.h | 46 ++--- src/server/dfly_main.cc | 4 + src/server/io_mgr.cc | 31 +++ src/server/io_mgr.h | 3 +- src/server/server_family.cc | 5 +- src/server/string_family.cc | 3 +- src/server/tiered_storage.cc | 280 ++++++++++++++------------- src/server/tiered_storage.h | 24 ++- tools/docker/Dockerfile.ubuntu-prod | 2 +- 15 files changed, 287 insertions(+), 191 deletions(-) diff --git a/.github/workflows/docker-release.yml b/.github/workflows/docker-release.yml index eeca2b4..7b28d4f 100644 --- a/.github/workflows/docker-release.yml +++ b/.github/workflows/docker-release.yml @@ -47,6 +47,8 @@ jobs: with: context: . platforms: linux/amd64,linux/arm64 + build-args: | + QEMU_CPU='help' push: ${{ github.event_name != 'pull_request' }} tags: | ghcr.io/${{ github.actor }}/dragonfly-ubuntu:latest diff --git a/CMakeLists.txt b/CMakeLists.txt index 62aa38d..5d7dae4 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,7 +15,6 @@ set(CMAKE_CXX_STANDARD 17) set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/helio/cmake" ${CMAKE_MODULE_PATH}) option(BUILD_SHARED_LIBS "Build shared libraries" OFF) -set(Boost_USE_STATIC_LIBS ON) include(third_party) include(internal) diff --git a/helio b/helio index a024151..cd13f66 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit a024151f24180d493b51909b6853cfd16ae6367d +Subproject commit cd13f66ea2d6b98cfa3f5441fd151f41f9dc6966 diff --git a/src/server/common.cc b/src/server/common.cc index 403bf35..c37bf6e 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -19,6 +19,7 @@ thread_local ServerState ServerState::state_; atomic_uint64_t used_mem_peak(0); atomic_uint64_t used_mem_current(0); unsigned kernel_version = 0; +size_t max_memory_limit = 0; ServerState::ServerState() { } @@ -114,12 +115,12 @@ bool ParseHumanReadableBytes(std::string_view str, int64_t* num_bytes) { #define ADD(x) (x) += o.x TieredStats& TieredStats::operator+=(const TieredStats& o) { - static_assert(sizeof(TieredStats) == 24); + static_assert(sizeof(TieredStats) == 32); ADD(external_reads); ADD(external_writes); ADD(storage_capacity); - + ADD(storage_reserved); return *this; } diff --git a/src/server/common.h b/src/server/common.h index f7777b4..4100127 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -75,6 +75,9 @@ struct TieredStats { size_t external_writes = 0; size_t storage_capacity = 0; + // how much was reserved by actively stored items. + size_t storage_reserved = 0; + TieredStats& operator+=(const TieredStats&); }; @@ -96,6 +99,7 @@ bool ParseHumanReadableBytes(std::string_view str, int64_t* num_bytes); // Cached values, updated frequently to represent the correct state of the system. extern std::atomic_uint64_t used_mem_peak; extern std::atomic_uint64_t used_mem_current; +extern size_t max_memory_limit; // version 5.11 maps to 511 etc. // set upon server start. diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 31f3692..79457c7 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -86,20 +86,32 @@ class PrimeEvictionPolicy { #define ADD(x) (x) += o.x +PerDbStats& PerDbStats::operator+=(const PerDbStats& o) { + constexpr size_t kDbSz = sizeof(PerDbStats); + static_assert(kDbSz == 56); + + ADD(inline_keys); + ADD(obj_memory_usage); + ADD(strval_memory_usage); + ADD(listpack_blob_cnt); + ADD(listpack_bytes); + ADD(external_entries); + ADD(external_size); + + return *this; +} + DbStats& DbStats::operator+=(const DbStats& o) { - static_assert(sizeof(DbStats) == 80); + constexpr size_t kDbSz = sizeof(DbStats); + static_assert(kDbSz == 96); + + PerDbStats::operator+=(o); ADD(key_count); ADD(expire_count); ADD(bucket_count); - ADD(inline_keys); - - ADD(obj_memory_usage); ADD(table_mem_usage); ADD(small_string_bytes); - ADD(listpack_blob_cnt); - ADD(listpack_bytes); - ADD(external_entries); return *this; } @@ -150,12 +162,8 @@ auto DbSlice::GetStats() const -> Stats { s.db.key_count += db->prime_table.size(); s.db.bucket_count += db->prime_table.bucket_count(); s.db.expire_count += db->expire_table.size(); - s.db.obj_memory_usage += db->stats.obj_memory_usage; - s.db.inline_keys += db->stats.inline_keys; s.db.table_mem_usage += (db->prime_table.mem_usage() + db->expire_table.mem_usage()); - s.db.listpack_blob_cnt += db->stats.listpack_blob_cnt; - s.db.listpack_bytes += db->stats.listpack_bytes; - s.db.external_entries += db->stats.external_entries; + s.db += db->stats; } s.db.small_string_bytes = CompactObj::GetStats().small_string_bytes; @@ -275,7 +283,11 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pairstats.obj_memory_usage -= existing->second.MallocUsed(); + size_t value_heap_size = existing->second.MallocUsed(); + db->stats.obj_memory_usage -= value_heap_size; + if (existing->second.ObjType() == OBJ_STRING) + db->stats.obj_memory_usage -= value_heap_size; + existing->second.Reset(); events_.expired_keys++; @@ -313,8 +325,12 @@ bool DbSlice::Del(DbIndex db_ind, PrimeIterator it) { CHECK_EQ(1u, db->mcflag_table.Erase(it->first)); } + size_t value_heap_size = it->second.MallocUsed(); db->stats.inline_keys -= it->first.IsInline(); - db->stats.obj_memory_usage -= (it->first.MallocUsed() + it->second.MallocUsed()); + db->stats.obj_memory_usage -= (it->first.MallocUsed() + value_heap_size); + if (it->second.ObjType() == OBJ_STRING) + db->stats.obj_memory_usage -= value_heap_size; + db->prime_table.Erase(it); return true; @@ -413,7 +429,11 @@ pair DbSlice::AddOrFind(DbIndex db_ind, string_view key, Pr auto& db = *db_arr_[db_ind]; auto& new_it = res.first; - db.stats.obj_memory_usage += obj.MallocUsed(); + size_t value_heap_size = obj.MallocUsed(); + db.stats.obj_memory_usage += value_heap_size; + if (obj.ObjType() == OBJ_STRING) + db.stats.strval_memory_usage += value_heap_size; + new_it->second = std::move(obj); if (expire_at_ms) { @@ -514,13 +534,20 @@ void DbSlice::PreUpdate(DbIndex db_ind, PrimeIterator it) { for (const auto& ccb : change_cb_) { ccb.second(db_ind, ChangeReq{it}); } - db->stats.obj_memory_usage -= it->second.MallocUsed(); + size_t value_heap_size = it->second.MallocUsed(); + db->stats.obj_memory_usage -= value_heap_size; + if (it->second.ObjType() == OBJ_STRING) + db->stats.strval_memory_usage -= value_heap_size; + it.SetVersion(NextVersion()); } void DbSlice::PostUpdate(DbIndex db_ind, PrimeIterator it) { auto& db = db_arr_[db_ind]; - db->stats.obj_memory_usage += it->second.MallocUsed(); + size_t value_heap_size = it->second.MallocUsed(); + db->stats.obj_memory_usage += value_heap_size; + if (it->second.ObjType() == OBJ_STRING) + db->stats.strval_memory_usage += value_heap_size; } pair DbSlice::ExpireIfNeeded(DbIndex db_ind, @@ -541,7 +568,10 @@ pair DbSlice::ExpireIfNeeded(DbIndex db_ind, db->expire_table.Erase(expire_it); db->stats.inline_keys -= it->first.IsInline(); - db->stats.obj_memory_usage -= (it->first.MallocUsed() + it->second.MallocUsed()); + size_t value_heap_size = it->second.MallocUsed(); + db->stats.obj_memory_usage -= (it->first.MallocUsed() + value_heap_size); + if (it->second.ObjType() == OBJ_STRING) + db->stats.strval_memory_usage -= value_heap_size; db->prime_table.Erase(it); ++events_.expired_keys; diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 0121057..56c37f3 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -20,7 +20,23 @@ namespace dfly { using facade::OpResult; -struct DbStats { +struct PerDbStats { + // Number of inline keys. + uint64_t inline_keys = 0; + + // Object memory usage besides hash-table capacity. + // Applies for any non-inline objects. + size_t obj_memory_usage = 0; + size_t strval_memory_usage = 0; + size_t listpack_blob_cnt = 0; + size_t listpack_bytes = 0; + size_t external_entries = 0; + size_t external_size = 0; + + PerDbStats& operator+=(const PerDbStats& o); +}; + +struct DbStats : public PerDbStats { // number of active keys. size_t key_count = 0; @@ -30,23 +46,12 @@ struct DbStats { // number of buckets in dictionary (key capacity) size_t bucket_count = 0; - // Number of inline keys. - size_t inline_keys = 0; - - // Object memory usage besides hash-table capacity. - // Applies for any non-inline objects. - size_t obj_memory_usage = 0; - // Memory used by dictionaries. size_t table_mem_usage = 0; size_t small_string_bytes = 0; - size_t listpack_blob_cnt = 0; - size_t listpack_bytes = 0; - - size_t external_entries = 0; - + using PerDbStats::operator+=; DbStats& operator+=(const DbStats& o); }; @@ -65,24 +70,13 @@ class DbSlice { void operator=(const DbSlice&) = delete; public: + using PerDbStats = ::dfly::PerDbStats; + struct Stats { DbStats db; SliceEvents events; }; - struct PerDbStats { - // Number of inline keys. - uint64_t inline_keys = 0; - - // Object memory usage besides hash-table capacity. - // Applies for any non-inline objects. - size_t obj_memory_usage = 0; - size_t listpack_blob_cnt = 0; - size_t listpack_bytes = 0; - size_t external_entries = 0; - }; - - DbSlice(uint32_t index, EngineShard* owner); ~DbSlice(); diff --git a/src/server/dfly_main.cc b/src/server/dfly_main.cc index 09c320d..51abcd2 100644 --- a/src/server/dfly_main.cc +++ b/src/server/dfly_main.cc @@ -89,8 +89,12 @@ int main(int argc, char* argv[]) { LOG(INFO) << "Found " << HumanReadableNumBytes(available) << " available memory. Setting maxmemory to " << HumanReadableNumBytes(maxmemory); FLAGS_maxmemory = maxmemory; + } else { + LOG(INFO) << "Max memory limit is: " << HumanReadableNumBytes(FLAGS_maxmemory); } + dfly::max_memory_limit = FLAGS_maxmemory; + if (FLAGS_use_large_pages) { mi_option_enable(mi_option_large_os_pages); } diff --git a/src/server/io_mgr.cc b/src/server/io_mgr.cc index c753567..07283ef 100644 --- a/src/server/io_mgr.cc +++ b/src/server/io_mgr.cc @@ -5,6 +5,7 @@ #include "server/io_mgr.h" #include +#include #include "base/logging.h" #include "facade/facade_types.h" @@ -21,6 +22,15 @@ using uring::FiberCall; using uring::Proactor; namespace this_fiber = ::boost::this_fiber; +namespace { + +constexpr inline size_t alignup(size_t num, size_t align) { + size_t amask = align - 1; + return (num + amask) & (~amask); +} + +} // namespace + IoMgr::IoMgr() { flags_val = 0; } @@ -99,6 +109,27 @@ error_code IoMgr::WriteAsync(size_t offset, string_view blob, WriteCb cb) { } error_code IoMgr::Read(size_t offset, io::MutableBytes dest) { + DCHECK(!dest.empty()); + + if (FLAGS_backing_file_direct) { + size_t read_offs = offset & ~4095ULL; + size_t end_range = alignup(offset + dest.size(), 4096); + size_t space_needed = end_range - read_offs; + DCHECK_EQ(0u, space_needed % 4096); + + uint8_t* space = (uint8_t*)mi_malloc_aligned(space_needed, 4096); + iovec v{.iov_base = space, .iov_len = space_needed}; + error_code ec = backing_file_->Read(&v, 1, read_offs, 0); + if (ec) { + mi_free(space); + return ec; + } + + memcpy(dest.data(), space + offset - read_offs, dest.size()); + mi_free_size_aligned(space, space_needed, 4096); + return ec; + } + iovec v{.iov_base = dest.data(), .iov_len = dest.size()}; return backing_file_->Read(&v, 1, offset, 0); } diff --git a/src/server/io_mgr.h b/src/server/io_mgr.h index c8f05ae..d09df6f 100644 --- a/src/server/io_mgr.h +++ b/src/server/io_mgr.h @@ -36,7 +36,8 @@ class IoMgr { std::error_code WriteAsync(size_t offset, std::string_view blob, WriteCb cb); std::error_code Read(size_t offset, io::MutableBytes dest); - size_t Size() const { + // Total file span + size_t Span() const { return sz_; } diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 7b0a284..005d64f 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -515,6 +515,7 @@ tcp_port:)"; append("num_entries:", m.db.key_count); append("inline_keys:", m.db.inline_keys); append("small_string_bytes:", m.db.small_string_bytes); + append("strval_bytes:", m.db.strval_memory_usage); append("listpack_blobs:", m.db.listpack_blob_cnt); append("listpack_bytes:", m.db.listpack_bytes); } @@ -540,10 +541,12 @@ tcp_port:)"; } if (should_enter("TIERED", true)) { - ADD_HEADER("# TIERED_STORAGE"); + ADD_HEADER("# TIERED"); append("external_entries:", m.db.external_entries); + append("external_bytes:", m.db.external_size); append("external_reads:", m.tiered_stats.external_reads); append("external_writes:", m.tiered_stats.external_writes); + append("external_reserved:", m.tiered_stats.storage_reserved); append("external_capacity:", m.tiered_stats.storage_capacity); } diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 8cd6022..e7e54c7 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -136,7 +136,7 @@ OpResult SetCmd::Set(const SetParams& params, std::string_view key, std::s EngineShard* shard = db_slice_.shard_owner(); if (shard->tiered_storage()) { // external storage enabled. - if (value.size() >= 64 && value.size() < 2_MB) { + if (value.size() >= 64) { shard->tiered_storage()->UnloadItem(params.db_index, it); } } @@ -869,7 +869,6 @@ OpResult StringFamily::OpGet(const OpArgs& op_args, string_view key) { auto [offset, size] = pv.GetExternalPtr(); val.resize(size); - // TODO: can not work with O_DIRECT error_code ec = tiered->Read(offset, size, val.data()); CHECK(!ec) << "TBD: " << ec; } else { diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index b9bfbd4..18a4659 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -52,6 +52,7 @@ struct EntryHash { }; struct TieredStorage::ActiveIoRequest { + size_t file_offset; char* block_ptr; // entry -> offset @@ -59,7 +60,7 @@ struct TieredStorage::ActiveIoRequest { mi_stl_allocator>>*/ absl::flat_hash_map> entries; - ActiveIoRequest(size_t sz) { + ActiveIoRequest(size_t file_offs, size_t sz) : file_offset(file_offs) { DCHECK_EQ(0u, sz % 4096); block_ptr = (char*)mi_malloc_aligned(sz, 4096); DCHECK_EQ(0, intptr_t(block_ptr) % 4096); @@ -70,7 +71,7 @@ struct TieredStorage::ActiveIoRequest { } }; -void TieredStorage::SendIoRequest(size_t offset, size_t req_size, ActiveIoRequest* req) { +void TieredStorage::SendIoRequest(size_t req_size, ActiveIoRequest* req) { #if 1 // static string tmp(4096, 'x'); // string_view sv{tmp}; @@ -80,7 +81,7 @@ void TieredStorage::SendIoRequest(size_t offset, size_t req_size, ActiveIoReques [this] { return num_active_requests_ <= FLAGS_tiered_storage_max_pending_writes; }); auto cb = [this, req](int res) { FinishIoRequest(res, req); }; - io_mgr_.WriteAsync(offset, sv, move(cb)); + io_mgr_.WriteAsync(req->file_offset, sv, move(cb)); ++stats_.external_writes; #else @@ -104,9 +105,18 @@ void TieredStorage::FinishIoRequest(int io_res, ActiveIoRequest* req) { it->second.SetIoPending(false); if (success) { + auto* stats = db_slice_.MutableStats(ikey.db_indx); + + size_t heap_size = it->second.MallocUsed(); size_t item_size = it->second.Size(); + + stats->obj_memory_usage -= heap_size; + stats->strval_memory_usage -= heap_size; + it->second.SetExternal(k_v.second, item_size); - ++db_slice_.MutableStats(ikey.db_indx)->external_entries; + + stats->external_entries += 1; + stats->external_size += item_size; } } delete req; @@ -118,7 +128,7 @@ void TieredStorage::FinishIoRequest(int io_res, ActiveIoRequest* req) { VLOG_IF(1, num_active_requests_ == 0) << "Finished active requests"; } -TieredStorage::TieredStorage(DbSlice* db_slice) : db_slice_(*db_slice) { +TieredStorage::TieredStorage(DbSlice* db_slice) : db_slice_(*db_slice), pending_req_(256) { } TieredStorage::~TieredStorage() { @@ -129,8 +139,9 @@ TieredStorage::~TieredStorage() { error_code TieredStorage::Open(const string& path) { error_code ec = io_mgr_.Open(path); if (!ec) { - if (io_mgr_.Size()) { // Add initial storage. - alloc_.AddStorage(0, io_mgr_.Size()); + if (io_mgr_.Span()) { // Add initial storage. + alloc_.AddStorage(0, io_mgr_.Span()); + stats_.storage_capacity = io_mgr_.Span(); } } return ec; @@ -146,6 +157,13 @@ void TieredStorage::Shutdown() { io_mgr_.Shutdown(); } +bool TieredStorage::ShouldFlush() { + if (num_active_requests_ >= FLAGS_tiered_storage_max_pending_writes) + return false; + + return pending_req_.size() > pending_req_.capacity() / 2; +} + error_code TieredStorage::UnloadItem(DbIndex db_index, PrimeIterator it) { CHECK_EQ(OBJ_STRING, it->second.ObjType()); @@ -153,148 +171,146 @@ error_code TieredStorage::UnloadItem(DbIndex db_index, PrimeIterator it) { error_code ec; pending_unload_bytes_ += blob_len; - if (db_index >= db_arr_.size()) { + /*if (db_index >= db_arr_.size()) { db_arr_.resize(db_index + 1); } if (db_arr_[db_index] == nullptr) { db_arr_[db_index] = new PerDb; + }*/ + + // PerDb* db = db_arr_[db_index]; + pending_req_.EmplaceOrOverride(PendingReq{it.bucket_cursor().value(), db_index}); + // db->pending_upload[it.bucket_cursor().value()] += blob_len; + + // size_t grow_size = 0; + + if (ShouldFlush()) { + FlushPending(); } - PerDb* db = db_arr_[db_index]; - db->pending_upload[it.bucket_cursor().value()] += blob_len; - - size_t grow_size = 0; - if (!io_mgr_.grow_pending() && pending_unload_bytes_ >= ExternalAllocator::kMinBlockSize) { - grow_size = SerializePendingItems(); - } - - if (grow_size == 0 && alloc_.allocated_bytes() > size_t(alloc_.capacity() * 0.85)) { - grow_size = 1ULL << 28; - } - - if (grow_size && !io_mgr_.grow_pending()) { - size_t start = io_mgr_.Size(); - - auto cb = [start, grow_size, this](int io_res) { - if (io_res == 0) { - alloc_.AddStorage(start, grow_size); - stats_.storage_capacity += grow_size; - } else { - LOG_FIRST_N(ERROR, 10) << "Error enlarging storage " << io_res; - } - }; - - ec = io_mgr_.GrowAsync(grow_size, move(cb)); + // if we reached high utilization of the file range - try to grow the file. + if (alloc_.allocated_bytes() > size_t(alloc_.capacity() * 0.85)) { + InitiateGrow(1ULL << 28); } return ec; } -size_t TieredStorage::SerializePendingItems() { - DCHECK(!io_mgr_.grow_pending()); +bool IsObjFitToUnload(const PrimeValue& pv) { + return pv.ObjType() == OBJ_STRING && !pv.IsExternal() && pv.Size() >= 64 && !pv.HasIoPending(); +}; - vector> sorted_cursors; - constexpr size_t kArrLen = 64; +void TieredStorage::FlushPending() { + DCHECK(!io_mgr_.grow_pending() && !pending_req_.empty()); - PrimeTable::iterator iters[kArrLen]; - unsigned iter_count = 0; - bool break_early = false; + vector> canonic_req; + canonic_req.reserve(pending_req_.size()); - auto is_good = [](const PrimeValue& pv) { - return pv.ObjType() == OBJ_STRING && !pv.IsExternal() && pv.Size() >= 64 && !pv.HasIoPending(); - }; - - auto tr_cb = [&](PrimeTable::iterator it) { - if (is_good(it->second)) { - CHECK_LT(iter_count, kArrLen); - iters[iter_count++] = it; - } - }; - - size_t open_block_size = 0; - size_t file_offset = 0; - size_t block_offset = 0; - ActiveIoRequest* active_req = nullptr; - - for (size_t i = 0; i < db_arr_.size(); ++i) { - PerDb* db = db_arr_[i]; - if (db == nullptr || db->pending_upload.empty()) - continue; - - sorted_cursors.resize(db->pending_upload.size()); - size_t index = 0; - for (const auto& k_v : db->pending_upload) { - sorted_cursors[index++] = {k_v.second, k_v.first}; - } - sort(sorted_cursors.begin(), sorted_cursors.end(), std::greater<>()); - DbIndex db_ind = i; - - for (const auto& pair : sorted_cursors) { - uint64_t cursor_val = pair.second; - PrimeTable::cursor curs(cursor_val); - db_slice_.GetTables(db_ind).first->Traverse(curs, tr_cb); - - for (unsigned j = 0; j < iter_count; ++j) { - PrimeIterator it = iters[j]; - size_t item_size = it->second.Size(); - DCHECK_GT(item_size, 0u); - - if (item_size + block_offset > open_block_size) { - if (open_block_size > 0) { // need to close - // save the block asynchronously. - ++submitted_io_writes_; - submitted_io_write_size_ += open_block_size; - - SendIoRequest(file_offset, open_block_size, active_req); - open_block_size = 0; - } - - if (pending_unload_bytes_ < unsigned(0.8 * ExternalAllocator::kMinBlockSize)) { - break_early = true; - break; - } - - DCHECK_EQ(0u, open_block_size); - int64_t res = alloc_.Malloc(item_size); - if (res < 0) { - return -res; - } - - file_offset = res; - open_block_size = ExternalAllocator::GoodSize(item_size); - block_offset = 0; - ++num_active_requests_; - active_req = new ActiveIoRequest(open_block_size); - } - - DCHECK_LE(item_size + block_offset, open_block_size); - - it->second.GetString(active_req->block_ptr + block_offset); - - DCHECK(!it->second.HasIoPending()); - it->second.SetIoPending(true); - - IndexKey key(db_ind, it->first.AsRef()); - bool added = active_req->entries.emplace(move(key), file_offset + block_offset).second; - CHECK(added); - block_offset += item_size; // saved into opened block. - pending_unload_bytes_ -= item_size; - } - iter_count = 0; - db->pending_upload.erase(cursor_val); - } // sorted_cursors - - if (break_early) - break; - DCHECK(db->pending_upload.empty()); - } // db_arr - - if (open_block_size > 0) { - SendIoRequest(file_offset, open_block_size, active_req); + for (size_t i = 0; i < pending_req_.size(); ++i) { + const PendingReq* req = pending_req_.GetItem(i); + canonic_req.emplace_back(req->db_indx, req->cursor); + } + pending_req_.ConsumeHead(pending_req_.size()); + // remove duplicates and sort. + { + sort(canonic_req.begin(), canonic_req.end()); + auto it = unique(canonic_req.begin(), canonic_req.end()); + canonic_req.resize(it - canonic_req.begin()); } - return 0; + // TODO: we could add item size and sort from largest to smallest before + // the aggregation. + constexpr size_t kMaxBatchLen = 64; + PrimeTable::iterator single_batch[kMaxBatchLen]; + unsigned batch_len = 0; + + auto tr_cb = [&](PrimeTable::iterator it) { + if (IsObjFitToUnload(it->second)) { + CHECK_LT(batch_len, kMaxBatchLen); + single_batch[batch_len++] = it; + } + }; + + size_t active_batch_size = 0; + size_t batch_offset = 0; + ActiveIoRequest* active_req = nullptr; + + for (size_t i = 0; i < canonic_req.size(); ++i) { + DbIndex db_ind = canonic_req[i].first; + uint64_t cursor_val = canonic_req[i].second; + PrimeTable::cursor curs(cursor_val); + db_slice_.GetTables(db_ind).first->Traverse(curs, tr_cb); + + for (unsigned j = 0; j < batch_len; ++j) { + PrimeIterator it = single_batch[j]; + size_t item_size = it->second.Size(); + DCHECK_GT(item_size, 0u); + + if (item_size + batch_offset > active_batch_size) { + if (active_batch_size > 0) { // need to close + // save the block asynchronously. + ++submitted_io_writes_; + submitted_io_write_size_ += active_batch_size; + + SendIoRequest(active_batch_size, active_req); + active_batch_size = 0; + } + + DCHECK_EQ(0u, active_batch_size); + int64_t res = alloc_.Malloc(item_size); + if (res < 0) { + InitiateGrow(-res); + return; + } + + active_batch_size = ExternalAllocator::GoodSize(item_size); + active_req = new ActiveIoRequest(res, active_batch_size); + stats_.storage_reserved += active_batch_size; + + batch_offset = 0; + ++num_active_requests_; + } + + DCHECK_LE(item_size + batch_offset, active_batch_size); + + it->second.GetString(active_req->block_ptr + batch_offset); + + DCHECK(!it->second.HasIoPending()); + it->second.SetIoPending(true); + + IndexKey key(db_ind, it->first.AsRef()); + bool added = + active_req->entries.emplace(move(key), active_req->file_offset + batch_offset).second; + CHECK(added); + batch_offset += item_size; // saved into opened block. + } + batch_len = 0; + } + + if (active_batch_size > 0) { + SendIoRequest(active_batch_size, active_req); + } +} + +void TieredStorage::InitiateGrow(size_t grow_size) { + if (io_mgr_.grow_pending()) + return; + DCHECK_GT(grow_size, 0u); + + size_t start = io_mgr_.Span(); + + auto cb = [start, grow_size, this](int io_res) { + if (io_res == 0) { + alloc_.AddStorage(start, grow_size); + stats_.storage_capacity += grow_size; + } else { + LOG_FIRST_N(ERROR, 10) << "Error enlarging storage " << io_res; + } + }; + + error_code ec = io_mgr_.GrowAsync(grow_size, move(cb)); + CHECK(!ec) << "TBD"; // TODO } } // namespace dfly diff --git a/src/server/tiered_storage.h b/src/server/tiered_storage.h index b24e48f..3357772 100644 --- a/src/server/tiered_storage.h +++ b/src/server/tiered_storage.h @@ -5,6 +5,8 @@ #include +#include "base/ring_buffer.h" + #include "core/external_alloc.h" #include "server/common.h" #include "server/io_mgr.h" @@ -36,10 +38,11 @@ class TieredStorage { private: struct ActiveIoRequest; - // return 0 if everything was sent. - // if more storage is needed returns requested size in bytes. - size_t SerializePendingItems(); - void SendIoRequest(size_t offset, size_t req_size, ActiveIoRequest* req); + bool ShouldFlush(); + + void FlushPending(); + void InitiateGrow(size_t size); + void SendIoRequest(size_t req_size, ActiveIoRequest* req); void FinishIoRequest(int io_res, ActiveIoRequest* req); DbSlice& db_slice_; @@ -59,12 +62,21 @@ class TieredStorage { }; struct PerDb { - // map of cursor -> pending size - absl::flat_hash_map pending_upload; absl::flat_hash_map active_requests; }; std::vector db_arr_; + + struct PendingReq { + uint64_t cursor; + DbIndex db_indx = kInvalidDbId; + }; + + base::RingBuffer pending_req_; + + // map of cursor -> pending size + // absl::flat_hash_map pending_upload; + TieredStats stats_; }; diff --git a/tools/docker/Dockerfile.ubuntu-prod b/tools/docker/Dockerfile.ubuntu-prod index b2d07ca..d4fddfe 100644 --- a/tools/docker/Dockerfile.ubuntu-prod +++ b/tools/docker/Dockerfile.ubuntu-prod @@ -6,7 +6,7 @@ COPY src/ ./src/ COPY helio/ ./helio/ COPY patches/ ./patches/ COPY CMakeLists.txt ./ -RUN ./helio/blaze.sh -release +RUN ./helio/blaze.sh -release -DBoost_USE_STATIC_LIBS=ON WORKDIR build-opt RUN ninja dragonfly