This commit is contained in:
Roman Gershman 2022-05-21 07:09:46 +03:00
parent 1de6f5317d
commit 543979875a
5 changed files with 40 additions and 2 deletions

View File

@ -390,6 +390,7 @@ void DbSlice::FlushDb(DbIndex db_ind) {
auto db_ptr = std::move(db);
DCHECK(!db);
CreateDb(db_ind);
db_arr_[db_ind]->trans_locks.swap(db_ptr->trans_locks);
boost::fibers::fiber([db_ptr = std::move(db_ptr)]() mutable { db_ptr.reset(); }).detach();
@ -401,6 +402,7 @@ void DbSlice::FlushDb(DbIndex db_ind) {
for (size_t i = 0; i < db_arr_.size(); ++i) {
if (all_dbs[i]) {
CreateDb(i);
db_arr_[i]->trans_locks.swap(all_dbs[i]->trans_locks);
}
}

View File

@ -371,6 +371,23 @@ TEST_F(ListFamilyTest, BPopRename) {
EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey1, "bar"));
}
TEST_F(ListFamilyTest, BPopFlush) {
RespExpr blpop_resp;
auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, "0"});
});
do {
this_fiber::sleep_for(30us);
} while (!IsLocked(0, kKey1));
pp_->at(1)->Await([&] {
Run({"flushdb"});
EXPECT_EQ(1, CheckedInt({"lpush", kKey1, "bar"}));
});
pop_fb.join();
}
TEST_F(ListFamilyTest, LRem) {
auto resp = Run({"rpush", kKey1, "a", "b", "a", "c"});
ASSERT_THAT(resp, IntArg(4));

View File

@ -148,12 +148,16 @@ TEST_F(RdbTest, ReloadTtl) {
TEST_F(RdbTest, SaveFlush) {
Run({"debug", "populate", "500000"});
auto save_fb = pp_->at(1)->LaunchFiber([&] {
RespExpr resp = Run({"save"});
ASSERT_EQ(resp, "OK");
});
usleep(1000);
do {
usleep(10);
} while (!service_->server_family().IsSaving());
Run({"flushdb"});
save_fb.join();
auto save_info = service_->server_family().GetLastSaveInfo();
@ -178,7 +182,10 @@ TEST_F(RdbTest, SaveManyDbs) {
RespExpr resp = Run({"save"});
ASSERT_EQ(resp, "OK");
});
usleep(1000);
do {
usleep(10);
} while (!service_->server_family().IsSaving());
pp_->at(1)->Await([&] {
Run({"select", "1"});

View File

@ -336,12 +336,15 @@ error_code ServerFamily::DoSave(Transaction* trans, string* err_details) {
saver.StartSnapshotInShard(shard);
return OpStatus::OK;
};
trans->ScheduleSingleHop(std::move(cb));
is_saving_.store(true, memory_order_relaxed);
// perform snapshot serialization, block the current fiber until it completes.
RdbTypeFreqMap freq_map;
ec = saver.SaveBody(&freq_map);
is_saving_.store(false, memory_order_relaxed);
absl::flat_hash_map<string_view, size_t> tmp_map;
for (const auto& k_v : freq_map) {
tmp_map[RdbTypeName(k_v.first)] += k_v.second;
@ -375,6 +378,8 @@ error_code ServerFamily::DoSave(Transaction* trans, string* err_details) {
}
error_code ServerFamily::DoFlush(Transaction* transaction, DbIndex db_ind) {
VLOG(1) << "DoFlush";
transaction->Schedule(); // TODO: to convert to ScheduleSingleHop ?
transaction->Execute(

View File

@ -71,6 +71,12 @@ class ServerFamily {
std::shared_ptr<const LastSaveInfo> GetLastSaveInfo() const;
std::error_code LoadRdb(const std::string& rdb_file);
// used within tests.
bool IsSaving() const {
return is_saving_.load(std::memory_order_relaxed);
}
private:
uint32_t shard_count() const {
return shard_set->size();
@ -119,6 +125,7 @@ class ServerFamily {
time_t start_time_ = 0; // in seconds, epoch time.
std::shared_ptr<LastSaveInfo> lsinfo_; // protected by save_mu_;
std::atomic_bool is_saving_{false};
};
} // namespace dfly