From 997d2dcb69f21207b60ebfe908dc385567096962 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Wed, 13 Apr 2022 10:50:19 +0300 Subject: [PATCH] more work on rdb load. 1. Added support of loading of compressed strings. 2. Verified we load expiry info. 3. Extended supported expiry period to 4 years (previously I set 1 year). --- README.md | 10 ++--- doc/differences.md | 4 +- src/core/expire_period.h | 10 ++--- src/server/CMakeLists.txt | 2 +- src/server/common.h | 4 +- src/server/db_slice.cc | 8 ++-- src/server/db_slice.h | 2 +- src/server/generic_family.cc | 2 +- src/server/rdb_load.cc | 57 ++++++++++++++++++++++++++- src/server/rdb_load.h | 3 ++ src/server/rdb_test.cc | 17 ++++++-- src/server/string_family.cc | 4 +- src/server/test_utils.cc | 9 +++++ src/server/test_utils.h | 1 + src/server/testdata/redis6_small.rdb | Bin 0 -> 638 bytes src/server/testdata/small.rdb | Bin 3873 -> 0 bytes 16 files changed, 104 insertions(+), 29 deletions(-) create mode 100644 src/server/testdata/redis6_small.rdb delete mode 100644 src/server/testdata/small.rdb diff --git a/README.md b/README.md index bd197d6..3869e02 100644 --- a/README.md +++ b/README.md @@ -270,14 +270,10 @@ instance. ## Design decisions along the way ### Expiration deadlines with relative accuracy -I decided to limit the expiration range to 365 days. Moreover, expiration deadlines +Expiration ranges are limited to ~4 years. Moreover, expiration deadlines with millisecond precision (PEXPIRE/PSETEX etc) will be rounded to closest second -**for deadlines greater than 33554431ms (approximately 560 minutes). In other words, -expiries of `PEXPIRE key 10010` will expire exactly after 10 seconds and 10ms. However, -`PEXPIRE key 34000300` will expire after 34000 seconds (i.e. 300ms earlier). Similarly, -`PEXPIRE key 34000800` will expire after 34001 seconds, i.e. 200ms later. - -Such rounding has at most 0.002% error which I hope is acceptable for large ranges. +**for deadlines greater than 134217727ms (approximately 37 hours)**. +Such rounding has less than 0.001% error which I hope is acceptable for large ranges. If it breaks your use-cases - talk to me or open an issue and explain your case. For more detailed differences between this and Redis implementations [see here](doc/differences.md). \ No newline at end of file diff --git a/doc/differences.md b/doc/differences.md index 6372663..71468e1 100644 --- a/doc/differences.md +++ b/doc/differences.md @@ -7,6 +7,6 @@ Indices (say in GETRANGE and SETRANGE commands) should be signed 32 bit integers [-2147483647, 2147483648]. ## Expiry ranges. -Expirations are limited to 365 days. For commands with millisecond precision like PEXPIRE or PSETEX, -expirations greater than 2^25ms are quietly rounded to the nearest second loosing precision of ~0.002%. +Expirations are limited to 4 years. For commands with millisecond precision like PEXPIRE or PSETEX, +expirations greater than 2^27ms are quietly rounded to the nearest second loosing precision of less than 0.001%. diff --git a/src/core/expire_period.h b/src/core/expire_period.h index 9e1978f..9dbc89c 100644 --- a/src/core/expire_period.h +++ b/src/core/expire_period.h @@ -19,8 +19,8 @@ class ExpirePeriod { } // in milliseconds - uint64_t duration() const { - return precision_ ? val_ * 1000 : val_; + uint64_t duration_ms() const { + return precision_ ? uint64_t(val_) * 1000 : val_; } unsigned generation() const { @@ -30,13 +30,13 @@ class ExpirePeriod { void Set(uint64_t ms); private: - uint32_t val_ : 25; - uint32_t gen_ : 6; + uint32_t val_ : 27; + uint32_t gen_ : 4; uint32_t precision_ : 1; // 0 - ms, 1 - sec. }; inline void ExpirePeriod::Set(uint64_t ms) { - constexpr uint64_t kBarrier = (1ULL << 25); + constexpr uint64_t kBarrier = (1ULL << 27); if (ms < kBarrier) { val_ = ms; diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 752d6fb..a9d0e03 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -20,7 +20,7 @@ cxx_test(hset_family_test dfly_test_lib LABELS DFLY) cxx_test(list_family_test dfly_test_lib LABELS DFLY) cxx_test(set_family_test dfly_test_lib LABELS DFLY) cxx_test(string_family_test dfly_test_lib LABELS DFLY) -cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/small.rdb LABELS DFLY) +cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/redis6_small.rdb LABELS DFLY) cxx_test(zset_family_test dfly_test_lib LABELS DFLY) diff --git a/src/server/common.h b/src/server/common.h index 3422558..b812526 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -16,8 +16,8 @@ namespace dfly { enum class ListDir : uint8_t { LEFT, RIGHT }; - -constexpr uint64_t kMaxExpireDeadlineSec = (1u << 25) - 1; +// Dependent on ExpirePeriod representation of the value. +constexpr int64_t kMaxExpireDeadlineSec = (1u << 27) - 1; using DbIndex = uint16_t; using ShardId = uint16_t; diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index e9737ed..a865ae1 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -259,9 +259,9 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pairsecond.duration() <= delta_ms) { + if (expire_it->second.duration_ms() <= delta_ms) { db->expire_table.Erase(expire_it); if (existing->second.HasFlag()) { @@ -535,9 +535,9 @@ pair DbSlice::ExpireIfNeeded(DbIndex db_ind, CHECK(IsValid(expire_it)); // TODO: to employ multi-generation update of expire-base and the underlying values. - uint32_t delta_ms = now_ms_ - expire_base_[0]; + uint64_t delta_ms = now_ms_ - expire_base_[0]; - if (expire_it->second.duration() > delta_ms) + if (expire_it->second.duration_ms() > delta_ms) return make_pair(it, expire_it); db->expire_table.Erase(expire_it); diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 098c1b7..b9988ad 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -103,7 +103,7 @@ class DbSlice { // returns absolute time of the expiration. uint64_t ExpireTime(ExpireIterator it) const { - return it.is_done() ? 0 : expire_base_[0] + it->second.duration(); + return it.is_done() ? 0 : expire_base_[0] + it->second.duration_ms(); } ExpirePeriod FromAbsoluteTime(uint64_t time_ms) const { diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index ad13c5e..94d71fe 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -528,7 +528,7 @@ OpStatus GenericFamily::OpExpire(const OpArgs& op_args, string_view key, int64_t now_msec = db_slice.Now(); int64_t rel_msec = params.absolute ? msec - now_msec : msec; - if (rel_msec > int64_t(kMaxExpireDeadlineSec * 1000)) { + if (rel_msec > kMaxExpireDeadlineSec * 1000) { return OpStatus::OUT_OF_RANGE; } diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 334543f..0bb0820 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -573,7 +573,7 @@ auto RdbLoader::FetchGenericString(int flags) -> io::Result { case RDB_ENC_INT32: return FetchIntegerObject(len, flags, NULL); case RDB_ENC_LZF: - LOG(FATAL) << "TBD"; + return FetchLzfStringObject(flags); default: LOG(FATAL) << "Unknown RDB string encoding type " << len; } @@ -610,6 +610,61 @@ auto RdbLoader::FetchGenericString(int flags) -> io::Result { return make_pair(o, len); } +auto RdbLoader::FetchLzfStringObject(int flags) -> io::Result { + bool zerocopy_decompress = true; + + const uint8_t* cbuf = NULL; + char* val = NULL; + + uint64_t clen, len; + + SET_OR_UNEXPECT(LoadLen(NULL), clen); + SET_OR_UNEXPECT(LoadLen(NULL), len); + + CHECK_LE(len, 1ULL << 29); + + if (mem_buf_.InputLen() >= clen) { + cbuf = mem_buf_.InputBuffer().data(); + } else { + compr_buf_.resize(clen); + zerocopy_decompress = false; + + /* Load the compressed representation and uncompress it to target. */ + error_code ec = FetchBuf(clen, compr_buf_.data()); + if (ec) { + return make_unexpected(ec); + } + cbuf = compr_buf_.data(); + } + + bool plain = (flags & RDB_LOAD_PLAIN) != 0; + bool sds = (flags & RDB_LOAD_SDS) != 0; + DCHECK_EQ(false, plain && sds); + + /* Allocate our target according to the uncompressed size. */ + if (plain) { + val = (char*)zmalloc(len); + } else { + val = sdsnewlen(SDS_NOINIT, len); + } + + if (lzf_decompress(cbuf, clen, val, len) == 0) { + LOG(ERROR) << "Invalid LZF compressed string"; + return Unexpected(errc::rdb_file_corrupted); + } + + // FetchBuf consumes the input but if we have not went through that path + // we need to consume now. + if (zerocopy_decompress) + mem_buf_.ConsumeInput(clen); + + if (plain || sds) { + return make_pair(val, len); + } + + return make_pair(createObject(OBJ_STRING, val), len); +} + auto RdbLoader::FetchIntegerObject(int enctype, int flags, size_t* lenptr) -> io::Result { bool plain = (flags & RDB_LOAD_PLAIN) != 0; diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index cfa1d3d..a24bedb 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -52,7 +52,9 @@ class RdbLoader { // flags are RDB_LOAD_XXX masks. using OpaqueBuf = std::pair; io::Result FetchGenericString(int flags); + io::Result FetchLzfStringObject(int flags); io::Result FetchIntegerObject(int enctype, int flags, size_t* lenptr); + io::Result FetchBinaryDouble(); io::Result FetchDouble(); @@ -82,6 +84,7 @@ class RdbLoader { EngineShardSet& ess_; base::IoBuf mem_buf_; + base::PODArray compr_buf_; std::unique_ptr shard_buf_; ::io::Source* src_ = nullptr; diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index f6bc0d7..4164b8c 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -12,6 +12,7 @@ extern "C" { #include "base/gtest.h" #include "base/logging.h" +#include "facade/facade_test.h" // needed to find operator== for RespExpr. #include "io/file.h" #include "server/engine_shard_set.h" #include "server/rdb_load.h" @@ -21,6 +22,7 @@ extern "C" { using namespace testing; using namespace std; using namespace util; +using namespace facade; DECLARE_int32(list_compress_depth); DECLARE_int32(list_max_listpack_size); @@ -75,11 +77,21 @@ TEST_F(RdbTest, LoadEmpty) { CHECK(!ec); } -TEST_F(RdbTest, LoadSmall) { - io::FileSource fs = GetSource("small.rdb"); +TEST_F(RdbTest, LoadSmall6) { + io::FileSource fs = GetSource("redis6_small.rdb"); RdbLoader loader(ess_); auto ec = loader.Load(&fs); CHECK(!ec); + auto resp = Run({"scan", "0"}); + EXPECT_THAT(Array(resp[1]), + UnorderedElementsAre("list1", "hset_zl", "list2", "zset_sl", "intset", "set1", + "zset_zl", "hset_ht", "intkey", "strkey")); + resp = Run({"smembers", "intset"}); + EXPECT_THAT(resp, UnorderedElementsAre("111", "222", "1234", "3333", "4444", "67899", "76554")); + + // TODO: when we implement PEXPIRETIME we will be able to do it directly. + int ttl = CheckedInt({"ttl", "set1"}); // should expire at 1747008000. + EXPECT_GT(ttl + time(NULL), 1747007000); // left 1000 seconds margin in case the clock is off. } TEST_F(RdbTest, Reload) { @@ -113,5 +125,4 @@ TEST_F(RdbTest, Reload) { EXPECT_EQ(2, CheckedInt({"ZCARD", "zs2"})); } - } // namespace dfly diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 39b4512..71e8e0b 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -127,14 +127,14 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) { return builder->SendError(kInvalidIntErr); } - if (int_arg <= 0 || (!is_ms && int_arg >= int64_t(kMaxExpireDeadlineSec))) { + if (int_arg <= 0 || (!is_ms && int_arg >= kMaxExpireDeadlineSec)) { return builder->SendError(kExpiryOutOfRange); } if (!is_ms) { int_arg *= 1000; } - if (int_arg >= int64_t(kMaxExpireDeadlineSec * 1000)) { + if (int_arg >= kMaxExpireDeadlineSec * 1000) { return builder->SendError(kExpiryOutOfRange); } sparams.expire_after_ms = int_arg; diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index ad4ed10..5866f6e 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -287,4 +287,13 @@ auto BaseFamilyTest::AddFindConn(Protocol proto, std::string_view id) -> TestCon return it->second.get(); } +RespVec BaseFamilyTest::Array(const RespExpr& expr) { + CHECK(expr.type == RespExpr::ARRAY || expr.type == RespExpr::NIL_ARRAY); + if (expr.type == RespExpr::NIL_ARRAY) + return RespVec{}; + + const RespVec* src = get(expr.u); + return *src; +} + } // namespace dfly diff --git a/src/server/test_utils.h b/src/server/test_utils.h index 5306104..5fa0e28 100644 --- a/src/server/test_utils.h +++ b/src/server/test_utils.h @@ -66,6 +66,7 @@ class BaseFamilyTest : public ::testing::Test { } TestConnWrapper* AddFindConn(Protocol proto, std::string_view id); + static RespVec Array(const RespExpr& expr); // ts is ms void UpdateTime(uint64_t ms); diff --git a/src/server/testdata/redis6_small.rdb b/src/server/testdata/redis6_small.rdb new file mode 100644 index 0000000000000000000000000000000000000000..eb4835d1022df1f91971bdaa88f14be67fd8a596 GIT binary patch literal 638 zcma)3K}+L66n<|eX-zePMO|SRRP1S`+E$Q?hb`Cz_a+LS3NlHjHoJ*R69xMRJPDq> z`d75M?QsuY1SzcRS+riVOXEwcR>i{(^9}R9mpAWyUsk^szpZ65nT0^B$+G39_he0- zpUKV42Z=6WSe|=Y2vpOv9N9Rq){UTV*Ole8Bb~<0p$@@-*si2^YEn3cEl;5he-liJ zS6%6CR6WKjIX_thG>m2lkNYADStZ*^2=OW*#Gq_qf6el87amL_CoYbG;f3kQ9bl zmX*!w_vfW|t*P7o=e&}qg2@T~G4ZYrhLR79CMCmvaP;%+7|D>1J2~bToKjxTlL?uz zj;tcQF!B;un=4mg?9tobfbDoMyn^2S`akY}NL%R7E|2IVvCUIj@7ZQl Y@;o(D=m%su8Ys{VPv%NHtzglA0JZSlWB>pF literal 0 HcmV?d00001 diff --git a/src/server/testdata/small.rdb b/src/server/testdata/small.rdb deleted file mode 100644 index 397664afb4ffb0325b48fdfec53ab984507cc3da..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 3873 zcmY*cJ8o1#5PSkzHV7gT&VXcV=Rf1f0R*IwK*kzBixswv{6XRp9EOkxIY1nUXe}9b z)LpgHX}#*6>Y7jYd-Lk$>$i)=qOBiY&L^v@{jc-oPCoDldHq;`4H{sfc$P&o!b~ zPV@+KNg7$FClu7Xt{p-@5%8dzC-!g`4kY5|L0 zS9o3Nb%ocJJ|u`pTMV_1W#MpY5oBegq7aFkGS{42dm(4Mv{;)FV$&<0i`t6;^# z6*F{Y>MF=pm@5DZggk2WtT97<<~}F2hRG||tAsp~dnV5~jG2cqW4+AvJSz08upzK9 zFk$l(H{XNh$#P&hvI=&(@RB4*O_bUIH9`r2iGd!w=(&px1vM5FQBY(-9=SYoIY5q3 zgsYLR78F!iP(xs2V21ik^%<;8R)t@ceial{SWv?0N<3W=A|ge2usm4~xg2u^umV{L ztVC9Vg=uRcq(({&FehxyfzgFV2c!>7p8;h;5q`yR|KRTEWSaIqudc7Z9h&ub9MZ#&%|7i4Yk2+z zFx^ujg!QAM$;T}b$n-^J!J172fOZ_teuRZ(-U@-t!yu43oC2BmS0FPEC=2TQkRp(2 zaRQkhE0F0!8i=xca~7yz7dMSwJssv2<)G;P5NUFR_3=X>b7cfF?L-4{Kl-d8keORF z5JLMHCXi{+0!8HNcTIEQ==Om}SvY}_C(z6q!ZOd5K;w9c^w^Bv3^fpkq_=2+%)}_r zI36l%>}l%!FrjKDB=2vuKxT9j$h`LgnNF)L_|SWYKn~ERYpaB9!&G3HlqAUXb%Dk~ zO;eA(SRwOTssKE*eS8wgv`b}SQ#Z4csv1Tf;g~ik(D)0W$xZhc*7&0V?XaacC6$0i zvT4sMVVDJlV@4T)O!rg>hqm{AflS*~79{kUL?F`_1sW$tRW37N*hG^jo z|7e;RB74IX$jq<;ncp^L;ezxoCXksHm4yxMzkmXnp+W