More printings in rdb_test in order to catch the snapshotting bug
This commit is contained in:
parent
a049128ab6
commit
d3f598eb88
|
@ -62,5 +62,5 @@ jobs:
|
||||||
GLOG_logtostderr=1 GLOG_vmodule=rdb_load=1,rdb_save=1 ctest -V -R rdb_test
|
GLOG_logtostderr=1 GLOG_vmodule=rdb_load=1,rdb_save=1 ctest -V -R rdb_test
|
||||||
echo Run ctest -V -L DFLY
|
echo Run ctest -V -L DFLY
|
||||||
#GLOG_logtostderr=1 GLOG_vmodule=transaction=1,engine_shard_set=1
|
#GLOG_logtostderr=1 GLOG_vmodule=transaction=1,engine_shard_set=1
|
||||||
GLOG_logtostderr=1 GLOG_vmodule=rdb_load=1,rdb_save=1 ctest -V -L DFLY
|
GLOG_logtostderr=1 GLOG_vmodule=rdb_load=1,rdb_save=2,snapshot=2 ctest -V -L DFLY
|
||||||
# GLOG_logtostderr=1 GLOG_vmodule=transaction=1,engine_shard_set=1 CTEST_OUTPUT_ON_FAILURE=1 ninja server/test
|
# GLOG_logtostderr=1 GLOG_vmodule=transaction=1,engine_shard_set=1 CTEST_OUTPUT_ON_FAILURE=1 ninja server/test
|
||||||
|
|
|
@ -187,7 +187,7 @@ error_code RdbSerializer::SaveEntry(const PrimeKey& pk, const PrimeValue& pv, ui
|
||||||
unsigned encoding = pv.Encoding();
|
unsigned encoding = pv.Encoding();
|
||||||
uint8_t rdb_type = RdbObjectType(obj_type, encoding);
|
uint8_t rdb_type = RdbObjectType(obj_type, encoding);
|
||||||
|
|
||||||
DVLOG(2) << "Saving keyval start " << key;
|
DVLOG(3) << "Saving keyval start " << key;
|
||||||
|
|
||||||
++type_freq_map_[rdb_type];
|
++type_freq_map_[rdb_type];
|
||||||
RETURN_ON_ERR(WriteOpcode(rdb_type));
|
RETURN_ON_ERR(WriteOpcode(rdb_type));
|
||||||
|
@ -238,7 +238,7 @@ error_code RdbSerializer::SaveListObject(const robj* obj) {
|
||||||
RETURN_ON_ERR(SaveLen(ql->len));
|
RETURN_ON_ERR(SaveLen(ql->len));
|
||||||
|
|
||||||
while (node) {
|
while (node) {
|
||||||
DVLOG(2) << "QL node (encoding/container/sz): " << node->encoding << "/" << node->container
|
DVLOG(3) << "QL node (encoding/container/sz): " << node->encoding << "/" << node->container
|
||||||
<< "/" << node->sz;
|
<< "/" << node->sz;
|
||||||
if (QL_NODE_IS_PLAIN(node)) {
|
if (QL_NODE_IS_PLAIN(node)) {
|
||||||
if (quicklistNodeIsCompressed(node)) {
|
if (quicklistNodeIsCompressed(node)) {
|
||||||
|
@ -604,7 +604,7 @@ std::error_code RdbSaver::SaveHeader(const StringVec& lua_scripts) {
|
||||||
|
|
||||||
error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) {
|
error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) {
|
||||||
RETURN_ON_ERR(impl_->serializer.FlushMem());
|
RETURN_ON_ERR(impl_->serializer.FlushMem());
|
||||||
VLOG(1) << "SaveBody";
|
VLOG(1) << "SaveBody , snapshots count: " << impl_->shard_snapshots.size();
|
||||||
|
|
||||||
size_t num_written = 0;
|
size_t num_written = 0;
|
||||||
SliceSnapshot::DbRecord record;
|
SliceSnapshot::DbRecord record;
|
||||||
|
@ -635,6 +635,7 @@ error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) {
|
||||||
last_db_index = record.db_index;
|
last_db_index = record.db_index;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DVLOG(2) << "Pulled " << record.id;
|
||||||
channel_bytes += record.value.size();
|
channel_bytes += record.value.size();
|
||||||
io_error = aligned_buf_.Write(record.value);
|
io_error = aligned_buf_.Write(record.value);
|
||||||
record.value.clear();
|
record.value.clear();
|
||||||
|
@ -651,8 +652,10 @@ error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) {
|
||||||
DCHECK(!channel.TryPop(record));
|
DCHECK(!channel.TryPop(record));
|
||||||
VLOG(1) << "Blobs written " << num_written << " pulled bytes: " << channel_bytes
|
VLOG(1) << "Blobs written " << num_written << " pulled bytes: " << channel_bytes
|
||||||
<< " pushed bytes: " << pushed_bytes;
|
<< " pushed bytes: " << pushed_bytes;
|
||||||
if (io_error)
|
if (io_error) {
|
||||||
|
VLOG(1) << "io error " << io_error;
|
||||||
return io_error;
|
return io_error;
|
||||||
|
}
|
||||||
|
|
||||||
RETURN_ON_ERR(SaveEpilog());
|
RETURN_ON_ERR(SaveEpilog());
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,6 @@ extern "C" {
|
||||||
}
|
}
|
||||||
|
|
||||||
#include <absl/flags/reflection.h>
|
#include <absl/flags/reflection.h>
|
||||||
|
|
||||||
#include <mimalloc.h>
|
#include <mimalloc.h>
|
||||||
|
|
||||||
#include "base/flags.h"
|
#include "base/flags.h"
|
||||||
|
@ -26,8 +25,8 @@ using namespace testing;
|
||||||
using namespace std;
|
using namespace std;
|
||||||
using namespace util;
|
using namespace util;
|
||||||
using namespace facade;
|
using namespace facade;
|
||||||
using absl::StrCat;
|
|
||||||
using absl::SetFlag;
|
using absl::SetFlag;
|
||||||
|
using absl::StrCat;
|
||||||
|
|
||||||
ABSL_DECLARE_FLAG(int32, list_compress_depth);
|
ABSL_DECLARE_FLAG(int32, list_compress_depth);
|
||||||
ABSL_DECLARE_FLAG(int32, list_max_listpack_size);
|
ABSL_DECLARE_FLAG(int32, list_max_listpack_size);
|
||||||
|
@ -85,7 +84,11 @@ TEST_F(RdbTest, LoadEmpty) {
|
||||||
TEST_F(RdbTest, LoadSmall6) {
|
TEST_F(RdbTest, LoadSmall6) {
|
||||||
io::FileSource fs = GetSource("redis6_small.rdb");
|
io::FileSource fs = GetSource("redis6_small.rdb");
|
||||||
RdbLoader loader(service_->script_mgr());
|
RdbLoader loader(service_->script_mgr());
|
||||||
auto ec = loader.Load(&fs);
|
|
||||||
|
// must run in proactor thread in order to avoid polluting the serverstate
|
||||||
|
// in the main, testing thread.
|
||||||
|
auto ec = pp_->at(0)->Await([&] { return loader.Load(&fs); });
|
||||||
|
|
||||||
ASSERT_FALSE(ec) << ec.message();
|
ASSERT_FALSE(ec) << ec.message();
|
||||||
|
|
||||||
auto resp = Run({"scan", "0"});
|
auto resp = Run({"scan", "0"});
|
||||||
|
@ -215,6 +218,14 @@ TEST_F(RdbTest, SaveManyDbs) {
|
||||||
ASSERT_EQ(2, metrics.db.size());
|
ASSERT_EQ(2, metrics.db.size());
|
||||||
EXPECT_EQ(50000, metrics.db[0].key_count);
|
EXPECT_EQ(50000, metrics.db[0].key_count);
|
||||||
EXPECT_EQ(10000, metrics.db[1].key_count);
|
EXPECT_EQ(10000, metrics.db[1].key_count);
|
||||||
|
if (metrics.db[1].key_count != 10000) {
|
||||||
|
Run({"select", "1"});
|
||||||
|
resp = Run({"scan", "0", "match", "ab*"});
|
||||||
|
StringVec vec = StrArray(resp.GetVec()[1]);
|
||||||
|
for (const auto& s: vec) {
|
||||||
|
LOG(ERROR) << "Bad key: " << s;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace dfly
|
} // namespace dfly
|
||||||
|
|
|
@ -69,6 +69,7 @@ void SliceSnapshot::SerializeSingleEntry(DbIndex db_indx, const PrimeKey& pk,
|
||||||
}
|
}
|
||||||
error_code ec = rdb_serializer_->SaveEntry(pk, pv, expire_time);
|
error_code ec = rdb_serializer_->SaveEntry(pk, pv, expire_time);
|
||||||
CHECK(!ec); // we write to StringFile.
|
CHECK(!ec); // we write to StringFile.
|
||||||
|
++num_records_in_blob_;
|
||||||
|
|
||||||
if (db_indx != savecb_current_db_) {
|
if (db_indx != savecb_current_db_) {
|
||||||
ec = rdb_serializer_->FlushMem();
|
ec = rdb_serializer_->FlushMem();
|
||||||
|
@ -76,8 +77,15 @@ void SliceSnapshot::SerializeSingleEntry(DbIndex db_indx, const PrimeKey& pk,
|
||||||
string tmp = std::move(sfile_->val);
|
string tmp = std::move(sfile_->val);
|
||||||
channel_bytes_ += tmp.size();
|
channel_bytes_ += tmp.size();
|
||||||
DCHECK(!dest_->IsClosing());
|
DCHECK(!dest_->IsClosing());
|
||||||
|
DbRecord rec{.db_index = db_indx,
|
||||||
|
.id = rec_id_,
|
||||||
|
.num_records = num_records_in_blob_,
|
||||||
|
.value = std::move(tmp)};
|
||||||
|
DVLOG(2) << "Pushed " << rec_id_;
|
||||||
|
++rec_id_;
|
||||||
|
num_records_in_blob_ = 0;
|
||||||
|
|
||||||
dest_->Push(DbRecord{db_indx, std::move(tmp)});
|
dest_->Push(std::move(rec));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -149,7 +157,14 @@ bool SliceSnapshot::FlushSfile(bool force) {
|
||||||
|
|
||||||
string tmp = std::move(sfile_->val); // important to move before pushing!
|
string tmp = std::move(sfile_->val); // important to move before pushing!
|
||||||
channel_bytes_ += tmp.size();
|
channel_bytes_ += tmp.size();
|
||||||
dest_->Push(DbRecord{savecb_current_db_, std::move(tmp)});
|
DbRecord rec{.db_index = savecb_current_db_,
|
||||||
|
.id = rec_id_,
|
||||||
|
.num_records = num_records_in_blob_,
|
||||||
|
.value = std::move(tmp)};
|
||||||
|
DVLOG(2) << "Pushed " << rec_id_;
|
||||||
|
++rec_id_;
|
||||||
|
num_records_in_blob_ = 0;
|
||||||
|
dest_->Push(std::move(rec));
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,8 @@ class SliceSnapshot {
|
||||||
// RdbSaver adds "select" opcodes when necessary in order to maintain consistency.
|
// RdbSaver adds "select" opcodes when necessary in order to maintain consistency.
|
||||||
struct DbRecord {
|
struct DbRecord {
|
||||||
DbIndex db_index;
|
DbIndex db_index;
|
||||||
|
uint64_t id;
|
||||||
|
uint32_t num_records;
|
||||||
std::string value;
|
std::string value;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -68,6 +70,8 @@ class SliceSnapshot {
|
||||||
|
|
||||||
size_t channel_bytes_ = 0;
|
size_t channel_bytes_ = 0;
|
||||||
size_t serialized_ = 0, skipped_ = 0, side_saved_ = 0, savecb_calls_ = 0;
|
size_t serialized_ = 0, skipped_ = 0, side_saved_ = 0, savecb_calls_ = 0;
|
||||||
|
uint64_t rec_id_ = 0;
|
||||||
|
uint32_t num_records_in_blob_ = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace dfly
|
} // namespace dfly
|
||||||
|
|
Loading…
Reference in New Issue