Compare commits

...

3 Commits
main ... ExpSet

Author SHA1 Message Date
Roman Gershman b61bf1ea1c Introduce experimental set 2022-08-27 10:19:15 +03:00
Roman Gershman a7a3c89ffa Refactor all set-over-dict functions into a central place.
Stop leaking dict interface into rdb_load.
2022-08-27 10:19:15 +03:00
Roman Gershman 4fdde2f62b Add initial code for StringSet. It should eventually replace dict for set types 2022-08-27 10:19:15 +03:00
7 changed files with 1227 additions and 139 deletions

View File

@ -1,6 +1,6 @@
add_library(dfly_core compact_object.cc dragonfly_core.cc extent_tree.cc
external_alloc.cc interpreter.cc mi_memory_resource.cc
segment_allocator.cc small_string.cc tx_queue.cc)
segment_allocator.cc small_string.cc string_set.cc tx_queue.cc)
cxx_link(dfly_core base absl::flat_hash_map absl::str_format redis_lib TRDP::lua
Boost::fiber crypto)
@ -15,3 +15,4 @@ cxx_test(external_alloc_test dfly_core LABELS DFLY)
cxx_test(dash_test dfly_core LABELS DFLY)
cxx_test(interpreter_test dfly_core LABELS DFLY)
cxx_test(json_test dfly_core TRDP::jsoncons LABELS DFLY)
cxx_test(string_set_test dfly_core LABELS DFLY)

View File

@ -21,6 +21,7 @@ extern "C" {
#include "base/logging.h"
#include "base/pod_array.h"
#include "core/string_set.h"
#if defined(__aarch64__)
#include "base/sse2neon.h"
@ -55,6 +56,11 @@ inline void FreeObjSet(unsigned encoding, void* ptr, pmr::memory_resource* mr) {
dictRelease((dict*)ptr);
break;
}
case kEncodingStrMap2: {
delete (StringSet*)ptr;
break;
}
case kEncodingIntSet:
zfree((void*)ptr);
break;
@ -67,6 +73,10 @@ size_t MallocUsedSet(unsigned encoding, void* ptr) {
switch (encoding) {
case kEncodingStrMap /*OBJ_ENCODING_HT*/:
return 0; // TODO
case kEncodingStrMap2: {
StringSet* ss = (StringSet*)ptr;
return ss->obj_malloc_used() + ss->set_malloc_used();
}
case kEncodingIntSet:
return intsetBlobLen((intset*)ptr);
}
@ -258,6 +268,10 @@ size_t RobjWrapper::Size() const {
dict* d = (dict*)inner_obj_;
return dictSize(d);
}
case kEncodingStrMap2: {
StringSet* ss = (StringSet*)inner_obj_;
return ss->size();
}
default:
LOG(FATAL) << "Unexpected encoding " << encoding_;
}
@ -608,6 +622,7 @@ void CompactObj::ImportRObj(robj* o) {
if (o->encoding == OBJ_ENCODING_INTSET) {
enc = kEncodingIntSet;
} else {
LOG(DFATAL) << "This can not be done via ImportRObj for sets";
enc = kEncodingStrMap;
}
}

View File

@ -17,6 +17,7 @@ namespace dfly {
constexpr unsigned kEncodingIntSet = 0;
constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings
constexpr unsigned kEncodingStrMap2 = 2;
namespace detail {

509
src/core/string_set.cc Normal file
View File

@ -0,0 +1,509 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include "core/string_set.h"
#include <absl/numeric/bits.h>
#include <absl/strings/escaping.h>
#include "base/logging.h"
#include "core/compact_object.h" // for hashcode
extern "C" {
#include "redis/zmalloc.h"
}
namespace dfly {
using namespace std;
constexpr size_t kMinSizeShift = 2;
constexpr size_t kMinSize = 1 << kMinSizeShift;
constexpr size_t kAllowDisplacements = true;
inline bool CanSetFlat(int offs) {
if (kAllowDisplacements)
return offs < 2;
return offs == 0;
}
StringSet::StringSet(pmr::memory_resource* mr) : entries_(mr) {
}
StringSet::~StringSet() {
for (auto& entry : entries_) {
if (entry.IsLink()) {
LinkKey* lk = (LinkKey*)entry.get();
while (lk) {
sdsfree((sds)lk->ptr);
SuperPtr next = lk->next;
Free(lk);
if (next.IsSds()) {
sdsfree((sds)next.get());
lk = nullptr;
} else {
DCHECK(next.IsLink());
lk = (LinkKey*)next.get();
}
}
} else if (!entry.IsEmpty()) {
sdsfree((sds)entry.get());
}
}
DCHECK_EQ(0u, num_chain_entries_);
}
void StringSet::Reserve(size_t sz) {
sz = std::min<size_t>(sz, kMinSize);
sz = absl::bit_ceil(sz);
capacity_log_ = absl::bit_width(sz);
entries_.reserve(sz);
}
size_t StringSet::SuperPtr::SetString(std::string_view str) {
sds sdsptr = sdsnewlen(str.data(), str.size());
ptr = sdsptr;
return zmalloc_usable_size(sdsAllocPtr(sdsptr));
}
bool StringSet::SuperPtr::Compare(std::string_view str) const {
if (IsEmpty())
return false;
sds sp = GetSds();
return str == string_view{sp, sdslen(sp)};
}
bool StringSet::Add(std::string_view str) {
DVLOG(1) << "Add " << absl::CHexEscape(str);
uint64_t hc = CompactObj::HashCode(str);
if (entries_.empty()) {
capacity_log_ = kMinSizeShift;
entries_.resize(kMinSize);
auto& e = entries_[BucketId(hc)];
obj_malloc_used_ += e.SetString(str);
++size_;
++num_used_buckets_;
return true;
}
uint32_t bucket_id = BucketId(hc);
if (FindAround(str, bucket_id) < 2)
return false;
DCHECK_LT(bucket_id, entries_.size());
++size_;
// Try insert into flat surface first. Also handle the grow case
// if utilization is too high.
for (unsigned j = 0; j < 2; ++j) {
int offs = FindEmptyAround(bucket_id);
if (CanSetFlat(offs)) {
auto& entry = entries_[bucket_id + offs];
obj_malloc_used_ += entry.SetString(str);
if (offs != 0) {
entry.SetDisplaced();
}
++num_used_buckets_;
return true;
}
if (size_ < entries_.size())
break;
Grow();
bucket_id = BucketId(hc);
}
auto& dest = entries_[bucket_id];
DCHECK(!dest.IsEmpty());
if (dest.IsDisplaced()) {
sds sptr = dest.GetSds();
uint32_t nbid = BucketId(sptr);
Link(SuperPtr{sptr}, nbid);
if (dest.IsSds()) {
obj_malloc_used_ += dest.SetString(str);
} else {
LinkKey* lk = (LinkKey*)dest.get();
obj_malloc_used_ += lk->SetString(str);
dest.ClearDisplaced();
}
} else {
LinkKey* lk = NewLink(str, dest);
dest.SetLink(lk);
}
DCHECK(!dest.IsDisplaced());
return true;
}
unsigned StringSet::BucketDepth(uint32_t bid) const {
SuperPtr ptr = entries_[bid];
if (ptr.IsEmpty()) {
return 0;
}
unsigned res = 1;
while (ptr.IsLink()) {
LinkKey* lk = (LinkKey*)ptr.get();
++res;
ptr = lk->next;
DCHECK(!ptr.IsEmpty());
}
return res;
}
auto StringSet::NewLink(std::string_view str, SuperPtr ptr) -> LinkKey* {
LinkAllocator ea(mr());
LinkKey* lk = ea.allocate(1);
ea.construct(lk);
obj_malloc_used_ += lk->SetString(str);
lk->next = ptr;
++num_chain_entries_;
return lk;
}
#if 0
void StringSet::IterateOverBucket(uint32_t bid, const ItemCb& cb) {
const Entry& e = entries_[bid];
if (e.IsEmpty()) {
DCHECK(!e.next);
return;
}
cb(e.value);
const Entry* next = e.next;
while (next) {
cb(next->value);
next = next->next;
}
}
#endif
inline bool cmpsds(sds sp, string_view str) {
if (sdslen(sp) != str.size())
return false;
return str.empty() || memcmp(sp, str.data(), str.size()) == 0;
}
int StringSet::FindAround(string_view str, uint32_t bid) const {
SuperPtr ptr = entries_[bid];
while (ptr.IsLink()) {
LinkKey* lk = (LinkKey*)ptr.get();
sds sp = (sds)lk->get();
if (cmpsds(sp, str))
return 0;
ptr = lk->next;
DCHECK(!ptr.IsEmpty());
}
if (!ptr.IsEmpty()) {
DCHECK(ptr.IsSds());
sds sp = (sds)ptr.get();
if (cmpsds(sp, str))
return 0;
}
if (bid && entries_[bid - 1].Compare(str)) {
return -1;
}
if (bid + 1 < entries_.size() && entries_[bid + 1].Compare(str)) {
return 1;
}
return 2;
}
void StringSet::Grow() {
size_t prev_sz = entries_.size();
entries_.resize(prev_sz * 2);
++capacity_log_;
for (int i = prev_sz - 1; i >= 0; --i) {
SuperPtr* current = &entries_[i];
if (current->IsEmpty()) {
continue;
}
SuperPtr* prev = nullptr;
while (true) {
SuperPtr next;
LinkKey* lk = nullptr;
sds sp;
if (current->IsLink()) {
lk = (LinkKey*)current->get();
sp = (sds)lk->get();
next = lk->next;
} else {
sp = (sds)current->get();
}
uint32_t bid = BucketId(sp);
if (bid != uint32_t(i)) {
int offs = FindEmptyAround(bid);
if (CanSetFlat(offs)) {
auto& dest = entries_[bid + offs];
DCHECK(!dest.IsLink());
dest.ptr = sp;
if (offs != 0)
dest.SetDisplaced();
if (lk) {
Free(lk);
}
++num_used_buckets_;
} else {
Link(*current, bid);
}
*current = next;
} else {
current->ClearDisplaced();
if (lk) {
prev = current;
current = &lk->next;
}
}
if (next.IsEmpty())
break;
}
if (prev) {
DCHECK(prev->IsLink());
LinkKey* lk = (LinkKey*)prev->get();
if (lk->next.IsEmpty()) {
bool is_displaced = prev->IsDisplaced();
prev->ptr = lk->get();
if (is_displaced) {
prev->SetDisplaced();
}
Free(lk);
}
}
if (entries_[i].IsEmpty()) {
--num_used_buckets_;
}
}
#if 0
unsigned cnt = 0;
for (auto ptr : entries_) {
cnt += (!ptr.IsEmpty());
}
DCHECK_EQ(num_used_buckets_, cnt);
#endif
}
void StringSet::Link(SuperPtr ptr, uint32_t bid) {
SuperPtr& root = entries_[bid];
DCHECK(!root.IsEmpty());
bool is_root_displaced = root.IsDisplaced();
if (is_root_displaced) {
DCHECK_NE(bid, BucketId(root.GetSds()));
}
LinkKey* head;
void* val;
if (ptr.IsSds()) {
if (is_root_displaced) {
// in that case it's better to put ptr into root and move root data into its correct place.
sds val;
if (root.IsSds()) {
val = (sds)root.get();
root.ptr = ptr.get();
} else {
LinkKey* lk = (LinkKey*)root.get();
val = (sds)lk->get();
lk->ptr = ptr.get();
root.ClearDisplaced();
}
uint32_t nbid = BucketId(val);
DCHECK_NE(nbid, bid);
Link(SuperPtr{val}, nbid); // Potentially unbounded wave of updates.
return;
}
LinkAllocator ea(mr());
head = ea.allocate(1);
ea.construct(head);
val = ptr.get();
++num_chain_entries_;
} else {
head = (LinkKey*)ptr.get();
val = head->get();
}
if (root.IsSds()) {
head->ptr = root.get();
head->next = SuperPtr{val};
root.SetLink(head);
if (is_root_displaced) {
DCHECK_NE(bid, BucketId((sds)head->ptr));
root.SetDisplaced();
}
} else {
DCHECK(root.IsLink());
LinkKey* chain = (LinkKey*)root.get();
head->next = chain->next;
head->ptr = val;
chain->next.SetLink(head);
}
}
#if 0
void StringSet::MoveEntry(Entry* e, uint32_t bid) {
auto& dest = entries_[bid];
if (IsEmpty(dest)) {
dest.value = std::move(e->value);
Free(e);
return;
}
e->next = dest.next;
dest.next = e;
}
#endif
int StringSet::FindEmptyAround(uint32_t bid) const {
if (entries_[bid].IsEmpty())
return 0;
if (bid + 1 < entries_.size() && entries_[bid + 1].IsEmpty())
return 1;
if (bid && entries_[bid - 1].IsEmpty())
return -1;
return 2;
}
uint32_t StringSet::BucketId(sds ptr) const {
string_view sv{ptr, sdslen(ptr)};
return BucketId(CompactObj::HashCode(sv));
}
#if 0
uint32_t StringSet::Scan(uint32_t cursor, const ItemCb& cb) const {
if (capacity_log_ == 0)
return 0;
uint32_t bucket_id = cursor >> (32 - capacity_log_);
const_iterator it(this, bucket_id);
if (it.entry_ == nullptr)
return 0;
bucket_id = it.bucket_id_; // non-empty bucket
do {
cb(*it);
++it;
} while (it.bucket_id_ == bucket_id);
if (it.entry_ == nullptr)
return 0;
if (it.bucket_id_ == bucket_id + 1) { // cover displacement case
// TODO: we could avoid checking computing HC if we explicitly mark displacement.
// we have plenty-metadata to do so.
uint32_t bid = BucketId((*it).HashCode());
if (bid == it.bucket_id_) {
cb(*it);
++it;
}
}
return it.entry_ ? it.bucket_id_ << (32 - capacity_log_) : 0;
}
bool StringSet::Erase(std::string_view val) {
uint64_t hc = CompactObj::HashCode(val);
uint32_t bid = BucketId(hc);
Entry* current = &entries_[bid];
if (!current->IsEmpty()) {
if (current->value == val) {
current->Reset();
ShiftLeftIfNeeded(current);
--size_;
return true;
}
Entry* prev = current;
current = current->next;
while (current) {
if (current->value == val) {
current->Reset();
prev->next = current->next;
Free(current);
--size_;
return true;
}
prev = current;
current = current->next;
}
}
auto& prev = entries_[bid - 1];
// TODO: to mark displacement.
if (bid && !prev.IsEmpty()) {
if (prev.value == val) {
obj_malloc_used_ -= prev.value.MallocUsed();
prev.Reset();
ShiftLeftIfNeeded(&prev);
--size_;
return true;
}
}
auto& next = entries_[bid + 1];
if (bid + 1 < entries_.size()) {
if (next.value == val) {
obj_malloc_used_ -= next.value.MallocUsed();
next.Reset();
ShiftLeftIfNeeded(&next);
--size_;
return true;
}
}
return false;
}
#endif
void StringSet::iterator::SeekNonEmpty() {
while (bucket_id_ < owner_->entries_.size()) {
if (!owner_->entries_[bucket_id_].IsEmpty()) {
entry_ = &owner_->entries_[bucket_id_];
return;
}
++bucket_id_;
}
entry_ = nullptr;
}
void StringSet::const_iterator::SeekNonEmpty() {
while (bucket_id_ < owner_->entries_.size()) {
if (!owner_->entries_[bucket_id_].IsEmpty()) {
entry_ = &owner_->entries_[bucket_id_];
return;
}
++bucket_id_;
}
entry_ = nullptr;
}
} // namespace dfly

332
src/core/string_set.h Normal file
View File

@ -0,0 +1,332 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <memory_resource>
extern "C" {
#include "redis/object.h"
}
namespace dfly {
// StringSet is a nice but over-optimized data-structure. Probably is not worth it in the first
// place but sometimes the OCD kicks in and one can not resist.
// The advantage of it over redis-dict is smaller meta-data waste.
// dictEntry is 24 bytes, i.e it uses at least 32N bytes where N is the expected length.
// dict requires to allocate dictEntry per each addition in addition to the supplied key.
// It also wastes space in case of a set because it stores a value pointer inside dictEntry.
// To summarize:
// 100% utilized dict uses N*24 + N*8 = 32N bytes not including the key space.
// for 75% utilization (1/0.75 buckets): N*1.33*8 + N*24 = 35N
//
// This class uses 8 bytes per bucket (similarly to dictEntry*) but it used it for both
// links and keys. For most cases, we remove the need for another redirection layer
// and just store the key, so no "dictEntry" allocations occur.
// For those cells that require chaining, the bucket is
// changed in run-time to represent a linked chain.
// Additional feature - in order to to reduce collisions, we insert items into
// neighbour cells but only if they are empty (not chains). This way we reduce the number of
// empty (unused) spaces at full utilization from 36% to ~21%.
// 100% utilized table requires: N*8 + 0.2N*16 = 11.2N bytes or ~20 bytes savings.
// 75% utilization: N*1.33*8 + 0.12N*16 = 13N or ~22 bytes savings per record.
// TODO: to separate hash/compare functions from table logic and make it generic
// with potential replacements of hset/zset data structures.
// static_assert(sizeof(dictEntry) == 24);
class StringSet {
struct LinkKey;
// we can assume that high 12 bits of user address space
// can be used for tagging. At most 52 bits of address are reserved for
// some configurations, and usually it's 48 bits.
// https://www.kernel.org/doc/html/latest/arm64/memory.html
static constexpr size_t kLinkBit = 1ULL << 52;
static constexpr size_t kDisplaceBit = 1ULL << 53;
static constexpr size_t kTagMask = 4095ULL << 51; // we reserve 12 high bits.
struct SuperPtr {
void* ptr = nullptr; //
explicit SuperPtr(void* p = nullptr) : ptr(p) {
}
bool IsSds() const {
return (uintptr_t(ptr) & kLinkBit) == 0;
}
bool IsLink() const {
return (uintptr_t(ptr) & kLinkBit) == kLinkBit;
}
bool IsEmpty() const {
return ptr == nullptr;
}
void* get() const {
return (void*)(uintptr_t(ptr) & ~kTagMask);
}
bool IsDisplaced() const {
return (uintptr_t(ptr) & kDisplaceBit) == kDisplaceBit;
}
// returns usable size.
size_t SetString(std::string_view str);
void SetLink(LinkKey* lk) {
ptr = (void*)(uintptr_t(lk) | kLinkBit);
}
bool Compare(std::string_view str) const;
void SetDisplaced() {
ptr = (void*)(uintptr_t(ptr) | kDisplaceBit);
}
void ClearDisplaced() {
ptr = (void*)(uintptr_t(ptr) & ~kDisplaceBit);
}
void Reset() {
ptr = nullptr;
}
sds GetSds() const {
if (IsSds())
return (sds)get();
LinkKey* lk = (LinkKey*)get();
return (sds)lk->get();
}
};
struct LinkKey : public SuperPtr {
SuperPtr next; // could be LinkKey* or sds.
};
static_assert(sizeof(SuperPtr) == 8);
public:
class iterator;
class const_iterator;
// using ItemCb = std::function<void(const CompactObj& co)>;
StringSet(const StringSet&) = delete;
explicit StringSet(std::pmr::memory_resource* mr = std::pmr::get_default_resource());
~StringSet();
StringSet& operator=(StringSet&) = delete;
void Reserve(size_t sz);
bool Add(std::string_view str);
bool Remove(std::string_view str);
void Erase(iterator it);
size_t size() const {
return size_;
}
bool empty() const {
return size_ == 0;
}
size_t bucket_count() const {
return entries_.size();
}
// those that are chained to the entries stored inline in the bucket array.
size_t num_chain_entries() const {
return num_chain_entries_;
}
size_t num_used_buckets() const {
return num_used_buckets_;
}
bool Contains(std::string_view val) const;
bool Erase(std::string_view val);
iterator begin() {
return iterator{this, 0};
}
iterator end() {
return iterator{};
}
size_t obj_malloc_used() const {
return obj_malloc_used_;
}
size_t set_malloc_used() const {
return (num_chain_entries_ + entries_.capacity()) * sizeof(SuperPtr);
}
/// stable scanning api. has the same guarantees as redis scan command.
/// we avoid doing bit-reverse by using a different function to derive a bucket id
/// from hash values. By using msb part of hash we make it "stable" with respect to
/// rehashes. For example, with table log size 4 (size 16), entries in bucket id
/// 1110 come from hashes 1110XXXXX.... When a table grows to log size 5,
/// these entries can move either to 11100 or 11101. So if we traversed with our cursor
/// range [0000-1110], it's guaranteed that in grown table we do not need to cover again
/// [00000-11100]. Similarly with shrinkage, if a table is shrinked to log size 3,
/// keys from 1110 and 1111 will move to bucket 111. Again, it's guaranteed that we
/// covered the range [000-111] (all keys in that case).
/// Returns: next cursor or 0 if reached the end of scan.
/// cursor = 0 - initiates a new scan.
// uint32_t Scan(uint32_t cursor, const ItemCb& cb) const;
unsigned BucketDepth(uint32_t bid) const;
// void IterateOverBucket(uint32_t bid, const ItemCb& cb);
class iterator {
friend class StringSet;
public:
iterator() : owner_(nullptr), entry_(nullptr), bucket_id_(0) {
}
iterator& operator++();
bool operator==(const iterator& o) const {
return entry_ == o.entry_;
}
bool operator!=(const iterator& o) const {
return !(*this == o);
}
private:
iterator(StringSet* owner, uint32_t bid) : owner_(owner), bucket_id_(bid) {
SeekNonEmpty();
}
void SeekNonEmpty();
StringSet* owner_ = nullptr;
SuperPtr* entry_ = nullptr;
uint32_t bucket_id_ = 0;
};
class const_iterator {
friend class StringSet;
public:
const_iterator() : owner_(nullptr), entry_(nullptr), bucket_id_(0) {
}
const_iterator& operator++();
const_iterator& operator=(iterator& it) {
owner_ = it.owner_;
entry_ = it.entry_;
bucket_id_ = it.bucket_id_;
return *this;
}
bool operator==(const const_iterator& o) const {
return entry_ == o.entry_;
}
bool operator!=(const const_iterator& o) const {
return !(*this == o);
}
private:
const_iterator(const StringSet* owner, uint32_t bid) : owner_(owner), bucket_id_(bid) {
SeekNonEmpty();
}
void SeekNonEmpty();
const StringSet* owner_ = nullptr;
const SuperPtr* entry_ = nullptr;
uint32_t bucket_id_ = 0;
};
private:
friend class iterator;
using LinkAllocator = std::pmr::polymorphic_allocator<LinkKey>;
std::pmr::memory_resource* mr() {
return entries_.get_allocator().resource();
}
uint32_t BucketId(uint64_t hash) const {
return hash >> (64 - capacity_log_);
}
uint32_t BucketId(sds ptr) const;
// Returns: 2 if no empty spaces found around the bucket. 0, -1, 1 - offset towards
// an empty bucket.
int FindEmptyAround(uint32_t bid) const;
// returns 2 if no object was found in the vicinity.
// Returns relative offset to bid: 0, -1, 1 if found.
int FindAround(std::string_view str, uint32_t bid) const;
void Grow();
void Link(SuperPtr ptr, uint32_t bid);
/*void MoveEntry(Entry* e, uint32_t bid);
void ShiftLeftIfNeeded(Entry* root) {
if (root->next) {
root->value = std::move(root->next->value);
Entry* tmp = root->next;
root->next = root->next->next;
Free(tmp);
}
}
*/
void Free(LinkKey* lk) {
mr()->deallocate(lk, sizeof(LinkKey), alignof(LinkKey));
--num_chain_entries_;
}
LinkKey* NewLink(std::string_view str, SuperPtr ptr);
// The rule is - entries can be moved to vicinity as long as they are stored
// "flat", i.e. not into the linked list. The linked list
std::pmr::vector<SuperPtr> entries_;
size_t obj_malloc_used_ = 0;
uint32_t size_ = 0;
uint32_t num_chain_entries_ = 0;
uint32_t num_used_buckets_ = 0;
unsigned capacity_log_ = 0;
};
#if 0
inline StringSet::iterator& StringSet::iterator::operator++() {
if (entry_->next) {
entry_ = entry_->next;
} else {
++bucket_id_;
SeekNonEmpty();
}
return *this;
}
inline StringSet::const_iterator& StringSet::const_iterator::operator++() {
if (entry_->next) {
entry_ = entry_->next;
} else {
++bucket_id_;
SeekNonEmpty();
}
return *this;
}
#endif
} // namespace dfly

131
src/core/string_set_test.cc Normal file
View File

@ -0,0 +1,131 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include "core/string_set.h"
#include <absl/strings/str_cat.h>
#include <gmock/gmock.h>
#include <mimalloc.h>
#include <unordered_set>
#include "base/gtest.h"
#include "base/logging.h"
#include "core/mi_memory_resource.h"
extern "C" {
#include "redis/zmalloc.h"
}
namespace dfly {
using namespace std;
class StringSetTest : public ::testing::Test {
protected:
static void SetUpTestSuite() {
auto* tlh = mi_heap_get_backing();
init_zmalloc_threadlocal(tlh);
// SmallString::InitThreadLocal(tlh);
// static MiMemoryResource mi_resource(tlh);
// needed for MallocUsed
// CompactObj::InitThreadLocal(&mi_resource);
}
static void TearDownTestSuite() {
}
StringSet ss_;
};
TEST_F(StringSetTest, Basic) {
EXPECT_TRUE(ss_.Add("foo"));
EXPECT_TRUE(ss_.Add("bar"));
EXPECT_FALSE(ss_.Add("foo"));
EXPECT_FALSE(ss_.Add("bar"));
EXPECT_EQ(2, ss_.size());
}
TEST_F(StringSetTest, Ex1) {
EXPECT_TRUE(ss_.Add("AA@@@@@@@@@@@@@@"));
EXPECT_TRUE(ss_.Add("AAA@@@@@@@@@@@@@"));
EXPECT_TRUE(ss_.Add("AAAAAAAAA@@@@@@@"));
EXPECT_TRUE(ss_.Add("AAAAAAAAAA@@@@@@"));
EXPECT_TRUE(ss_.Add("AAAAAAAAAAAAAAA@"));
EXPECT_TRUE(ss_.Add("BBBBBAAAAAAAAAAA"));
EXPECT_TRUE(ss_.Add("BBBBBBBBAAAAAAAA"));
EXPECT_TRUE(ss_.Add("CCCCCBBBBBBBBBBB"));
}
TEST_F(StringSetTest, Many) {
double max_chain_factor = 0;
for (unsigned i = 0; i < 8192; ++i) {
EXPECT_TRUE(ss_.Add(absl::StrCat("xxxxxxxxxxxxxxxxx", i)));
size_t sz = ss_.size();
bool should_print = (sz == ss_.bucket_count()) || (sz == ss_.bucket_count() * 0.75);
if (should_print) {
double chain_usage = double(ss_.num_chain_entries()) / ss_.size();
unsigned num_empty = ss_.bucket_count() - ss_.num_used_buckets();
double empty_factor = double(num_empty) / ss_.bucket_count();
LOG(INFO) << "chains: " << 100 * chain_usage << ", empty: " << 100 * empty_factor << "% at "
<< ss_.size();
#if 0
if (ss_.size() == 15) {
for (unsigned i = 0; i < ss_.bucket_count(); ++i) {
LOG(INFO) << "[" << i << "]: " << ss_.BucketDepth(i);
}
/*ss_.IterateOverBucket(93, [this](const CompactObj& co) {
LOG(INFO) << "93->" << (co.HashCode() % ss_.bucket_count());
});*/
}
#endif
}
}
EXPECT_EQ(8192, ss_.size());
LOG(INFO) << "max chain factor: " << 100 * max_chain_factor << "%";
/*size_t iter_len = 0;
for (auto it = ss_.begin(); it != ss_.end(); ++it) {
++iter_len;
}
EXPECT_EQ(iter_len, 512);*/
}
#if 0
TEST_F(StringSetTest, IterScan) {
unordered_set<string> actual, expected;
auto insert_actual = [&](const CompactObj& val) {
string tmp;
val.GetString(&tmp);
actual.insert(tmp);
};
EXPECT_EQ(0, ss_.Scan(0, insert_actual));
EXPECT_TRUE(actual.empty());
for (unsigned i = 0; i < 512; ++i) {
string s = absl::StrCat("x", i);
expected.insert(s);
EXPECT_TRUE(ss_.Add(s));
}
for (CompactObj& val : ss_) {
insert_actual(val);
}
EXPECT_EQ(actual, expected);
actual.clear();
uint32_t cursor = 0;
do {
cursor = ss_.Scan(cursor, insert_actual);
} while (cursor);
EXPECT_EQ(actual, expected);
}
#endif
} // namespace dfly

View File

@ -11,17 +11,22 @@ extern "C" {
#include "redis/util.h"
}
#include "base/flags.h"
#include "base/logging.h"
#include "base/stl_util.h"
#include "core/string_set.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/transaction.h"
ABSL_FLAG(bool, use_set2, false, "If true use better implementation for sets");
namespace dfly {
using namespace std;
using absl::GetFlag;
using ResultStringVec = vector<OpResult<vector<string>>>;
using ResultSetView = OpResult<absl::flat_hash_set<std::string_view>>;
@ -52,24 +57,103 @@ intset* IntsetAddSafe(string_view val, intset* is, bool* success, bool* added) {
return is;
}
bool dictContains(const dict* d, string_view key) {
uint64_t h = dictGenHashFunction(key.data(), key.size());
for (unsigned table = 0; table <= 1; table++) {
uint64_t idx = h & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
dictEntry* he = d->ht_table[table][idx];
while (he) {
sds dkey = (sds)he->key;
if (sdslen(dkey) == key.size() && (key.empty() || memcmp(dkey, key.data(), key.size()) == 0))
return true;
he = he->next;
}
if (!dictIsRehashing(d))
break;
}
return false;
}
pair<unsigned, bool> RemoveStrSet(ArgSlice vals, CompactObj* set) {
unsigned removed = 0;
bool isempty = false;
if (GetFlag(FLAGS_use_set2)) {
StringSet* ss = (StringSet*)set->RObjPtr();
for (auto member : vals) {
// removed += ss->Erase(member);
}
isempty = ss->empty();
} else {
dict* d = (dict*)set->RObjPtr();
auto* shard = EngineShard::tlocal();
for (auto member : vals) {
shard->tmp_str1 = sdscpylen(shard->tmp_str1, member.data(), member.size());
int result = dictDelete(d, shard->tmp_str1);
removed += (result == DICT_OK);
}
isempty = (dictSize(d) == 0);
}
return make_pair(removed, isempty);
}
unsigned AddStrSet(ArgSlice vals, CompactObj* dest) {
unsigned res = 0;
dict* ds = (dict*)dest->RObjPtr();
auto* es = EngineShard::tlocal();
if (GetFlag(FLAGS_use_set2)) {
StringSet* ss = (StringSet*)dest->RObjPtr();
for (auto member : vals) {
res += ss->Add(member);
}
} else {
dict* ds = (dict*)dest->RObjPtr();
auto* es = EngineShard::tlocal();
for (auto member : vals) {
es->tmp_str1 = sdscpylen(es->tmp_str1, member.data(), member.size());
dictEntry* de = dictAddRaw(ds, es->tmp_str1, NULL);
if (de) {
de->key = sdsdup(es->tmp_str1);
++res;
for (auto member : vals) {
es->tmp_str1 = sdscpylen(es->tmp_str1, member.data(), member.size());
dictEntry* de = dictAddRaw(ds, es->tmp_str1, NULL);
if (de) {
de->key = sdsdup(es->tmp_str1);
++res;
}
}
}
return res;
}
void InitStrSet(CompactObj* set) {
if (GetFlag(FLAGS_use_set2)) {
StringSet* ss = new StringSet{CompactObj::memory_resource()};
set->InitRobj(OBJ_SET, kEncodingStrMap2, ss);
} else {
dict* ds = dictCreate(&setDictType);
set->InitRobj(OBJ_SET, kEncodingStrMap, ds);
}
}
// f receives a str object.
template <typename F> void FillFromStrSet(F&& f, void* ptr) {
string str;
if (GetFlag(FLAGS_use_set2)) {
/*for (const CompactObj& co : *(StringSet*)ptr) {
co.GetString(&str);
f(move(str));
}*/
} else {
dict* ds = (dict*)ptr;
dictIterator* di = dictGetIterator(ds);
dictEntry* de = nullptr;
while ((de = dictNext(di))) {
str.assign((sds)de->key, sdslen((sds)de->key));
f(move(str));
}
dictReleaseIterator(di);
}
}
// returns (removed, isempty)
pair<unsigned, bool> RemoveSet(ArgSlice vals, CompactObj* set) {
bool isempty = false;
@ -91,14 +175,7 @@ pair<unsigned, bool> RemoveSet(ArgSlice vals, CompactObj* set) {
isempty = (intsetLen(is) == 0);
set->SetRObjPtr(is);
} else {
dict* d = (dict*)set->RObjPtr();
auto* shard = EngineShard::tlocal();
for (auto member : vals) {
shard->tmp_str1 = sdscpylen(shard->tmp_str1, member.data(), member.size());
int result = dictDelete(d, shard->tmp_str1);
removed += (result == DICT_OK);
}
isempty = (dictSize(d) == 0);
return RemoveStrSet(vals, set);
}
return make_pair(removed, isempty);
}
@ -118,8 +195,7 @@ void InitSet(ArgSlice vals, CompactObj* set) {
intset* is = intsetNew();
set->InitRobj(OBJ_SET, kEncodingIntSet, is);
} else {
dict* ds = dictCreate(&setDictType);
set->InitRobj(OBJ_SET, kEncodingStrMap, ds);
InitStrSet(set);
}
}
@ -132,43 +208,126 @@ void ScanCallback(void* privdata, const dictEntry* de) {
uint64_t ScanStrSet(const CompactObj& co, uint64_t curs, unsigned count, StringVec* res) {
long maxiterations = count * 10;
DCHECK_EQ(kEncodingStrMap, co.Encoding());
dict* ds = (dict*)co.RObjPtr();
do {
curs = dictScan(ds, curs, ScanCallback, NULL, res);
} while (curs && maxiterations-- && res->size() < count);
if (GetFlag(FLAGS_use_set2)) {
DCHECK_EQ(kEncodingStrMap2, co.Encoding());
LOG(DFATAL) << "TBD: Not implemented";
} else {
DCHECK_EQ(kEncodingStrMap, co.Encoding());
dict* ds = (dict*)co.RObjPtr();
do {
curs = dictScan(ds, curs, ScanCallback, NULL, res);
} while (curs && maxiterations-- && res->size() < count);
}
return curs;
}
using SetType = pair<void*, unsigned>;
uint32_t SetTypeLen(const SetType& set) {
if (set.second == kEncodingIntSet) {
return intsetLen((const intset*)set.first);
}
if (GetFlag(FLAGS_use_set2)) {
DCHECK_EQ(set.second, kEncodingStrMap2);
return ((StringSet*)set.first)->size();
} else {
DCHECK_EQ(set.second, kEncodingStrMap);
return dictSize((const dict*)set.first);
}
};
bool IsInSet(const SetType& st, int64_t val) {
if (st.second == kEncodingIntSet)
return intsetFind((intset*)st.first, val);
char buf[32];
char* next = absl::numbers_internal::FastIntToBuffer(val, buf);
string_view str{buf, size_t(next - buf)};
if (GetFlag(FLAGS_use_set2)) {
LOG(DFATAL) << "TBD: Not implemented";
return false;
} else {
DCHECK_EQ(st.second, kEncodingStrMap);
return dictContains((dict*)st.first, str);
}
}
bool IsInSet(const SetType& st, string_view member) {
if (st.second == kEncodingIntSet) {
long long llval;
if (!string2ll(member.data(), member.size(), &llval))
return false;
return intsetFind((intset*)st.first, llval);
}
if (GetFlag(FLAGS_use_set2)) {
LOG(DFATAL) << "TBD: Not implemented";
return false;
} else {
DCHECK_EQ(st.second, kEncodingStrMap);
return dictContains((dict*)st.first, member);
}
}
// Removes arg from result.
void DiffStrSet(const SetType& st, absl::flat_hash_set<string>* result) {
DCHECK_EQ(kEncodingStrMap, st.second);
dict* ds = (dict*)st.first;
dictIterator* di = dictGetIterator(ds);
dictEntry* de = nullptr;
while ((de = dictNext(di))) {
sds key = (sds)de->key;
result->erase(string_view{key, sdslen(key)});
if (GetFlag(FLAGS_use_set2)) {
} else {
DCHECK_EQ(kEncodingStrMap, st.second);
dict* ds = (dict*)st.first;
dictIterator* di = dictGetIterator(ds);
dictEntry* de = nullptr;
while ((de = dictNext(di))) {
sds key = (sds)de->key;
result->erase(string_view{key, sdslen(key)});
}
dictReleaseIterator(di);
}
}
void InterStrSet(const vector<SetType>& vec, StringVec* result) {
if (GetFlag(FLAGS_use_set2)) {
} else {
dict* ds = (dict*)vec.front().first;
dictIterator* di = dictGetIterator(ds);
dictEntry* de = nullptr;
while ((de = dictNext(di))) {
size_t j = 1;
sds key = (sds)de->key;
string_view member{key, sdslen(key)};
for (j = 1; j < vec.size(); j++) {
if (vec[j].first != ds && !IsInSet(vec[j], member))
break;
}
/* Only take action when all vec contain the member */
if (j == vec.size()) {
result->push_back(string(member));
}
}
dictReleaseIterator(di);
}
dictReleaseIterator(di);
}
StringVec PopStrSet(unsigned count, const SetType& st) {
StringVec result;
dict* ds = (dict*)st.first;
string str;
dictIterator* di = dictGetSafeIterator(ds);
for (uint32_t i = 0; i < count; ++i) {
dictEntry* de = dictNext(di);
DCHECK(de);
result.emplace_back((sds)de->key, sdslen((sds)de->key));
dictDelete(ds, de->key);
if (GetFlag(FLAGS_use_set2)) {
} else {
dict* ds = (dict*)st.first;
string str;
dictIterator* di = dictGetSafeIterator(ds);
for (uint32_t i = 0; i < count; ++i) {
dictEntry* de = dictNext(di);
DCHECK(de);
result.emplace_back((sds)de->key, sdslen((sds)de->key));
dictDelete(ds, de->key);
}
dictReleaseIterator(di);
}
dictReleaseIterator(di);
return result;
}
@ -290,58 +449,6 @@ OpStatus NoOpCb(Transaction* t, EngineShard* shard) {
return OpStatus::OK;
};
using SetType = pair<void*, unsigned>;
uint32_t SetTypeLen(const SetType& set) {
if (set.second == kEncodingStrMap) {
return dictSize((const dict*)set.first);
}
DCHECK_EQ(set.second, kEncodingIntSet);
return intsetLen((const intset*)set.first);
};
bool dictContains(const dict* d, string_view key) {
uint64_t h = dictGenHashFunction(key.data(), key.size());
for (unsigned table = 0; table <= 1; table++) {
uint64_t idx = h & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
dictEntry* he = d->ht_table[table][idx];
while (he) {
sds dkey = (sds)he->key;
if (sdslen(dkey) == key.size() && (key.empty() || memcmp(dkey, key.data(), key.size()) == 0))
return true;
he = he->next;
}
if (!dictIsRehashing(d))
break;
}
return false;
}
bool IsInSet(const SetType& st, int64_t val) {
if (st.second == kEncodingIntSet)
return intsetFind((intset*)st.first, val);
DCHECK_EQ(st.second, kEncodingStrMap);
char buf[32];
char* next = absl::numbers_internal::FastIntToBuffer(val, buf);
return dictContains((dict*)st.first, string_view{buf, size_t(next - buf)});
}
bool IsInSet(const SetType& st, string_view member) {
if (st.second == kEncodingIntSet) {
long long llval;
if (!string2ll(member.data(), member.size(), &llval))
return false;
return intsetFind((intset*)st.first, llval);
}
DCHECK_EQ(st.second, kEncodingStrMap);
return dictContains((dict*)st.first, member);
}
template <typename F> void FillSet(const SetType& set, F&& f) {
if (set.second == kEncodingIntSet) {
intset* is = (intset*)set.first;
@ -354,15 +461,7 @@ template <typename F> void FillSet(const SetType& set, F&& f) {
f(string{buf, size_t(next - buf)});
}
} else {
dict* ds = (dict*)set.first;
string str;
dictIterator* di = dictGetIterator(ds);
dictEntry* de = nullptr;
while ((de = dictNext(di))) {
str.assign((sds)de->key, sdslen((sds)de->key));
f(move(str));
}
dictReleaseIterator(di);
FillFromStrSet(move(f), set.first);
}
}
@ -428,7 +527,11 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
return OpStatus::OUT_OF_MEMORY;
}
// frees 'is' on a way.
co.InitRobj(OBJ_SET, kEncodingStrMap, tmp.ptr);
if (GetFlag(FLAGS_use_set2)) {
co.InitRobj(OBJ_SET, kEncodingStrMap2, tmp.ptr);
} else {
co.InitRobj(OBJ_SET, kEncodingStrMap, tmp.ptr);
}
inner_obj = co.RObjPtr();
break;
}
@ -691,25 +794,7 @@ OpResult<StringVec> OpInter(const Transaction* t, EngineShard* es, bool remove_f
}
}
} else {
dict* ds = (dict*)sets.front().first;
dictIterator* di = dictGetIterator(ds);
dictEntry* de = nullptr;
while ((de = dictNext(di))) {
size_t j = 1;
sds key = (sds)de->key;
string_view member{key, sdslen(key)};
for (j = 1; j < sets.size(); j++) {
if (sets[j].first != ds && !IsInSet(sets[j], member))
break;
}
/* Only take action when all sets contain the member */
if (j == sets.size()) {
result.push_back(string(member));
}
}
dictReleaseIterator(di);
InterStrSet(sets, &result);
}
return result;
@ -1193,25 +1278,39 @@ bool SetFamily::ConvertToStrSet(const intset* is, size_t expected_len, robj* des
char buf[32];
int ii = 0;
dict* ds = dictCreate(&setDictType);
if (GetFlag(FLAGS_use_set2)) {
StringSet* ss = new StringSet{CompactObj::memory_resource()};
if (expected_len)
ss->Reserve(expected_len);
if (expected_len) {
if (dictTryExpand(ds, expected_len) != DICT_OK) {
dictRelease(ds);
return false;
while (intsetGet(const_cast<intset*>(is), ii++, &intele)) {
char* next = absl::numbers_internal::FastIntToBuffer(intele, buf);
string_view str{buf, size_t(next - buf)};
CHECK(ss->Add(str));
}
dest->ptr = ss;
dest->encoding = OBJ_ENCODING_HT;
} else {
dict* ds = dictCreate(&setDictType);
if (expected_len) {
if (dictTryExpand(ds, expected_len) != DICT_OK) {
dictRelease(ds);
return false;
}
}
/* To add the elements we extract integers and create redis objects */
while (intsetGet(const_cast<intset*>(is), ii++, &intele)) {
char* next = absl::numbers_internal::FastIntToBuffer(intele, buf);
sds s = sdsnewlen(buf, next - buf);
CHECK(dictAddRaw(ds, s, NULL));
}
dest->ptr = ds;
dest->encoding = OBJ_ENCODING_HT;
}
/* To add the elements we extract integers and create redis objects */
while (intsetGet(const_cast<intset*>(is), ii++, &intele)) {
char* next = absl::numbers_internal::FastIntToBuffer(intele, buf);
sds s = sdsnewlen(buf, next - buf);
CHECK(dictAddRaw(ds, s, NULL));
}
dest->ptr = ds;
dest->encoding = OBJ_ENCODING_HT;
return true;
}