diff --git a/README.md b/README.md index d8e7253..2735117 100644 --- a/README.md +++ b/README.md @@ -155,6 +155,7 @@ API 2.0 - [ ] HINCRBY - [ ] HINCRBYFLOAT - [ ] HGETALL + - [X] HKEYS - [ ] PubSub family - [ ] PUBLISH - [ ] PUBSUB diff --git a/src/redis/redis_aux.c b/src/redis/redis_aux.c index e0cc53d..8a34874 100644 --- a/src/redis/redis_aux.c +++ b/src/redis/redis_aux.c @@ -15,9 +15,12 @@ void InitRedisTables() { server.zset_max_listpack_entries = 128; server.zset_max_listpack_value = 64; server.set_max_intset_entries = 512; + + // Present so that redis code compiles. However, we ignore this field and instead check against + // listpack total size in hset_family.cc server.hash_max_listpack_entries = 512; - server.hash_max_listpack_value = 64; -} + server.hash_max_listpack_value = 32; // decreased from redis default 64. + } // These functions are moved here from server.c int htNeedsResize(dict* dict) { diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 8332c71..d07abf9 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -26,7 +26,7 @@ using facade::OpStatus; #define ADD(x) (x) += o.x DbStats& DbStats::operator+=(const DbStats& o) { - static_assert(sizeof(DbStats) == 56); + static_assert(sizeof(DbStats) == 72); ADD(key_count); ADD(expire_count); @@ -36,6 +36,8 @@ DbStats& DbStats::operator+=(const DbStats& o) { ADD(obj_memory_usage); ADD(table_mem_usage); ADD(small_string_bytes); + ADD(listpack_blob_cnt); + ADD(listpack_bytes); return *this; } @@ -87,6 +89,8 @@ auto DbSlice::GetStats() const -> Stats { s.db.obj_memory_usage += db->stats.obj_memory_usage; s.db.inline_keys += db->stats.inline_keys; s.db.table_mem_usage += (db->prime_table.mem_usage() + db->expire_table.mem_usage()); + s.db.listpack_blob_cnt += db->stats.listpack_blob_cnt; + s.db.listpack_bytes += db->stats.listpack_bytes; } s.db.small_string_bytes = CompactObj::GetStats().small_string_bytes; @@ -249,8 +253,7 @@ size_t DbSlice::FlushDb(DbIndex db_ind) { db->prime_table.Clear(); db->expire_table.Clear(); db->mcflag_table.Clear(); - db->stats.inline_keys = 0; - db->stats.obj_memory_usage = 0; + db->stats = InternalDbStats{}; return removed; }; diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 23fdc88..b326000 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -42,6 +42,8 @@ struct DbStats { size_t small_string_bytes = 0; + size_t listpack_blob_cnt = 0; + size_t listpack_bytes = 0; DbStats& operator+=(const DbStats& o); }; @@ -54,14 +56,6 @@ struct SliceEvents { }; class DbSlice { - struct InternalDbStats { - // Number of inline keys. - uint64_t inline_keys = 0; - - // Object memory usage besides hash-table capacity. - // Applies for any non-inline objects. - size_t obj_memory_usage = 0; - }; DbSlice(const DbSlice&) = delete; void operator=(const DbSlice&) = delete; @@ -72,6 +66,18 @@ class DbSlice { SliceEvents events; }; + struct InternalDbStats { + // Number of inline keys. + uint64_t inline_keys = 0; + + // Object memory usage besides hash-table capacity. + // Applies for any non-inline objects. + size_t obj_memory_usage = 0; + size_t listpack_blob_cnt = 0; + size_t listpack_bytes = 0; + }; + + DbSlice(uint32_t index, EngineShard* owner); ~DbSlice(); @@ -165,6 +171,10 @@ class DbSlice { void PreUpdate(DbIndex db_ind, MainIterator it); void PostUpdate(DbIndex db_ind, MainIterator it); + InternalDbStats* MutableStats(DbIndex db_ind) { + return &db_arr_[db_ind]->stats; + } + // Check whether 'it' has not expired. Returns it if it's still valid. Otherwise, erases it // from both tables and return MainIterator{}. std::pair ExpireIfNeeded(DbIndex db_ind, MainIterator it) const; diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index 7d0ad8a..64b47e4 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -22,6 +22,8 @@ namespace dfly { namespace { +constexpr size_t kMaxListPackLen = 1024; + bool IsGoodForListpack(CmdArgList args, const uint8_t* lp) { size_t sum = 0; for (auto s : args) { @@ -30,7 +32,39 @@ bool IsGoodForListpack(CmdArgList args, const uint8_t* lp) { sum += s.size(); } - return lpSafeToAdd(const_cast(lp), sum); + return lpBytes(const_cast(lp)) + sum < kMaxListPackLen; +} + +// returns a new pointer to lp. Returns true if field was inserted or false it it already existed. +pair lpInsertElem(uint8_t* lp, string_view field, string_view val) { + uint8_t* vptr; + + uint8_t* fptr = lpFirst(lp); + uint8_t* fsrc = (uint8_t*)field.data(); + uint8_t* vsrc = (uint8_t*)val.data(); + + bool updated = false; + + if (fptr) { + fptr = lpFind(lp, fptr, fsrc, field.size(), 1); + if (fptr) { + /* Grab pointer to the value (fptr points to the field) */ + vptr = lpNext(lp, fptr); + updated = true; + + /* Replace value */ + lp = lpReplace(lp, &vptr, vsrc, val.size()); + } + } + + if (!updated) { + /* Push new field/value pair onto the tail of the listpack */ + // TODO: we should at least allocate once for both elements. + lp = lpAppend(lp, fsrc, field.size()); + lp = lpAppend(lp, vsrc, val.size()); + } + + return make_pair(lp, !updated); } } // namespace @@ -95,7 +129,7 @@ void HSetFamily::HExists(CmdArgList args, ConnectionContext* cntx) { void HSetFamily::HGet(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); - string_view field = ArgS(args, 1); + string_view field = ArgS(args, 2); auto cb = [&](Transaction* t, EngineShard* shard) { return OpHGet(OpArgs{shard, t->db_index()}, key, field); @@ -116,6 +150,24 @@ void HSetFamily::HGet(CmdArgList args, ConnectionContext* cntx) { void HSetFamily::HIncrBy(CmdArgList args, ConnectionContext* cntx) { } +void HSetFamily::HKeys(CmdArgList args, ConnectionContext* cntx) { + string_view key = ArgS(args, 1); + + auto cb = [&](Transaction* t, EngineShard* shard) { + return OpHKeys(OpArgs{shard, t->db_index()}, key); + }; + + OpResult> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + if (result) { + (*cntx)->StartArray(result->size()); + for (const auto& s : *result) { + (*cntx)->SendBulkString(s); + } + } else { + (*cntx)->SendError(result.status()); + } +} + void HSetFamily::HSet(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); @@ -145,9 +197,13 @@ OpResult HSetFamily::OpHSet(const OpArgs& op_args, string_view key, Cm auto& db_slice = op_args.shard->db_slice(); const auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key); + DbSlice::InternalDbStats* stats = db_slice.MutableStats(op_args.db_ind); + if (inserted) { robj* ro = createHashObject(); it->second.ImportRObj(ro); + stats->listpack_blob_cnt++; + stats->listpack_bytes += lpBytes((uint8_t*)ro->ptr); } else { if (it->second.ObjType() != OBJ_HASH) return OpStatus::WRONG_TYPE; @@ -156,19 +212,37 @@ OpResult HSetFamily::OpHSet(const OpArgs& op_args, string_view key, Cm robj* hset = it->second.AsRObj(); uint8_t* lp = (uint8_t*)hset->ptr; - if (hset->encoding == OBJ_ENCODING_LISTPACK && !IsGoodForListpack(values, lp)) { - hashTypeConvert(hset, OBJ_ENCODING_HT); + if (hset->encoding == OBJ_ENCODING_LISTPACK) { + stats->listpack_bytes -= lpBytes(lp); + + if (!IsGoodForListpack(values, lp)) { + stats->listpack_blob_cnt--; + hashTypeConvert(hset, OBJ_ENCODING_HT); + } } + unsigned created = 0; - // TODO: we could avoid double copying by reimplementing hashTypeSet with better interface. - for (size_t i = 0; i < values.size(); i += 2) { - op_args.shard->tmp_str1 = - sdscpylen(op_args.shard->tmp_str1, values[i].data(), values[i].size()); - op_args.shard->tmp_str2 = - sdscpylen(op_args.shard->tmp_str2, values[i + 1].data(), values[i + 1].size()); + if (hset->encoding == OBJ_ENCODING_LISTPACK) { + bool inserted; + for (size_t i = 0; i < values.size(); i += 2) { + tie(lp, inserted) = lpInsertElem(lp, ArgS(values, i), ArgS(values, i + 1)); + created += inserted; + } + hset->ptr = lp; + stats->listpack_bytes += lpBytes(lp); + } else { + DCHECK_EQ(OBJ_ENCODING_HT, hset->encoding); - created += !hashTypeSet(hset, op_args.shard->tmp_str1, op_args.shard->tmp_str2, HASH_SET_COPY); + // Dictionary + for (size_t i = 0; i < values.size(); i += 2) { + sds fs = sdsnewlen(values[i].data(), values[i].size()); + sds vs = sdsnewlen(values[i + 1].data(), values[i + 1].size()); + + // hashTypeSet checks for hash_max_listpack_entries and converts into dictionary + // if it goes beyond. + created += !hashTypeSet(hset, fs, vs, HASH_SET_TAKE_FIELD | HASH_SET_TAKE_VALUE); + } } it->second.SyncRObj(); @@ -188,6 +262,11 @@ OpResult HSetFamily::OpHDel(const OpArgs& op_args, string_view key, Cm robj* hset = co.AsRObj(); unsigned deleted = 0; bool key_remove = false; + DbSlice::InternalDbStats* stats = db_slice.MutableStats(op_args.db_ind); + + if (hset->encoding == OBJ_ENCODING_LISTPACK) { + stats->listpack_bytes -= lpBytes((uint8_t*)hset->ptr); + } for (auto s : values) { op_args.shard->tmp_str1 = sdscpylen(op_args.shard->tmp_str1, s.data(), s.size()); @@ -204,7 +283,12 @@ OpResult HSetFamily::OpHDel(const OpArgs& op_args, string_view key, Cm co.SyncRObj(); if (key_remove) { + if (hset->encoding == OBJ_ENCODING_LISTPACK) { + stats->listpack_blob_cnt--; + } db_slice.Del(op_args.db_ind, *it_res); + } else if (hset->encoding == OBJ_ENCODING_LISTPACK) { + stats->listpack_bytes += lpBytes((uint8_t*)hset->ptr); } return deleted; @@ -262,6 +346,41 @@ OpResult HSetFamily::OpHGet(const OpArgs& op_args, string_view key, stri LOG(FATAL) << "Unknown hash encoding " << hset->encoding; } +OpResult> HSetFamily::OpHKeys(const OpArgs& op_args, string_view key) { + auto& db_slice = op_args.shard->db_slice(); + auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH); + if (!it_res) { + if (it_res.status() == OpStatus::KEY_NOTFOUND) + return vector{}; + return it_res.status(); + } + + robj* hset = (*it_res)->second.AsRObj(); + auto* hi = hashTypeInitIterator(hset); + + vector res; + if (hset->encoding == OBJ_ENCODING_LISTPACK) { + unsigned slen; + long long vll; + while (hashTypeNext(hi) != C_ERR) { + uint8_t* ptr = lpGetValue(hi->fptr, &slen, &vll); + if (ptr) { + res.emplace_back(reinterpret_cast(ptr), slen); + } else { + res.emplace_back(absl::StrCat(vll)); + } + } + } else { + while (hashTypeNext(hi) != C_ERR) { + sds key = (sds)dictGetKey(hi->de); + res.emplace_back(key, sdslen(key)); + } + } + + hashTypeReleaseIterator(hi); + return res; +} + using CI = CommandId; #define HFUNC(x) SetHandler(&HSetFamily::x) @@ -272,6 +391,7 @@ void HSetFamily::Register(CommandRegistry* registry) { << CI{"HEXISTS", CO::FAST | CO::READONLY, 3, 1, 1, 1}.HFUNC(HExists) << CI{"HGET", CO::FAST | CO::READONLY, 3, 1, 1, 1}.HFUNC(HGet) << CI{"HINCRBY", CO::WRITE | CO::DENYOOM | CO::FAST, 4, 1, 1, 1}.HFUNC(HIncrBy) + << CI{"HKEYS", CO::READONLY, 2, 1, 1, 1}.HFUNC(HKeys) << CI{"HSET", CO::WRITE | CO::FAST | CO::DENYOOM, -4, 1, 1, 1}.HFUNC(HSet) << CI{"HSETNX", CO::WRITE | CO::DENYOOM | CO::FAST, 4, 1, 1, 1}.HFUNC(HSetNx) << CI{"HSTRLEN", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(HStrLen); diff --git a/src/server/hset_family.h b/src/server/hset_family.h index 9479646..7d2c469 100644 --- a/src/server/hset_family.h +++ b/src/server/hset_family.h @@ -23,6 +23,7 @@ class HSetFamily { static void HExists(CmdArgList args, ConnectionContext* cntx); static void HGet(CmdArgList args, ConnectionContext* cntx); static void HIncrBy(CmdArgList args, ConnectionContext* cntx); + static void HKeys(CmdArgList args, ConnectionContext* cntx); // hmset is deprecated, we should not implement it unless we have to. static void HSet(CmdArgList args, ConnectionContext* cntx); @@ -35,6 +36,8 @@ class HSetFamily { static OpResult OpHLen(const OpArgs& op_args, std::string_view key); static OpResult OpHGet(const OpArgs& op_args, std::string_view key, std::string_view field); + + static OpResult> OpHKeys(const OpArgs& op_args, std::string_view key); }; } // namespace dfly diff --git a/src/server/hset_family_test.cc b/src/server/hset_family_test.cc index 7d59295..0d5952f 100644 --- a/src/server/hset_family_test.cc +++ b/src/server/hset_family_test.cc @@ -4,10 +4,17 @@ #include "server/hset_family.h" +extern "C" { +#include "redis/listpack.h" +#include "redis/object.h" +#include "redis/sds.h" +#include "redis/zmalloc.h" +} + #include "base/gtest.h" #include "base/logging.h" -#include "server/test_utils.h" #include "facade/facade_test.h" +#include "server/test_utils.h" using namespace testing; using namespace std; @@ -19,38 +26,60 @@ namespace dfly { class HSetFamilyTest : public BaseFamilyTest { protected: + static void SetUpTestSuite() { + init_zmalloc_threadlocal(); + } }; +TEST_F(HSetFamilyTest, Hash) { + robj* obj = createHashObject(); + sds field = sdsnew("field"); + sds val = sdsnew("value"); + hashTypeSet(obj, field, val, 0); + sdsfree(field); + sdsfree(val); + decrRefCount(obj); +} + TEST_F(HSetFamilyTest, Basic) { auto resp = Run({"hset", "x", "a"}); - EXPECT_THAT(resp[0], ErrArg("wrong number")); + EXPECT_THAT(resp[0], ErrArg("wrong number")); resp = Run({"hset", "x", "a", "b"}); - EXPECT_THAT(resp[0], IntArg(1)); + EXPECT_THAT(resp[0], IntArg(1)); resp = Run({"hlen", "x"}); - EXPECT_THAT(resp[0], IntArg(1)); + EXPECT_THAT(resp[0], IntArg(1)); resp = Run({"hexists", "x", "a"}); - EXPECT_THAT(resp[0], IntArg(1)); + EXPECT_THAT(resp[0], IntArg(1)); resp = Run({"hexists", "x", "b"}); - EXPECT_THAT(resp[0], IntArg(0)); + EXPECT_THAT(resp[0], IntArg(0)); resp = Run({"hexists", "y", "a"}); - EXPECT_THAT(resp[0], IntArg(0)); + EXPECT_THAT(resp[0], IntArg(0)); resp = Run({"hset", "x", "a", "b"}); - EXPECT_THAT(resp[0], IntArg(0)); + EXPECT_THAT(resp[0], IntArg(0)); resp = Run({"hset", "x", "a", "c"}); - EXPECT_THAT(resp[0], IntArg(0)); + EXPECT_THAT(resp[0], IntArg(0)); resp = Run({"hset", "y", "a", "c", "d", "e"}); - EXPECT_THAT(resp[0], IntArg(2)); + EXPECT_THAT(resp[0], IntArg(2)); resp = Run({"hdel", "y", "a", "d"}); - EXPECT_THAT(resp[0], IntArg(2)); - + EXPECT_THAT(resp[0], IntArg(2)); } +TEST_F(HSetFamilyTest, HSetLarge) { + string val(1024, 'b'); + + auto resp = Run({"hset", "x", "a", val}); + EXPECT_THAT(resp[0], IntArg(1)); + resp = Run({"hlen", "x"}); + EXPECT_THAT(resp[0], IntArg(1)); +} + + } // namespace dfly diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 97e9fb1..0015935 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -407,6 +407,8 @@ tcp_port:)"; absl::StrAppend(&info, "num_entries:", m.db.key_count, "\n"); absl::StrAppend(&info, "inline_keys:", m.db.inline_keys, "\n"); absl::StrAppend(&info, "small_string_bytes:", m.db.small_string_bytes, "\n"); + absl::StrAppend(&info, "listpack_blobs:", m.db.listpack_blob_cnt, "\n"); + absl::StrAppend(&info, "listpack_bytes:", m.db.listpack_bytes, "\n"); } if (should_enter("STATS")) { diff --git a/tests/generate_sets.py b/tests/generate_sets.py new file mode 100755 index 0000000..72b39f9 --- /dev/null +++ b/tests/generate_sets.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 + +import argparse +import random +import string +import redis as rclient +import uuid +import time + + +def fill_set(args, redis: rclient.Redis): + for j in range(args.num): + token = uuid.uuid1().hex + key = f'USER_OTP:{token}' + otp = ''.join(random.choices( + string.ascii_uppercase + string.digits, k=7)) + redis.execute_command('sadd', key, otp) + +def fill_hset(args, redis): + for j in range(args.num): + token = uuid.uuid1().hex + key = f'USER_INFO:{token}' + phone = f'555-999-{j}' + user_id = 'user' * 5 + f'-{j}' + redis.hset(key, 'phone', phone) + redis.hset(key, 'user_id', user_id) + redis.hset(key, 'login_time', time.time()) + + +def main(): + # Check processor architecture + parser = argparse.ArgumentParser(description='fill hset entities') + parser.add_argument( + '-p', type=int, help='redis port', dest='port', default=6380) + parser.add_argument( + '-n', type=int, help='number of keys', dest='num', default=10000) + parser.add_argument( + '--type', type=str, choices=['hset', 'set'], help='set type', default='hset') + + args = parser.parse_args() + redis = rclient.Redis(host='localhost', port=args.port, db=0) + if args.type == 'hset': + fill_hset(args, redis) + elif args.type == 'set': + fill_set(args, redis) + +if __name__ == "__main__": + main()