chore(rdb): Move object creation during load operation to shard threads. (#193)

This commit fixes the behavior with streams loading and resolves #159.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2022-07-07 13:39:59 +03:00 committed by GitHub
parent 567af06d2c
commit 049f34c89f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 271 additions and 229 deletions

View File

@ -202,6 +202,7 @@ class RdbLoader::OpaqueObjLoader {
void CreateHMap(const LoadTrace* ltrace);
void CreateList(const LoadTrace* ltrace);
void CreateZSet(const LoadTrace* ltrace);
void CreateStream(const LoadTrace* ltrace);
void HandleBlob(string_view blob);
@ -246,7 +247,9 @@ void RdbLoader::OpaqueObjLoader::operator()(const unique_ptr<LoadTrace>& ptr) {
case RDB_TYPE_ZSET_2:
CreateZSet(ptr.get());
break;
case RDB_TYPE_STREAM_LISTPACKS:
CreateStream(ptr.get());
break;
default:
LOG(FATAL) << "Unsupported rdb type " << rdb_type_;
}
@ -363,9 +366,7 @@ void RdbLoader::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
} else {
dict* hmap = dictCreate(&hashDictType);
auto cleanup = absl::MakeCleanup([&] {
dictRelease(hmap);
});
auto cleanup = absl::MakeCleanup([&] { dictRelease(hmap); });
if (len > DICT_HT_INITIAL_SIZE) {
if (dictTryExpand(hmap, len) != DICT_OK) {
@ -515,6 +516,122 @@ void RdbLoader::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
pv_->ImportRObj(res);
}
void RdbLoader::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
CHECK(ltrace->stream_trace);
robj* res = createStreamObject();
stream* s = (stream*)res->ptr;
auto cleanup = absl::Cleanup([&] { decrRefCount(res); });
size_t lpcnt = ltrace->arr.size() / 2;
for (size_t i = 0; i < lpcnt; ++i) {
string_view nodekey = ToSV(ltrace->arr[i * 2].rdb_var);
string_view data = ToSV(ltrace->arr[i * 2 + 1].rdb_var);
uint8_t* lp = (uint8_t*)data.data();
if (!streamValidateListpackIntegrity(lp, data.size(), 0)) {
LOG(ERROR) << "Stream listpack integrity check failed.";
ec_ = RdbError(errc::rdb_file_corrupted);
return;
}
unsigned char* first = lpFirst(lp);
if (first == NULL) {
/* Serialized listpacks should never be empty, since on
* deletion we should remove the radix tree key if the
* resulting listpack is empty. */
LOG(ERROR) << "Empty listpack inside stream";
ec_ = RdbError(errc::rdb_file_corrupted);
return;
}
uint8_t* copy_lp = (uint8_t*)zmalloc(data.size());
memcpy(copy_lp, lp, data.size());
/* Insert the key in the radix tree. */
int retval =
raxTryInsert(s->rax_tree, (unsigned char*)nodekey.data(), nodekey.size(), copy_lp, NULL);
if (!retval) {
zfree(copy_lp);
LOG(ERROR) << "Listpack re-added with existing key";
ec_ = RdbError(errc::rdb_file_corrupted);
return;
}
}
s->length = ltrace->stream_trace->stream_len;
s->last_id.ms = ltrace->stream_trace->ms;
s->last_id.seq = ltrace->stream_trace->seq;
for (const auto& cg : ltrace->stream_trace->cgroup) {
string_view cgname = ToSV(cg.name);
streamID cg_id;
cg_id.ms = cg.ms;
cg_id.seq = cg.seq;
streamCG* cgroup = streamCreateCG(s, cgname.data(), cgname.size(), &cg_id, 0);
if (cgroup == NULL) {
LOG(ERROR) << "Duplicated consumer group name " << cgname;
ec_ = RdbError(errc::duplicate_key);
return;
}
for (const auto& pel : cg.pel_arr) {
streamNACK* nack = streamCreateNACK(NULL);
nack->delivery_time = pel.delivery_time;
nack->delivery_count = pel.delivery_count;
if (!raxTryInsert(cgroup->pel, const_cast<uint8_t*>(pel.rawid.data()), pel.rawid.size(), nack,
NULL)) {
LOG(ERROR) << "Duplicated global PEL entry loading stream consumer group";
ec_ = RdbError(errc::duplicate_key);
streamFreeNACK(nack);
return;
}
}
for (const auto& cons : cg.cons_arr) {
sds cname = ToSds(cons.name);
streamConsumer* consumer =
streamCreateConsumer(cgroup, cname, NULL, 0, SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
sdsfree(cname);
if (!consumer) {
LOG(ERROR) << "Duplicate stream consumer detected.";
ec_ = RdbError(errc::duplicate_key);
return;
}
consumer->seen_time = cons.seen_time;
/* Create the PEL (pending entries list) about entries owned by this specific
* consumer. */
for (const auto& rawid : cons.nack_arr) {
uint8_t* ptr = const_cast<uint8_t*>(rawid.data());
streamNACK* nack = (streamNACK*)raxFind(cgroup->pel, ptr, rawid.size());
if (nack == raxNotFound) {
LOG(ERROR) << "Consumer entry not found in group global PEL";
ec_ = RdbError(errc::rdb_file_corrupted);
return;
}
/* Set the NACK consumer, that was left to NULL when
* loading the global PEL. Then set the same shared
* NACK structure also in the consumer-specific PEL. */
nack->consumer = consumer;
if (!raxTryInsert(consumer->pel, ptr, rawid.size(), nack, NULL)) {
LOG(ERROR) << "Duplicated consumer PEL entry loading a stream consumer group";
streamFreeNACK(nack);
ec_ = RdbError(errc::duplicate_key);
return;
}
}
}
}
std::move(cleanup).Cancel();
pv_->ImportRObj(res);
}
void RdbLoader::OpaqueObjLoader::HandleBlob(string_view blob) {
if (rdb_type_ == RDB_TYPE_STRING) {
pv_->SetString(blob);
@ -996,37 +1113,26 @@ error_code RdbLoader::HandleAux() {
* are required to skip AUX fields they don't understand.
*
* An AUX field is composed of two strings: key and value. */
robj *auxkey, *auxval;
string auxkey, auxval;
auto exp = FetchGenericString(RDB_LOAD_NONE);
if (!exp)
return exp.error();
auxkey = (robj*)exp->first;
exp = FetchGenericString(RDB_LOAD_NONE);
if (!exp) {
decrRefCount(auxkey);
return exp.error();
}
SET_OR_RETURN(FetchGenericString(), auxkey);
SET_OR_RETURN(FetchGenericString(), auxval);
auxval = (robj*)exp->first;
char* auxkey_sds = (sds)auxkey->ptr;
char* auxval_sds = (sds)auxval->ptr;
if (auxkey_sds[0] == '%') {
if (!auxkey.empty() && auxkey[0] == '%') {
/* All the fields with a name staring with '%' are considered
* information fields and are logged at startup with a log
* level of NOTICE. */
LOG(INFO) << "RDB '" << auxkey_sds << "': " << auxval_sds;
} else if (!strcasecmp(auxkey_sds, "repl-stream-db")) {
LOG(INFO) << "RDB '" << auxkey << "': " << auxval;
} else if (auxkey == "repl-stream-db") {
// TODO
} else if (!strcasecmp(auxkey_sds, "repl-id")) {
} else if (auxkey == "repl-id") {
// TODO
} else if (!strcasecmp(auxkey_sds, "repl-offset")) {
} else if (auxkey == "repl-offset") {
// TODO
} else if (!strcasecmp(auxkey_sds, "lua")) {
} else if (auxkey == "lua") {
ServerState* ss = ServerState::tlocal();
Interpreter& script = ss->GetInterpreter();
string_view body{auxval_sds, strlen(auxval_sds)};
string_view body{auxval};
string result;
Interpreter::AddResult add_result = script.AddFunction(body, &result);
if (add_result == Interpreter::ADD_OK) {
@ -1035,31 +1141,34 @@ error_code RdbLoader::HandleAux() {
} else if (add_result == Interpreter::COMPILE_ERR) {
LOG(ERROR) << "Error when compiling lua scripts";
}
} else if (!strcasecmp(auxkey_sds, "redis-ver")) {
LOG(INFO) << "Loading RDB produced by version " << auxval_sds;
} else if (!strcasecmp(auxkey_sds, "ctime")) {
time_t age = time(NULL) - strtol(auxval_sds, NULL, 10);
if (age < 0)
age = 0;
LOG(INFO) << "RDB age " << strings::HumanReadableElapsedTime(age);
} else if (!strcasecmp(auxkey_sds, "used-mem")) {
long long usedmem = strtoll(auxval_sds, NULL, 10);
LOG(INFO) << "RDB memory usage when created " << strings::HumanReadableNumBytes(usedmem);
} else if (!strcasecmp(auxkey_sds, "aof-preamble")) {
long long haspreamble = strtoll(auxval_sds, NULL, 10);
if (haspreamble)
} else if (auxkey == "redis-ver") {
LOG(INFO) << "Loading RDB produced by version " << auxval;
} else if (auxkey == "ctime") {
int64_t ctime;
if (absl::SimpleAtoi(auxval, &ctime)) {
time_t age = time(NULL) - ctime;
if (age < 0)
age = 0;
LOG(INFO) << "RDB age " << strings::HumanReadableElapsedTime(age);
}
} else if (auxkey == "used-mem") {
long long usedmem;
if (absl::SimpleAtoi(auxval, &usedmem)) {
LOG(INFO) << "RDB memory usage when created " << strings::HumanReadableNumBytes(usedmem);
}
} else if (auxkey == "aof-preamble") {
long long haspreamble;
if (absl::SimpleAtoi(auxval, &haspreamble) && haspreamble) {
LOG(INFO) << "RDB has an AOF tail";
} else if (!strcasecmp(auxkey_sds, "redis-bits")) {
}
} else if (auxkey == "redis-bits") {
/* Just ignored. */
} else {
/* We ignore fields we don't understand, as by AUX field
* contract. */
LOG(WARNING) << "Unrecognized RDB AUX field: '" << auxkey_sds << "'";
LOG(WARNING) << "Unrecognized RDB AUX field: '" << auxkey << "'";
}
decrRefCount(auxkey);
decrRefCount(auxval);
return kOk;
}
@ -1092,7 +1201,7 @@ void RdbLoader::FlushShardAsync(ShardId sid) {
void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
DbSlice& db_slice = EngineShard::tlocal()->db_slice();
for (const auto& item : ib) {
std::string_view key{item.key, sdslen(item.key)};
std::string_view key{item.key};
PrimeValue pv;
OpaqueObjLoader visitor(item.val.rdb_type, &pv);
std::visit(visitor, item.val.obj);
@ -1109,8 +1218,6 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
if (!added) {
LOG(WARNING) << "RDB has duplicated key '" << key << "' in DB " << db_ind;
}
sdsfree(item.key);
}
}
@ -1134,7 +1241,7 @@ size_t RdbLoader::StrLen(const RdbVariant& tset) {
return 0;
}
auto RdbLoader::FetchGenericString(int flags) -> io::Result<OpaqueBuf> {
auto RdbLoader::FetchGenericString() -> io::Result<string> {
bool isencoded;
size_t len;
@ -1145,58 +1252,41 @@ auto RdbLoader::FetchGenericString(int flags) -> io::Result<OpaqueBuf> {
case RDB_ENC_INT8:
case RDB_ENC_INT16:
case RDB_ENC_INT32:
return FetchIntegerObject(len, flags);
return FetchIntegerObject(len);
case RDB_ENC_LZF:
return FetchLzfStringObject(flags);
return FetchLzfStringObject();
default:
LOG(ERROR) << "Unknown RDB string encoding len " << len;
return Unexpected(errc::rdb_file_corrupted);
}
}
bool encode = (flags & RDB_LOAD_ENC) != 0;
bool plain = (flags & RDB_LOAD_PLAIN) != 0;
bool sds = (flags & RDB_LOAD_SDS) != 0;
string res;
if (plain || sds) {
if (plain && len == 0) {
return make_pair(nullptr, 0);
}
char* buf = plain ? (char*)zmalloc(len) : sdsnewlen(SDS_NOINIT, len);
error_code ec = FetchBuf(len, buf);
if (len > 0) {
res.resize(len);
error_code ec = FetchBuf(len, res.data());
if (ec) {
if (plain)
zfree(buf);
else
sdsfree(buf);
return make_unexpected(ec);
}
return make_pair(buf, len);
}
robj* o = encode ? createStringObject(SDS_NOINIT, len) : createRawStringObject(SDS_NOINIT, len);
error_code ec = FetchBuf(len, o->ptr);
if (ec) {
decrRefCount(o);
return make_unexpected(ec);
}
return make_pair(o, len);
return res;
}
auto RdbLoader::FetchLzfStringObject(int flags) -> io::Result<OpaqueBuf> {
auto RdbLoader::FetchLzfStringObject() -> io::Result<string> {
bool zerocopy_decompress = true;
const uint8_t* cbuf = NULL;
char* val = NULL;
uint64_t clen, len;
SET_OR_UNEXPECT(LoadLen(NULL), clen);
SET_OR_UNEXPECT(LoadLen(NULL), len);
CHECK_LE(len, 1ULL << 29);
if (len > 1ULL << 29 || len <= clen || clen == 0) {
LOG(ERROR) << "Bad compressed string";
return Unexpected(rdb::rdb_file_corrupted);
}
if (mem_buf_.InputLen() >= clen) {
cbuf = mem_buf_.InputBuffer().data();
@ -1212,18 +1302,9 @@ auto RdbLoader::FetchLzfStringObject(int flags) -> io::Result<OpaqueBuf> {
cbuf = compr_buf_.data();
}
bool plain = (flags & RDB_LOAD_PLAIN) != 0;
bool sds = (flags & RDB_LOAD_SDS) != 0;
DCHECK_EQ(false, plain && sds);
string res(len, 0);
/* Allocate our target according to the uncompressed size. */
if (plain) {
val = (char*)zmalloc(len);
} else {
val = sdsnewlen(SDS_NOINIT, len);
}
if (lzf_decompress(cbuf, clen, val, len) == 0) {
if (lzf_decompress(cbuf, clen, res.data(), len) == 0) {
LOG(ERROR) << "Invalid LZF compressed string";
return Unexpected(errc::rdb_file_corrupted);
}
@ -1233,17 +1314,10 @@ auto RdbLoader::FetchLzfStringObject(int flags) -> io::Result<OpaqueBuf> {
if (zerocopy_decompress)
mem_buf_.ConsumeInput(clen);
if (plain || sds) {
return make_pair(val, len);
}
return make_pair(createObject(OBJ_STRING, val), len);
return res;
}
auto RdbLoader::FetchIntegerObject(int enctype, int flags) -> io::Result<OpaqueBuf> {
bool plain = (flags & RDB_LOAD_PLAIN) != 0;
bool sds = (flags & RDB_LOAD_SDS) != 0;
bool encode = (flags & RDB_LOAD_ENC) != 0;
auto RdbLoader::FetchIntegerObject(int enctype) -> io::Result<string> {
long long val;
if (enctype == RDB_ENC_INT8) {
@ -1256,17 +1330,10 @@ auto RdbLoader::FetchIntegerObject(int enctype, int flags) -> io::Result<OpaqueB
return Unexpected(errc::invalid_encoding);
}
if (plain || sds) {
char buf[LONG_STR_SIZE], *p;
int len = ll2string(buf, sizeof(buf), val);
p = plain ? (char*)zmalloc(len) : sdsnewlen(SDS_NOINIT, len);
memcpy(p, buf, len);
return make_pair(p, len);
}
char buf[32];
absl::numbers_internal::FastIntToBuffer(val, buf);
robj* o = encode ? createStringObjectFromLongLongForValue(val)
: createObject(OBJ_STRING, sdsfromlonglong(val));
return make_pair(o, 16);
return string(buf);
}
io::Result<double> RdbLoader::FetchBinaryDouble() {
@ -1311,19 +1378,11 @@ io::Result<double> RdbLoader::FetchDouble() {
return val;
}
auto RdbLoader::ReadKey() -> io::Result<sds> {
auto res = FetchGenericString(RDB_LOAD_SDS);
if (res) {
sds k = (sds)res->first;
DVLOG(2) << "Read " << std::string_view(k, sdslen(k));
return k;
}
return res.get_unexpected();
auto RdbLoader::ReadKey() -> io::Result<string> {
return FetchGenericString();
}
auto RdbLoader::ReadObj(int rdbtype) -> io::Result<OpaqueObj> {
io::Result<robj*> res_obj = nullptr;
switch (rdbtype) {
case RDB_TYPE_STRING: {
/* Read string value */
@ -1349,17 +1408,13 @@ auto RdbLoader::ReadObj(int rdbtype) -> io::Result<OpaqueObj> {
case RDB_TYPE_LIST_QUICKLIST_2:
return ReadListQuicklist(rdbtype);
case RDB_TYPE_STREAM_LISTPACKS:
res_obj = ReadStreams();
return ReadStreams();
break;
default:
LOG(ERROR) << "Unsupported rdb type " << rdbtype;
return Unexpected(errc::invalid_encoding);
}
if (!res_obj)
return make_unexpected(res_obj.error());
LOG(ERROR) << "Unsupported rdb type " << rdbtype;
return OpaqueObj{*res_obj, rdbtype};
return Unexpected(errc::invalid_encoding);
}
auto RdbLoader::ReadStringObj() -> io::Result<RdbVariant> {
@ -1575,95 +1630,64 @@ auto RdbLoader::ReadListQuicklist(int rdbtype) -> io::Result<OpaqueObj> {
return OpaqueObj{std::move(load_trace), rdbtype};
}
auto RdbLoader::ReadStreams() -> io::Result<robj*> {
auto RdbLoader::ReadStreams() -> io::Result<OpaqueObj> {
uint64_t listpacks;
SET_OR_UNEXPECT(LoadLen(nullptr), listpacks);
robj* o = createStreamObject();
stream* s = (stream*)o->ptr;
unique_ptr<LoadTrace> load_trace(new LoadTrace);
load_trace->arr.resize(listpacks * 2);
auto cleanup = absl::Cleanup([&] { decrRefCount(o); });
while (listpacks--) {
for (size_t i = 0; i < listpacks; ++i) {
/* Get the master ID, the one we'll use as key of the radix tree
* node: the entries inside the listpack itself are delta-encoded
* relatively to this ID. */
sds nodekey;
SET_OR_UNEXPECT(ReadKey(), nodekey);
auto cleanup2 = absl::Cleanup([&] { sdsfree(nodekey); });
RdbVariant stream_id, blob;
SET_OR_UNEXPECT(ReadStringObj(), stream_id);
if (sdslen(nodekey) != sizeof(streamID)) {
if (StrLen(stream_id) != sizeof(streamID)) {
LOG(ERROR) << "Stream node key entry is not the size of a stream ID";
return Unexpected(errc::rdb_file_corrupted);
}
/* Load the listpack. */
OpaqueBuf fetch;
SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_PLAIN), fetch);
if (fetch.second == 0 || fetch.first == nullptr) {
SET_OR_UNEXPECT(ReadStringObj(), blob);
if (StrLen(blob) == 0) {
LOG(ERROR) << "Stream listpacks loading failed";
return Unexpected(errc::rdb_file_corrupted);
}
DCHECK(fetch.first);
uint8_t* lp = (uint8_t*)fetch.first;
if (!streamValidateListpackIntegrity(lp, fetch.second, 0)) {
zfree(lp);
LOG(ERROR) << "Stream listpack integrity check failed.";
return Unexpected(errc::rdb_file_corrupted);
}
unsigned char* first = lpFirst(lp);
if (first == NULL) {
/* Serialized listpacks should never be empty, since on
* deletion we should remove the radix tree key if the
* resulting listpack is empty. */
LOG(ERROR) << "Empty listpack inside stream";
zfree(lp);
return Unexpected(errc::rdb_file_corrupted);
}
/* Insert the key in the radix tree. */
int retval = raxTryInsert(s->rax_tree, (unsigned char*)nodekey, sizeof(streamID), lp, NULL);
if (!retval) {
LOG(ERROR) << "Listpack re-added with existing key";
return Unexpected(errc::duplicate_key);
}
load_trace->arr[2 * i].rdb_var = std::move(stream_id);
load_trace->arr[2 * i + 1].rdb_var = std::move(blob);
}
load_trace->stream_trace.reset(new StreamTrace);
/* Load total number of items inside the stream. */
SET_OR_UNEXPECT(LoadLen(nullptr), s->length);
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->stream_len);
/* Load the last entry ID. */
SET_OR_UNEXPECT(LoadLen(nullptr), s->last_id.ms);
SET_OR_UNEXPECT(LoadLen(nullptr), s->last_id.seq);
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->ms);
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->seq);
/* Consumer groups loading */
uint64_t cgroups_count;
SET_OR_UNEXPECT(LoadLen(nullptr), cgroups_count);
load_trace->stream_trace->cgroup.resize(cgroups_count);
while (cgroups_count--) {
for (size_t i = 0; i < cgroups_count; ++i) {
auto& cgroup = load_trace->stream_trace->cgroup[i];
/* Get the consumer group name and ID. We can then create the
* consumer group ASAP and populate its structure as
* we read more data. */
streamID cg_id;
sds cgname;
SET_OR_UNEXPECT(ReadKey(), cgname);
// sds cgname;
RdbVariant cgname;
SET_OR_UNEXPECT(ReadStringObj(), cgname);
cgroup.name = std::move(cgname);
auto cleanup2 = absl::Cleanup([&] { sdsfree(cgname); });
SET_OR_UNEXPECT(LoadLen(nullptr), cg_id.ms);
SET_OR_UNEXPECT(LoadLen(nullptr), cg_id.seq);
streamCG* cgroup = streamCreateCG(s, cgname, sdslen(cgname), &cg_id, 0);
if (cgroup == NULL) {
LOG(ERROR) << "Duplicated consumer group name " << cgname;
return Unexpected(errc::duplicate_key);
}
std::move(cleanup2).Invoke();
// no need to free cgroup because it's attached to s.
SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.ms);
SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.seq);
/* Load the global PEL for this consumer group, however we'll
* not yet populate the NACK structures with the message
@ -1673,79 +1697,74 @@ auto RdbLoader::ReadStreams() -> io::Result<robj*> {
uint64_t pel_size;
SET_OR_UNEXPECT(LoadLen(nullptr), pel_size);
while (pel_size--) {
uint8_t rawid[sizeof(streamID)];
error_code ec = FetchBuf(sizeof(rawid), rawid);
cgroup.pel_arr.resize(pel_size);
for (size_t j = 0; j < pel_size; ++j) {
auto& pel = cgroup.pel_arr[j];
error_code ec = FetchBuf(pel.rawid.size(), pel.rawid.data());
if (ec) {
LOG(ERROR) << "Stream PEL ID loading failed.";
return make_unexpected(ec);
}
streamNACK* nack = streamCreateNACK(NULL);
auto cleanup2 = absl::Cleanup([&] { streamFreeNACK(nack); });
// streamNACK* nack = streamCreateNACK(NULL);
// auto cleanup2 = absl::Cleanup([&] { streamFreeNACK(nack); });
SET_OR_UNEXPECT(FetchInt<int64_t>(), nack->delivery_time);
SET_OR_UNEXPECT(LoadLen(nullptr), nack->delivery_count);
SET_OR_UNEXPECT(FetchInt<int64_t>(), pel.delivery_time);
SET_OR_UNEXPECT(LoadLen(nullptr), pel.delivery_count);
if (!raxTryInsert(cgroup->pel, rawid, sizeof(rawid), nack, NULL)) {
/*if (!raxTryInsert(cgroup->pel, rawid, sizeof(rawid), nack, NULL)) {
LOG(ERROR) << "Duplicated global PEL entry loading stream consumer group";
return Unexpected(errc::duplicate_key);
}
std::move(cleanup2).Cancel();
std::move(cleanup2).Cancel();*/
}
/* Now that we loaded our global PEL, we need to load the
* consumers and their local PELs. */
uint64_t consumers_num;
SET_OR_UNEXPECT(LoadLen(nullptr), consumers_num);
cgroup.cons_arr.resize(consumers_num);
while (consumers_num--) {
sds cname;
SET_OR_UNEXPECT(ReadKey(), cname);
for (size_t j = 0; j < consumers_num; ++j) {
auto& consumer = cgroup.cons_arr[j];
SET_OR_UNEXPECT(ReadStringObj(), consumer.name);
streamConsumer* consumer =
streamCreateConsumer(cgroup, cname, NULL, 0, SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
sdsfree(cname);
if (!consumer) {
LOG(ERROR) << "Duplicate stream consumer detected.";
return Unexpected(errc::duplicate_key);
}
// no need to free consumer because it's attached to cgroup.
SET_OR_UNEXPECT(FetchInt<int64_t>(), consumer->seen_time);
SET_OR_UNEXPECT(FetchInt<int64_t>(), consumer.seen_time);
/* Load the PEL about entries owned by this specific
* consumer. */
SET_OR_UNEXPECT(LoadLen(nullptr), pel_size);
while (pel_size--) {
unsigned char rawid[sizeof(streamID)];
error_code ec = FetchBuf(sizeof(rawid), rawid);
consumer.nack_arr.resize(pel_size);
for (size_t k = 0; k < pel_size; ++k) {
auto& nack = consumer.nack_arr[k];
// unsigned char rawid[sizeof(streamID)];
error_code ec = FetchBuf(nack.size(), nack.data());
if (ec) {
LOG(ERROR) << "Stream PEL ID loading failed.";
return make_unexpected(ec);
}
streamNACK* nack = (streamNACK*)raxFind(cgroup->pel, rawid, sizeof(rawid));
/*streamNACK* nack = (streamNACK*)raxFind(cgroup->pel, rawid, sizeof(rawid));
if (nack == raxNotFound) {
LOG(ERROR) << "Consumer entry not found in group global PEL";
return Unexpected(errc::rdb_file_corrupted);
}
}*/
/* Set the NACK consumer, that was left to NULL when
* loading the global PEL. Then set the same shared
* NACK structure also in the consumer-specific PEL. */
/*
nack->consumer = consumer;
if (!raxTryInsert(consumer->pel, rawid, sizeof(rawid), nack, NULL)) {
LOG(ERROR) << "Duplicated consumer PEL entry loading a stream consumer group";
streamFreeNACK(nack);
return Unexpected(errc::duplicate_key);
}
}*/
}
} // while (consumers_num)
} // while (cgroup_num)
std::move(cleanup).Cancel();
return o;
return OpaqueObj{std::move(load_trace), RDB_TYPE_STREAM_LISTPACKS};
}
void RdbLoader::ResizeDb(size_t key_num, size_t expire_num) {
@ -1755,19 +1774,19 @@ void RdbLoader::ResizeDb(size_t key_num, size_t expire_num) {
error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
/* Read key */
sds key;
string key;
OpaqueObj val;
// We free key in LoadItemsBuffer.
SET_OR_RETURN(ReadKey(), key);
auto key_cleanup = absl::MakeCleanup([key] { sdsfree(key); });
io::Result<OpaqueObj> io_res = ReadObj(type);
if (!io_res) {
VLOG(1) << "ReadObj error " << io_res.error() << " for key " << key;
return io_res.error();
}
val = std::move(io_res.value());
/* Check if the key already expired. This function is used when loading
@ -1783,14 +1802,12 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
if (should_expire) {
// decrRefCount(val);
} else {
std::move(key_cleanup).Cancel();
std::string_view str_key(key, sdslen(key));
ShardId sid = Shard(str_key, shard_set->size());
ShardId sid = Shard(key, shard_set->size());
uint64_t expire_at_ms = settings->expiretime;
auto& out_buf = shard_buf_[sid];
out_buf.emplace_back(Item{key, std::move(val), expire_at_ms});
out_buf.emplace_back(Item{std::move(key), std::move(val), expire_at_ms});
constexpr size_t kBufSize = 128;
if (out_buf.size() >= kBufSize) {

View File

@ -42,7 +42,6 @@ class RdbLoader {
using MutableBytes = ::io::MutableBytes;
struct ObjSettings;
using OpaqueBuf = std::pair<void*, size_t>;
struct LzfString {
base::PODArray<uint8_t> compressed_blob;
uint64_t uncompressed_len;
@ -50,7 +49,7 @@ class RdbLoader {
struct LoadTrace;
using RdbVariant = std::variant<long long, robj*, base::PODArray<char>, LzfString,
using RdbVariant = std::variant<long long, base::PODArray<char>, LzfString,
std::unique_ptr<LoadTrace>>;
struct OpaqueObj {
RdbVariant obj;
@ -65,14 +64,43 @@ class RdbLoader {
};
};
struct StreamPelTrace {
std::array<uint8_t, 16> rawid;
int64_t delivery_time;
uint64_t delivery_count;
};
struct StreamConsumerTrace {
RdbVariant name;
int64_t seen_time;
std::vector<std::array<uint8_t, 16>> nack_arr;
};
struct StreamCGTrace {
RdbVariant name;
uint64_t ms;
uint64_t seq;
std::vector<StreamPelTrace> pel_arr;
std::vector<StreamConsumerTrace> cons_arr;
};
struct StreamTrace {
size_t lp_len;
size_t stream_len;
uint64_t ms, seq;
std::vector<StreamCGTrace> cgroup;
};
struct LoadTrace {
std::vector<LoadBlob> arr;
std::unique_ptr<StreamTrace> stream_trace;
};
class OpaqueObjLoader;
struct Item {
sds key;
std::string key;
OpaqueObj val;
uint64_t expire_ms;
};
@ -90,17 +118,14 @@ class RdbLoader {
io::Result<uint64_t> LoadLen(bool* is_encoded);
std::error_code FetchBuf(size_t size, void* dest);
// FetchGenericString may return various types. I basically copied the code
// from rdb.c and tried not to shoot myself on the way.
// flags are RDB_LOAD_XXX masks.
io::Result<OpaqueBuf> FetchGenericString(int flags);
io::Result<OpaqueBuf> FetchLzfStringObject(int flags);
io::Result<OpaqueBuf> FetchIntegerObject(int enctype, int flags);
io::Result<std::string> FetchGenericString();
io::Result<std::string> FetchLzfStringObject();
io::Result<std::string> FetchIntegerObject(int enctype);
io::Result<double> FetchBinaryDouble();
io::Result<double> FetchDouble();
::io::Result<sds> ReadKey();
::io::Result<std::string> ReadKey();
::io::Result<OpaqueObj> ReadObj(int rdbtype);
::io::Result<RdbVariant> ReadStringObj();
@ -114,7 +139,7 @@ class RdbLoader {
::io::Result<OpaqueObj> ReadZSet(int rdbtype);
::io::Result<OpaqueObj> ReadZSetZL();
::io::Result<OpaqueObj> ReadListQuicklist(int rdbtype);
::io::Result<robj*> ReadStreams();
::io::Result<OpaqueObj> ReadStreams();
std::error_code EnsureRead(size_t min_sz) {
if (mem_buf_.InputLen() >= min_sz)