Add global state to the server to prevent multiple exclusive operations to run concurrently

This commit is contained in:
Roman Gershman 2022-01-26 09:05:15 +02:00
parent 254e640a19
commit b83c201e30
5 changed files with 91 additions and 2 deletions

View File

@ -2,12 +2,12 @@
// See LICENSE for licensing terms.
//
#include "server/common_types.h"
#include <absl/strings/str_cat.h>
#include "base/logging.h"
#include "server/common_types.h"
#include "server/error.h"
#include "server/global_state.h"
#include "server/server_state.h"
namespace dfly {
@ -51,6 +51,20 @@ const char kUintErr[] = "value is out of range, must be positive";
const char kDbIndOutOfRangeErr[] = "DB index is out of range";
const char kInvalidDbIndErr[] = "invalid DB index";
const char* GlobalState::Name(S s) {
switch (s) {
case GlobalState::IDLE:
return "IDLE";
case GlobalState::LOADING:
return "LOADING";
case GlobalState::SAVING:
return "SAVING";
case GlobalState::SHUTTING_DOWN:
return "SHUTTING DOWN";
}
ABSL_INTERNAL_UNREACHABLE;
}
} // namespace dfly
namespace std {

54
server/global_state.h Normal file
View File

@ -0,0 +1,54 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <atomic>
#include <utility>
namespace dfly {
// Switches from IDLE state to any of the non-idle states.
// Switched from non-idle to IDLE.
// Refuses switching from non-idle to another non-idle state directly.
//
class GlobalState {
public:
enum S : uint8_t {
IDLE,
LOADING,
SAVING,
SHUTTING_DOWN,
};
GlobalState(S s = IDLE) : s_(s) {
}
const char* Name() const {
return Name(s_.load(std::memory_order_relaxed));
}
static const char* Name(S s);
std::pair<S, bool> Next(S s) {
S current{IDLE};
bool res = s_.compare_exchange_strong(current, s, std::memory_order_acq_rel);
return std::make_pair(current, res);
}
// Switches to IDLE and returns the previous state.
S Clear() {
return s_.exchange(IDLE, std::memory_order_acq_rel);
}
// Returns the current state.
S Load() const {
return s_.load(std::memory_order_acq_rel);
}
private:
std::atomic<S> s_;
};
} // namespace dfly

View File

@ -87,6 +87,11 @@ void Service::Init(util::AcceptServer* acceptor, const InitOpts& opts) {
void Service::Shutdown() {
VLOG(1) << "Service::Shutdown";
auto [current, switched] = server_family_.global_state()->Next(GlobalState::SHUTTING_DOWN);
// TODO: to introduce BlockingNext that waits until the state is switched to idle.
CHECK(switched) << "TBD " << GlobalState::Name(current);
engine_varz.reset();
request_latency_usec.Shutdown();
ping_qps.Shutdown();

View File

@ -140,6 +140,16 @@ void ServerFamily::Debug(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
static unsigned fl_index = 1;
auto [current, switched] = global_state_.Next(GlobalState::SAVING);
if (!switched) {
string error = absl::StrCat(GlobalState::Name(current), " - can not save database");
return cntx->SendError(error);
}
absl::Cleanup rev_state = [this] {
global_state_.Clear();
};
fs::path dir_path(FLAGS_dir);
error_code ec;

View File

@ -6,6 +6,7 @@
#include "server/common_types.h"
#include "server/engine_shard_set.h"
#include "server/global_state.h"
#include "util/proactor_pool.h"
namespace util {
@ -36,6 +37,10 @@ class ServerFamily {
Metrics GetMetrics() const;
GlobalState* global_state() {
return &global_state_;
}
private:
uint32_t shard_count() const {
return ess_.size();
@ -56,6 +61,7 @@ class ServerFamily {
util::AcceptServer* acceptor_ = nullptr;
std::atomic<int64_t> last_save_; // in seconds.
GlobalState global_state_;
};
} // namespace dfly