Track active io requests in tiered storage

This commit is contained in:
Roman Gershman 2022-04-20 19:05:56 +03:00
parent 7c29ea445f
commit 0f19e60a81
6 changed files with 108 additions and 30 deletions

View File

@ -325,7 +325,7 @@ void ExternalAllocator::AddStorage(size_t offset, size_t size) {
capacity_ += size;
}
size_t ExternalAllocator::GoogSize(size_t sz) {
size_t ExternalAllocator::GoodSize(size_t sz) {
uint8_t bin_idx = ToBinIdx(sz);
return ToBlockSize(bin_idx);
}

View File

@ -67,7 +67,7 @@ class ExternalAllocator {
// Similar to mi_good_size, returns the size of the underlying block as if
// were returned by Malloc. Guaranteed that the result not less than sz.
// No allocation is done.
static size_t GoogSize(size_t sz);
static size_t GoodSize(size_t sz);
size_t capacity() const {
return capacity_;

View File

@ -71,19 +71,14 @@ error_code IoMgr::GrowAsync(size_t len, GrowCb cb) {
error_code IoMgr::WriteAsync(size_t offset, string_view blob, WriteCb cb) {
DCHECK(!blob.empty());
uring::Proactor* proactor = (uring::Proactor*)ProactorBase::me();
Proactor* proactor = (Proactor*)ProactorBase::me();
uint8_t* ptr = new uint8_t[blob.size()];
memcpy(ptr, blob.data(), blob.size());
auto ring_cb = [ptr, cb = move(cb)](uring::Proactor::IoResult res, uint32_t flags,
int64_t payload) {
auto ring_cb = [cb = move(cb)](Proactor::IoResult res, uint32_t flags, int64_t payload) {
cb(res);
delete[] ptr;
};
uring::SubmitEntry se = proactor->GetSubmitEntry(move(ring_cb), 0);
se.PrepWrite(backing_file_->fd(), ptr, blob.size(), offset);
se.PrepWrite(backing_file_->fd(), blob.data(), blob.size(), offset);
return error_code{};
}

View File

@ -30,12 +30,8 @@ class IoMgr {
// passing other values will check-fail.
std::error_code GrowAsync(size_t len, GrowCb cb);
std::error_code Write(size_t offset, std::string_view blob) {
return backing_file_->Write(io::Buffer(blob), offset, 0);
}
// Returns error if submission failed. Otherwise - returns the io result
// via cb.
// via cb. A caller must make sure that the blob exists until cb is called.
std::error_code WriteAsync(size_t offset, std::string_view blob, WriteCb cb);
size_t Size() const { return sz_; }

View File

@ -10,10 +10,81 @@ extern "C" {
#include "base/logging.h"
#include "server/db_slice.h"
#include "util/proactor_base.h"
namespace dfly {
using namespace std;
struct IndexKey {
DbIndex db_indx;
PrimeKey key;
IndexKey() {}
// We define here a weird copy constructor because map uses pair<const PrimeKey,..>
// and "const" prevents moving IndexKey.
IndexKey(const IndexKey& o) : db_indx(o.db_indx), key(o.key.AsRef()) {
}
IndexKey(IndexKey&&) = default;
IndexKey(DbIndex i, PrimeKey k) : db_indx(i), key(std::move(k)) {}
bool operator==(const IndexKey& ik) const {
return ik.db_indx == db_indx && ik.key == key;
}
// IndexKey& operator=(IndexKey&&) {}
// IndexKey& operator=(const IndexKey&) =delete;
};
struct EntryHash {
size_t operator()(const IndexKey& ik) const {
return ik.key.HashCode() ^ (size_t(ik.db_indx) << 16);
}
};
struct TieredStorage::ActiveIoRequest {
char* block_ptr;
// entry -> offset
absl::flat_hash_map<IndexKey, size_t, EntryHash> entries;
ActiveIoRequest(size_t sz) {
DCHECK_EQ(0u, sz % 4096);
block_ptr = (char*)aligned_malloc(sz, 4096);
DCHECK_EQ(0, intptr_t(block_ptr) % 4096);
}
~ActiveIoRequest() {
free(block_ptr);
}
};
void TieredStorage::FinishIoRequest(int io_res, ActiveIoRequest* req) {
bool success = true;
if (io_res < 0) {
LOG(ERROR) << "Error writing into ssd file: " << util::detail::SafeErrorMessage(-io_res);
success = false;
}
for (const auto& k_v : req->entries) {
const IndexKey& ikey = k_v.first;
PrimeTable* pt = db_slice_.GetTables(ikey.db_indx).first;
PrimeIterator it = pt->Find(ikey.key);
CHECK(!it.is_done()) << "TBD";
CHECK(it->second.HasIoPending());
it->second.SetIoPending(false);
if (success) {
size_t item_size = it->second.Size();
it->second.SetExternal(k_v.second, item_size);
}
}
delete req;
}
TieredStorage::TieredStorage(DbSlice* db_slice) : db_slice_(*db_slice) {
}
@ -50,7 +121,7 @@ void TieredStorage::UnloadItem(DbIndex db_index, PrimeIterator it) {
}
PerDb* db = db_arr_[db_index];
db->pending_upload_[it.bucket_cursor().value()] += blob_len;
db->pending_upload[it.bucket_cursor().value()] += blob_len;
size_t grow_size = 0;
if (!io_mgr_.grow_pending() && pending_unload_bytes_ > 4080) {
@ -99,16 +170,16 @@ size_t TieredStorage::SerializePendingItems() {
size_t open_block_size = 0;
size_t file_offset = 0;
size_t block_offset = 0;
char* block_ptr = nullptr;
ActiveIoRequest* active_req = nullptr;
for (size_t i = 0; i < db_arr_.size(); ++i) {
PerDb* db = db_arr_[i];
if (db == nullptr || db->pending_upload_.empty())
if (db == nullptr || db->pending_upload.empty())
continue;
sorted_cursors.resize(db->pending_upload_.size());
sorted_cursors.resize(db->pending_upload.size());
size_t index = 0;
for (const auto& k_v : db->pending_upload_) {
for (const auto& k_v : db->pending_upload) {
sorted_cursors[index++] = {k_v.second, k_v.first};
}
sort(sorted_cursors.begin(), sorted_cursors.end(), std::greater<>());
@ -130,8 +201,8 @@ size_t TieredStorage::SerializePendingItems() {
++submitted_io_writes_;
submitted_io_write_size_ += open_block_size;
string_view sv{block_ptr, open_block_size};
auto cb = [block_ptr](int res) { delete[] block_ptr; };
string_view sv{active_req->block_ptr, open_block_size};
auto cb = [this, active_req](int res) { FinishIoRequest(res, active_req); };
io_mgr_.WriteAsync(file_offset, sv, move(cb));
open_block_size = 0;
@ -144,26 +215,32 @@ size_t TieredStorage::SerializePendingItems() {
}
file_offset = res;
open_block_size = ExternalAllocator::GoogSize(item_size);
open_block_size = ExternalAllocator::GoodSize(item_size);
block_offset = 0;
block_ptr = new char[open_block_size];
active_req = new ActiveIoRequest(open_block_size);
}
DCHECK_LE(item_size + block_offset, open_block_size);
it->second.GetString(block_ptr + block_offset);
it->second.GetString(active_req->block_ptr + block_offset);
DCHECK(!it->second.HasIoPending());
it->second.SetIoPending(true);
IndexKey key(db_ind, it->first.AsRef());
active_req->entries.try_emplace(move(key), file_offset + block_offset);
block_offset += item_size; // saved into opened block.
pending_unload_bytes_ -= item_size;
it->second.SetIoPending(true);
}
count = 0;
db->pending_upload_.erase(cursor_val);
db->pending_upload.erase(cursor_val);
} // sorted_cursors
} // db_arr
if (open_block_size > 0) {
auto cb = [block_ptr](int res) { delete[] block_ptr; };
string_view sv{block_ptr, open_block_size};
auto cb = [this, active_req](int res) { FinishIoRequest(res, active_req); };
string_view sv{active_req->block_ptr, open_block_size};
io_mgr_.WriteAsync(file_offset, sv, move(cb));
}

View File

@ -25,10 +25,12 @@ class TieredStorage {
void UnloadItem(DbIndex db_index, PrimeIterator it);
private:
struct ActiveIoRequest;
// return 0 if everything was sent.
// if more storage is needed returns requested size in bytes.
size_t SerializePendingItems();
void FinishIoRequest(int io_res, ActiveIoRequest* req);
DbSlice& db_slice_;
IoMgr io_mgr_;
@ -38,10 +40,18 @@ class TieredStorage {
size_t submitted_io_writes_ = 0;
size_t submitted_io_write_size_ = 0;
struct Hasher {
size_t operator()(const PrimeKey& o) const {
return o.HashCode();
}
};
struct PerDb {
// map of cursor -> pending size
absl::flat_hash_map<uint64_t, size_t> pending_upload_;
absl::flat_hash_map<uint64_t, size_t> pending_upload;
absl::flat_hash_map<PrimeKey, ActiveIoRequest*, Hasher> active_requests;
};
std::vector<PerDb*> db_arr_;
};