more work on rdb load.
1. Added support of loading of compressed strings. 2. Verified we load expiry info. 3. Extended supported expiry period to 4 years (previously I set 1 year).
This commit is contained in:
parent
cafabce161
commit
997d2dcb69
10
README.md
10
README.md
|
@ -270,14 +270,10 @@ instance.
|
|||
|
||||
## Design decisions along the way
|
||||
### Expiration deadlines with relative accuracy
|
||||
I decided to limit the expiration range to 365 days. Moreover, expiration deadlines
|
||||
Expiration ranges are limited to ~4 years. Moreover, expiration deadlines
|
||||
with millisecond precision (PEXPIRE/PSETEX etc) will be rounded to closest second
|
||||
**for deadlines greater than 33554431ms (approximately 560 minutes). In other words,
|
||||
expiries of `PEXPIRE key 10010` will expire exactly after 10 seconds and 10ms. However,
|
||||
`PEXPIRE key 34000300` will expire after 34000 seconds (i.e. 300ms earlier). Similarly,
|
||||
`PEXPIRE key 34000800` will expire after 34001 seconds, i.e. 200ms later.
|
||||
|
||||
Such rounding has at most 0.002% error which I hope is acceptable for large ranges.
|
||||
**for deadlines greater than 134217727ms (approximately 37 hours)**.
|
||||
Such rounding has less than 0.001% error which I hope is acceptable for large ranges.
|
||||
If it breaks your use-cases - talk to me or open an issue and explain your case.
|
||||
|
||||
For more detailed differences between this and Redis implementations [see here](doc/differences.md).
|
|
@ -7,6 +7,6 @@ Indices (say in GETRANGE and SETRANGE commands) should be signed 32 bit integers
|
|||
[-2147483647, 2147483648].
|
||||
|
||||
## Expiry ranges.
|
||||
Expirations are limited to 365 days. For commands with millisecond precision like PEXPIRE or PSETEX,
|
||||
expirations greater than 2^25ms are quietly rounded to the nearest second loosing precision of ~0.002%.
|
||||
Expirations are limited to 4 years. For commands with millisecond precision like PEXPIRE or PSETEX,
|
||||
expirations greater than 2^27ms are quietly rounded to the nearest second loosing precision of less than 0.001%.
|
||||
|
||||
|
|
|
@ -19,8 +19,8 @@ class ExpirePeriod {
|
|||
}
|
||||
|
||||
// in milliseconds
|
||||
uint64_t duration() const {
|
||||
return precision_ ? val_ * 1000 : val_;
|
||||
uint64_t duration_ms() const {
|
||||
return precision_ ? uint64_t(val_) * 1000 : val_;
|
||||
}
|
||||
|
||||
unsigned generation() const {
|
||||
|
@ -30,13 +30,13 @@ class ExpirePeriod {
|
|||
void Set(uint64_t ms);
|
||||
|
||||
private:
|
||||
uint32_t val_ : 25;
|
||||
uint32_t gen_ : 6;
|
||||
uint32_t val_ : 27;
|
||||
uint32_t gen_ : 4;
|
||||
uint32_t precision_ : 1; // 0 - ms, 1 - sec.
|
||||
};
|
||||
|
||||
inline void ExpirePeriod::Set(uint64_t ms) {
|
||||
constexpr uint64_t kBarrier = (1ULL << 25);
|
||||
constexpr uint64_t kBarrier = (1ULL << 27);
|
||||
|
||||
if (ms < kBarrier) {
|
||||
val_ = ms;
|
||||
|
|
|
@ -20,7 +20,7 @@ cxx_test(hset_family_test dfly_test_lib LABELS DFLY)
|
|||
cxx_test(list_family_test dfly_test_lib LABELS DFLY)
|
||||
cxx_test(set_family_test dfly_test_lib LABELS DFLY)
|
||||
cxx_test(string_family_test dfly_test_lib LABELS DFLY)
|
||||
cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/small.rdb LABELS DFLY)
|
||||
cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/redis6_small.rdb LABELS DFLY)
|
||||
cxx_test(zset_family_test dfly_test_lib LABELS DFLY)
|
||||
|
||||
|
||||
|
|
|
@ -16,8 +16,8 @@ namespace dfly {
|
|||
|
||||
enum class ListDir : uint8_t { LEFT, RIGHT };
|
||||
|
||||
|
||||
constexpr uint64_t kMaxExpireDeadlineSec = (1u << 25) - 1;
|
||||
// Dependent on ExpirePeriod representation of the value.
|
||||
constexpr int64_t kMaxExpireDeadlineSec = (1u << 27) - 1;
|
||||
|
||||
using DbIndex = uint16_t;
|
||||
using ShardId = uint16_t;
|
||||
|
|
|
@ -259,9 +259,9 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pair<PrimeIterator
|
|||
|
||||
// TODO: to implement the incremental update of expiry values using multi-generation
|
||||
// expire_base_ update. Right now we use only index 0.
|
||||
uint32_t delta_ms = now_ms_ - expire_base_[0];
|
||||
uint64_t delta_ms = now_ms_ - expire_base_[0];
|
||||
|
||||
if (expire_it->second.duration() <= delta_ms) {
|
||||
if (expire_it->second.duration_ms() <= delta_ms) {
|
||||
db->expire_table.Erase(expire_it);
|
||||
|
||||
if (existing->second.HasFlag()) {
|
||||
|
@ -535,9 +535,9 @@ pair<PrimeIterator, ExpireIterator> DbSlice::ExpireIfNeeded(DbIndex db_ind,
|
|||
CHECK(IsValid(expire_it));
|
||||
|
||||
// TODO: to employ multi-generation update of expire-base and the underlying values.
|
||||
uint32_t delta_ms = now_ms_ - expire_base_[0];
|
||||
uint64_t delta_ms = now_ms_ - expire_base_[0];
|
||||
|
||||
if (expire_it->second.duration() > delta_ms)
|
||||
if (expire_it->second.duration_ms() > delta_ms)
|
||||
return make_pair(it, expire_it);
|
||||
|
||||
db->expire_table.Erase(expire_it);
|
||||
|
|
|
@ -103,7 +103,7 @@ class DbSlice {
|
|||
|
||||
// returns absolute time of the expiration.
|
||||
uint64_t ExpireTime(ExpireIterator it) const {
|
||||
return it.is_done() ? 0 : expire_base_[0] + it->second.duration();
|
||||
return it.is_done() ? 0 : expire_base_[0] + it->second.duration_ms();
|
||||
}
|
||||
|
||||
ExpirePeriod FromAbsoluteTime(uint64_t time_ms) const {
|
||||
|
|
|
@ -528,7 +528,7 @@ OpStatus GenericFamily::OpExpire(const OpArgs& op_args, string_view key,
|
|||
int64_t now_msec = db_slice.Now();
|
||||
int64_t rel_msec = params.absolute ? msec - now_msec : msec;
|
||||
|
||||
if (rel_msec > int64_t(kMaxExpireDeadlineSec * 1000)) {
|
||||
if (rel_msec > kMaxExpireDeadlineSec * 1000) {
|
||||
return OpStatus::OUT_OF_RANGE;
|
||||
}
|
||||
|
||||
|
|
|
@ -573,7 +573,7 @@ auto RdbLoader::FetchGenericString(int flags) -> io::Result<OpaqueBuf> {
|
|||
case RDB_ENC_INT32:
|
||||
return FetchIntegerObject(len, flags, NULL);
|
||||
case RDB_ENC_LZF:
|
||||
LOG(FATAL) << "TBD";
|
||||
return FetchLzfStringObject(flags);
|
||||
default:
|
||||
LOG(FATAL) << "Unknown RDB string encoding type " << len;
|
||||
}
|
||||
|
@ -610,6 +610,61 @@ auto RdbLoader::FetchGenericString(int flags) -> io::Result<OpaqueBuf> {
|
|||
return make_pair(o, len);
|
||||
}
|
||||
|
||||
auto RdbLoader::FetchLzfStringObject(int flags) -> io::Result<OpaqueBuf> {
|
||||
bool zerocopy_decompress = true;
|
||||
|
||||
const uint8_t* cbuf = NULL;
|
||||
char* val = NULL;
|
||||
|
||||
uint64_t clen, len;
|
||||
|
||||
SET_OR_UNEXPECT(LoadLen(NULL), clen);
|
||||
SET_OR_UNEXPECT(LoadLen(NULL), len);
|
||||
|
||||
CHECK_LE(len, 1ULL << 29);
|
||||
|
||||
if (mem_buf_.InputLen() >= clen) {
|
||||
cbuf = mem_buf_.InputBuffer().data();
|
||||
} else {
|
||||
compr_buf_.resize(clen);
|
||||
zerocopy_decompress = false;
|
||||
|
||||
/* Load the compressed representation and uncompress it to target. */
|
||||
error_code ec = FetchBuf(clen, compr_buf_.data());
|
||||
if (ec) {
|
||||
return make_unexpected(ec);
|
||||
}
|
||||
cbuf = compr_buf_.data();
|
||||
}
|
||||
|
||||
bool plain = (flags & RDB_LOAD_PLAIN) != 0;
|
||||
bool sds = (flags & RDB_LOAD_SDS) != 0;
|
||||
DCHECK_EQ(false, plain && sds);
|
||||
|
||||
/* Allocate our target according to the uncompressed size. */
|
||||
if (plain) {
|
||||
val = (char*)zmalloc(len);
|
||||
} else {
|
||||
val = sdsnewlen(SDS_NOINIT, len);
|
||||
}
|
||||
|
||||
if (lzf_decompress(cbuf, clen, val, len) == 0) {
|
||||
LOG(ERROR) << "Invalid LZF compressed string";
|
||||
return Unexpected(errc::rdb_file_corrupted);
|
||||
}
|
||||
|
||||
// FetchBuf consumes the input but if we have not went through that path
|
||||
// we need to consume now.
|
||||
if (zerocopy_decompress)
|
||||
mem_buf_.ConsumeInput(clen);
|
||||
|
||||
if (plain || sds) {
|
||||
return make_pair(val, len);
|
||||
}
|
||||
|
||||
return make_pair(createObject(OBJ_STRING, val), len);
|
||||
}
|
||||
|
||||
auto RdbLoader::FetchIntegerObject(int enctype, int flags, size_t* lenptr)
|
||||
-> io::Result<OpaqueBuf> {
|
||||
bool plain = (flags & RDB_LOAD_PLAIN) != 0;
|
||||
|
|
|
@ -52,7 +52,9 @@ class RdbLoader {
|
|||
// flags are RDB_LOAD_XXX masks.
|
||||
using OpaqueBuf = std::pair<void*, size_t>;
|
||||
io::Result<OpaqueBuf> FetchGenericString(int flags);
|
||||
io::Result<OpaqueBuf> FetchLzfStringObject(int flags);
|
||||
io::Result<OpaqueBuf> FetchIntegerObject(int enctype, int flags, size_t* lenptr);
|
||||
|
||||
io::Result<double> FetchBinaryDouble();
|
||||
io::Result<double> FetchDouble();
|
||||
|
||||
|
@ -82,6 +84,7 @@ class RdbLoader {
|
|||
|
||||
EngineShardSet& ess_;
|
||||
base::IoBuf mem_buf_;
|
||||
base::PODArray<uint8_t> compr_buf_;
|
||||
std::unique_ptr<ItemsBuf[]> shard_buf_;
|
||||
|
||||
::io::Source* src_ = nullptr;
|
||||
|
|
|
@ -12,6 +12,7 @@ extern "C" {
|
|||
|
||||
#include "base/gtest.h"
|
||||
#include "base/logging.h"
|
||||
#include "facade/facade_test.h" // needed to find operator== for RespExpr.
|
||||
#include "io/file.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "server/rdb_load.h"
|
||||
|
@ -21,6 +22,7 @@ extern "C" {
|
|||
using namespace testing;
|
||||
using namespace std;
|
||||
using namespace util;
|
||||
using namespace facade;
|
||||
|
||||
DECLARE_int32(list_compress_depth);
|
||||
DECLARE_int32(list_max_listpack_size);
|
||||
|
@ -75,11 +77,21 @@ TEST_F(RdbTest, LoadEmpty) {
|
|||
CHECK(!ec);
|
||||
}
|
||||
|
||||
TEST_F(RdbTest, LoadSmall) {
|
||||
io::FileSource fs = GetSource("small.rdb");
|
||||
TEST_F(RdbTest, LoadSmall6) {
|
||||
io::FileSource fs = GetSource("redis6_small.rdb");
|
||||
RdbLoader loader(ess_);
|
||||
auto ec = loader.Load(&fs);
|
||||
CHECK(!ec);
|
||||
auto resp = Run({"scan", "0"});
|
||||
EXPECT_THAT(Array(resp[1]),
|
||||
UnorderedElementsAre("list1", "hset_zl", "list2", "zset_sl", "intset", "set1",
|
||||
"zset_zl", "hset_ht", "intkey", "strkey"));
|
||||
resp = Run({"smembers", "intset"});
|
||||
EXPECT_THAT(resp, UnorderedElementsAre("111", "222", "1234", "3333", "4444", "67899", "76554"));
|
||||
|
||||
// TODO: when we implement PEXPIRETIME we will be able to do it directly.
|
||||
int ttl = CheckedInt({"ttl", "set1"}); // should expire at 1747008000.
|
||||
EXPECT_GT(ttl + time(NULL), 1747007000); // left 1000 seconds margin in case the clock is off.
|
||||
}
|
||||
|
||||
TEST_F(RdbTest, Reload) {
|
||||
|
@ -113,5 +125,4 @@ TEST_F(RdbTest, Reload) {
|
|||
EXPECT_EQ(2, CheckedInt({"ZCARD", "zs2"}));
|
||||
}
|
||||
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -127,14 +127,14 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
|
|||
return builder->SendError(kInvalidIntErr);
|
||||
}
|
||||
|
||||
if (int_arg <= 0 || (!is_ms && int_arg >= int64_t(kMaxExpireDeadlineSec))) {
|
||||
if (int_arg <= 0 || (!is_ms && int_arg >= kMaxExpireDeadlineSec)) {
|
||||
return builder->SendError(kExpiryOutOfRange);
|
||||
}
|
||||
|
||||
if (!is_ms) {
|
||||
int_arg *= 1000;
|
||||
}
|
||||
if (int_arg >= int64_t(kMaxExpireDeadlineSec * 1000)) {
|
||||
if (int_arg >= kMaxExpireDeadlineSec * 1000) {
|
||||
return builder->SendError(kExpiryOutOfRange);
|
||||
}
|
||||
sparams.expire_after_ms = int_arg;
|
||||
|
|
|
@ -287,4 +287,13 @@ auto BaseFamilyTest::AddFindConn(Protocol proto, std::string_view id) -> TestCon
|
|||
return it->second.get();
|
||||
}
|
||||
|
||||
RespVec BaseFamilyTest::Array(const RespExpr& expr) {
|
||||
CHECK(expr.type == RespExpr::ARRAY || expr.type == RespExpr::NIL_ARRAY);
|
||||
if (expr.type == RespExpr::NIL_ARRAY)
|
||||
return RespVec{};
|
||||
|
||||
const RespVec* src = get<RespVec*>(expr.u);
|
||||
return *src;
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -66,6 +66,7 @@ class BaseFamilyTest : public ::testing::Test {
|
|||
}
|
||||
|
||||
TestConnWrapper* AddFindConn(Protocol proto, std::string_view id);
|
||||
static RespVec Array(const RespExpr& expr);
|
||||
|
||||
// ts is ms
|
||||
void UpdateTime(uint64_t ms);
|
||||
|
|
Binary file not shown.
Binary file not shown.
Loading…
Reference in New Issue