Support loading of zset, hset entries

This commit is contained in:
Roman Gershman 2022-04-12 21:21:03 +03:00
parent bfcefd932d
commit cafabce161
4 changed files with 303 additions and 23 deletions

View File

@ -72,7 +72,6 @@ API 1.0
- [X] DBSIZE
- [ ] BGSAVE
- [X] SAVE
- [X] DBSIZE
- [X] DEBUG
- [X] EXEC
- [X] FLUSHALL

View File

@ -5,6 +5,7 @@
#include "server/rdb_load.h"
extern "C" {
#include "redis/intset.h"
#include "redis/listpack.h"
#include "redis/lzfP.h" /* LZF compression library */
@ -12,6 +13,7 @@ extern "C" {
#include "redis/util.h"
#include "redis/ziplist.h"
#include "redis/zmalloc.h"
#include "redis/zset.h"
}
#include <absl/cleanup/cleanup.h>
@ -73,11 +75,11 @@ error_condition error_category::default_error_condition(int ev) const noexcept {
error_category rdb_category;
inline error_code RdbError(int ev) {
inline error_code RdbError(errc ev) {
return error_code{ev, rdb_category};
}
inline auto Unexpected(int ev) {
inline auto Unexpected(errc ev) {
return make_unexpected(RdbError(ev));
}
@ -253,10 +255,7 @@ error_code RdbLoader::Load(io::Source* src) {
while (1) {
/* Read type. */
auto expected_type = FetchType();
if (!expected_type)
return expected_type.error();
type = expected_type.value();
SET_OR_RETURN(FetchType(), type);
/* Handle special types. */
if (type == RDB_OPCODE_EXPIRETIME) {
@ -643,6 +642,48 @@ auto RdbLoader::FetchIntegerObject(int enctype, int flags, size_t* lenptr)
return make_pair(o, 16);
}
io::Result<double> RdbLoader::FetchBinaryDouble() {
union {
uint64_t val;
double d;
} u;
static_assert(sizeof(u) == sizeof(uint64_t));
auto ec = EnsureRead(8);
if (ec)
return make_unexpected(ec);
uint8_t buf[8];
mem_buf_.ReadAndConsume(8, buf);
u.val = base::LE::LoadT<uint64_t>(buf);
return u.d;
}
io::Result<double> RdbLoader::FetchDouble() {
uint8_t len;
SET_OR_UNEXPECT(FetchInt<uint8_t>(), len);
constexpr double kInf = std::numeric_limits<double>::infinity();
switch (len) {
case 255:
return -kInf;
case 254:
return kInf;
case 253:
return std::numeric_limits<double>::quiet_NaN();
default:;
}
char buf[256];
error_code ec = FetchBuf(len, buf);
if (ec)
return make_unexpected(ec);
buf[len] = '\0';
double val;
if (sscanf(buf, "%lg", &val) != 1)
return Unexpected(errc::rdb_file_corrupted);
return val;
}
auto RdbLoader::ReadKey() -> io::Result<sds> {
auto res = FetchGenericString(RDB_LOAD_SDS);
if (res) {
@ -668,11 +709,21 @@ io::Result<robj*> RdbLoader::ReadObj(int rdbtype) {
case RDB_TYPE_SET:
res_obj = ReadSet();
break;
case RDB_TYPE_SET_INTSET:
res_obj = ReadIntSet();
break;
case RDB_TYPE_HASH_ZIPLIST:
res_obj = ReadHZiplist();
break;
case RDB_TYPE_HASH:
res_obj = ReadHSet();
break;
case RDB_TYPE_ZSET:
res_obj = ReadZSet();
case RDB_TYPE_ZSET_2:
res_obj = ReadZSet(rdbtype);
break;
case RDB_TYPE_ZSET_ZIPLIST:
res_obj = ReadZSetZL();
break;
case RDB_TYPE_LIST_QUICKLIST:
res_obj = ReadListQuicklist(rdbtype);
@ -686,13 +737,9 @@ io::Result<robj*> RdbLoader::ReadObj(int rdbtype) {
}
io::Result<robj*> RdbLoader::ReadSet() {
/* Read Set value */
io::Result<uint64_t> io_len = LoadLen(NULL);
size_t len;
SET_OR_UNEXPECT(LoadLen(NULL), len);
if (!io_len)
return make_unexpected(io_len.error());
size_t len = *io_len;
if (len == 0)
return Unexpected(errc::empty_key);
@ -715,6 +762,7 @@ io::Result<robj*> RdbLoader::ReadSet() {
return Unexpected(errc::out_of_memory);
}
} else {
// TODO: why do we bother creating intset if it was recorded as non intset?
res = createIntsetObject();
}
@ -767,6 +815,40 @@ io::Result<robj*> RdbLoader::ReadSet() {
return res;
}
::io::Result<robj*> RdbLoader::ReadIntSet() {
OpaqueBuf fetch;
SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_PLAIN), fetch);
if (fetch.second == 0) {
return Unexpected(errc::rdb_file_corrupted);
}
DCHECK(fetch.first);
if (!intsetValidateIntegrity((uint8_t*)fetch.first, fetch.second, 0)) {
LOG(ERROR) << "Intset integrity check failed.";
zfree(fetch.first);
return Unexpected(errc::rdb_file_corrupted);
}
intset* is = (intset*)fetch.first;
robj* res;
unsigned len = intsetLen(is);
if (len > SetFamily::MaxIntsetEntries()) {
res = createSetObject();
if (len > DICT_HT_INITIAL_SIZE && dictTryExpand((dict*)res->ptr, len) != DICT_OK) {
LOG(ERROR) << "OOM in dictTryExpand " << len;
decrRefCount(res);
return Unexpected(errc::out_of_memory);
}
SetFamily::ConvertTo(is, (dict*)res->ptr);
zfree(is);
} else {
res = createObject(OBJ_SET, is);
res->encoding = OBJ_ENCODING_INTSET;
}
return res;
}
io::Result<robj*> RdbLoader::ReadHZiplist() {
OpaqueBuf fetch;
SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_PLAIN), fetch);
@ -803,9 +885,200 @@ io::Result<robj*> RdbLoader::ReadHZiplist() {
return res;
}
io::Result<robj*> RdbLoader::ReadZSet() {
LOG(FATAL) << "TBD";
return NULL;
io::Result<robj*> RdbLoader::ReadHSet() {
uint64_t len;
SET_OR_UNEXPECT(LoadLen(nullptr), len);
if (len == 0)
return Unexpected(errc::empty_key);
sds field = nullptr;
sds value = nullptr;
robj* res = createHashObject();
/* Too many entries? Use a hash table right from the start. */
if (len > server.hash_max_listpack_entries)
hashTypeConvert(res, OBJ_ENCODING_HT);
auto cleanup = absl::Cleanup([&] {
decrRefCount(res);
if (field)
sdsfree(field);
if (value)
sdsfree(value);
});
/* Load every field and value into the ziplist */
while (res->encoding == OBJ_ENCODING_LISTPACK && len > 0) {
len--;
OpaqueBuf ofield, ovalue;
SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_SDS), ofield);
field = (sds)ofield.first;
SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_SDS), ovalue);
value = (sds)ovalue.first;
/* Convert to hash table if size threshold is exceeded */
if (sdslen(field) > server.hash_max_listpack_value ||
sdslen(value) > server.hash_max_listpack_value ||
!lpSafeToAdd((uint8_t*)res->ptr, sdslen(field) + sdslen(value))) {
hashTypeConvert(res, OBJ_ENCODING_HT);
int ret = dictAdd((dict*)res->ptr, field, value);
if (ret == DICT_ERR) {
return Unexpected(errc::rdb_file_corrupted);
}
break;
}
/* Add pair to listpack */
res->ptr = lpAppend((uint8_t*)res->ptr, (uint8_t*)field, sdslen(field));
res->ptr = lpAppend((uint8_t*)res->ptr, (uint8_t*)value, sdslen(value));
sdsfree(field);
sdsfree(value);
field = value = nullptr;
}
if (res->encoding == OBJ_ENCODING_HT) {
if (len > DICT_HT_INITIAL_SIZE) {
if (dictTryExpand((dict*)res->ptr, len) != DICT_OK) {
LOG(ERROR) << "OOM in dictTryExpand " << len;
return Unexpected(errc::out_of_memory);
}
}
/* Load remaining fields and values into the hash table */
while (len > 0) {
len--;
OpaqueBuf ofield, ovalue;
SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_SDS), ofield);
field = (sds)ofield.first;
SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_SDS), ovalue);
value = (sds)ovalue.first;
/* Add pair to hash table */
int ret = dictAdd((dict*)res->ptr, field, value);
if (ret == DICT_ERR) {
LOG(ERROR) << "Duplicate hash fields detected";
return Unexpected(errc::rdb_file_corrupted);
}
}
}
DCHECK_EQ(0u, len);
std::move(cleanup).Cancel();
return res;
}
io::Result<robj*> RdbLoader::ReadZSet(int rdbtype) {
/* Read sorted set value. */
uint64_t zsetlen;
SET_OR_UNEXPECT(LoadLen(nullptr), zsetlen);
if (zsetlen == 0)
return Unexpected(errc::empty_key);
robj* res = createZsetObject();
zset* zs = (zset*)res->ptr;
sds sdsele = nullptr;
auto cleanup = absl::Cleanup([&] {
decrRefCount(res);
if (sdsele)
sdsfree(sdsele);
});
if (zsetlen > DICT_HT_INITIAL_SIZE && dictTryExpand(zs->dict, zsetlen) != DICT_OK) {
LOG(ERROR) << "OOM in dictTryExpand " << zsetlen;
return Unexpected(errc::out_of_memory);
}
size_t maxelelen = 0, totelelen = 0;
/* Load every single element of the sorted set. */
while (zsetlen--) {
double score;
zskiplistNode* znode;
OpaqueBuf fetch;
SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_SDS), fetch);
sdsele = (sds)fetch.first;
if (rdbtype == RDB_TYPE_ZSET_2) {
SET_OR_UNEXPECT(FetchBinaryDouble(), score);
} else {
SET_OR_UNEXPECT(FetchDouble(), score);
}
if (isnan(score)) {
LOG(ERROR) << "Zset with NAN score detected";
return Unexpected(errc::rdb_file_corrupted);
}
/* Don't care about integer-encoded strings. */
if (sdslen(sdsele) > maxelelen)
maxelelen = sdslen(sdsele);
totelelen += sdslen(sdsele);
znode = zslInsert(zs->zsl, score, sdsele);
int ret = dictAdd(zs->dict, sdsele, &znode->score);
sdsele = nullptr;
if (ret != DICT_OK) {
LOG(ERROR) << "Duplicate zset fields detected";
return Unexpected(errc::rdb_file_corrupted);
}
}
/* Convert *after* loading, since sorted sets are not stored ordered. */
if (zsetLength(res) <= server.zset_max_listpack_entries &&
maxelelen <= server.zset_max_listpack_value && lpSafeToAdd(NULL, totelelen)) {
zsetConvert(res, OBJ_ENCODING_LISTPACK);
}
std::move(cleanup).Cancel();
return res;
}
io::Result<robj*> RdbLoader::ReadZSetZL() {
OpaqueBuf fetch;
SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_PLAIN), fetch);
if (fetch.second == 0) {
return Unexpected(errc::rdb_file_corrupted);
}
DCHECK(fetch.first);
unsigned char* lp = lpNew(fetch.second);
if (!ziplistPairsConvertAndValidateIntegrity((uint8_t*)fetch.first, fetch.second, &lp)) {
LOG(ERROR) << "Zset ziplist integrity check failed.";
zfree(lp);
zfree(fetch.first);
return Unexpected(errc::rdb_file_corrupted);
}
zfree(fetch.first);
if (lpLength(lp) == 0) {
lpFree(lp);
return Unexpected(errc::empty_key);
}
robj* res = createObject(OBJ_ZSET, lp);
res->encoding = OBJ_ENCODING_LISTPACK;
if (lpBytes(lp) > server.zset_max_listpack_entries)
zsetConvert(res, OBJ_ENCODING_SKIPLIST);
else
res->ptr = lpShrinkToFit(lp);
return res;
}
io::Result<robj*> RdbLoader::ReadListQuicklist(int rdbtype) {

View File

@ -53,12 +53,17 @@ class RdbLoader {
using OpaqueBuf = std::pair<void*, size_t>;
io::Result<OpaqueBuf> FetchGenericString(int flags);
io::Result<OpaqueBuf> FetchIntegerObject(int enctype, int flags, size_t* lenptr);
io::Result<double> FetchBinaryDouble();
io::Result<double> FetchDouble();
::io::Result<sds> ReadKey();
::io::Result<robj*> ReadObj(int rdbtype);
::io::Result<robj*> ReadSet();
::io::Result<robj*> ReadIntSet();
::io::Result<robj*> ReadHZiplist();
::io::Result<robj*> ReadZSet();
::io::Result<robj*> ReadHSet();
::io::Result<robj*> ReadZSet(int rdbtype);
::io::Result<robj*> ReadZSetZL();
::io::Result<robj*> ReadListQuicklist(int rdbtype);
std::error_code EnsureRead(size_t min_sz) {

View File

@ -92,22 +92,25 @@ TEST_F(RdbTest, Reload) {
Run({"set", "huge_key", string((1 << 17) - 10, 'H')});
Run({"sadd", "set_key1", "val1", "val2"});
// Run({"sadd", "intset_key", "1", "2", "3"});
Run({"sadd", "intset_key", "1", "2", "3"});
Run({"hset", "small_hset", "field1", "val1", "field2", "val2"});
// Run({"hset", "large_hset", "field1", string(510, 'V'), string(120, 'F'), "val2"});
Run({"hset", "large_hset", "field1", string(510, 'V'), string(120, 'F'), "val2"});
Run({"rpush", "list_key1", "val", "val2"});
Run({"rpush", "list_key2", "head", string(511, 'a'), string(500, 'b'), "tail"});
// Run({"zadd", "zs1", "1.1", "a", "-1.1", "b"});
// Run({"zadd", "zs2", "1.1", string(510, 'a'), "-1.1", string(502, 'b')});
Run({"zadd", "zs1", "1.1", "a", "-1.1", "b"});
Run({"zadd", "zs2", "1.1", string(510, 'a'), "-1.1", string(502, 'b')});
Run({"debug", "reload"});
EXPECT_EQ(2, CheckedInt({"scard", "set_key1"}));
// EXPECT_EQ(3, CheckedInt({"scard", "intset_key"}));
EXPECT_EQ(3, CheckedInt({"scard", "intset_key"}));
EXPECT_EQ(2, CheckedInt({"hlen", "small_hset"}));
EXPECT_EQ(2, CheckedInt({"hlen", "large_hset"}));
EXPECT_EQ(4, CheckedInt({"LLEN", "list_key2"}));
EXPECT_EQ(2, CheckedInt({"ZCARD", "zs1"}));
EXPECT_EQ(2, CheckedInt({"ZCARD", "zs2"}));
}