Compare commits
3 Commits
Author | SHA1 | Date |
---|---|---|
Roman Gershman | b61bf1ea1c | |
Roman Gershman | a7a3c89ffa | |
Roman Gershman | 4fdde2f62b |
|
@ -1,6 +1,6 @@
|
|||
add_library(dfly_core compact_object.cc dragonfly_core.cc extent_tree.cc
|
||||
external_alloc.cc interpreter.cc mi_memory_resource.cc
|
||||
segment_allocator.cc small_string.cc tx_queue.cc)
|
||||
segment_allocator.cc small_string.cc string_set.cc tx_queue.cc)
|
||||
cxx_link(dfly_core base absl::flat_hash_map absl::str_format redis_lib TRDP::lua
|
||||
Boost::fiber crypto)
|
||||
|
||||
|
@ -15,3 +15,4 @@ cxx_test(external_alloc_test dfly_core LABELS DFLY)
|
|||
cxx_test(dash_test dfly_core LABELS DFLY)
|
||||
cxx_test(interpreter_test dfly_core LABELS DFLY)
|
||||
cxx_test(json_test dfly_core TRDP::jsoncons LABELS DFLY)
|
||||
cxx_test(string_set_test dfly_core LABELS DFLY)
|
||||
|
|
|
@ -21,6 +21,7 @@ extern "C" {
|
|||
|
||||
#include "base/logging.h"
|
||||
#include "base/pod_array.h"
|
||||
#include "core/string_set.h"
|
||||
|
||||
#if defined(__aarch64__)
|
||||
#include "base/sse2neon.h"
|
||||
|
@ -55,6 +56,11 @@ inline void FreeObjSet(unsigned encoding, void* ptr, pmr::memory_resource* mr) {
|
|||
dictRelease((dict*)ptr);
|
||||
break;
|
||||
}
|
||||
case kEncodingStrMap2: {
|
||||
delete (StringSet*)ptr;
|
||||
break;
|
||||
}
|
||||
|
||||
case kEncodingIntSet:
|
||||
zfree((void*)ptr);
|
||||
break;
|
||||
|
@ -67,6 +73,10 @@ size_t MallocUsedSet(unsigned encoding, void* ptr) {
|
|||
switch (encoding) {
|
||||
case kEncodingStrMap /*OBJ_ENCODING_HT*/:
|
||||
return 0; // TODO
|
||||
case kEncodingStrMap2: {
|
||||
StringSet* ss = (StringSet*)ptr;
|
||||
return ss->obj_malloc_used() + ss->set_malloc_used();
|
||||
}
|
||||
case kEncodingIntSet:
|
||||
return intsetBlobLen((intset*)ptr);
|
||||
}
|
||||
|
@ -258,6 +268,10 @@ size_t RobjWrapper::Size() const {
|
|||
dict* d = (dict*)inner_obj_;
|
||||
return dictSize(d);
|
||||
}
|
||||
case kEncodingStrMap2: {
|
||||
StringSet* ss = (StringSet*)inner_obj_;
|
||||
return ss->size();
|
||||
}
|
||||
default:
|
||||
LOG(FATAL) << "Unexpected encoding " << encoding_;
|
||||
}
|
||||
|
@ -608,6 +622,7 @@ void CompactObj::ImportRObj(robj* o) {
|
|||
if (o->encoding == OBJ_ENCODING_INTSET) {
|
||||
enc = kEncodingIntSet;
|
||||
} else {
|
||||
LOG(DFATAL) << "This can not be done via ImportRObj for sets";
|
||||
enc = kEncodingStrMap;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ namespace dfly {
|
|||
|
||||
constexpr unsigned kEncodingIntSet = 0;
|
||||
constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings
|
||||
constexpr unsigned kEncodingStrMap2 = 2;
|
||||
|
||||
namespace detail {
|
||||
|
||||
|
|
|
@ -0,0 +1,509 @@
|
|||
// Copyright 2022, Roman Gershman. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
|
||||
#include "core/string_set.h"
|
||||
|
||||
#include <absl/numeric/bits.h>
|
||||
#include <absl/strings/escaping.h>
|
||||
|
||||
#include "base/logging.h"
|
||||
#include "core/compact_object.h" // for hashcode
|
||||
|
||||
extern "C" {
|
||||
#include "redis/zmalloc.h"
|
||||
}
|
||||
|
||||
namespace dfly {
|
||||
using namespace std;
|
||||
|
||||
constexpr size_t kMinSizeShift = 2;
|
||||
constexpr size_t kMinSize = 1 << kMinSizeShift;
|
||||
constexpr size_t kAllowDisplacements = true;
|
||||
|
||||
inline bool CanSetFlat(int offs) {
|
||||
if (kAllowDisplacements)
|
||||
return offs < 2;
|
||||
return offs == 0;
|
||||
}
|
||||
|
||||
StringSet::StringSet(pmr::memory_resource* mr) : entries_(mr) {
|
||||
}
|
||||
|
||||
StringSet::~StringSet() {
|
||||
for (auto& entry : entries_) {
|
||||
if (entry.IsLink()) {
|
||||
LinkKey* lk = (LinkKey*)entry.get();
|
||||
while (lk) {
|
||||
sdsfree((sds)lk->ptr);
|
||||
SuperPtr next = lk->next;
|
||||
Free(lk);
|
||||
if (next.IsSds()) {
|
||||
sdsfree((sds)next.get());
|
||||
lk = nullptr;
|
||||
} else {
|
||||
DCHECK(next.IsLink());
|
||||
lk = (LinkKey*)next.get();
|
||||
}
|
||||
}
|
||||
} else if (!entry.IsEmpty()) {
|
||||
sdsfree((sds)entry.get());
|
||||
}
|
||||
}
|
||||
DCHECK_EQ(0u, num_chain_entries_);
|
||||
}
|
||||
|
||||
void StringSet::Reserve(size_t sz) {
|
||||
sz = std::min<size_t>(sz, kMinSize);
|
||||
|
||||
sz = absl::bit_ceil(sz);
|
||||
capacity_log_ = absl::bit_width(sz);
|
||||
entries_.reserve(sz);
|
||||
}
|
||||
|
||||
size_t StringSet::SuperPtr::SetString(std::string_view str) {
|
||||
sds sdsptr = sdsnewlen(str.data(), str.size());
|
||||
ptr = sdsptr;
|
||||
return zmalloc_usable_size(sdsAllocPtr(sdsptr));
|
||||
}
|
||||
|
||||
bool StringSet::SuperPtr::Compare(std::string_view str) const {
|
||||
if (IsEmpty())
|
||||
return false;
|
||||
|
||||
sds sp = GetSds();
|
||||
return str == string_view{sp, sdslen(sp)};
|
||||
}
|
||||
|
||||
bool StringSet::Add(std::string_view str) {
|
||||
DVLOG(1) << "Add " << absl::CHexEscape(str);
|
||||
|
||||
uint64_t hc = CompactObj::HashCode(str);
|
||||
|
||||
if (entries_.empty()) {
|
||||
capacity_log_ = kMinSizeShift;
|
||||
entries_.resize(kMinSize);
|
||||
auto& e = entries_[BucketId(hc)];
|
||||
obj_malloc_used_ += e.SetString(str);
|
||||
++size_;
|
||||
++num_used_buckets_;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
uint32_t bucket_id = BucketId(hc);
|
||||
if (FindAround(str, bucket_id) < 2)
|
||||
return false;
|
||||
|
||||
DCHECK_LT(bucket_id, entries_.size());
|
||||
++size_;
|
||||
|
||||
// Try insert into flat surface first. Also handle the grow case
|
||||
// if utilization is too high.
|
||||
for (unsigned j = 0; j < 2; ++j) {
|
||||
int offs = FindEmptyAround(bucket_id);
|
||||
if (CanSetFlat(offs)) {
|
||||
auto& entry = entries_[bucket_id + offs];
|
||||
obj_malloc_used_ += entry.SetString(str);
|
||||
if (offs != 0) {
|
||||
entry.SetDisplaced();
|
||||
}
|
||||
++num_used_buckets_;
|
||||
return true;
|
||||
}
|
||||
|
||||
if (size_ < entries_.size())
|
||||
break;
|
||||
|
||||
Grow();
|
||||
bucket_id = BucketId(hc);
|
||||
}
|
||||
|
||||
auto& dest = entries_[bucket_id];
|
||||
DCHECK(!dest.IsEmpty());
|
||||
if (dest.IsDisplaced()) {
|
||||
sds sptr = dest.GetSds();
|
||||
uint32_t nbid = BucketId(sptr);
|
||||
Link(SuperPtr{sptr}, nbid);
|
||||
|
||||
if (dest.IsSds()) {
|
||||
obj_malloc_used_ += dest.SetString(str);
|
||||
} else {
|
||||
LinkKey* lk = (LinkKey*)dest.get();
|
||||
obj_malloc_used_ += lk->SetString(str);
|
||||
dest.ClearDisplaced();
|
||||
}
|
||||
} else {
|
||||
LinkKey* lk = NewLink(str, dest);
|
||||
dest.SetLink(lk);
|
||||
}
|
||||
DCHECK(!dest.IsDisplaced());
|
||||
return true;
|
||||
}
|
||||
|
||||
unsigned StringSet::BucketDepth(uint32_t bid) const {
|
||||
SuperPtr ptr = entries_[bid];
|
||||
if (ptr.IsEmpty()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
unsigned res = 1;
|
||||
while (ptr.IsLink()) {
|
||||
LinkKey* lk = (LinkKey*)ptr.get();
|
||||
++res;
|
||||
ptr = lk->next;
|
||||
DCHECK(!ptr.IsEmpty());
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
auto StringSet::NewLink(std::string_view str, SuperPtr ptr) -> LinkKey* {
|
||||
LinkAllocator ea(mr());
|
||||
LinkKey* lk = ea.allocate(1);
|
||||
ea.construct(lk);
|
||||
obj_malloc_used_ += lk->SetString(str);
|
||||
lk->next = ptr;
|
||||
++num_chain_entries_;
|
||||
|
||||
return lk;
|
||||
}
|
||||
|
||||
#if 0
|
||||
void StringSet::IterateOverBucket(uint32_t bid, const ItemCb& cb) {
|
||||
const Entry& e = entries_[bid];
|
||||
if (e.IsEmpty()) {
|
||||
DCHECK(!e.next);
|
||||
return;
|
||||
}
|
||||
cb(e.value);
|
||||
|
||||
const Entry* next = e.next;
|
||||
while (next) {
|
||||
cb(next->value);
|
||||
next = next->next;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
inline bool cmpsds(sds sp, string_view str) {
|
||||
if (sdslen(sp) != str.size())
|
||||
return false;
|
||||
return str.empty() || memcmp(sp, str.data(), str.size()) == 0;
|
||||
}
|
||||
|
||||
int StringSet::FindAround(string_view str, uint32_t bid) const {
|
||||
SuperPtr ptr = entries_[bid];
|
||||
|
||||
while (ptr.IsLink()) {
|
||||
LinkKey* lk = (LinkKey*)ptr.get();
|
||||
sds sp = (sds)lk->get();
|
||||
if (cmpsds(sp, str))
|
||||
return 0;
|
||||
ptr = lk->next;
|
||||
DCHECK(!ptr.IsEmpty());
|
||||
}
|
||||
|
||||
if (!ptr.IsEmpty()) {
|
||||
DCHECK(ptr.IsSds());
|
||||
sds sp = (sds)ptr.get();
|
||||
if (cmpsds(sp, str))
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (bid && entries_[bid - 1].Compare(str)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (bid + 1 < entries_.size() && entries_[bid + 1].Compare(str)) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 2;
|
||||
}
|
||||
|
||||
void StringSet::Grow() {
|
||||
size_t prev_sz = entries_.size();
|
||||
entries_.resize(prev_sz * 2);
|
||||
++capacity_log_;
|
||||
|
||||
for (int i = prev_sz - 1; i >= 0; --i) {
|
||||
SuperPtr* current = &entries_[i];
|
||||
if (current->IsEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SuperPtr* prev = nullptr;
|
||||
while (true) {
|
||||
SuperPtr next;
|
||||
LinkKey* lk = nullptr;
|
||||
sds sp;
|
||||
|
||||
if (current->IsLink()) {
|
||||
lk = (LinkKey*)current->get();
|
||||
sp = (sds)lk->get();
|
||||
next = lk->next;
|
||||
} else {
|
||||
sp = (sds)current->get();
|
||||
}
|
||||
|
||||
uint32_t bid = BucketId(sp);
|
||||
if (bid != uint32_t(i)) {
|
||||
int offs = FindEmptyAround(bid);
|
||||
if (CanSetFlat(offs)) {
|
||||
auto& dest = entries_[bid + offs];
|
||||
DCHECK(!dest.IsLink());
|
||||
|
||||
dest.ptr = sp;
|
||||
if (offs != 0)
|
||||
dest.SetDisplaced();
|
||||
if (lk) {
|
||||
Free(lk);
|
||||
}
|
||||
++num_used_buckets_;
|
||||
} else {
|
||||
Link(*current, bid);
|
||||
}
|
||||
*current = next;
|
||||
} else {
|
||||
current->ClearDisplaced();
|
||||
if (lk) {
|
||||
prev = current;
|
||||
current = &lk->next;
|
||||
}
|
||||
}
|
||||
if (next.IsEmpty())
|
||||
break;
|
||||
}
|
||||
|
||||
if (prev) {
|
||||
DCHECK(prev->IsLink());
|
||||
LinkKey* lk = (LinkKey*)prev->get();
|
||||
if (lk->next.IsEmpty()) {
|
||||
bool is_displaced = prev->IsDisplaced();
|
||||
prev->ptr = lk->get();
|
||||
if (is_displaced) {
|
||||
prev->SetDisplaced();
|
||||
}
|
||||
Free(lk);
|
||||
}
|
||||
}
|
||||
|
||||
if (entries_[i].IsEmpty()) {
|
||||
--num_used_buckets_;
|
||||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
unsigned cnt = 0;
|
||||
for (auto ptr : entries_) {
|
||||
cnt += (!ptr.IsEmpty());
|
||||
}
|
||||
DCHECK_EQ(num_used_buckets_, cnt);
|
||||
#endif
|
||||
}
|
||||
|
||||
void StringSet::Link(SuperPtr ptr, uint32_t bid) {
|
||||
SuperPtr& root = entries_[bid];
|
||||
DCHECK(!root.IsEmpty());
|
||||
|
||||
bool is_root_displaced = root.IsDisplaced();
|
||||
|
||||
if (is_root_displaced) {
|
||||
DCHECK_NE(bid, BucketId(root.GetSds()));
|
||||
}
|
||||
LinkKey* head;
|
||||
void* val;
|
||||
|
||||
if (ptr.IsSds()) {
|
||||
if (is_root_displaced) {
|
||||
// in that case it's better to put ptr into root and move root data into its correct place.
|
||||
sds val;
|
||||
if (root.IsSds()) {
|
||||
val = (sds)root.get();
|
||||
root.ptr = ptr.get();
|
||||
} else {
|
||||
LinkKey* lk = (LinkKey*)root.get();
|
||||
val = (sds)lk->get();
|
||||
lk->ptr = ptr.get();
|
||||
root.ClearDisplaced();
|
||||
}
|
||||
uint32_t nbid = BucketId(val);
|
||||
DCHECK_NE(nbid, bid);
|
||||
|
||||
Link(SuperPtr{val}, nbid); // Potentially unbounded wave of updates.
|
||||
return;
|
||||
}
|
||||
|
||||
LinkAllocator ea(mr());
|
||||
head = ea.allocate(1);
|
||||
ea.construct(head);
|
||||
val = ptr.get();
|
||||
++num_chain_entries_;
|
||||
} else {
|
||||
head = (LinkKey*)ptr.get();
|
||||
val = head->get();
|
||||
}
|
||||
|
||||
if (root.IsSds()) {
|
||||
head->ptr = root.get();
|
||||
head->next = SuperPtr{val};
|
||||
root.SetLink(head);
|
||||
if (is_root_displaced) {
|
||||
DCHECK_NE(bid, BucketId((sds)head->ptr));
|
||||
root.SetDisplaced();
|
||||
}
|
||||
} else {
|
||||
DCHECK(root.IsLink());
|
||||
LinkKey* chain = (LinkKey*)root.get();
|
||||
head->next = chain->next;
|
||||
head->ptr = val;
|
||||
chain->next.SetLink(head);
|
||||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
void StringSet::MoveEntry(Entry* e, uint32_t bid) {
|
||||
auto& dest = entries_[bid];
|
||||
if (IsEmpty(dest)) {
|
||||
dest.value = std::move(e->value);
|
||||
Free(e);
|
||||
return;
|
||||
}
|
||||
e->next = dest.next;
|
||||
dest.next = e;
|
||||
}
|
||||
#endif
|
||||
|
||||
int StringSet::FindEmptyAround(uint32_t bid) const {
|
||||
if (entries_[bid].IsEmpty())
|
||||
return 0;
|
||||
|
||||
if (bid + 1 < entries_.size() && entries_[bid + 1].IsEmpty())
|
||||
return 1;
|
||||
|
||||
if (bid && entries_[bid - 1].IsEmpty())
|
||||
return -1;
|
||||
|
||||
return 2;
|
||||
}
|
||||
|
||||
uint32_t StringSet::BucketId(sds ptr) const {
|
||||
string_view sv{ptr, sdslen(ptr)};
|
||||
return BucketId(CompactObj::HashCode(sv));
|
||||
}
|
||||
|
||||
#if 0
|
||||
uint32_t StringSet::Scan(uint32_t cursor, const ItemCb& cb) const {
|
||||
if (capacity_log_ == 0)
|
||||
return 0;
|
||||
|
||||
uint32_t bucket_id = cursor >> (32 - capacity_log_);
|
||||
const_iterator it(this, bucket_id);
|
||||
|
||||
if (it.entry_ == nullptr)
|
||||
return 0;
|
||||
|
||||
bucket_id = it.bucket_id_; // non-empty bucket
|
||||
do {
|
||||
cb(*it);
|
||||
++it;
|
||||
} while (it.bucket_id_ == bucket_id);
|
||||
|
||||
if (it.entry_ == nullptr)
|
||||
return 0;
|
||||
|
||||
if (it.bucket_id_ == bucket_id + 1) { // cover displacement case
|
||||
// TODO: we could avoid checking computing HC if we explicitly mark displacement.
|
||||
// we have plenty-metadata to do so.
|
||||
uint32_t bid = BucketId((*it).HashCode());
|
||||
if (bid == it.bucket_id_) {
|
||||
cb(*it);
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
return it.entry_ ? it.bucket_id_ << (32 - capacity_log_) : 0;
|
||||
}
|
||||
|
||||
bool StringSet::Erase(std::string_view val) {
|
||||
uint64_t hc = CompactObj::HashCode(val);
|
||||
uint32_t bid = BucketId(hc);
|
||||
|
||||
Entry* current = &entries_[bid];
|
||||
|
||||
if (!current->IsEmpty()) {
|
||||
if (current->value == val) {
|
||||
current->Reset();
|
||||
ShiftLeftIfNeeded(current);
|
||||
--size_;
|
||||
return true;
|
||||
}
|
||||
|
||||
Entry* prev = current;
|
||||
current = current->next;
|
||||
while (current) {
|
||||
if (current->value == val) {
|
||||
current->Reset();
|
||||
prev->next = current->next;
|
||||
Free(current);
|
||||
--size_;
|
||||
return true;
|
||||
}
|
||||
prev = current;
|
||||
current = current->next;
|
||||
}
|
||||
}
|
||||
|
||||
auto& prev = entries_[bid - 1];
|
||||
// TODO: to mark displacement.
|
||||
if (bid && !prev.IsEmpty()) {
|
||||
if (prev.value == val) {
|
||||
obj_malloc_used_ -= prev.value.MallocUsed();
|
||||
|
||||
prev.Reset();
|
||||
ShiftLeftIfNeeded(&prev);
|
||||
--size_;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
auto& next = entries_[bid + 1];
|
||||
if (bid + 1 < entries_.size()) {
|
||||
if (next.value == val) {
|
||||
obj_malloc_used_ -= next.value.MallocUsed();
|
||||
next.Reset();
|
||||
ShiftLeftIfNeeded(&next);
|
||||
--size_;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
void StringSet::iterator::SeekNonEmpty() {
|
||||
while (bucket_id_ < owner_->entries_.size()) {
|
||||
if (!owner_->entries_[bucket_id_].IsEmpty()) {
|
||||
entry_ = &owner_->entries_[bucket_id_];
|
||||
return;
|
||||
}
|
||||
++bucket_id_;
|
||||
}
|
||||
entry_ = nullptr;
|
||||
}
|
||||
|
||||
void StringSet::const_iterator::SeekNonEmpty() {
|
||||
while (bucket_id_ < owner_->entries_.size()) {
|
||||
if (!owner_->entries_[bucket_id_].IsEmpty()) {
|
||||
entry_ = &owner_->entries_[bucket_id_];
|
||||
return;
|
||||
}
|
||||
++bucket_id_;
|
||||
}
|
||||
entry_ = nullptr;
|
||||
}
|
||||
|
||||
} // namespace dfly
|
|
@ -0,0 +1,332 @@
|
|||
// Copyright 2022, Roman Gershman. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
#pragma once
|
||||
|
||||
#include <memory_resource>
|
||||
|
||||
extern "C" {
|
||||
#include "redis/object.h"
|
||||
}
|
||||
|
||||
namespace dfly {
|
||||
|
||||
// StringSet is a nice but over-optimized data-structure. Probably is not worth it in the first
|
||||
// place but sometimes the OCD kicks in and one can not resist.
|
||||
// The advantage of it over redis-dict is smaller meta-data waste.
|
||||
// dictEntry is 24 bytes, i.e it uses at least 32N bytes where N is the expected length.
|
||||
// dict requires to allocate dictEntry per each addition in addition to the supplied key.
|
||||
// It also wastes space in case of a set because it stores a value pointer inside dictEntry.
|
||||
// To summarize:
|
||||
// 100% utilized dict uses N*24 + N*8 = 32N bytes not including the key space.
|
||||
// for 75% utilization (1/0.75 buckets): N*1.33*8 + N*24 = 35N
|
||||
//
|
||||
// This class uses 8 bytes per bucket (similarly to dictEntry*) but it used it for both
|
||||
// links and keys. For most cases, we remove the need for another redirection layer
|
||||
// and just store the key, so no "dictEntry" allocations occur.
|
||||
// For those cells that require chaining, the bucket is
|
||||
// changed in run-time to represent a linked chain.
|
||||
// Additional feature - in order to to reduce collisions, we insert items into
|
||||
// neighbour cells but only if they are empty (not chains). This way we reduce the number of
|
||||
// empty (unused) spaces at full utilization from 36% to ~21%.
|
||||
// 100% utilized table requires: N*8 + 0.2N*16 = 11.2N bytes or ~20 bytes savings.
|
||||
// 75% utilization: N*1.33*8 + 0.12N*16 = 13N or ~22 bytes savings per record.
|
||||
// TODO: to separate hash/compare functions from table logic and make it generic
|
||||
// with potential replacements of hset/zset data structures.
|
||||
// static_assert(sizeof(dictEntry) == 24);
|
||||
|
||||
class StringSet {
|
||||
struct LinkKey;
|
||||
// we can assume that high 12 bits of user address space
|
||||
// can be used for tagging. At most 52 bits of address are reserved for
|
||||
// some configurations, and usually it's 48 bits.
|
||||
// https://www.kernel.org/doc/html/latest/arm64/memory.html
|
||||
static constexpr size_t kLinkBit = 1ULL << 52;
|
||||
static constexpr size_t kDisplaceBit = 1ULL << 53;
|
||||
static constexpr size_t kTagMask = 4095ULL << 51; // we reserve 12 high bits.
|
||||
|
||||
struct SuperPtr {
|
||||
void* ptr = nullptr; //
|
||||
|
||||
explicit SuperPtr(void* p = nullptr) : ptr(p) {
|
||||
}
|
||||
|
||||
bool IsSds() const {
|
||||
return (uintptr_t(ptr) & kLinkBit) == 0;
|
||||
}
|
||||
|
||||
bool IsLink() const {
|
||||
return (uintptr_t(ptr) & kLinkBit) == kLinkBit;
|
||||
}
|
||||
|
||||
bool IsEmpty() const {
|
||||
return ptr == nullptr;
|
||||
}
|
||||
|
||||
void* get() const {
|
||||
return (void*)(uintptr_t(ptr) & ~kTagMask);
|
||||
}
|
||||
|
||||
bool IsDisplaced() const {
|
||||
return (uintptr_t(ptr) & kDisplaceBit) == kDisplaceBit;
|
||||
}
|
||||
|
||||
// returns usable size.
|
||||
size_t SetString(std::string_view str);
|
||||
|
||||
void SetLink(LinkKey* lk) {
|
||||
ptr = (void*)(uintptr_t(lk) | kLinkBit);
|
||||
}
|
||||
|
||||
bool Compare(std::string_view str) const;
|
||||
|
||||
void SetDisplaced() {
|
||||
ptr = (void*)(uintptr_t(ptr) | kDisplaceBit);
|
||||
}
|
||||
|
||||
void ClearDisplaced() {
|
||||
ptr = (void*)(uintptr_t(ptr) & ~kDisplaceBit);
|
||||
}
|
||||
|
||||
void Reset() {
|
||||
ptr = nullptr;
|
||||
}
|
||||
|
||||
sds GetSds() const {
|
||||
if (IsSds())
|
||||
return (sds)get();
|
||||
LinkKey* lk = (LinkKey*)get();
|
||||
return (sds)lk->get();
|
||||
}
|
||||
};
|
||||
|
||||
struct LinkKey : public SuperPtr {
|
||||
SuperPtr next; // could be LinkKey* or sds.
|
||||
};
|
||||
|
||||
static_assert(sizeof(SuperPtr) == 8);
|
||||
|
||||
public:
|
||||
class iterator;
|
||||
class const_iterator;
|
||||
// using ItemCb = std::function<void(const CompactObj& co)>;
|
||||
|
||||
StringSet(const StringSet&) = delete;
|
||||
|
||||
explicit StringSet(std::pmr::memory_resource* mr = std::pmr::get_default_resource());
|
||||
~StringSet();
|
||||
|
||||
StringSet& operator=(StringSet&) = delete;
|
||||
|
||||
void Reserve(size_t sz);
|
||||
|
||||
bool Add(std::string_view str);
|
||||
|
||||
bool Remove(std::string_view str);
|
||||
|
||||
void Erase(iterator it);
|
||||
|
||||
size_t size() const {
|
||||
return size_;
|
||||
}
|
||||
|
||||
bool empty() const {
|
||||
return size_ == 0;
|
||||
}
|
||||
|
||||
size_t bucket_count() const {
|
||||
return entries_.size();
|
||||
}
|
||||
|
||||
// those that are chained to the entries stored inline in the bucket array.
|
||||
size_t num_chain_entries() const {
|
||||
return num_chain_entries_;
|
||||
}
|
||||
|
||||
size_t num_used_buckets() const {
|
||||
return num_used_buckets_;
|
||||
}
|
||||
|
||||
bool Contains(std::string_view val) const;
|
||||
|
||||
bool Erase(std::string_view val);
|
||||
|
||||
iterator begin() {
|
||||
return iterator{this, 0};
|
||||
}
|
||||
|
||||
iterator end() {
|
||||
return iterator{};
|
||||
}
|
||||
|
||||
size_t obj_malloc_used() const {
|
||||
return obj_malloc_used_;
|
||||
}
|
||||
|
||||
size_t set_malloc_used() const {
|
||||
return (num_chain_entries_ + entries_.capacity()) * sizeof(SuperPtr);
|
||||
}
|
||||
|
||||
/// stable scanning api. has the same guarantees as redis scan command.
|
||||
/// we avoid doing bit-reverse by using a different function to derive a bucket id
|
||||
/// from hash values. By using msb part of hash we make it "stable" with respect to
|
||||
/// rehashes. For example, with table log size 4 (size 16), entries in bucket id
|
||||
/// 1110 come from hashes 1110XXXXX.... When a table grows to log size 5,
|
||||
/// these entries can move either to 11100 or 11101. So if we traversed with our cursor
|
||||
/// range [0000-1110], it's guaranteed that in grown table we do not need to cover again
|
||||
/// [00000-11100]. Similarly with shrinkage, if a table is shrinked to log size 3,
|
||||
/// keys from 1110 and 1111 will move to bucket 111. Again, it's guaranteed that we
|
||||
/// covered the range [000-111] (all keys in that case).
|
||||
/// Returns: next cursor or 0 if reached the end of scan.
|
||||
/// cursor = 0 - initiates a new scan.
|
||||
// uint32_t Scan(uint32_t cursor, const ItemCb& cb) const;
|
||||
|
||||
unsigned BucketDepth(uint32_t bid) const;
|
||||
|
||||
// void IterateOverBucket(uint32_t bid, const ItemCb& cb);
|
||||
|
||||
class iterator {
|
||||
friend class StringSet;
|
||||
|
||||
public:
|
||||
iterator() : owner_(nullptr), entry_(nullptr), bucket_id_(0) {
|
||||
}
|
||||
|
||||
iterator& operator++();
|
||||
|
||||
bool operator==(const iterator& o) const {
|
||||
return entry_ == o.entry_;
|
||||
}
|
||||
|
||||
bool operator!=(const iterator& o) const {
|
||||
return !(*this == o);
|
||||
}
|
||||
|
||||
private:
|
||||
iterator(StringSet* owner, uint32_t bid) : owner_(owner), bucket_id_(bid) {
|
||||
SeekNonEmpty();
|
||||
}
|
||||
|
||||
void SeekNonEmpty();
|
||||
|
||||
StringSet* owner_ = nullptr;
|
||||
SuperPtr* entry_ = nullptr;
|
||||
uint32_t bucket_id_ = 0;
|
||||
};
|
||||
|
||||
class const_iterator {
|
||||
friend class StringSet;
|
||||
|
||||
public:
|
||||
const_iterator() : owner_(nullptr), entry_(nullptr), bucket_id_(0) {
|
||||
}
|
||||
|
||||
const_iterator& operator++();
|
||||
|
||||
const_iterator& operator=(iterator& it) {
|
||||
owner_ = it.owner_;
|
||||
entry_ = it.entry_;
|
||||
bucket_id_ = it.bucket_id_;
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
bool operator==(const const_iterator& o) const {
|
||||
return entry_ == o.entry_;
|
||||
}
|
||||
|
||||
bool operator!=(const const_iterator& o) const {
|
||||
return !(*this == o);
|
||||
}
|
||||
|
||||
private:
|
||||
const_iterator(const StringSet* owner, uint32_t bid) : owner_(owner), bucket_id_(bid) {
|
||||
SeekNonEmpty();
|
||||
}
|
||||
|
||||
void SeekNonEmpty();
|
||||
|
||||
const StringSet* owner_ = nullptr;
|
||||
const SuperPtr* entry_ = nullptr;
|
||||
uint32_t bucket_id_ = 0;
|
||||
};
|
||||
|
||||
private:
|
||||
friend class iterator;
|
||||
|
||||
using LinkAllocator = std::pmr::polymorphic_allocator<LinkKey>;
|
||||
|
||||
std::pmr::memory_resource* mr() {
|
||||
return entries_.get_allocator().resource();
|
||||
}
|
||||
|
||||
uint32_t BucketId(uint64_t hash) const {
|
||||
return hash >> (64 - capacity_log_);
|
||||
}
|
||||
|
||||
uint32_t BucketId(sds ptr) const;
|
||||
|
||||
// Returns: 2 if no empty spaces found around the bucket. 0, -1, 1 - offset towards
|
||||
// an empty bucket.
|
||||
int FindEmptyAround(uint32_t bid) const;
|
||||
|
||||
// returns 2 if no object was found in the vicinity.
|
||||
// Returns relative offset to bid: 0, -1, 1 if found.
|
||||
int FindAround(std::string_view str, uint32_t bid) const;
|
||||
|
||||
void Grow();
|
||||
|
||||
void Link(SuperPtr ptr, uint32_t bid);
|
||||
/*void MoveEntry(Entry* e, uint32_t bid);
|
||||
|
||||
void ShiftLeftIfNeeded(Entry* root) {
|
||||
if (root->next) {
|
||||
root->value = std::move(root->next->value);
|
||||
Entry* tmp = root->next;
|
||||
root->next = root->next->next;
|
||||
Free(tmp);
|
||||
}
|
||||
}
|
||||
*/
|
||||
void Free(LinkKey* lk) {
|
||||
mr()->deallocate(lk, sizeof(LinkKey), alignof(LinkKey));
|
||||
--num_chain_entries_;
|
||||
}
|
||||
|
||||
LinkKey* NewLink(std::string_view str, SuperPtr ptr);
|
||||
|
||||
// The rule is - entries can be moved to vicinity as long as they are stored
|
||||
// "flat", i.e. not into the linked list. The linked list
|
||||
std::pmr::vector<SuperPtr> entries_;
|
||||
size_t obj_malloc_used_ = 0;
|
||||
uint32_t size_ = 0;
|
||||
uint32_t num_chain_entries_ = 0;
|
||||
uint32_t num_used_buckets_ = 0;
|
||||
unsigned capacity_log_ = 0;
|
||||
};
|
||||
|
||||
#if 0
|
||||
inline StringSet::iterator& StringSet::iterator::operator++() {
|
||||
if (entry_->next) {
|
||||
entry_ = entry_->next;
|
||||
} else {
|
||||
++bucket_id_;
|
||||
SeekNonEmpty();
|
||||
}
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
inline StringSet::const_iterator& StringSet::const_iterator::operator++() {
|
||||
if (entry_->next) {
|
||||
entry_ = entry_->next;
|
||||
} else {
|
||||
++bucket_id_;
|
||||
SeekNonEmpty();
|
||||
}
|
||||
|
||||
return *this;
|
||||
}
|
||||
#endif
|
||||
|
||||
} // namespace dfly
|
|
@ -0,0 +1,131 @@
|
|||
// Copyright 2022, Roman Gershman. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
|
||||
#include "core/string_set.h"
|
||||
|
||||
#include <absl/strings/str_cat.h>
|
||||
#include <gmock/gmock.h>
|
||||
#include <mimalloc.h>
|
||||
|
||||
#include <unordered_set>
|
||||
|
||||
#include "base/gtest.h"
|
||||
#include "base/logging.h"
|
||||
#include "core/mi_memory_resource.h"
|
||||
|
||||
extern "C" {
|
||||
#include "redis/zmalloc.h"
|
||||
}
|
||||
|
||||
namespace dfly {
|
||||
|
||||
using namespace std;
|
||||
|
||||
class StringSetTest : public ::testing::Test {
|
||||
protected:
|
||||
static void SetUpTestSuite() {
|
||||
auto* tlh = mi_heap_get_backing();
|
||||
init_zmalloc_threadlocal(tlh);
|
||||
// SmallString::InitThreadLocal(tlh);
|
||||
|
||||
// static MiMemoryResource mi_resource(tlh);
|
||||
// needed for MallocUsed
|
||||
// CompactObj::InitThreadLocal(&mi_resource);
|
||||
}
|
||||
|
||||
static void TearDownTestSuite() {
|
||||
}
|
||||
|
||||
StringSet ss_;
|
||||
};
|
||||
|
||||
TEST_F(StringSetTest, Basic) {
|
||||
EXPECT_TRUE(ss_.Add("foo"));
|
||||
EXPECT_TRUE(ss_.Add("bar"));
|
||||
EXPECT_FALSE(ss_.Add("foo"));
|
||||
EXPECT_FALSE(ss_.Add("bar"));
|
||||
EXPECT_EQ(2, ss_.size());
|
||||
}
|
||||
|
||||
TEST_F(StringSetTest, Ex1) {
|
||||
EXPECT_TRUE(ss_.Add("AA@@@@@@@@@@@@@@"));
|
||||
EXPECT_TRUE(ss_.Add("AAA@@@@@@@@@@@@@"));
|
||||
EXPECT_TRUE(ss_.Add("AAAAAAAAA@@@@@@@"));
|
||||
EXPECT_TRUE(ss_.Add("AAAAAAAAAA@@@@@@"));
|
||||
EXPECT_TRUE(ss_.Add("AAAAAAAAAAAAAAA@"));
|
||||
EXPECT_TRUE(ss_.Add("BBBBBAAAAAAAAAAA"));
|
||||
EXPECT_TRUE(ss_.Add("BBBBBBBBAAAAAAAA"));
|
||||
EXPECT_TRUE(ss_.Add("CCCCCBBBBBBBBBBB"));
|
||||
}
|
||||
|
||||
TEST_F(StringSetTest, Many) {
|
||||
double max_chain_factor = 0;
|
||||
for (unsigned i = 0; i < 8192; ++i) {
|
||||
EXPECT_TRUE(ss_.Add(absl::StrCat("xxxxxxxxxxxxxxxxx", i)));
|
||||
size_t sz = ss_.size();
|
||||
bool should_print = (sz == ss_.bucket_count()) || (sz == ss_.bucket_count() * 0.75);
|
||||
if (should_print) {
|
||||
double chain_usage = double(ss_.num_chain_entries()) / ss_.size();
|
||||
unsigned num_empty = ss_.bucket_count() - ss_.num_used_buckets();
|
||||
double empty_factor = double(num_empty) / ss_.bucket_count();
|
||||
|
||||
LOG(INFO) << "chains: " << 100 * chain_usage << ", empty: " << 100 * empty_factor << "% at "
|
||||
<< ss_.size();
|
||||
#if 0
|
||||
if (ss_.size() == 15) {
|
||||
for (unsigned i = 0; i < ss_.bucket_count(); ++i) {
|
||||
LOG(INFO) << "[" << i << "]: " << ss_.BucketDepth(i);
|
||||
}
|
||||
/*ss_.IterateOverBucket(93, [this](const CompactObj& co) {
|
||||
LOG(INFO) << "93->" << (co.HashCode() % ss_.bucket_count());
|
||||
});*/
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}
|
||||
EXPECT_EQ(8192, ss_.size());
|
||||
|
||||
LOG(INFO) << "max chain factor: " << 100 * max_chain_factor << "%";
|
||||
/*size_t iter_len = 0;
|
||||
for (auto it = ss_.begin(); it != ss_.end(); ++it) {
|
||||
++iter_len;
|
||||
}
|
||||
EXPECT_EQ(iter_len, 512);*/
|
||||
}
|
||||
|
||||
#if 0
|
||||
TEST_F(StringSetTest, IterScan) {
|
||||
unordered_set<string> actual, expected;
|
||||
auto insert_actual = [&](const CompactObj& val) {
|
||||
string tmp;
|
||||
val.GetString(&tmp);
|
||||
actual.insert(tmp);
|
||||
};
|
||||
|
||||
EXPECT_EQ(0, ss_.Scan(0, insert_actual));
|
||||
EXPECT_TRUE(actual.empty());
|
||||
|
||||
for (unsigned i = 0; i < 512; ++i) {
|
||||
string s = absl::StrCat("x", i);
|
||||
expected.insert(s);
|
||||
EXPECT_TRUE(ss_.Add(s));
|
||||
}
|
||||
|
||||
|
||||
for (CompactObj& val : ss_) {
|
||||
insert_actual(val);
|
||||
}
|
||||
|
||||
EXPECT_EQ(actual, expected);
|
||||
actual.clear();
|
||||
uint32_t cursor = 0;
|
||||
do {
|
||||
cursor = ss_.Scan(cursor, insert_actual);
|
||||
} while (cursor);
|
||||
EXPECT_EQ(actual, expected);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
} // namespace dfly
|
|
@ -11,17 +11,22 @@ extern "C" {
|
|||
#include "redis/util.h"
|
||||
}
|
||||
|
||||
#include "base/flags.h"
|
||||
#include "base/logging.h"
|
||||
#include "base/stl_util.h"
|
||||
#include "core/string_set.h"
|
||||
#include "server/command_registry.h"
|
||||
#include "server/conn_context.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "server/error.h"
|
||||
#include "server/transaction.h"
|
||||
|
||||
ABSL_FLAG(bool, use_set2, false, "If true use better implementation for sets");
|
||||
|
||||
namespace dfly {
|
||||
|
||||
using namespace std;
|
||||
using absl::GetFlag;
|
||||
|
||||
using ResultStringVec = vector<OpResult<vector<string>>>;
|
||||
using ResultSetView = OpResult<absl::flat_hash_set<std::string_view>>;
|
||||
|
@ -52,24 +57,103 @@ intset* IntsetAddSafe(string_view val, intset* is, bool* success, bool* added) {
|
|||
return is;
|
||||
}
|
||||
|
||||
bool dictContains(const dict* d, string_view key) {
|
||||
uint64_t h = dictGenHashFunction(key.data(), key.size());
|
||||
|
||||
for (unsigned table = 0; table <= 1; table++) {
|
||||
uint64_t idx = h & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
|
||||
dictEntry* he = d->ht_table[table][idx];
|
||||
while (he) {
|
||||
sds dkey = (sds)he->key;
|
||||
if (sdslen(dkey) == key.size() && (key.empty() || memcmp(dkey, key.data(), key.size()) == 0))
|
||||
return true;
|
||||
he = he->next;
|
||||
}
|
||||
if (!dictIsRehashing(d))
|
||||
break;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
pair<unsigned, bool> RemoveStrSet(ArgSlice vals, CompactObj* set) {
|
||||
unsigned removed = 0;
|
||||
bool isempty = false;
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
StringSet* ss = (StringSet*)set->RObjPtr();
|
||||
for (auto member : vals) {
|
||||
// removed += ss->Erase(member);
|
||||
}
|
||||
isempty = ss->empty();
|
||||
} else {
|
||||
dict* d = (dict*)set->RObjPtr();
|
||||
auto* shard = EngineShard::tlocal();
|
||||
|
||||
for (auto member : vals) {
|
||||
shard->tmp_str1 = sdscpylen(shard->tmp_str1, member.data(), member.size());
|
||||
int result = dictDelete(d, shard->tmp_str1);
|
||||
removed += (result == DICT_OK);
|
||||
}
|
||||
isempty = (dictSize(d) == 0);
|
||||
}
|
||||
return make_pair(removed, isempty);
|
||||
}
|
||||
|
||||
unsigned AddStrSet(ArgSlice vals, CompactObj* dest) {
|
||||
unsigned res = 0;
|
||||
|
||||
dict* ds = (dict*)dest->RObjPtr();
|
||||
auto* es = EngineShard::tlocal();
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
StringSet* ss = (StringSet*)dest->RObjPtr();
|
||||
for (auto member : vals) {
|
||||
res += ss->Add(member);
|
||||
}
|
||||
} else {
|
||||
dict* ds = (dict*)dest->RObjPtr();
|
||||
auto* es = EngineShard::tlocal();
|
||||
|
||||
for (auto member : vals) {
|
||||
es->tmp_str1 = sdscpylen(es->tmp_str1, member.data(), member.size());
|
||||
dictEntry* de = dictAddRaw(ds, es->tmp_str1, NULL);
|
||||
if (de) {
|
||||
de->key = sdsdup(es->tmp_str1);
|
||||
++res;
|
||||
for (auto member : vals) {
|
||||
es->tmp_str1 = sdscpylen(es->tmp_str1, member.data(), member.size());
|
||||
dictEntry* de = dictAddRaw(ds, es->tmp_str1, NULL);
|
||||
if (de) {
|
||||
de->key = sdsdup(es->tmp_str1);
|
||||
++res;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
void InitStrSet(CompactObj* set) {
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
StringSet* ss = new StringSet{CompactObj::memory_resource()};
|
||||
set->InitRobj(OBJ_SET, kEncodingStrMap2, ss);
|
||||
} else {
|
||||
dict* ds = dictCreate(&setDictType);
|
||||
set->InitRobj(OBJ_SET, kEncodingStrMap, ds);
|
||||
}
|
||||
}
|
||||
|
||||
// f receives a str object.
|
||||
template <typename F> void FillFromStrSet(F&& f, void* ptr) {
|
||||
string str;
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
/*for (const CompactObj& co : *(StringSet*)ptr) {
|
||||
co.GetString(&str);
|
||||
f(move(str));
|
||||
}*/
|
||||
} else {
|
||||
dict* ds = (dict*)ptr;
|
||||
|
||||
dictIterator* di = dictGetIterator(ds);
|
||||
dictEntry* de = nullptr;
|
||||
while ((de = dictNext(di))) {
|
||||
str.assign((sds)de->key, sdslen((sds)de->key));
|
||||
f(move(str));
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
}
|
||||
}
|
||||
|
||||
// returns (removed, isempty)
|
||||
pair<unsigned, bool> RemoveSet(ArgSlice vals, CompactObj* set) {
|
||||
bool isempty = false;
|
||||
|
@ -91,14 +175,7 @@ pair<unsigned, bool> RemoveSet(ArgSlice vals, CompactObj* set) {
|
|||
isempty = (intsetLen(is) == 0);
|
||||
set->SetRObjPtr(is);
|
||||
} else {
|
||||
dict* d = (dict*)set->RObjPtr();
|
||||
auto* shard = EngineShard::tlocal();
|
||||
for (auto member : vals) {
|
||||
shard->tmp_str1 = sdscpylen(shard->tmp_str1, member.data(), member.size());
|
||||
int result = dictDelete(d, shard->tmp_str1);
|
||||
removed += (result == DICT_OK);
|
||||
}
|
||||
isempty = (dictSize(d) == 0);
|
||||
return RemoveStrSet(vals, set);
|
||||
}
|
||||
return make_pair(removed, isempty);
|
||||
}
|
||||
|
@ -118,8 +195,7 @@ void InitSet(ArgSlice vals, CompactObj* set) {
|
|||
intset* is = intsetNew();
|
||||
set->InitRobj(OBJ_SET, kEncodingIntSet, is);
|
||||
} else {
|
||||
dict* ds = dictCreate(&setDictType);
|
||||
set->InitRobj(OBJ_SET, kEncodingStrMap, ds);
|
||||
InitStrSet(set);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -132,43 +208,126 @@ void ScanCallback(void* privdata, const dictEntry* de) {
|
|||
uint64_t ScanStrSet(const CompactObj& co, uint64_t curs, unsigned count, StringVec* res) {
|
||||
long maxiterations = count * 10;
|
||||
|
||||
DCHECK_EQ(kEncodingStrMap, co.Encoding());
|
||||
dict* ds = (dict*)co.RObjPtr();
|
||||
do {
|
||||
curs = dictScan(ds, curs, ScanCallback, NULL, res);
|
||||
} while (curs && maxiterations-- && res->size() < count);
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
DCHECK_EQ(kEncodingStrMap2, co.Encoding());
|
||||
LOG(DFATAL) << "TBD: Not implemented";
|
||||
} else {
|
||||
DCHECK_EQ(kEncodingStrMap, co.Encoding());
|
||||
dict* ds = (dict*)co.RObjPtr();
|
||||
do {
|
||||
curs = dictScan(ds, curs, ScanCallback, NULL, res);
|
||||
} while (curs && maxiterations-- && res->size() < count);
|
||||
}
|
||||
|
||||
return curs;
|
||||
}
|
||||
|
||||
using SetType = pair<void*, unsigned>;
|
||||
|
||||
uint32_t SetTypeLen(const SetType& set) {
|
||||
if (set.second == kEncodingIntSet) {
|
||||
return intsetLen((const intset*)set.first);
|
||||
}
|
||||
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
DCHECK_EQ(set.second, kEncodingStrMap2);
|
||||
return ((StringSet*)set.first)->size();
|
||||
} else {
|
||||
DCHECK_EQ(set.second, kEncodingStrMap);
|
||||
return dictSize((const dict*)set.first);
|
||||
}
|
||||
};
|
||||
|
||||
bool IsInSet(const SetType& st, int64_t val) {
|
||||
if (st.second == kEncodingIntSet)
|
||||
return intsetFind((intset*)st.first, val);
|
||||
|
||||
char buf[32];
|
||||
char* next = absl::numbers_internal::FastIntToBuffer(val, buf);
|
||||
string_view str{buf, size_t(next - buf)};
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
LOG(DFATAL) << "TBD: Not implemented";
|
||||
return false;
|
||||
} else {
|
||||
DCHECK_EQ(st.second, kEncodingStrMap);
|
||||
return dictContains((dict*)st.first, str);
|
||||
}
|
||||
}
|
||||
|
||||
bool IsInSet(const SetType& st, string_view member) {
|
||||
if (st.second == kEncodingIntSet) {
|
||||
long long llval;
|
||||
if (!string2ll(member.data(), member.size(), &llval))
|
||||
return false;
|
||||
|
||||
return intsetFind((intset*)st.first, llval);
|
||||
}
|
||||
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
LOG(DFATAL) << "TBD: Not implemented";
|
||||
return false;
|
||||
} else {
|
||||
DCHECK_EQ(st.second, kEncodingStrMap);
|
||||
return dictContains((dict*)st.first, member);
|
||||
}
|
||||
}
|
||||
|
||||
// Removes arg from result.
|
||||
void DiffStrSet(const SetType& st, absl::flat_hash_set<string>* result) {
|
||||
DCHECK_EQ(kEncodingStrMap, st.second);
|
||||
dict* ds = (dict*)st.first;
|
||||
dictIterator* di = dictGetIterator(ds);
|
||||
dictEntry* de = nullptr;
|
||||
while ((de = dictNext(di))) {
|
||||
sds key = (sds)de->key;
|
||||
result->erase(string_view{key, sdslen(key)});
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
} else {
|
||||
DCHECK_EQ(kEncodingStrMap, st.second);
|
||||
dict* ds = (dict*)st.first;
|
||||
dictIterator* di = dictGetIterator(ds);
|
||||
dictEntry* de = nullptr;
|
||||
while ((de = dictNext(di))) {
|
||||
sds key = (sds)de->key;
|
||||
result->erase(string_view{key, sdslen(key)});
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
}
|
||||
}
|
||||
|
||||
void InterStrSet(const vector<SetType>& vec, StringVec* result) {
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
} else {
|
||||
dict* ds = (dict*)vec.front().first;
|
||||
dictIterator* di = dictGetIterator(ds);
|
||||
dictEntry* de = nullptr;
|
||||
while ((de = dictNext(di))) {
|
||||
size_t j = 1;
|
||||
sds key = (sds)de->key;
|
||||
string_view member{key, sdslen(key)};
|
||||
|
||||
for (j = 1; j < vec.size(); j++) {
|
||||
if (vec[j].first != ds && !IsInSet(vec[j], member))
|
||||
break;
|
||||
}
|
||||
|
||||
/* Only take action when all vec contain the member */
|
||||
if (j == vec.size()) {
|
||||
result->push_back(string(member));
|
||||
}
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
}
|
||||
|
||||
StringVec PopStrSet(unsigned count, const SetType& st) {
|
||||
StringVec result;
|
||||
dict* ds = (dict*)st.first;
|
||||
string str;
|
||||
dictIterator* di = dictGetSafeIterator(ds);
|
||||
for (uint32_t i = 0; i < count; ++i) {
|
||||
dictEntry* de = dictNext(di);
|
||||
DCHECK(de);
|
||||
result.emplace_back((sds)de->key, sdslen((sds)de->key));
|
||||
dictDelete(ds, de->key);
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
} else {
|
||||
dict* ds = (dict*)st.first;
|
||||
string str;
|
||||
dictIterator* di = dictGetSafeIterator(ds);
|
||||
for (uint32_t i = 0; i < count; ++i) {
|
||||
dictEntry* de = dictNext(di);
|
||||
DCHECK(de);
|
||||
result.emplace_back((sds)de->key, sdslen((sds)de->key));
|
||||
dictDelete(ds, de->key);
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -290,58 +449,6 @@ OpStatus NoOpCb(Transaction* t, EngineShard* shard) {
|
|||
return OpStatus::OK;
|
||||
};
|
||||
|
||||
using SetType = pair<void*, unsigned>;
|
||||
|
||||
uint32_t SetTypeLen(const SetType& set) {
|
||||
if (set.second == kEncodingStrMap) {
|
||||
return dictSize((const dict*)set.first);
|
||||
}
|
||||
DCHECK_EQ(set.second, kEncodingIntSet);
|
||||
return intsetLen((const intset*)set.first);
|
||||
};
|
||||
|
||||
bool dictContains(const dict* d, string_view key) {
|
||||
uint64_t h = dictGenHashFunction(key.data(), key.size());
|
||||
|
||||
for (unsigned table = 0; table <= 1; table++) {
|
||||
uint64_t idx = h & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
|
||||
dictEntry* he = d->ht_table[table][idx];
|
||||
while (he) {
|
||||
sds dkey = (sds)he->key;
|
||||
if (sdslen(dkey) == key.size() && (key.empty() || memcmp(dkey, key.data(), key.size()) == 0))
|
||||
return true;
|
||||
he = he->next;
|
||||
}
|
||||
if (!dictIsRehashing(d))
|
||||
break;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool IsInSet(const SetType& st, int64_t val) {
|
||||
if (st.second == kEncodingIntSet)
|
||||
return intsetFind((intset*)st.first, val);
|
||||
|
||||
DCHECK_EQ(st.second, kEncodingStrMap);
|
||||
char buf[32];
|
||||
char* next = absl::numbers_internal::FastIntToBuffer(val, buf);
|
||||
|
||||
return dictContains((dict*)st.first, string_view{buf, size_t(next - buf)});
|
||||
}
|
||||
|
||||
bool IsInSet(const SetType& st, string_view member) {
|
||||
if (st.second == kEncodingIntSet) {
|
||||
long long llval;
|
||||
if (!string2ll(member.data(), member.size(), &llval))
|
||||
return false;
|
||||
|
||||
return intsetFind((intset*)st.first, llval);
|
||||
}
|
||||
DCHECK_EQ(st.second, kEncodingStrMap);
|
||||
|
||||
return dictContains((dict*)st.first, member);
|
||||
}
|
||||
|
||||
template <typename F> void FillSet(const SetType& set, F&& f) {
|
||||
if (set.second == kEncodingIntSet) {
|
||||
intset* is = (intset*)set.first;
|
||||
|
@ -354,15 +461,7 @@ template <typename F> void FillSet(const SetType& set, F&& f) {
|
|||
f(string{buf, size_t(next - buf)});
|
||||
}
|
||||
} else {
|
||||
dict* ds = (dict*)set.first;
|
||||
string str;
|
||||
dictIterator* di = dictGetIterator(ds);
|
||||
dictEntry* de = nullptr;
|
||||
while ((de = dictNext(di))) {
|
||||
str.assign((sds)de->key, sdslen((sds)de->key));
|
||||
f(move(str));
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
FillFromStrSet(move(f), set.first);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -428,7 +527,11 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
|
|||
return OpStatus::OUT_OF_MEMORY;
|
||||
}
|
||||
// frees 'is' on a way.
|
||||
co.InitRobj(OBJ_SET, kEncodingStrMap, tmp.ptr);
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
co.InitRobj(OBJ_SET, kEncodingStrMap2, tmp.ptr);
|
||||
} else {
|
||||
co.InitRobj(OBJ_SET, kEncodingStrMap, tmp.ptr);
|
||||
}
|
||||
inner_obj = co.RObjPtr();
|
||||
break;
|
||||
}
|
||||
|
@ -691,25 +794,7 @@ OpResult<StringVec> OpInter(const Transaction* t, EngineShard* es, bool remove_f
|
|||
}
|
||||
}
|
||||
} else {
|
||||
dict* ds = (dict*)sets.front().first;
|
||||
dictIterator* di = dictGetIterator(ds);
|
||||
dictEntry* de = nullptr;
|
||||
while ((de = dictNext(di))) {
|
||||
size_t j = 1;
|
||||
sds key = (sds)de->key;
|
||||
string_view member{key, sdslen(key)};
|
||||
|
||||
for (j = 1; j < sets.size(); j++) {
|
||||
if (sets[j].first != ds && !IsInSet(sets[j], member))
|
||||
break;
|
||||
}
|
||||
|
||||
/* Only take action when all sets contain the member */
|
||||
if (j == sets.size()) {
|
||||
result.push_back(string(member));
|
||||
}
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
InterStrSet(sets, &result);
|
||||
}
|
||||
|
||||
return result;
|
||||
|
@ -1193,25 +1278,39 @@ bool SetFamily::ConvertToStrSet(const intset* is, size_t expected_len, robj* des
|
|||
char buf[32];
|
||||
int ii = 0;
|
||||
|
||||
dict* ds = dictCreate(&setDictType);
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
StringSet* ss = new StringSet{CompactObj::memory_resource()};
|
||||
if (expected_len)
|
||||
ss->Reserve(expected_len);
|
||||
|
||||
if (expected_len) {
|
||||
if (dictTryExpand(ds, expected_len) != DICT_OK) {
|
||||
dictRelease(ds);
|
||||
return false;
|
||||
while (intsetGet(const_cast<intset*>(is), ii++, &intele)) {
|
||||
char* next = absl::numbers_internal::FastIntToBuffer(intele, buf);
|
||||
string_view str{buf, size_t(next - buf)};
|
||||
CHECK(ss->Add(str));
|
||||
}
|
||||
|
||||
dest->ptr = ss;
|
||||
dest->encoding = OBJ_ENCODING_HT;
|
||||
} else {
|
||||
dict* ds = dictCreate(&setDictType);
|
||||
|
||||
if (expected_len) {
|
||||
if (dictTryExpand(ds, expected_len) != DICT_OK) {
|
||||
dictRelease(ds);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/* To add the elements we extract integers and create redis objects */
|
||||
while (intsetGet(const_cast<intset*>(is), ii++, &intele)) {
|
||||
char* next = absl::numbers_internal::FastIntToBuffer(intele, buf);
|
||||
sds s = sdsnewlen(buf, next - buf);
|
||||
CHECK(dictAddRaw(ds, s, NULL));
|
||||
}
|
||||
|
||||
dest->ptr = ds;
|
||||
dest->encoding = OBJ_ENCODING_HT;
|
||||
}
|
||||
|
||||
/* To add the elements we extract integers and create redis objects */
|
||||
while (intsetGet(const_cast<intset*>(is), ii++, &intele)) {
|
||||
char* next = absl::numbers_internal::FastIntToBuffer(intele, buf);
|
||||
sds s = sdsnewlen(buf, next - buf);
|
||||
CHECK(dictAddRaw(ds, s, NULL));
|
||||
}
|
||||
|
||||
dest->ptr = ds;
|
||||
dest->encoding = OBJ_ENCODING_HT;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue