Implement hkeys,hvals, hmget and hgetall commands

This commit is contained in:
Roman Gershman 2022-03-07 23:00:18 +02:00
parent 71272c4ee1
commit 3c1b600e79
4 changed files with 212 additions and 38 deletions

View File

@ -148,13 +148,15 @@ API 2.0
- [ ] BRPOPLPUSH
- [X] HashSet Family
- [X] HSET
- [X] HMSET
- [X] HDEL
- [X] HEXISTS
- [X] HGET
- [X] HMGET
- [X] HLEN
- [ ] HINCRBY
- [ ] HINCRBYFLOAT
- [ ] HGETALL
- [X] HGETALL
- [X] HKEYS
- [ ] PubSub family
- [ ] PUBLISH

View File

@ -35,6 +35,16 @@ bool IsGoodForListpack(CmdArgList args, const uint8_t* lp) {
return lpBytes(const_cast<uint8_t*>(lp)) + sum < kMaxListPackLen;
}
string LpGetVal(uint8_t* lp_it) {
int64_t ele_len;
uint8_t* ptr = lpGet(lp_it, &ele_len, NULL);
if (ptr) {
return string(reinterpret_cast<char*>(ptr), ele_len);
}
return absl::StrCat(ele_len);
}
// returns a new pointer to lp. Returns true if field was inserted or false it it already existed.
pair<uint8_t*, bool> lpInsertElem(uint8_t* lp, string_view field, string_view val) {
uint8_t* vptr;
@ -74,7 +84,7 @@ void HSetFamily::HDel(CmdArgList args, ConnectionContext* cntx) {
args.remove_prefix(2);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpHDel(OpArgs{shard, t->db_index()}, key, args);
return OpDel(OpArgs{shard, t->db_index()}, key, args);
};
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
@ -89,7 +99,7 @@ void HSetFamily::HLen(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpHLen(OpArgs{shard, t->db_index()}, key);
return OpLen(OpArgs{shard, t->db_index()}, key);
};
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
@ -127,12 +137,36 @@ void HSetFamily::HExists(CmdArgList args, ConnectionContext* cntx) {
}
}
void HSetFamily::HMGet(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
args.remove_prefix(2);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpMGet(OpArgs{shard, t->db_index()}, key, args);
};
OpResult<vector<OptStr>> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) {
(*cntx)->StartArray(result->size());
for (const auto& val : *result) {
if (val) {
(*cntx)->SendBulkString(*val);
} else {
(*cntx)->SendNull();
}
}
} else {
(*cntx)->SendError(result.status());
}
}
void HSetFamily::HGet(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
string_view field = ArgS(args, 2);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpHGet(OpArgs{shard, t->db_index()}, key, field);
return OpGet(OpArgs{shard, t->db_index()}, key, field);
};
OpResult<string> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
@ -148,16 +182,30 @@ void HSetFamily::HGet(CmdArgList args, ConnectionContext* cntx) {
}
void HSetFamily::HIncrBy(CmdArgList args, ConnectionContext* cntx) {
LOG(DFATAL) << "TBD";
}
void HSetFamily::HKeys(CmdArgList args, ConnectionContext* cntx) {
HGetGeneric(args, cntx, FIELDS);
}
void HSetFamily::HVals(CmdArgList args, ConnectionContext* cntx) {
HGetGeneric(args, cntx, VALUES);
}
void HSetFamily::HGetAll(CmdArgList args, ConnectionContext* cntx) {
HGetGeneric(args, cntx, GetAllMode::FIELDS | GetAllMode::VALUES);
}
void HSetFamily::HGetGeneric(CmdArgList args, ConnectionContext* cntx, uint8_t getall_mask) {
string_view key = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpHKeys(OpArgs{shard, t->db_index()}, key);
return OpGetAll(OpArgs{shard, t->db_index()}, key, getall_mask);
};
OpResult<vector<string>> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) {
(*cntx)->StartArray(result->size());
for (const auto& s : *result) {
@ -173,7 +221,7 @@ void HSetFamily::HSet(CmdArgList args, ConnectionContext* cntx) {
args.remove_prefix(2);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpHSet(OpArgs{shard, t->db_index()}, key, args, false);
return OpSet(OpArgs{shard, t->db_index()}, key, args, false);
};
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
@ -184,14 +232,30 @@ void HSetFamily::HSet(CmdArgList args, ConnectionContext* cntx) {
}
}
void HSetFamily::HMSet(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
args.remove_prefix(2);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpSet(OpArgs{shard, t->db_index()}, key, args, false);
};
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
// Returns "OK", that's all the difference from HSet.
(*cntx)->SendError(result.status());
}
void HSetFamily::HSetNx(CmdArgList args, ConnectionContext* cntx) {
LOG(DFATAL) << "TBD";
}
void HSetFamily::HStrLen(CmdArgList args, ConnectionContext* cntx) {
LOG(DFATAL) << "TBD";
}
OpResult<uint32_t> HSetFamily::OpHSet(const OpArgs& op_args, string_view key, CmdArgList values,
bool skip_if_exists) {
OpResult<uint32_t> HSetFamily::OpSet(const OpArgs& op_args, string_view key, CmdArgList values,
bool skip_if_exists) {
DCHECK(!values.empty() && 0 == values.size() % 2);
auto& db_slice = op_args.shard->db_slice();
@ -249,7 +313,7 @@ OpResult<uint32_t> HSetFamily::OpHSet(const OpArgs& op_args, string_view key, Cm
return created;
}
OpResult<uint32_t> HSetFamily::OpHDel(const OpArgs& op_args, string_view key, CmdArgList values) {
OpResult<uint32_t> HSetFamily::OpDel(const OpArgs& op_args, string_view key, CmdArgList values) {
DCHECK(!values.empty());
auto& db_slice = op_args.shard->db_slice();
@ -294,7 +358,73 @@ OpResult<uint32_t> HSetFamily::OpHDel(const OpArgs& op_args, string_view key, Cm
return deleted;
}
OpResult<uint32_t> HSetFamily::OpHLen(const OpArgs& op_args, string_view key) {
auto HSetFamily::OpMGet(const OpArgs& op_args, std::string_view key, CmdArgList fields)
-> OpResult<vector<OptStr>> {
DCHECK(!fields.empty());
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH);
if (!it_res)
return it_res.status();
CompactObj& co = (*it_res)->second;
robj* hset = co.AsRObj();
std::vector<OptStr> result(fields.size());
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
uint8_t* lp = (uint8_t*)hset->ptr;
absl::flat_hash_map<string_view, unsigned> reverse;
reverse.reserve(fields.size() + 1);
for (size_t i = 0; i < fields.size(); ++i) {
reverse.emplace(ArgS(fields, i), i); // map fields to their index.
}
char ibuf[32];
uint8_t* lp_elem = lpFirst(lp);
int64_t ele_len;
string_view key;
DCHECK(lp_elem); // empty containers are not allowed.
// We do single pass on listpack for this operation.
do {
uint8_t* elem = lpGet(lp_elem, &ele_len, NULL);
if (elem) {
key = string_view{reinterpret_cast<char*>(elem), size_t(ele_len)};
} else {
char* next = absl::numbers_internal::FastIntToBuffer(ele_len, ibuf);
key = string_view{ibuf, size_t(next - ibuf)};
}
lp_elem = lpNext(lp, lp_elem); // switch to value
DCHECK(lp_elem);
auto it = reverse.find(key);
if (it != reverse.end()) {
DCHECK_LT(it->second, result.size());
result[it->second].emplace(LpGetVal(lp_elem)); // populate found items.
}
lp_elem = lpNext(lp, lp_elem); // switch to the next key
} while(lp_elem);
} else {
DCHECK_EQ(OBJ_ENCODING_HT, hset->encoding);
dict* d = (dict*)hset->ptr;
for (size_t i = 0; i < fields.size(); ++i) {
op_args.shard->tmp_str1 =
sdscpylen(op_args.shard->tmp_str1, fields[i].data(), fields[i].size());
dictEntry* de = dictFind(d, op_args.shard->tmp_str1);
if (de) {
sds val = (sds)dictGetVal(de);
result[i]->assign(val, sdslen(val));
}
}
}
return result;
}
OpResult<uint32_t> HSetFamily::OpLen(const OpArgs& op_args, string_view key) {
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH);
@ -307,7 +437,7 @@ OpResult<uint32_t> HSetFamily::OpHLen(const OpArgs& op_args, string_view key) {
return it_res.status();
}
OpResult<string> HSetFamily::OpHGet(const OpArgs& op_args, string_view key, string_view field) {
OpResult<string> HSetFamily::OpGet(const OpArgs& op_args, string_view key, string_view field) {
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH);
if (!it_res)
@ -333,20 +463,19 @@ OpResult<string> HSetFamily::OpHGet(const OpArgs& op_args, string_view key, stri
return absl::StrCat(vll);
}
DCHECK_EQ(hset->encoding, OBJ_ENCODING_HT);
if (hset->encoding == OBJ_ENCODING_HT) {
dictEntry* de = dictFind((dict*)hset->ptr, op_args.shard->tmp_str1);
if (!de)
return OpStatus::KEY_NOTFOUND;
dictEntry* de = dictFind((dict*)hset->ptr, op_args.shard->tmp_str1);
if (!de)
return OpStatus::KEY_NOTFOUND;
sds val = (sds)dictGetVal(de);
return string(val, sdslen(val));
}
sds val = (sds)dictGetVal(de);
LOG(FATAL) << "Unknown hash encoding " << hset->encoding;
return string(val, sdslen(val));
}
OpResult<vector<string>> HSetFamily::OpHKeys(const OpArgs& op_args, string_view key) {
OpResult<vector<string>> HSetFamily::OpGetAll(const OpArgs& op_args, string_view key,
uint8_t mask) {
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH);
if (!it_res) {
@ -356,28 +485,35 @@ OpResult<vector<string>> HSetFamily::OpHKeys(const OpArgs& op_args, string_view
}
robj* hset = (*it_res)->second.AsRObj();
auto* hi = hashTypeInitIterator(hset);
hashTypeIterator* hi = hashTypeInitIterator(hset);
vector<string> res;
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
unsigned slen;
long long vll;
while (hashTypeNext(hi) != C_ERR) {
uint8_t* ptr = lpGetValue(hi->fptr, &slen, &vll);
if (ptr) {
res.emplace_back(reinterpret_cast<char*>(ptr), slen);
} else {
res.emplace_back(absl::StrCat(vll));
if (mask & FIELDS) {
res.push_back(LpGetVal(hi->fptr));
}
if (mask & VALUES) {
res.push_back(LpGetVal(hi->vptr));
}
}
} else {
while (hashTypeNext(hi) != C_ERR) {
sds key = (sds)dictGetKey(hi->de);
res.emplace_back(key, sdslen(key));
if (mask & FIELDS) {
sds key = (sds)dictGetKey(hi->de);
res.emplace_back(key, sdslen(key));
}
if (mask & VALUES) {
sds val = (sds)dictGetVal(hi->de);
res.emplace_back(val, sdslen(val));
}
}
}
hashTypeReleaseIterator(hi);
return res;
}
@ -390,8 +526,11 @@ void HSetFamily::Register(CommandRegistry* registry) {
<< CI{"HLEN", CO::FAST | CO::READONLY, 2, 1, 1, 1}.HFUNC(HLen)
<< CI{"HEXISTS", CO::FAST | CO::READONLY, 3, 1, 1, 1}.HFUNC(HExists)
<< CI{"HGET", CO::FAST | CO::READONLY, 3, 1, 1, 1}.HFUNC(HGet)
<< CI{"HGETALL", CO::FAST | CO::READONLY, 2, 1, 1, 1}.HFUNC(HGetAll)
<< CI{"HMGET", CO::FAST | CO::READONLY, -3, 1, 1, 1}.HFUNC(HMGet)
<< CI{"HINCRBY", CO::WRITE | CO::DENYOOM | CO::FAST, 4, 1, 1, 1}.HFUNC(HIncrBy)
<< CI{"HKEYS", CO::READONLY, 2, 1, 1, 1}.HFUNC(HKeys)
<< CI{"HVALS", CO::READONLY, 2, 1, 1, 1}.HFUNC(HVals)
<< CI{"HSET", CO::WRITE | CO::FAST | CO::DENYOOM, -4, 1, 1, 1}.HFUNC(HSet)
<< CI{"HSETNX", CO::WRITE | CO::DENYOOM | CO::FAST, 4, 1, 1, 1}.HFUNC(HSetNx)
<< CI{"HSTRLEN", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(HStrLen);

View File

@ -4,6 +4,8 @@
#pragma once
#include <optional>
#include "facade/op_status.h"
#include "server/common_types.h"
@ -18,26 +20,41 @@ class HSetFamily {
static void Register(CommandRegistry* registry);
private:
enum GetAllMode : uint8_t { FIELDS = 1, VALUES = 2 };
static void HDel(CmdArgList args, ConnectionContext* cntx);
static void HLen(CmdArgList args, ConnectionContext* cntx);
static void HExists(CmdArgList args, ConnectionContext* cntx);
static void HGet(CmdArgList args, ConnectionContext* cntx);
static void HMGet(CmdArgList args, ConnectionContext* cntx);
static void HIncrBy(CmdArgList args, ConnectionContext* cntx);
static void HKeys(CmdArgList args, ConnectionContext* cntx);
static void HVals(CmdArgList args, ConnectionContext* cntx);
static void HGetAll(CmdArgList args, ConnectionContext* cntx);
// hmset is deprecated, we should not implement it unless we have to.
static void HSet(CmdArgList args, ConnectionContext* cntx);
static void HMSet(CmdArgList args, ConnectionContext* cntx);
static void HSetNx(CmdArgList args, ConnectionContext* cntx);
static void HStrLen(CmdArgList args, ConnectionContext* cntx);
static OpResult<uint32_t> OpHSet(const OpArgs& op_args, std::string_view key, CmdArgList values,
bool skip_if_exists);
static OpResult<uint32_t> OpHDel(const OpArgs& op_args, std::string_view key, CmdArgList values);
static OpResult<uint32_t> OpHLen(const OpArgs& op_args, std::string_view key);
static OpResult<std::string> OpHGet(const OpArgs& op_args, std::string_view key,
std::string_view field);
static void HGetGeneric(CmdArgList args, ConnectionContext* cntx, uint8_t getall_mask);
static OpResult<std::vector<std::string>> OpHKeys(const OpArgs& op_args, std::string_view key);
static OpResult<uint32_t> OpSet(const OpArgs& op_args, std::string_view key, CmdArgList values,
bool skip_if_exists);
static OpResult<uint32_t> OpDel(const OpArgs& op_args, std::string_view key, CmdArgList values);
using OptStr = std::optional<std::string>;
static OpResult<std::vector<OptStr>> OpMGet(const OpArgs& op_args, std::string_view key,
CmdArgList fields);
static OpResult<uint32_t> OpLen(const OpArgs& op_args, std::string_view key);
static OpResult<std::string> OpGet(const OpArgs& op_args, std::string_view key,
std::string_view field);
static OpResult<std::vector<std::string>> OpGetAll(const OpArgs& op_args, std::string_view key,
uint8_t getall_mask);
};
} // namespace dfly

View File

@ -81,5 +81,21 @@ TEST_F(HSetFamilyTest, HSetLarge) {
EXPECT_THAT(resp[0], IntArg(1));
}
TEST_F(HSetFamilyTest, Get) {
auto resp = Run({"hset", "x", "a", "1", "b", "2", "c", "3"});
EXPECT_THAT(resp[0], IntArg(3));
resp = Run({"hkeys", "x"});
EXPECT_THAT(resp, UnorderedElementsAre("a", "b", "c"));
resp = Run({"hvals", "x"});
EXPECT_THAT(resp, UnorderedElementsAre("1", "2", "3"));
resp = Run({"hmget", "x", "a", "c", "d"});
EXPECT_THAT(resp, ElementsAre("1", "3", ArgType(RespExpr::NIL)));
resp = Run({"hgetall", "x"});
EXPECT_THAT(resp, ElementsAre("a", "1", "b", "2", "c", "3"));
}
} // namespace dfly