Implement SPOP action

This commit is contained in:
Roman Gershman 2022-03-06 18:18:29 +02:00
parent f09f516636
commit 71272c4ee1
6 changed files with 113 additions and 67 deletions

View File

@ -12,44 +12,6 @@
namespace dfly {
class FlatSet {
public:
FlatSet(std::pmr::memory_resource* mr) : set_(mr) {
}
void Reserve(size_t sz) {
set_.reserve(sz);
}
bool Add(std::string_view str) {
return set_.emplace(str).second;
}
bool Remove(std::string_view str) {
size_t res = set_.erase(str);
return res > 0;
}
size_t Size() const {
return set_.size();
}
bool Empty() const {
return set_.empty();
}
bool Contains(std::string_view val) const {
return set_.contains(val);
}
auto begin() const {
return set_.begin();
}
auto end() const {
return set_.end();
}
private:
struct Hasher {
using is_transparent = void; // to allow heteregenous lookups.
@ -76,6 +38,59 @@ class FlatSet {
using FlatSetType =
absl::flat_hash_set<CompactObj, Hasher, Eq, std::pmr::polymorphic_allocator<CompactObj>>;
public:
using iterator = FlatSetType::iterator;
FlatSet(std::pmr::memory_resource* mr) : set_(mr) {
}
void Reserve(size_t sz) {
set_.reserve(sz);
}
bool Add(std::string_view str) {
return set_.emplace(str).second;
}
bool Remove(std::string_view str) {
size_t res = set_.erase(str);
return res > 0;
}
void Erase(iterator it) {
set_.erase(it);
}
size_t Size() const {
return set_.size();
}
bool Empty() const {
return set_.empty();
}
bool Contains(std::string_view val) const {
return set_.contains(val);
}
auto begin() const {
return set_.begin();
}
auto begin() {
return set_.begin();
}
auto end() const {
return set_.end();
}
auto end() {
return set_.end();
}
private:
FlatSetType set_;
};

View File

@ -252,6 +252,13 @@ intset *intsetRemove(intset *is, int64_t value, int *success) {
return is;
}
intset *intsetTrimTail(intset *is, uint32_t tail_len) {
uint32_t len = intrev32ifbe(is->length);
uint32_t new_len = tail_len >= len ? 0 : len - tail_len;
is->length = intrev32ifbe(new_len);
return intsetResize(is, new_len);
}
/* Determine whether a value belongs to this set */
uint8_t intsetFind(intset *is, int64_t value) {
uint8_t valenc = _intsetValueEncoding(value);

View File

@ -41,11 +41,13 @@ typedef struct intset {
intset *intsetNew(void);
intset *intsetAdd(intset *is, int64_t value, uint8_t *success);
intset *intsetRemove(intset *is, int64_t value, int *success);
intset *intsetTrimTail(intset *is, uint32_t trim_len); // Removes last trim_len elements.
uint8_t intsetFind(intset *is, int64_t value);
int64_t intsetRandom(intset *is);
uint8_t intsetGet(intset *is, uint32_t pos, int64_t *value);
uint32_t intsetLen(const intset *is);
size_t intsetBlobLen(intset *is);
int intsetValidateIntegrity(const unsigned char *is, size_t size, int deep);
#ifdef REDIS_TEST

View File

@ -893,47 +893,45 @@ auto SetFamily::OpPop(const OpArgs& op_args, std::string_view key, unsigned coun
if (count == 0)
return result;
#if 0
MainIterator it = find_res.value();
robj* sobj = it->second.AsRObj();
auto slen = setTypeSize(sobj);
size_t slen = it->second.Size();
/* CASE 1:
* The number of requested elements is greater than or equal to
* the number of elements inside the set: simply return the whole set. */
if (count >= slen) {
/* We just return the entire set */
auto* si = setTypeInitIterator(sobj);
sds ele;
while ((ele = setTypeNextObject(si)) != NULL) {
std::string_view sv{ele, sdslen(ele)};
result.emplace_back(sv);
sdsfree(ele);
}
setTypeReleaseIterator(si);
FillSet(it->second, [&result](string s) {
result.push_back(move(s));
});
/* Delete the set as it is now empty */
CHECK(es->db_slice().Del(op_args.db_ind, it));
} else {
sds sdsele;
int64_t llele;
if (it->second.Encoding() == kEncodingIntSet) {
intset* is = (intset*)it->second.RObjPtr();
int64_t val = 0;
while (count--) {
/* Emit and remove. */
int encoding = setTypeRandomElement(sobj, &sdsele, &llele);
if (encoding == OBJ_ENCODING_INTSET) {
result.emplace_back(absl::StrCat(llele));
sobj->ptr = intsetRemove((intset*)sobj->ptr, llele, NULL);
} else {
result.emplace_back(std::string_view{sdsele, sdslen(sdsele)});
setTypeRemove(sobj, sdsele);
// copy last count values.
for (uint32_t i = slen - count; i < slen; ++i) {
intsetGet(is, i, &val);
result.push_back(absl::StrCat(val));
}
is = intsetTrimTail(is, count); // now remove last count items
it->second.SetRObjPtr(is);
} else {
FlatSet* fs = (FlatSet*)it->second.RObjPtr();
string str;
for (uint32_t i = 0; i < count; ++i) {
auto it = fs->begin();
it->GetString(&str);
fs->Erase(it);
result.push_back(move(str));
}
it->second.SetRObjPtr(fs);
}
it->second.SyncRObj();
}
#endif
return result;
}

View File

@ -84,5 +84,29 @@ TEST_F(SetFamilyTest, SMove) {
EXPECT_THAT(resp[0], IntArg(1));
}
TEST_F(SetFamilyTest, SPop) {
auto resp = Run({"sadd", "x", "1", "2", "3"});
resp = Run({"spop", "x", "3"});
EXPECT_THAT(resp, UnorderedElementsAre("1", "2", "3"));
resp = Run({"type", "x"});
EXPECT_THAT(resp, RespEq("none"));
Run({"sadd", "x", "1", "2", "3"});
resp = Run({"spop", "x", "2"});
EXPECT_THAT(resp, IsSubsetOf({"1", "2", "3"}));
EXPECT_EQ(2, resp.size());
resp = Run({"scard", "x"});
EXPECT_THAT(resp[0], IntArg(1));
Run({"sadd", "y", "a", "b", "c"});
resp = Run({"spop", "y", "1"});
EXPECT_THAT(resp, IsSubsetOf({"a", "b", "c"}));
EXPECT_EQ(1, resp.size());
resp = Run({"smembers", "y"});
EXPECT_THAT(resp, IsSubsetOf({"a", "b", "c"}));
EXPECT_EQ(2, resp.size());
}
} // namespace dfly

View File

@ -89,7 +89,7 @@ TEST_F(StringFamilyTest, Set) {
ASSERT_THAT(resp, ElementsAre(ErrArg(kInvalidIntErr)));
resp = Run({"set", "foo", "bar", "ex", "-1"});
ASSERT_THAT(resp, ElementsAre(ErrArg("invalid expire time in set")));
ASSERT_THAT(resp, ElementsAre(ErrArg("out of range")));
resp = Run({"set", "foo", "bar", "ex", "1"});
ASSERT_THAT(resp, RespEq("OK"));