Add safe cursor API to dash table

This commit is contained in:
Roman Gershman 2022-04-14 21:31:31 +03:00
parent ad3bdbf499
commit bc92ace19c
11 changed files with 109 additions and 38 deletions

View File

@ -45,7 +45,7 @@ size_t DictMallocSize(dict* d) {
size_t res = zmalloc_usable_size(d->ht_table[0]) + zmalloc_usable_size(d->ht_table[1]) +
znallocx(sizeof(dict));
return res = dictSize(d) * 16; // approximation.
return res + dictSize(d) * 16; // approximation.
}
inline void FreeObjSet(unsigned encoding, void* ptr, pmr::memory_resource* mr) {
@ -68,9 +68,9 @@ size_t MallocUsedSet(unsigned encoding, void* ptr) {
return 0; // TODO
case kEncodingIntSet:
return intsetBlobLen((intset*)ptr);
default:
LOG(FATAL) << "Unknown set encoding type " << encoding;
}
LOG(DFATAL) << "Unknown set encoding type " << encoding;
return 0;
}
@ -80,9 +80,8 @@ size_t MallocUsedHSet(unsigned encoding, void* ptr) {
return lpBytes(reinterpret_cast<uint8_t*>(ptr));
case OBJ_ENCODING_HT:
return DictMallocSize((dict*)ptr);
default:
LOG(FATAL) << "Unknown set encoding type " << encoding;
}
LOG(DFATAL) << "Unknown set encoding type " << encoding;
return 0;
}
@ -93,10 +92,9 @@ size_t MallocUsedZSet(unsigned encoding, void* ptr) {
case OBJ_ENCODING_SKIPLIST: {
zset* zs = (zset*)ptr;
return DictMallocSize(zs->dict);
} break;
default:
LOG(FATAL) << "Unknown set encoding type " << encoding;
}
}
LOG(DFATAL) << "Unknown set encoding type " << encoding;
return 0;
}
@ -217,7 +215,7 @@ size_t RobjWrapper::MallocUsed() const {
CHECK_EQ(OBJ_ENCODING_RAW, encoding_);
return InnerObjMallocUsed();
case OBJ_LIST:
CHECK_EQ(encoding_, OBJ_ENCODING_QUICKLIST);
DCHECK_EQ(encoding_, OBJ_ENCODING_QUICKLIST);
return QlMAllocSize((quicklist*)inner_obj_);
case OBJ_SET:
return MallocUsedSet(encoding_, inner_obj_);
@ -371,7 +369,6 @@ void RobjWrapper::MakeInnerRoom(size_t current_cap, size_t desired, pmr::memory_
inner_obj_ = newp;
}
#pragma GCC push_options
#pragma GCC optimize("Ofast")
@ -694,8 +691,7 @@ void CompactObj::SetString(std::string_view str) {
if (rev_len == str.size()) {
mask |= ASCII2_ENC_BIT; // str hits its highest bound.
} else {
CHECK_EQ(str.size(), rev_len - 1)
<< "Bad ascii encoding for len " << str.size();
CHECK_EQ(str.size(), rev_len - 1) << "Bad ascii encoding for len " << str.size();
mask |= ASCII1_ENC_BIT;
}
@ -845,7 +841,7 @@ size_t CompactObj::MallocUsed() const {
return u_.small_str.MallocUsed();
}
LOG(FATAL) << "TBD";
LOG(DFATAL) << "should not reach";
return 0;
}

View File

@ -225,6 +225,8 @@ class CompactObj {
void SetString(std::string_view str);
void GetString(std::string* res) const;
// In case this object a single blob, returns number of bytes allocated on heap
// for that blob. Otherwise returns 0.
size_t MallocUsed() const;
// Resets the object to empty state.

View File

@ -183,13 +183,21 @@ class DashTable : public detail::DashTableBase {
return !(lhs == rhs);
}
// debug accessors.
// Bucket resolution cursor that is safe to use with insertions/removals.
// Serves as a hint really to the placement of the original item, i.e. the item
// could have moved.
detail::DashCursor bucket_cursor() const {
return detail::DashCursor(owner_->global_depth_, seg_id_, bucket_id_);
}
unsigned bucket_id() const {
return bucket_id_;
}
unsigned slot_id() const {
return slot_id_;
}
unsigned segment_id() const {
return seg_id_;
}
@ -200,6 +208,7 @@ class DashTable : public detail::DashTableBase {
using const_bucket_iterator = Iterator<true, true>;
using bucket_iterator = Iterator<false, true>;
using cursor = detail::DashCursor;
struct EvictionBuckets {
bucket_iterator iter[2 + Policy::kStashBucketNum];
@ -305,7 +314,7 @@ class DashTable : public detail::DashTableBase {
// It guarantees that if key exists at the beginning of traversal, stays in the table during the
// traversal, it will eventually reach it even when the table shrinks or grows.
// Returns: cursor that is guaranteed to be less than 2^40.
template <typename Cb> uint64_t Traverse(uint64_t cursor, Cb&& cb);
template <typename Cb> cursor Traverse(cursor curs, Cb&& cb);
// Takes an iterator pointing to an entry in a dash bucket and traverses all bucket's entries by
// calling cb(iterator) for every non-empty slot. The iteration goes over a physical bucket.
@ -315,6 +324,10 @@ class DashTable : public detail::DashTableBase {
return const_bucket_iterator{it.owner_, it.seg_id_, it.bucket_id_, 0};
}
const_bucket_iterator CursorToBucketIt(cursor c) const {
return const_bucket_iterator{this, c.segment_id(global_depth_), c.bucket_id(), 0};
}
// Capture Version Change. Runs cb(it) on every bucket! (not entry) in the table whose version
// would potentially change upon insertion of 'k'.
// In practice traversal is limited to a single segment. The operation is read-only and
@ -673,13 +686,13 @@ void DashTable<_Key, _Value, Policy>::Split(uint32_t seg_id) {
template <typename _Key, typename _Value, typename Policy>
template <typename Cb>
uint64_t DashTable<_Key, _Value, Policy>::Traverse(uint64_t cursor, Cb&& cb) {
unsigned bid = cursor & 0xFF;
if (bid >= kLogicalBucketNum) // sanity.
auto DashTable<_Key, _Value, Policy>::Traverse(cursor curs, Cb&& cb) -> cursor {
if (curs.bucket_id() >= kLogicalBucketNum) // sanity.
return 0;
uint32_t sid = cursor >> (40 - global_depth_);
uint32_t sid = curs.segment_id(global_depth_);
uint8_t bid = curs.bucket_id();
auto hash_fun = [this](const auto& k) { return policy_.HashFn(k); };
bool fetched = false;
@ -700,7 +713,7 @@ uint64_t DashTable<_Key, _Value, Policy>::Traverse(uint64_t cursor, Cb&& cb) {
}
} while (!fetched);
return (uint64_t(sid) << (40 - global_depth_)) | bid;
return cursor{global_depth_, sid, bid};
}
template <typename _Key, typename _Value, typename Policy>

View File

@ -532,14 +532,14 @@ class DashTableBase {
public:
explicit DashTableBase(uint32_t gd)
: initial_depth_(gd), global_depth_(gd), unique_segments_(1 << gd) {
: unique_segments_(1 << gd), initial_depth_(gd), global_depth_(gd) {
}
uint32_t unique_segments() const {
return unique_segments_;
}
uint32_t depth() const {
uint16_t depth() const {
return global_depth_;
}
@ -556,10 +556,10 @@ class DashTableBase {
return 0;
}
uint32_t initial_depth_;
uint32_t global_depth_;
uint32_t unique_segments_;
size_t size_ = 0;
uint32_t unique_segments_;
uint8_t initial_depth_;
uint8_t global_depth_;
}; // DashTableBase
template <typename _Key, typename _Value> class IteratorPair {
@ -579,6 +579,48 @@ template <typename _Key, typename _Value> class IteratorPair {
_Value& second;
};
// Represents a cursor that points to a bucket in dash table.
// One major difference with iterator is that the cursor survives dash table resizes and
// will always point to the most appropriate segment with the same bucket.
// It uses 40 lsb bits out of 64 assuming that number of segments does not cross 4B.
// It's a reasonable assumption in shared nothing architecture when we usually have no more than
// 32GB per CPU. Each segment spawns hundreds of entries so we can not grow segment table
// to billions.
class DashCursor {
public:
DashCursor(uint64_t val = 0) : val_(val) {
}
DashCursor(uint8_t depth, uint32_t seg_id, uint8_t bid)
: val_((uint64_t(seg_id) << (40 - depth)) | bid) {
}
uint8_t bucket_id() const {
return val_ & 0xFF;
}
// segment_id is padded to the left of 32 bit region:
// | segment_id......| bucket_id
// 40 8 0
// By using depth we take most significant bits of segment_id if depth has decreased
// since the cursort was created, or extend the least significant bits with zeros if
// depth has increased.
uint32_t segment_id(uint8_t depth) {
return val_ >> (40 - depth);
}
uint64_t value() const {
return val_;
}
explicit operator bool() const {
return val_ != 0;
}
private:
uint64_t val_;
};
/***********************************************************
* Implementation section.
*/

View File

@ -425,7 +425,8 @@ TEST_F(DashTest, Traverse) {
for (size_t i = 0; i < kNumItems; ++i) {
dt_.Insert(i, i);
}
uint64_t cursor = 0;
Dash64::cursor cursor;
vector<unsigned> nums;
auto tr_cb = [&](Dash64::iterator it) {
nums.push_back(it->first);
@ -434,7 +435,7 @@ TEST_F(DashTest, Traverse) {
do {
cursor = dt_.Traverse(cursor, tr_cb);
} while (cursor != 0);
} while (cursor);
sort(nums.begin(), nums.end());
nums.resize(unique(nums.begin(), nums.end()) - nums.begin());
ASSERT_EQ(kNumItems, nums.size());

View File

@ -265,6 +265,7 @@ int64_t ExternalAllocator::Malloc(size_t sz) {
size_t pos = page->free_blocks._Find_first();
page->free_blocks.flip(pos);
--page->available;
allocated_bytes_ += ToBlockSize(page->block_size_bin);
SegmentDescr* seg = ToSegDescr(page);
return seg->BlockOffset(page, pos);
@ -298,6 +299,7 @@ void ExternalAllocator::Free(size_t offset, size_t sz) {
if (page->available == blocks_num) {
FreePage(page, seg, block_size);
}
allocated_bytes_ -= block_size;
}
void ExternalAllocator::AddStorage(size_t offset, size_t size) {
@ -319,6 +321,8 @@ void ExternalAllocator::AddStorage(size_t offset, size_t size) {
if (next != added_segs_.end()) {
CHECK_LE(offset + size, next->first);
}
capacity_ += size;
}
size_t ExternalAllocator::GoogSize(size_t sz) {

View File

@ -69,6 +69,14 @@ class ExternalAllocator {
// No allocation is done.
static size_t GoogSize(size_t sz);
size_t capacity() const {
return capacity_;
}
size_t allocated_bytes() const {
return allocated_bytes_;
}
private:
class SegmentDescr;
using Page = detail::Page;
@ -88,6 +96,9 @@ class ExternalAllocator {
// weird queue to support AddStorage interface. We can not instantiate segment
// until we know its class and that we know only when a page is demanded.
absl::btree_map<size_t, size_t> added_segs_;
size_t capacity_ = 0; // in bytes.
size_t allocated_bytes_ = 0;
};
} // namespace dfly

View File

@ -46,7 +46,6 @@ TEST_F(ExternalAllocatorTest, Invariants) {
std::map<int64_t, size_t> ranges;
int64_t res = 0;
size_t sum = 0;
while (res >= 0) {
for (unsigned j = 1; j < 5; ++j) {
size_t sz = 4000 * j;
@ -55,10 +54,10 @@ TEST_F(ExternalAllocatorTest, Invariants) {
break;
auto [it, added] = ranges.emplace(res, sz);
ASSERT_TRUE(added);
sum += sz;
}
}
EXPECT_GT(sum, kSegSize / 2);
EXPECT_GT(ext_alloc_.allocated_bytes(), ext_alloc_.capacity() * 0.75);
off_t last = 0;
for (const auto& k_v : ranges) {

View File

@ -124,6 +124,7 @@ void DebugCmd::Reload(CmdArgList args) {
CHECK_NOTNULL(cid);
intrusive_ptr<Transaction> trans(new Transaction{cid, &ess});
trans->InitByArgs(0, {});
VLOG(1) << "Performing save";
ec = sf_.DoSave(trans.get(), &err_details);
if (ec) {
return (*cntx_)->SendError(absl::StrCat(err_details, ec.message()));
@ -133,6 +134,7 @@ void DebugCmd::Reload(CmdArgList args) {
const CommandId* cid = sf_.service().FindCmd("FLUSHALL");
intrusive_ptr<Transaction> flush_trans(new Transaction{cid, &ess});
flush_trans->InitByArgs(0, {});
VLOG(1) << "Performing flush";
ec = sf_.DoFlush(flush_trans.get(), DbSlice::kDbAll);
if (ec) {
LOG(ERROR) << "Error flushing db " << ec.message();
@ -154,6 +156,7 @@ void DebugCmd::Reload(CmdArgList args) {
return;
}
VLOG(1) << "Performing load";
io::FileSource fs(*res);
RdbLoader loader(&ess);

View File

@ -642,15 +642,15 @@ void GenericFamily::OpScan(const OpArgs& op_args, string_view pattern, string_vi
VLOG(1) << "PrimeTable " << db_slice.shard_id() << "/" << op_args.db_ind << " has "
<< db_slice.DbSize(op_args.db_ind);
uint64_t cur = *cursor;
PrimeTable::cursor cur = *cursor;
auto [prime_table, expire_table] = db_slice.GetTables(op_args.db_ind);
do {
cur = prime_table->Traverse(
cur, [&](PrimeIterator it) { cnt += ScanCb(op_args, it, pattern, type_filter, vec); });
} while (cur && cnt < limit);
VLOG(1) << "OpScan " << db_slice.shard_id() << " cursor: " << cur;
*cursor = cur;
VLOG(1) << "OpScan " << db_slice.shard_id() << " cursor: " << cur.value();
*cursor = cur.value();
}
bool GenericFamily::ScanCb(const OpArgs& op_args, PrimeIterator it, string_view pattern,

View File

@ -71,7 +71,6 @@ void SliceSnapshot::Join() {
static_assert(sizeof(PrimeTable::const_iterator) == 16);
void SliceSnapshot::SerializeSingleEntry(PrimeIterator it) {
uint64_t expire_time = 0;
if (it->second.HasExpire()) {
auto eit = expire_tbl_->Find(it->first);
@ -86,7 +85,7 @@ void SliceSnapshot::SerializeSingleEntry(PrimeIterator it) {
void SliceSnapshot::FiberFunc() {
this_fiber::properties<FiberProps>().set_name(
absl::StrCat("SliceSnapshot", ProactorBase::GetIndex()));
uint64_t cursor = 0;
PrimeTable::cursor cursor;
static_assert(PHYSICAL_LEN > PrimeTable::kPhysicalBucketNum);
uint64_t last_yield = 0;
@ -94,7 +93,8 @@ void SliceSnapshot::FiberFunc() {
// Traverse a single logical bucket but do not update its versions.
// we can not update a version because entries in the same bucket share part of the version.
// Therefore we save first, and then update version in one atomic swipe.
uint64_t next = prime_table_->Traverse(cursor, [this](auto it) { this->SaveCb(move(it)); });
PrimeTable::cursor next =
prime_table_->Traverse(cursor, [this](auto it) { this->SaveCb(move(it)); });
cursor = next;
physical_mask_.reset();
@ -109,7 +109,7 @@ void SliceSnapshot::FiberFunc() {
// flush in case other fibers (writes commands that pushed previous values) filled the file.
FlushSfile(false);
}
} while (cursor > 0);
} while (cursor);
DVLOG(1) << "after loop " << this_fiber::properties<FiberProps>().name();
FlushSfile(true);