From cafabce161f08c1f71a09dae3ce16235a0e40457 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Tue, 12 Apr 2022 21:21:03 +0300 Subject: [PATCH] Support loading of zset, hset entries --- README.md | 1 - src/server/rdb_load.cc | 305 ++++++++++++++++++++++++++++++++++++++--- src/server/rdb_load.h | 7 +- src/server/rdb_test.cc | 13 +- 4 files changed, 303 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 7dab756..bd197d6 100644 --- a/README.md +++ b/README.md @@ -72,7 +72,6 @@ API 1.0 - [X] DBSIZE - [ ] BGSAVE - [X] SAVE - - [X] DBSIZE - [X] DEBUG - [X] EXEC - [X] FLUSHALL diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index bc716d4..334543f 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -5,6 +5,7 @@ #include "server/rdb_load.h" extern "C" { + #include "redis/intset.h" #include "redis/listpack.h" #include "redis/lzfP.h" /* LZF compression library */ @@ -12,6 +13,7 @@ extern "C" { #include "redis/util.h" #include "redis/ziplist.h" #include "redis/zmalloc.h" +#include "redis/zset.h" } #include @@ -73,11 +75,11 @@ error_condition error_category::default_error_condition(int ev) const noexcept { error_category rdb_category; -inline error_code RdbError(int ev) { +inline error_code RdbError(errc ev) { return error_code{ev, rdb_category}; } -inline auto Unexpected(int ev) { +inline auto Unexpected(errc ev) { return make_unexpected(RdbError(ev)); } @@ -253,10 +255,7 @@ error_code RdbLoader::Load(io::Source* src) { while (1) { /* Read type. */ - auto expected_type = FetchType(); - if (!expected_type) - return expected_type.error(); - type = expected_type.value(); + SET_OR_RETURN(FetchType(), type); /* Handle special types. */ if (type == RDB_OPCODE_EXPIRETIME) { @@ -643,6 +642,48 @@ auto RdbLoader::FetchIntegerObject(int enctype, int flags, size_t* lenptr) return make_pair(o, 16); } +io::Result RdbLoader::FetchBinaryDouble() { + union { + uint64_t val; + double d; + } u; + + static_assert(sizeof(u) == sizeof(uint64_t)); + auto ec = EnsureRead(8); + if (ec) + return make_unexpected(ec); + + uint8_t buf[8]; + mem_buf_.ReadAndConsume(8, buf); + u.val = base::LE::LoadT(buf); + return u.d; +} + +io::Result RdbLoader::FetchDouble() { + uint8_t len; + + SET_OR_UNEXPECT(FetchInt(), len); + constexpr double kInf = std::numeric_limits::infinity(); + switch (len) { + case 255: + return -kInf; + case 254: + return kInf; + case 253: + return std::numeric_limits::quiet_NaN(); + default:; + } + char buf[256]; + error_code ec = FetchBuf(len, buf); + if (ec) + return make_unexpected(ec); + buf[len] = '\0'; + double val; + if (sscanf(buf, "%lg", &val) != 1) + return Unexpected(errc::rdb_file_corrupted); + return val; +} + auto RdbLoader::ReadKey() -> io::Result { auto res = FetchGenericString(RDB_LOAD_SDS); if (res) { @@ -668,11 +709,21 @@ io::Result RdbLoader::ReadObj(int rdbtype) { case RDB_TYPE_SET: res_obj = ReadSet(); break; + case RDB_TYPE_SET_INTSET: + res_obj = ReadIntSet(); + break; case RDB_TYPE_HASH_ZIPLIST: res_obj = ReadHZiplist(); break; + case RDB_TYPE_HASH: + res_obj = ReadHSet(); + break; case RDB_TYPE_ZSET: - res_obj = ReadZSet(); + case RDB_TYPE_ZSET_2: + res_obj = ReadZSet(rdbtype); + break; + case RDB_TYPE_ZSET_ZIPLIST: + res_obj = ReadZSetZL(); break; case RDB_TYPE_LIST_QUICKLIST: res_obj = ReadListQuicklist(rdbtype); @@ -686,13 +737,9 @@ io::Result RdbLoader::ReadObj(int rdbtype) { } io::Result RdbLoader::ReadSet() { - /* Read Set value */ - io::Result io_len = LoadLen(NULL); + size_t len; + SET_OR_UNEXPECT(LoadLen(NULL), len); - if (!io_len) - return make_unexpected(io_len.error()); - - size_t len = *io_len; if (len == 0) return Unexpected(errc::empty_key); @@ -715,6 +762,7 @@ io::Result RdbLoader::ReadSet() { return Unexpected(errc::out_of_memory); } } else { + // TODO: why do we bother creating intset if it was recorded as non intset? res = createIntsetObject(); } @@ -767,6 +815,40 @@ io::Result RdbLoader::ReadSet() { return res; } +::io::Result RdbLoader::ReadIntSet() { + OpaqueBuf fetch; + SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_PLAIN), fetch); + + if (fetch.second == 0) { + return Unexpected(errc::rdb_file_corrupted); + } + DCHECK(fetch.first); + + if (!intsetValidateIntegrity((uint8_t*)fetch.first, fetch.second, 0)) { + LOG(ERROR) << "Intset integrity check failed."; + zfree(fetch.first); + return Unexpected(errc::rdb_file_corrupted); + } + + intset* is = (intset*)fetch.first; + robj* res; + unsigned len = intsetLen(is); + if (len > SetFamily::MaxIntsetEntries()) { + res = createSetObject(); + if (len > DICT_HT_INITIAL_SIZE && dictTryExpand((dict*)res->ptr, len) != DICT_OK) { + LOG(ERROR) << "OOM in dictTryExpand " << len; + decrRefCount(res); + return Unexpected(errc::out_of_memory); + } + SetFamily::ConvertTo(is, (dict*)res->ptr); + zfree(is); + } else { + res = createObject(OBJ_SET, is); + res->encoding = OBJ_ENCODING_INTSET; + } + return res; +} + io::Result RdbLoader::ReadHZiplist() { OpaqueBuf fetch; SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_PLAIN), fetch); @@ -803,9 +885,200 @@ io::Result RdbLoader::ReadHZiplist() { return res; } -io::Result RdbLoader::ReadZSet() { - LOG(FATAL) << "TBD"; - return NULL; +io::Result RdbLoader::ReadHSet() { + uint64_t len; + SET_OR_UNEXPECT(LoadLen(nullptr), len); + + if (len == 0) + return Unexpected(errc::empty_key); + + sds field = nullptr; + sds value = nullptr; + robj* res = createHashObject(); + + /* Too many entries? Use a hash table right from the start. */ + if (len > server.hash_max_listpack_entries) + hashTypeConvert(res, OBJ_ENCODING_HT); + + auto cleanup = absl::Cleanup([&] { + decrRefCount(res); + if (field) + sdsfree(field); + if (value) + sdsfree(value); + }); + + /* Load every field and value into the ziplist */ + while (res->encoding == OBJ_ENCODING_LISTPACK && len > 0) { + len--; + + OpaqueBuf ofield, ovalue; + SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_SDS), ofield); + field = (sds)ofield.first; + + SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_SDS), ovalue); + value = (sds)ovalue.first; + + /* Convert to hash table if size threshold is exceeded */ + if (sdslen(field) > server.hash_max_listpack_value || + sdslen(value) > server.hash_max_listpack_value || + !lpSafeToAdd((uint8_t*)res->ptr, sdslen(field) + sdslen(value))) { + hashTypeConvert(res, OBJ_ENCODING_HT); + int ret = dictAdd((dict*)res->ptr, field, value); + if (ret == DICT_ERR) { + return Unexpected(errc::rdb_file_corrupted); + } + break; + } + + /* Add pair to listpack */ + res->ptr = lpAppend((uint8_t*)res->ptr, (uint8_t*)field, sdslen(field)); + res->ptr = lpAppend((uint8_t*)res->ptr, (uint8_t*)value, sdslen(value)); + + sdsfree(field); + sdsfree(value); + field = value = nullptr; + } + + if (res->encoding == OBJ_ENCODING_HT) { + if (len > DICT_HT_INITIAL_SIZE) { + if (dictTryExpand((dict*)res->ptr, len) != DICT_OK) { + LOG(ERROR) << "OOM in dictTryExpand " << len; + return Unexpected(errc::out_of_memory); + } + } + + /* Load remaining fields and values into the hash table */ + while (len > 0) { + len--; + + OpaqueBuf ofield, ovalue; + SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_SDS), ofield); + field = (sds)ofield.first; + + SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_SDS), ovalue); + value = (sds)ovalue.first; + + /* Add pair to hash table */ + int ret = dictAdd((dict*)res->ptr, field, value); + if (ret == DICT_ERR) { + LOG(ERROR) << "Duplicate hash fields detected"; + return Unexpected(errc::rdb_file_corrupted); + } + } + } + DCHECK_EQ(0u, len); + + std::move(cleanup).Cancel(); + return res; +} + +io::Result RdbLoader::ReadZSet(int rdbtype) { + /* Read sorted set value. */ + uint64_t zsetlen; + SET_OR_UNEXPECT(LoadLen(nullptr), zsetlen); + + if (zsetlen == 0) + return Unexpected(errc::empty_key); + + robj* res = createZsetObject(); + zset* zs = (zset*)res->ptr; + sds sdsele = nullptr; + + auto cleanup = absl::Cleanup([&] { + decrRefCount(res); + if (sdsele) + sdsfree(sdsele); + }); + + if (zsetlen > DICT_HT_INITIAL_SIZE && dictTryExpand(zs->dict, zsetlen) != DICT_OK) { + LOG(ERROR) << "OOM in dictTryExpand " << zsetlen; + return Unexpected(errc::out_of_memory); + } + + size_t maxelelen = 0, totelelen = 0; + + /* Load every single element of the sorted set. */ + while (zsetlen--) { + double score; + zskiplistNode* znode; + + OpaqueBuf fetch; + SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_SDS), fetch); + + sdsele = (sds)fetch.first; + + if (rdbtype == RDB_TYPE_ZSET_2) { + SET_OR_UNEXPECT(FetchBinaryDouble(), score); + } else { + SET_OR_UNEXPECT(FetchDouble(), score); + } + + if (isnan(score)) { + LOG(ERROR) << "Zset with NAN score detected"; + return Unexpected(errc::rdb_file_corrupted); + } + + /* Don't care about integer-encoded strings. */ + if (sdslen(sdsele) > maxelelen) + maxelelen = sdslen(sdsele); + totelelen += sdslen(sdsele); + + znode = zslInsert(zs->zsl, score, sdsele); + int ret = dictAdd(zs->dict, sdsele, &znode->score); + sdsele = nullptr; + + if (ret != DICT_OK) { + LOG(ERROR) << "Duplicate zset fields detected"; + return Unexpected(errc::rdb_file_corrupted); + } + } + + /* Convert *after* loading, since sorted sets are not stored ordered. */ + if (zsetLength(res) <= server.zset_max_listpack_entries && + maxelelen <= server.zset_max_listpack_value && lpSafeToAdd(NULL, totelelen)) { + zsetConvert(res, OBJ_ENCODING_LISTPACK); + } + + std::move(cleanup).Cancel(); + + return res; +} + +io::Result RdbLoader::ReadZSetZL() { + OpaqueBuf fetch; + SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_PLAIN), fetch); + + if (fetch.second == 0) { + return Unexpected(errc::rdb_file_corrupted); + } + DCHECK(fetch.first); + + unsigned char* lp = lpNew(fetch.second); + if (!ziplistPairsConvertAndValidateIntegrity((uint8_t*)fetch.first, fetch.second, &lp)) { + LOG(ERROR) << "Zset ziplist integrity check failed."; + zfree(lp); + zfree(fetch.first); + return Unexpected(errc::rdb_file_corrupted); + } + + zfree(fetch.first); + + if (lpLength(lp) == 0) { + lpFree(lp); + + return Unexpected(errc::empty_key); + } + + robj* res = createObject(OBJ_ZSET, lp); + res->encoding = OBJ_ENCODING_LISTPACK; + + if (lpBytes(lp) > server.zset_max_listpack_entries) + zsetConvert(res, OBJ_ENCODING_SKIPLIST); + else + res->ptr = lpShrinkToFit(lp); + + return res; } io::Result RdbLoader::ReadListQuicklist(int rdbtype) { diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index 10a2560..cfa1d3d 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -53,12 +53,17 @@ class RdbLoader { using OpaqueBuf = std::pair; io::Result FetchGenericString(int flags); io::Result FetchIntegerObject(int enctype, int flags, size_t* lenptr); + io::Result FetchBinaryDouble(); + io::Result FetchDouble(); ::io::Result ReadKey(); ::io::Result ReadObj(int rdbtype); ::io::Result ReadSet(); + ::io::Result ReadIntSet(); ::io::Result ReadHZiplist(); - ::io::Result ReadZSet(); + ::io::Result ReadHSet(); + ::io::Result ReadZSet(int rdbtype); + ::io::Result ReadZSetZL(); ::io::Result ReadListQuicklist(int rdbtype); std::error_code EnsureRead(size_t min_sz) { diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index b6af5bb..f6bc0d7 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -92,22 +92,25 @@ TEST_F(RdbTest, Reload) { Run({"set", "huge_key", string((1 << 17) - 10, 'H')}); Run({"sadd", "set_key1", "val1", "val2"}); - // Run({"sadd", "intset_key", "1", "2", "3"}); + Run({"sadd", "intset_key", "1", "2", "3"}); Run({"hset", "small_hset", "field1", "val1", "field2", "val2"}); - // Run({"hset", "large_hset", "field1", string(510, 'V'), string(120, 'F'), "val2"}); + Run({"hset", "large_hset", "field1", string(510, 'V'), string(120, 'F'), "val2"}); Run({"rpush", "list_key1", "val", "val2"}); Run({"rpush", "list_key2", "head", string(511, 'a'), string(500, 'b'), "tail"}); - // Run({"zadd", "zs1", "1.1", "a", "-1.1", "b"}); - // Run({"zadd", "zs2", "1.1", string(510, 'a'), "-1.1", string(502, 'b')}); + Run({"zadd", "zs1", "1.1", "a", "-1.1", "b"}); + Run({"zadd", "zs2", "1.1", string(510, 'a'), "-1.1", string(502, 'b')}); Run({"debug", "reload"}); EXPECT_EQ(2, CheckedInt({"scard", "set_key1"})); - // EXPECT_EQ(3, CheckedInt({"scard", "intset_key"})); + EXPECT_EQ(3, CheckedInt({"scard", "intset_key"})); EXPECT_EQ(2, CheckedInt({"hlen", "small_hset"})); + EXPECT_EQ(2, CheckedInt({"hlen", "large_hset"})); EXPECT_EQ(4, CheckedInt({"LLEN", "list_key2"})); + EXPECT_EQ(2, CheckedInt({"ZCARD", "zs1"})); + EXPECT_EQ(2, CheckedInt({"ZCARD", "zs2"})); }