Add some hash set commands

This commit is contained in:
Roman Gershman 2022-03-03 09:34:53 +02:00
parent 3f0fcbf99f
commit b3e5730377
8 changed files with 215 additions and 9 deletions

View File

@ -2,12 +2,16 @@
[![ci-tests](https://github.com/romange/dragonfly/actions/workflows/ci.yml/badge.svg)](https://github.com/romange/dragonfly/actions/workflows/ci.yml)
A toy memory store that supports basic commands like `SET` and `GET` for both memcached and redis protocols. In addition, it supports redis `PING` command.
A novel memory store that supports Redis and Memcached commands.
For more detailed status of what's implemented - see below.
Demo features include:
Features include:
1. High throughput reaching millions of QPS on a single node.
2. TLS support.
3. Pipelining mode.
4. A novel cache design, which does not require specifying eviction policies.
5. Memory efficiency that can save 20-40% for regular workloads and even more for cache like
workloads
## Building from source
I've tested the build on Ubuntu 21.04+.
@ -142,6 +146,15 @@ API 2.0
- [X] BLPOP
- [ ] BRPOP
- [ ] BRPOPLPUSH
- [X] HashSet Family
- [X] HSET
- [X] HDEL
- [X] HEXISTS
- [X] HGET
- [X] HLEN
- [ ] HINCRBY
- [ ] HINCRBYFLOAT
- [ ] HGETALL
- [ ] PubSub family
- [ ] PUBLISH
- [ ] PUBSUB

View File

@ -72,6 +72,17 @@ size_t MallocUsedSet(unsigned encoding, void* ptr) {
}
}
size_t MallocUsedHSet(unsigned encoding, void* ptr) {
switch (encoding) {
case OBJ_ENCODING_LISTPACK:
return lpBytes(reinterpret_cast<uint8_t*>(ptr));
case OBJ_ENCODING_HT:
return DictMallocSize((dict*)ptr);
default:
LOG(FATAL) << "Unknown set encoding type " << encoding;
}
}
inline void FreeObjHash(unsigned encoding, void* ptr) {
switch (encoding) {
case OBJ_ENCODING_HT:
@ -236,7 +247,8 @@ size_t RobjWrapper::MallocUsed() const {
return QlMAllocSize((quicklist*)ptr);
case OBJ_SET:
return MallocUsedSet(encoding, ptr);
break;
case OBJ_HASH:
return MallocUsedHSet(encoding, ptr);
default:
LOG(FATAL) << "Not supported " << type;
}

View File

@ -17,6 +17,10 @@ class ConnectionContext {
public:
ConnectionContext(::io::Sink* stream, Connection* owner);
// We won't have any virtual methods, probably. However, since we allocate derived class,
// we need to declare a virtual d-tor so we could delete them inside Connection.
virtual ~ConnectionContext() {}
Connection* owner() {
return owner_;
}

View File

@ -198,6 +198,7 @@ void Connection::HandleRequests() {
if (should_disarm_poller) {
us->CancelPoll(poll_id);
}
cc_.reset();
}
}

View File

@ -100,9 +100,6 @@ robj *createZsetListpackObject(void);
unsigned long long estimateObjectIdleTime(const robj *o);
uint8_t LFUDecrAndReturn(time_t epoch_sec, const robj *o);
void listTypeConvert(robj *subject, int enc);
void hashTypeConvert(robj *o, int enc);
unsigned long hashTypeLength(const robj *o);
int hashZiplistValidateIntegrity(unsigned char *zl, size_t size, int deep);
int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
long long lru_clock, int lru_multiplier);
@ -190,6 +187,10 @@ sds hashTypeCurrentObjectNewSds(hashTypeIterator *hi, int what);
robj *hashTypeGetValueObject(robj *o, sds field);
int hashTypeSet(robj *o, sds field, sds value, int flags);
robj *hashTypeDup(robj *o);
int hashTypeGetFromListpack(robj *o, sds field,
unsigned char **vstr,
unsigned int *vlen,
long long *vll);
/* Macro used to initialize a Redis object allocated on the stack.

View File

@ -36,15 +36,81 @@ bool IsGoodForListpack(CmdArgList args, const uint8_t* lp) {
} // namespace
void HSetFamily::HDel(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
args.remove_prefix(2);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpHDel(OpArgs{shard, t->db_index()}, key, args);
};
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) {
(*cntx)->SendLong(*result);
} else {
(*cntx)->SendError(result.status());
}
}
void HSetFamily::HLen(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpHLen(OpArgs{shard, t->db_index()}, key);
};
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) {
(*cntx)->SendLong(*result);
} else {
(*cntx)->SendError(result.status());
}
}
void HSetFamily::HExists(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
string_view field = ArgS(args, 2);
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<int> {
auto& db_slice = shard->db_slice();
auto it_res = db_slice.Find(t->db_index(), key, OBJ_HASH);
if (it_res) {
robj* hset = (*it_res)->second.AsRObj();
shard->tmp_str1 = sdscpylen(shard->tmp_str1, field.data(), field.size());
return hashTypeExists(hset, shard->tmp_str1);
}
if (it_res.status() == OpStatus::KEY_NOTFOUND)
return 0;
return it_res.status();
};
OpResult<int> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) {
(*cntx)->SendLong(*result);
} else {
(*cntx)->SendError(result.status());
}
}
void HSetFamily::HGet(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
string_view field = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpHGet(OpArgs{shard, t->db_index()}, key, field);
};
OpResult<string> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) {
(*cntx)->SendBulkString(*result);
} else {
if (result.status() == OpStatus::KEY_NOTFOUND) {
(*cntx)->SendNull();
} else {
(*cntx)->SendError(result.status());
}
}
}
void HSetFamily::HIncrBy(CmdArgList args, ConnectionContext* cntx) {
@ -72,8 +138,8 @@ void HSetFamily::HSetNx(CmdArgList args, ConnectionContext* cntx) {
void HSetFamily::HStrLen(CmdArgList args, ConnectionContext* cntx) {
}
OpResult<uint32_t> HSetFamily::OpHSet(const OpArgs& op_args, std::string_view key,
CmdArgList values, bool skip_if_exists) {
OpResult<uint32_t> HSetFamily::OpHSet(const OpArgs& op_args, string_view key, CmdArgList values,
bool skip_if_exists) {
DCHECK(!values.empty() && 0 == values.size() % 2);
auto& db_slice = op_args.shard->db_slice();
@ -109,6 +175,93 @@ OpResult<uint32_t> HSetFamily::OpHSet(const OpArgs& op_args, std::string_view ke
return created;
}
OpResult<uint32_t> HSetFamily::OpHDel(const OpArgs& op_args, string_view key, CmdArgList values) {
DCHECK(!values.empty());
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH);
if (!it_res)
return it_res.status();
CompactObj& co = (*it_res)->second;
robj* hset = co.AsRObj();
unsigned deleted = 0;
bool key_remove = false;
for (auto s : values) {
op_args.shard->tmp_str1 = sdscpylen(op_args.shard->tmp_str1, s.data(), s.size());
if (hashTypeDelete(hset, op_args.shard->tmp_str1)) {
++deleted;
if (hashTypeLength(hset) == 0) {
key_remove = true;
break;
}
}
}
co.SyncRObj();
if (key_remove) {
db_slice.Del(op_args.db_ind, *it_res);
}
return deleted;
}
OpResult<uint32_t> HSetFamily::OpHLen(const OpArgs& op_args, string_view key) {
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH);
if (it_res) {
robj* hset = (*it_res)->second.AsRObj();
return hashTypeLength(hset);
}
if (it_res.status() == OpStatus::KEY_NOTFOUND)
return 0;
return it_res.status();
}
OpResult<string> HSetFamily::OpHGet(const OpArgs& op_args, string_view key, string_view field) {
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH);
if (!it_res)
return it_res.status();
robj* hset = (*it_res)->second.AsRObj();
op_args.shard->tmp_str1 = sdscpylen(op_args.shard->tmp_str1, field.data(), field.size());
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
unsigned char* vstr = NULL;
unsigned int vlen = UINT_MAX;
long long vll = LLONG_MAX;
int ret = hashTypeGetFromListpack(hset, op_args.shard->tmp_str1, &vstr, &vlen, &vll);
if (ret < 0) {
return OpStatus::KEY_NOTFOUND;
}
if (vstr) {
const char* src = reinterpret_cast<const char*>(vstr);
return string{src, vlen};
}
return absl::StrCat(vll);
}
if (hset->encoding == OBJ_ENCODING_HT) {
dictEntry* de = dictFind((dict*)hset->ptr, op_args.shard->tmp_str1);
if (!de)
return OpStatus::KEY_NOTFOUND;
sds val = (sds)dictGetVal(de);
return string(val, sdslen(val));
}
LOG(FATAL) << "Unknown hash encoding " << hset->encoding;
}
using CI = CommandId;
#define HFUNC(x) SetHandler(&HSetFamily::x)

View File

@ -31,6 +31,10 @@ class HSetFamily {
static OpResult<uint32_t> OpHSet(const OpArgs& op_args, std::string_view key, CmdArgList values,
bool skip_if_exists);
static OpResult<uint32_t> OpHDel(const OpArgs& op_args, std::string_view key, CmdArgList values);
static OpResult<uint32_t> OpHLen(const OpArgs& op_args, std::string_view key);
static OpResult<std::string> OpHGet(const OpArgs& op_args, std::string_view key,
std::string_view field);
};
} // namespace dfly

View File

@ -21,18 +21,36 @@ class HSetFamilyTest : public BaseFamilyTest {
protected:
};
TEST_F(HSetFamilyTest, HSet) {
TEST_F(HSetFamilyTest, Basic) {
auto resp = Run({"hset", "x", "a"});
EXPECT_THAT(resp[0], ErrArg("wrong number"));
resp = Run({"hset", "x", "a", "b"});
EXPECT_THAT(resp[0], IntArg(1));
resp = Run({"hlen", "x"});
EXPECT_THAT(resp[0], IntArg(1));
resp = Run({"hexists", "x", "a"});
EXPECT_THAT(resp[0], IntArg(1));
resp = Run({"hexists", "x", "b"});
EXPECT_THAT(resp[0], IntArg(0));
resp = Run({"hexists", "y", "a"});
EXPECT_THAT(resp[0], IntArg(0));
resp = Run({"hset", "x", "a", "b"});
EXPECT_THAT(resp[0], IntArg(0));
resp = Run({"hset", "x", "a", "c"});
EXPECT_THAT(resp[0], IntArg(0));
resp = Run({"hset", "y", "a", "c", "d", "e"});
EXPECT_THAT(resp[0], IntArg(2));
resp = Run({"hdel", "y", "a", "d"});
EXPECT_THAT(resp[0], IntArg(2));
}
} // namespace dfly