chore(rdb): move object creation during loading to shard threads. (#188)

Related to #159. Before this change, rdb loading thread has been creating all the redis objects as well.
Now we separate rdb file parsing and objects creation. File parsing phase produces a load trace of one or more binary blobs.
Those blobs are then passed to the threads that are responsible to manage the objects.
The second phase is object creation based on the trace that was passed. Finally those binary blobs are destroyed.
As a result, each thread creates objects using the memory allocator it owns and memory stats become consistent.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2022-07-04 19:08:13 +03:00 committed by GitHub
parent 8cb486a690
commit 55389d9be5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 760 additions and 395 deletions

View File

@ -22,8 +22,10 @@ using facade::kDbIndOutOfRangeErr;
#define RETURN_ON_ERR(x) \
do { \
auto __ec = (x); \
if (__ec) \
if (__ec) { \
VLOG(1) << "Error " << __ec << " while calling " #x; \
return __ec; \
} \
} while (0)
#endif // RETURN_ON_ERR

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,9 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <boost/fiber/mutex.hpp>
#include <system_error>
extern "C" {
@ -24,18 +27,53 @@ class RdbLoader {
~RdbLoader();
std::error_code Load(::io::Source* src);
void set_source_limit(size_t n) { source_limit_ = n;}
void set_source_limit(size_t n) {
source_limit_ = n;
}
::io::Bytes Leftover() const { return mem_buf_.InputBuffer(); }
size_t bytes_read() const { return bytes_read_; }
::io::Bytes Leftover() const {
return mem_buf_.InputBuffer();
}
size_t bytes_read() const {
return bytes_read_;
}
private:
using MutableBytes = ::io::MutableBytes;
struct ObjSettings;
using OpaqueBuf = std::pair<void*, size_t>;
struct LzfString {
base::PODArray<uint8_t> compressed_blob;
uint64_t uncompressed_len;
};
struct LoadTrace;
using RdbVariant = std::variant<long long, robj*, base::PODArray<char>, LzfString,
std::unique_ptr<LoadTrace>>;
struct OpaqueObj {
RdbVariant obj;
int rdb_type;
};
struct LoadBlob {
RdbVariant rdb_var;
union {
unsigned encoding;
double score;
};
};
struct LoadTrace {
std::vector<LoadBlob> arr;
};
class OpaqueObjLoader;
struct Item {
sds key;
robj* val;
OpaqueObj val;
uint64_t expire_ms;
};
using ItemsBuf = std::vector<Item>;
@ -55,23 +93,27 @@ class RdbLoader {
// FetchGenericString may return various types. I basically copied the code
// from rdb.c and tried not to shoot myself on the way.
// flags are RDB_LOAD_XXX masks.
using OpaqueBuf = std::pair<void*, size_t>;
io::Result<OpaqueBuf> FetchGenericString(int flags);
io::Result<OpaqueBuf> FetchLzfStringObject(int flags);
io::Result<OpaqueBuf> FetchIntegerObject(int enctype, int flags, size_t* lenptr);
io::Result<OpaqueBuf> FetchIntegerObject(int enctype, int flags);
io::Result<double> FetchBinaryDouble();
io::Result<double> FetchDouble();
::io::Result<sds> ReadKey();
::io::Result<robj*> ReadObj(int rdbtype);
::io::Result<robj*> ReadSet();
::io::Result<robj*> ReadIntSet();
::io::Result<robj*> ReadHZiplist();
::io::Result<robj*> ReadHSet();
::io::Result<robj*> ReadZSet(int rdbtype);
::io::Result<robj*> ReadZSetZL();
::io::Result<robj*> ReadListQuicklist(int rdbtype);
::io::Result<OpaqueObj> ReadObj(int rdbtype);
::io::Result<RdbVariant> ReadStringObj();
::io::Result<long long> ReadIntObj(int encoding);
::io::Result<LzfString> ReadLzf();
::io::Result<OpaqueObj> ReadSet();
::io::Result<OpaqueObj> ReadIntSet();
::io::Result<OpaqueObj> ReadHZiplist();
::io::Result<OpaqueObj> ReadHMap();
::io::Result<OpaqueObj> ReadZSet(int rdbtype);
::io::Result<OpaqueObj> ReadZSetZL();
::io::Result<OpaqueObj> ReadListQuicklist(int rdbtype);
::io::Result<robj*> ReadStreams();
std::error_code EnsureRead(size_t min_sz) {
@ -86,7 +128,8 @@ class RdbLoader {
std::error_code VerifyChecksum();
void FlushShardAsync(ShardId sid);
static void LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib);
void LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib);
static size_t StrLen(const RdbVariant& tset);
ScriptMgr* script_mgr_;
base::IoBuf mem_buf_;
@ -97,6 +140,10 @@ class RdbLoader {
size_t bytes_read_ = 0;
size_t source_limit_ = SIZE_MAX;
DbIndex cur_db_index_ = 0;
::boost::fibers::mutex mu_;
std::error_code ec_; // guarded by mu_
std::atomic_bool stop_early_{false};
};
} // namespace dfly

View File

@ -97,6 +97,9 @@ TEST_F(RdbTest, LoadSmall6) {
EXPECT_THAT(StrArray(resp.GetVec()[1]),
UnorderedElementsAre("list1", "hset_zl", "list2", "zset_sl", "intset", "set1",
"zset_zl", "hset_ht", "intkey", "strkey"));
EXPECT_THAT(Run({"get", "intkey"}), "1234567");
EXPECT_THAT(Run({"get", "strkey"}), "abcdefghjjjjjjjjjj");
resp = Run({"smembers", "intset"});
ASSERT_THAT(resp, ArgType(RespExpr::ARRAY));
EXPECT_THAT(resp.GetVec(),

View File

@ -1182,13 +1182,13 @@ uint32_t SetFamily::MaxIntsetEntries() {
return kMaxIntSetEntries;
}
void SetFamily::ConvertTo(intset* src, dict* dest) {
void SetFamily::ConvertTo(const intset* src, dict* dest) {
int64_t intele;
char buf[32];
/* To add the elements we extract integers and create redis objects */
int ii = 0;
while (intsetGet(src, ii++, &intele)) {
while (intsetGet(const_cast<intset*>(src), ii++, &intele)) {
char* next = absl::numbers_internal::FastIntToBuffer(intele, buf);
sds s = sdsnewlen(buf, next - buf);
CHECK(dictAddRaw(dest, s, NULL));

View File

@ -25,7 +25,7 @@ class SetFamily {
static uint32_t MaxIntsetEntries();
static void ConvertTo(intset* src, dict* dest);
static void ConvertTo(const intset* src, dict* dest);
private:
static void SAdd(CmdArgList args, ConnectionContext* cntx);