Add HKEYS command. Account for listpack blobs
This commit is contained in:
parent
b3e5730377
commit
2213c1b38b
|
@ -155,6 +155,7 @@ API 2.0
|
|||
- [ ] HINCRBY
|
||||
- [ ] HINCRBYFLOAT
|
||||
- [ ] HGETALL
|
||||
- [X] HKEYS
|
||||
- [ ] PubSub family
|
||||
- [ ] PUBLISH
|
||||
- [ ] PUBSUB
|
||||
|
|
|
@ -15,9 +15,12 @@ void InitRedisTables() {
|
|||
server.zset_max_listpack_entries = 128;
|
||||
server.zset_max_listpack_value = 64;
|
||||
server.set_max_intset_entries = 512;
|
||||
|
||||
// Present so that redis code compiles. However, we ignore this field and instead check against
|
||||
// listpack total size in hset_family.cc
|
||||
server.hash_max_listpack_entries = 512;
|
||||
server.hash_max_listpack_value = 64;
|
||||
}
|
||||
server.hash_max_listpack_value = 32; // decreased from redis default 64.
|
||||
}
|
||||
|
||||
// These functions are moved here from server.c
|
||||
int htNeedsResize(dict* dict) {
|
||||
|
|
|
@ -26,7 +26,7 @@ using facade::OpStatus;
|
|||
#define ADD(x) (x) += o.x
|
||||
|
||||
DbStats& DbStats::operator+=(const DbStats& o) {
|
||||
static_assert(sizeof(DbStats) == 56);
|
||||
static_assert(sizeof(DbStats) == 72);
|
||||
|
||||
ADD(key_count);
|
||||
ADD(expire_count);
|
||||
|
@ -36,6 +36,8 @@ DbStats& DbStats::operator+=(const DbStats& o) {
|
|||
ADD(obj_memory_usage);
|
||||
ADD(table_mem_usage);
|
||||
ADD(small_string_bytes);
|
||||
ADD(listpack_blob_cnt);
|
||||
ADD(listpack_bytes);
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
@ -87,6 +89,8 @@ auto DbSlice::GetStats() const -> Stats {
|
|||
s.db.obj_memory_usage += db->stats.obj_memory_usage;
|
||||
s.db.inline_keys += db->stats.inline_keys;
|
||||
s.db.table_mem_usage += (db->prime_table.mem_usage() + db->expire_table.mem_usage());
|
||||
s.db.listpack_blob_cnt += db->stats.listpack_blob_cnt;
|
||||
s.db.listpack_bytes += db->stats.listpack_bytes;
|
||||
}
|
||||
s.db.small_string_bytes = CompactObj::GetStats().small_string_bytes;
|
||||
|
||||
|
@ -249,8 +253,7 @@ size_t DbSlice::FlushDb(DbIndex db_ind) {
|
|||
db->prime_table.Clear();
|
||||
db->expire_table.Clear();
|
||||
db->mcflag_table.Clear();
|
||||
db->stats.inline_keys = 0;
|
||||
db->stats.obj_memory_usage = 0;
|
||||
db->stats = InternalDbStats{};
|
||||
|
||||
return removed;
|
||||
};
|
||||
|
|
|
@ -42,6 +42,8 @@ struct DbStats {
|
|||
|
||||
size_t small_string_bytes = 0;
|
||||
|
||||
size_t listpack_blob_cnt = 0;
|
||||
size_t listpack_bytes = 0;
|
||||
DbStats& operator+=(const DbStats& o);
|
||||
};
|
||||
|
||||
|
@ -54,14 +56,6 @@ struct SliceEvents {
|
|||
};
|
||||
|
||||
class DbSlice {
|
||||
struct InternalDbStats {
|
||||
// Number of inline keys.
|
||||
uint64_t inline_keys = 0;
|
||||
|
||||
// Object memory usage besides hash-table capacity.
|
||||
// Applies for any non-inline objects.
|
||||
size_t obj_memory_usage = 0;
|
||||
};
|
||||
|
||||
DbSlice(const DbSlice&) = delete;
|
||||
void operator=(const DbSlice&) = delete;
|
||||
|
@ -72,6 +66,18 @@ class DbSlice {
|
|||
SliceEvents events;
|
||||
};
|
||||
|
||||
struct InternalDbStats {
|
||||
// Number of inline keys.
|
||||
uint64_t inline_keys = 0;
|
||||
|
||||
// Object memory usage besides hash-table capacity.
|
||||
// Applies for any non-inline objects.
|
||||
size_t obj_memory_usage = 0;
|
||||
size_t listpack_blob_cnt = 0;
|
||||
size_t listpack_bytes = 0;
|
||||
};
|
||||
|
||||
|
||||
DbSlice(uint32_t index, EngineShard* owner);
|
||||
~DbSlice();
|
||||
|
||||
|
@ -165,6 +171,10 @@ class DbSlice {
|
|||
void PreUpdate(DbIndex db_ind, MainIterator it);
|
||||
void PostUpdate(DbIndex db_ind, MainIterator it);
|
||||
|
||||
InternalDbStats* MutableStats(DbIndex db_ind) {
|
||||
return &db_arr_[db_ind]->stats;
|
||||
}
|
||||
|
||||
// Check whether 'it' has not expired. Returns it if it's still valid. Otherwise, erases it
|
||||
// from both tables and return MainIterator{}.
|
||||
std::pair<MainIterator, ExpireIterator> ExpireIfNeeded(DbIndex db_ind, MainIterator it) const;
|
||||
|
|
|
@ -22,6 +22,8 @@ namespace dfly {
|
|||
|
||||
namespace {
|
||||
|
||||
constexpr size_t kMaxListPackLen = 1024;
|
||||
|
||||
bool IsGoodForListpack(CmdArgList args, const uint8_t* lp) {
|
||||
size_t sum = 0;
|
||||
for (auto s : args) {
|
||||
|
@ -30,7 +32,39 @@ bool IsGoodForListpack(CmdArgList args, const uint8_t* lp) {
|
|||
sum += s.size();
|
||||
}
|
||||
|
||||
return lpSafeToAdd(const_cast<uint8_t*>(lp), sum);
|
||||
return lpBytes(const_cast<uint8_t*>(lp)) + sum < kMaxListPackLen;
|
||||
}
|
||||
|
||||
// returns a new pointer to lp. Returns true if field was inserted or false it it already existed.
|
||||
pair<uint8_t*, bool> lpInsertElem(uint8_t* lp, string_view field, string_view val) {
|
||||
uint8_t* vptr;
|
||||
|
||||
uint8_t* fptr = lpFirst(lp);
|
||||
uint8_t* fsrc = (uint8_t*)field.data();
|
||||
uint8_t* vsrc = (uint8_t*)val.data();
|
||||
|
||||
bool updated = false;
|
||||
|
||||
if (fptr) {
|
||||
fptr = lpFind(lp, fptr, fsrc, field.size(), 1);
|
||||
if (fptr) {
|
||||
/* Grab pointer to the value (fptr points to the field) */
|
||||
vptr = lpNext(lp, fptr);
|
||||
updated = true;
|
||||
|
||||
/* Replace value */
|
||||
lp = lpReplace(lp, &vptr, vsrc, val.size());
|
||||
}
|
||||
}
|
||||
|
||||
if (!updated) {
|
||||
/* Push new field/value pair onto the tail of the listpack */
|
||||
// TODO: we should at least allocate once for both elements.
|
||||
lp = lpAppend(lp, fsrc, field.size());
|
||||
lp = lpAppend(lp, vsrc, val.size());
|
||||
}
|
||||
|
||||
return make_pair(lp, !updated);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
@ -95,7 +129,7 @@ void HSetFamily::HExists(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
void HSetFamily::HGet(CmdArgList args, ConnectionContext* cntx) {
|
||||
string_view key = ArgS(args, 1);
|
||||
string_view field = ArgS(args, 1);
|
||||
string_view field = ArgS(args, 2);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpHGet(OpArgs{shard, t->db_index()}, key, field);
|
||||
|
@ -116,6 +150,24 @@ void HSetFamily::HGet(CmdArgList args, ConnectionContext* cntx) {
|
|||
void HSetFamily::HIncrBy(CmdArgList args, ConnectionContext* cntx) {
|
||||
}
|
||||
|
||||
void HSetFamily::HKeys(CmdArgList args, ConnectionContext* cntx) {
|
||||
string_view key = ArgS(args, 1);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpHKeys(OpArgs{shard, t->db_index()}, key);
|
||||
};
|
||||
|
||||
OpResult<vector<string>> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
if (result) {
|
||||
(*cntx)->StartArray(result->size());
|
||||
for (const auto& s : *result) {
|
||||
(*cntx)->SendBulkString(s);
|
||||
}
|
||||
} else {
|
||||
(*cntx)->SendError(result.status());
|
||||
}
|
||||
}
|
||||
|
||||
void HSetFamily::HSet(CmdArgList args, ConnectionContext* cntx) {
|
||||
string_view key = ArgS(args, 1);
|
||||
|
||||
|
@ -145,9 +197,13 @@ OpResult<uint32_t> HSetFamily::OpHSet(const OpArgs& op_args, string_view key, Cm
|
|||
auto& db_slice = op_args.shard->db_slice();
|
||||
const auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key);
|
||||
|
||||
DbSlice::InternalDbStats* stats = db_slice.MutableStats(op_args.db_ind);
|
||||
|
||||
if (inserted) {
|
||||
robj* ro = createHashObject();
|
||||
it->second.ImportRObj(ro);
|
||||
stats->listpack_blob_cnt++;
|
||||
stats->listpack_bytes += lpBytes((uint8_t*)ro->ptr);
|
||||
} else {
|
||||
if (it->second.ObjType() != OBJ_HASH)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
@ -156,19 +212,37 @@ OpResult<uint32_t> HSetFamily::OpHSet(const OpArgs& op_args, string_view key, Cm
|
|||
robj* hset = it->second.AsRObj();
|
||||
uint8_t* lp = (uint8_t*)hset->ptr;
|
||||
|
||||
if (hset->encoding == OBJ_ENCODING_LISTPACK && !IsGoodForListpack(values, lp)) {
|
||||
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
stats->listpack_bytes -= lpBytes(lp);
|
||||
|
||||
if (!IsGoodForListpack(values, lp)) {
|
||||
stats->listpack_blob_cnt--;
|
||||
hashTypeConvert(hset, OBJ_ENCODING_HT);
|
||||
}
|
||||
}
|
||||
|
||||
unsigned created = 0;
|
||||
|
||||
// TODO: we could avoid double copying by reimplementing hashTypeSet with better interface.
|
||||
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
bool inserted;
|
||||
for (size_t i = 0; i < values.size(); i += 2) {
|
||||
op_args.shard->tmp_str1 =
|
||||
sdscpylen(op_args.shard->tmp_str1, values[i].data(), values[i].size());
|
||||
op_args.shard->tmp_str2 =
|
||||
sdscpylen(op_args.shard->tmp_str2, values[i + 1].data(), values[i + 1].size());
|
||||
tie(lp, inserted) = lpInsertElem(lp, ArgS(values, i), ArgS(values, i + 1));
|
||||
created += inserted;
|
||||
}
|
||||
hset->ptr = lp;
|
||||
stats->listpack_bytes += lpBytes(lp);
|
||||
} else {
|
||||
DCHECK_EQ(OBJ_ENCODING_HT, hset->encoding);
|
||||
|
||||
created += !hashTypeSet(hset, op_args.shard->tmp_str1, op_args.shard->tmp_str2, HASH_SET_COPY);
|
||||
// Dictionary
|
||||
for (size_t i = 0; i < values.size(); i += 2) {
|
||||
sds fs = sdsnewlen(values[i].data(), values[i].size());
|
||||
sds vs = sdsnewlen(values[i + 1].data(), values[i + 1].size());
|
||||
|
||||
// hashTypeSet checks for hash_max_listpack_entries and converts into dictionary
|
||||
// if it goes beyond.
|
||||
created += !hashTypeSet(hset, fs, vs, HASH_SET_TAKE_FIELD | HASH_SET_TAKE_VALUE);
|
||||
}
|
||||
}
|
||||
it->second.SyncRObj();
|
||||
|
||||
|
@ -188,6 +262,11 @@ OpResult<uint32_t> HSetFamily::OpHDel(const OpArgs& op_args, string_view key, Cm
|
|||
robj* hset = co.AsRObj();
|
||||
unsigned deleted = 0;
|
||||
bool key_remove = false;
|
||||
DbSlice::InternalDbStats* stats = db_slice.MutableStats(op_args.db_ind);
|
||||
|
||||
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
stats->listpack_bytes -= lpBytes((uint8_t*)hset->ptr);
|
||||
}
|
||||
|
||||
for (auto s : values) {
|
||||
op_args.shard->tmp_str1 = sdscpylen(op_args.shard->tmp_str1, s.data(), s.size());
|
||||
|
@ -204,7 +283,12 @@ OpResult<uint32_t> HSetFamily::OpHDel(const OpArgs& op_args, string_view key, Cm
|
|||
co.SyncRObj();
|
||||
|
||||
if (key_remove) {
|
||||
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
stats->listpack_blob_cnt--;
|
||||
}
|
||||
db_slice.Del(op_args.db_ind, *it_res);
|
||||
} else if (hset->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
stats->listpack_bytes += lpBytes((uint8_t*)hset->ptr);
|
||||
}
|
||||
|
||||
return deleted;
|
||||
|
@ -262,6 +346,41 @@ OpResult<string> HSetFamily::OpHGet(const OpArgs& op_args, string_view key, stri
|
|||
LOG(FATAL) << "Unknown hash encoding " << hset->encoding;
|
||||
}
|
||||
|
||||
OpResult<vector<string>> HSetFamily::OpHKeys(const OpArgs& op_args, string_view key) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH);
|
||||
if (!it_res) {
|
||||
if (it_res.status() == OpStatus::KEY_NOTFOUND)
|
||||
return vector<string>{};
|
||||
return it_res.status();
|
||||
}
|
||||
|
||||
robj* hset = (*it_res)->second.AsRObj();
|
||||
auto* hi = hashTypeInitIterator(hset);
|
||||
|
||||
vector<string> res;
|
||||
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
unsigned slen;
|
||||
long long vll;
|
||||
while (hashTypeNext(hi) != C_ERR) {
|
||||
uint8_t* ptr = lpGetValue(hi->fptr, &slen, &vll);
|
||||
if (ptr) {
|
||||
res.emplace_back(reinterpret_cast<char*>(ptr), slen);
|
||||
} else {
|
||||
res.emplace_back(absl::StrCat(vll));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
while (hashTypeNext(hi) != C_ERR) {
|
||||
sds key = (sds)dictGetKey(hi->de);
|
||||
res.emplace_back(key, sdslen(key));
|
||||
}
|
||||
}
|
||||
|
||||
hashTypeReleaseIterator(hi);
|
||||
return res;
|
||||
}
|
||||
|
||||
using CI = CommandId;
|
||||
|
||||
#define HFUNC(x) SetHandler(&HSetFamily::x)
|
||||
|
@ -272,6 +391,7 @@ void HSetFamily::Register(CommandRegistry* registry) {
|
|||
<< CI{"HEXISTS", CO::FAST | CO::READONLY, 3, 1, 1, 1}.HFUNC(HExists)
|
||||
<< CI{"HGET", CO::FAST | CO::READONLY, 3, 1, 1, 1}.HFUNC(HGet)
|
||||
<< CI{"HINCRBY", CO::WRITE | CO::DENYOOM | CO::FAST, 4, 1, 1, 1}.HFUNC(HIncrBy)
|
||||
<< CI{"HKEYS", CO::READONLY, 2, 1, 1, 1}.HFUNC(HKeys)
|
||||
<< CI{"HSET", CO::WRITE | CO::FAST | CO::DENYOOM, -4, 1, 1, 1}.HFUNC(HSet)
|
||||
<< CI{"HSETNX", CO::WRITE | CO::DENYOOM | CO::FAST, 4, 1, 1, 1}.HFUNC(HSetNx)
|
||||
<< CI{"HSTRLEN", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(HStrLen);
|
||||
|
|
|
@ -23,6 +23,7 @@ class HSetFamily {
|
|||
static void HExists(CmdArgList args, ConnectionContext* cntx);
|
||||
static void HGet(CmdArgList args, ConnectionContext* cntx);
|
||||
static void HIncrBy(CmdArgList args, ConnectionContext* cntx);
|
||||
static void HKeys(CmdArgList args, ConnectionContext* cntx);
|
||||
|
||||
// hmset is deprecated, we should not implement it unless we have to.
|
||||
static void HSet(CmdArgList args, ConnectionContext* cntx);
|
||||
|
@ -35,6 +36,8 @@ class HSetFamily {
|
|||
static OpResult<uint32_t> OpHLen(const OpArgs& op_args, std::string_view key);
|
||||
static OpResult<std::string> OpHGet(const OpArgs& op_args, std::string_view key,
|
||||
std::string_view field);
|
||||
|
||||
static OpResult<std::vector<std::string>> OpHKeys(const OpArgs& op_args, std::string_view key);
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -4,10 +4,17 @@
|
|||
|
||||
#include "server/hset_family.h"
|
||||
|
||||
extern "C" {
|
||||
#include "redis/listpack.h"
|
||||
#include "redis/object.h"
|
||||
#include "redis/sds.h"
|
||||
#include "redis/zmalloc.h"
|
||||
}
|
||||
|
||||
#include "base/gtest.h"
|
||||
#include "base/logging.h"
|
||||
#include "server/test_utils.h"
|
||||
#include "facade/facade_test.h"
|
||||
#include "server/test_utils.h"
|
||||
|
||||
using namespace testing;
|
||||
using namespace std;
|
||||
|
@ -19,8 +26,21 @@ namespace dfly {
|
|||
|
||||
class HSetFamilyTest : public BaseFamilyTest {
|
||||
protected:
|
||||
static void SetUpTestSuite() {
|
||||
init_zmalloc_threadlocal();
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(HSetFamilyTest, Hash) {
|
||||
robj* obj = createHashObject();
|
||||
sds field = sdsnew("field");
|
||||
sds val = sdsnew("value");
|
||||
hashTypeSet(obj, field, val, 0);
|
||||
sdsfree(field);
|
||||
sdsfree(val);
|
||||
decrRefCount(obj);
|
||||
}
|
||||
|
||||
TEST_F(HSetFamilyTest, Basic) {
|
||||
auto resp = Run({"hset", "x", "a"});
|
||||
EXPECT_THAT(resp[0], ErrArg("wrong number"));
|
||||
|
@ -50,7 +70,16 @@ TEST_F(HSetFamilyTest, Basic) {
|
|||
|
||||
resp = Run({"hdel", "y", "a", "d"});
|
||||
EXPECT_THAT(resp[0], IntArg(2));
|
||||
|
||||
}
|
||||
|
||||
TEST_F(HSetFamilyTest, HSetLarge) {
|
||||
string val(1024, 'b');
|
||||
|
||||
auto resp = Run({"hset", "x", "a", val});
|
||||
EXPECT_THAT(resp[0], IntArg(1));
|
||||
resp = Run({"hlen", "x"});
|
||||
EXPECT_THAT(resp[0], IntArg(1));
|
||||
}
|
||||
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -407,6 +407,8 @@ tcp_port:)";
|
|||
absl::StrAppend(&info, "num_entries:", m.db.key_count, "\n");
|
||||
absl::StrAppend(&info, "inline_keys:", m.db.inline_keys, "\n");
|
||||
absl::StrAppend(&info, "small_string_bytes:", m.db.small_string_bytes, "\n");
|
||||
absl::StrAppend(&info, "listpack_blobs:", m.db.listpack_blob_cnt, "\n");
|
||||
absl::StrAppend(&info, "listpack_bytes:", m.db.listpack_bytes, "\n");
|
||||
}
|
||||
|
||||
if (should_enter("STATS")) {
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import random
|
||||
import string
|
||||
import redis as rclient
|
||||
import uuid
|
||||
import time
|
||||
|
||||
|
||||
def fill_set(args, redis: rclient.Redis):
|
||||
for j in range(args.num):
|
||||
token = uuid.uuid1().hex
|
||||
key = f'USER_OTP:{token}'
|
||||
otp = ''.join(random.choices(
|
||||
string.ascii_uppercase + string.digits, k=7))
|
||||
redis.execute_command('sadd', key, otp)
|
||||
|
||||
def fill_hset(args, redis):
|
||||
for j in range(args.num):
|
||||
token = uuid.uuid1().hex
|
||||
key = f'USER_INFO:{token}'
|
||||
phone = f'555-999-{j}'
|
||||
user_id = 'user' * 5 + f'-{j}'
|
||||
redis.hset(key, 'phone', phone)
|
||||
redis.hset(key, 'user_id', user_id)
|
||||
redis.hset(key, 'login_time', time.time())
|
||||
|
||||
|
||||
def main():
|
||||
# Check processor architecture
|
||||
parser = argparse.ArgumentParser(description='fill hset entities')
|
||||
parser.add_argument(
|
||||
'-p', type=int, help='redis port', dest='port', default=6380)
|
||||
parser.add_argument(
|
||||
'-n', type=int, help='number of keys', dest='num', default=10000)
|
||||
parser.add_argument(
|
||||
'--type', type=str, choices=['hset', 'set'], help='set type', default='hset')
|
||||
|
||||
args = parser.parse_args()
|
||||
redis = rclient.Redis(host='localhost', port=args.port, db=0)
|
||||
if args.type == 'hset':
|
||||
fill_hset(args, redis)
|
||||
elif args.type == 'set':
|
||||
fill_set(args, redis)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in New Issue