docs(rdb): More work on the rdb snapshotting document.

Also add some debugging command to control the replication flow.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2022-08-28 17:27:34 +03:00
parent c72939d0d4
commit ae65c489e5
7 changed files with 132 additions and 22 deletions

View File

@ -36,22 +36,90 @@ and replay it by distributing entries among `K` replica shards. After all the sn
they will continue with replaying the change log (stable state replication), which is out of context they will continue with replaying the change log (stable state replication), which is out of context
of this document. of this document.
## Moving point-in-time (TBD) ## Relaxed point-in-time (TBD)
When DF saves its snapshot file on disk, it maintains snapshot isolation by applying a virtual cut When DF saves its snapshot file on disk, it maintains snapshot isolation by applying a virtual cut
through all the process shards. Snapshotting may take time during which DF may apply multiple mutations through all the process shards. Snapshotting may take time, during which, DF may process many write requests.
to its contents. These mutations won't be part of the snapshot since it captures data These mutations won't be part of the snapshot, because the cut captures data up to the point
*right before* it has started. This is perfect for backups. **it has started**. This is perfect for backups. I call this variation - conservative snapshotting.
However, when we peform snapshotting for replication, we would love to produce a snapshot However, when we peform snapshotting for replication, we would like to produce a snapshot
that includes all the data upto point in time when the snapshotting finishes. that includes all the data upto point in time when the snapshotting **finishes**. I called
(Why, actually, we do not want similar semantics for file snapshots??). this *relaxed snapshotting*. The reason for relaxed snapshotting is to avoid keeping the changelog
of all mutations during the snapshot creation.
The reason for this is because, otherwise, we would need to keep aside a change-log As a side comment - we could, in theory, support the same (relaxed)
of all mutations after the snapshot is started so we could replay it after it finishes. semantics for file snapshots, but it's no necessary since it might increase the snapshot sizes.
We would need this change-log anyways, after the snapshot finishes - this changelog is what The snapshotting phase (full-sync) can take up lots of time which add lots of memory pressure on the system.
provides data for the stable state replication. However, the snapshotting phase can take up Keeping the change-log aside during the full-sync phase will only add more pressure.
lots of time and add lots of memory pressure on the system. Keeping the change-log during this phase, We achieve relaxed snapshotting by pushing the changes into the replication sockets without saving them aside.
will only add more pressure. By relaxing the requirement of point-in-time we can push the changelog Of course, we would still need a point-in-time consistency,
into replication sockets without saving it aside. Of course we would still need a point in time consistency, in order to know when the snapshotting finished and the stable state replication started.
in order to know when the snapshotting finished and stable state replication started.
## Conservative and relaxed snapshotting variations
Both algorithms maintain a scanning process (fiber) that iterarively goes over the main dictionary
and serializes its data. Before starting the process, the SnapshotShard captures
the change epoch of its shard (this epoch is increased with each write request).
```cpp
SnapshotShard.epoch = shard.epoch++;
```
For sake of simplicity, we can assume that each entry in the shard maintains its own version counter.
By capturing the epoch number we establish a cut: all entries with `version <= SnapshotShard.epoch`
have not been serialized yet and were not modified by the concurrent writes.
The DashTable iteration algorithm guarantees convergeance and coverage ("at most once"),
but it does not guarantee that each entry is visited *exactly once*.
Therefore, we use entry versions for two things: 1) to avoid serialization of the same entry multiple times,
and 2) to correctly serialize entries that need to change due to concurrent writes.
Serialization Fiber:
```cpp
for (entry : table) {
if (entry.version <= cut.epoch) {
entry.version = cut.epoch + 1;
SendToSerializationSink(entry);
}
}
```
To allow concurrent writes during the snapshotting phase, we setup a hook that is triggerred on each
entry mutation in the table:
OnWriteHook:
```cpp
....
if (entry.version <= cut.version) {
SendToSerializationSink(entry);
}
...
entry = new_entry;
entry.version = shard.epoch++; // guaranteed to become > cut.version
```
Please note that this hook maintains point-in-time semantics for the conservative variation by pushing
the previous value of the entry into the sink before changing it.
However, for the relaxed point-in-time, we do not have to store the old value.
Therefore, we can do the following:
OnWriteHook:
```cpp
if (entry.version <= cut.version) {
SendToSerializationSink(new_entry); // do not have to send the old value
} else {
// Keep sending the changes.
SendToSerializationSink(IncrementalDiff(entry, new_entry));
}
entry = new_entry;
entry.version = shard.epoch++;
```
The change data is sent along with the rest of the contents, and it requires to extend
the existing rdb format to support differential operations like (hset, append, etc).
The Serialization Fiber loop is the same for this variation.

View File

@ -34,8 +34,8 @@ using boost::intrusive_ptr;
using boost::fibers::fiber; using boost::fibers::fiber;
using namespace facade; using namespace facade;
namespace fs = std::filesystem; namespace fs = std::filesystem;
using absl::StrAppend;
using absl::GetFlag; using absl::GetFlag;
using absl::StrAppend;
struct PopulateBatch { struct PopulateBatch {
DbIndex dbid; DbIndex dbid;
@ -84,14 +84,17 @@ void DebugCmd::Run(CmdArgList args) {
"DEBUG <subcommand> [<arg> [value] [opt] ...]. Subcommands are:", "DEBUG <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",
"OBJECT <key>", "OBJECT <key>",
" Show low-level info about `key` and associated value.", " Show low-level info about `key` and associated value.",
"LOAD <filename>" "LOAD <filename>",
"RELOAD [option ...]", "RELOAD [option ...]",
" Save the RDB on disk (TBD) and reload it back to memory. Valid <option> values:", " Save the RDB on disk and reload it back to memory. Valid <option> values:",
" * NOSAVE: the database will be loaded from an existing RDB file.", " * NOSAVE: the database will be loaded from an existing RDB file.",
" Examples:", " Examples:",
" * DEBUG RELOAD NOSAVE: replace the current database with the contents of an", " * DEBUG RELOAD NOSAVE: replace the current database with the contents of an",
" existing RDB file.", " existing RDB file.",
"REPLICA PAUSE/RESUME",
" Stops replica from reconnecting to master, or resumes",
"WATCHED", "WATCHED",
" Shows the watched keys as a result of BLPOP and similar operations."
"POPULATE <count> [<prefix>] [<size>]", "POPULATE <count> [<prefix>] [<size>]",
" Create <count> string keys named key:<num>. If <prefix> is specified then", " Create <count> string keys named key:<num>. If <prefix> is specified then",
" it is used instead of the 'key' prefix.", " it is used instead of the 'key' prefix.",
@ -110,6 +113,11 @@ void DebugCmd::Run(CmdArgList args) {
if (subcmd == "RELOAD") { if (subcmd == "RELOAD") {
return Reload(args); return Reload(args);
} }
if (subcmd == "REPLICA" && args.size() == 3) {
return Replica(args);
}
if (subcmd == "WATCHED") { if (subcmd == "WATCHED") {
return Watched(); return Watched();
} }
@ -161,6 +169,18 @@ void DebugCmd::Reload(CmdArgList args) {
Load(last_save_file); Load(last_save_file);
} }
void DebugCmd::Replica(CmdArgList args) {
args.remove_prefix(2);
ToUpper(&args[0]);
string_view opt = ArgS(args, 0);
if (opt == "PAUSE" || opt == "RESUME") {
sf_.PauseReplication(opt == "PAUSE");
return (*cntx_)->SendOk();
}
return (*cntx_)->SendError(UnknownSubCmd("replica", "DEBUG"));
}
void DebugCmd::Load(string_view filename) { void DebugCmd::Load(string_view filename) {
GlobalState new_state = sf_.service().SwitchState(GlobalState::ACTIVE, GlobalState::LOADING); GlobalState new_state = sf_.service().SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
if (new_state != GlobalState::LOADING) { if (new_state != GlobalState::LOADING) {

View File

@ -21,6 +21,7 @@ class DebugCmd {
void Populate(CmdArgList args); void Populate(CmdArgList args);
void PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view prefix, unsigned value_len); void PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view prefix, unsigned value_len);
void Reload(CmdArgList args); void Reload(CmdArgList args);
void Replica(CmdArgList args);
void Load(std::string_view filename); void Load(std::string_view filename);
void Inspect(std::string_view key); void Inspect(std::string_view key);
void Watched(); void Watched();

View File

@ -173,6 +173,9 @@ void Replica::ReplicateFb() {
while (state_mask_ & R_ENABLED) { while (state_mask_ & R_ENABLED) {
if ((state_mask_ & R_TCP_CONNECTED) == 0) { if ((state_mask_ & R_TCP_CONNECTED) == 0) {
this_fiber::sleep_for(500ms); this_fiber::sleep_for(500ms);
if (is_paused_)
continue;
ec = ConnectSocket(); ec = ConnectSocket();
if (ec) { if (ec) {
LOG(ERROR) << "Error connecting " << ec; LOG(ERROR) << "Error connecting " << ec;
@ -330,8 +333,8 @@ error_code Replica::InitiatePSync() {
string id("?"); // corresponds to null master id and null offset string id("?"); // corresponds to null master id and null offset
int64_t offs = -1; int64_t offs = -1;
if (!master_repl_id_.empty()) { // in case we synced before if (!master_repl_id_.empty()) { // in case we synced before
id = master_repl_id_; // provide the replication offset and master id id = master_repl_id_; // provide the replication offset and master id
offs = repl_offs_; // to try incremental sync. offs = repl_offs_; // to try incremental sync.
} }
serializer.SendCommand(StrCat("PSYNC ", id, " ", offs)); serializer.SendCommand(StrCat("PSYNC ", id, " ", offs));
RETURN_ON_ERR(serializer.ec()); RETURN_ON_ERR(serializer.ec());
@ -519,7 +522,6 @@ error_code Replica::ConsumeRedisStream() {
time_t last_ack = time(nullptr); time_t last_ack = time(nullptr);
string ack_cmd; string ack_cmd;
// basically reflection of dragonfly_connection IoLoop function. // basically reflection of dragonfly_connection IoLoop function.
while (!ec) { while (!ec) {
io::MutableBytes buf = io_buf.AppendBuffer(); io::MutableBytes buf = io_buf.AppendBuffer();
@ -562,6 +564,10 @@ auto Replica::GetInfo() const -> Info {
}); });
} }
void Replica::Pause(bool pause) {
sock_thread_->Await([&] { is_paused_ = pause; });
}
error_code Replica::ParseAndExecute(base::IoBuf* io_buf) { error_code Replica::ParseAndExecute(base::IoBuf* io_buf) {
VLOG(1) << "ParseAndExecute: input len " << io_buf->InputLen(); VLOG(1) << "ParseAndExecute: input len " << io_buf->InputLen();
if (parser_->stash_size() > 0) { if (parser_->stash_size() > 0) {
@ -588,7 +594,7 @@ error_code Replica::ParseAndExecute(base::IoBuf* io_buf) {
service_.DispatchCommand(arg_list, &conn_context); service_.DispatchCommand(arg_list, &conn_context);
} }
io_buf->ConsumeInput(consumed); io_buf->ConsumeInput(consumed);
break; break;
case RedisParser::INPUT_PENDING: case RedisParser::INPUT_PENDING:
io_buf->ConsumeInput(consumed); io_buf->ConsumeInput(consumed);
break; break;

View File

@ -47,6 +47,7 @@ class Replica {
// Threadsafe, fiber blocking. // Threadsafe, fiber blocking.
Info GetInfo() const; Info GetInfo() const;
void Pause(bool pause);
private: private:
// The flow is : R_ENABLED -> R_TCP_CONNECTED -> (R_SYNCING) -> R_SYNC_OK. // The flow is : R_ENABLED -> R_TCP_CONNECTED -> (R_SYNCING) -> R_SYNC_OK.
@ -97,6 +98,7 @@ class Replica {
size_t repl_offs_ = 0, ack_offs_ = 0; size_t repl_offs_ = 0, ack_offs_ = 0;
uint64_t last_io_time_ = 0; // in ns, monotonic clock. uint64_t last_io_time_ = 0; // in ns, monotonic clock.
unsigned state_mask_ = 0; unsigned state_mask_ = 0;
bool is_paused_ = false;
}; };
} // namespace dfly } // namespace dfly

View File

@ -545,6 +545,17 @@ void ServerFamily::ConfigureMetrics(util::HttpListenerBase* http_base) {
http_base->RegisterCb("/metrics", cb); http_base->RegisterCb("/metrics", cb);
} }
void ServerFamily::PauseReplication(bool pause) {
unique_lock lk(replicaof_mu_);
// Switch to primary mode.
if (!ServerState::tlocal()->is_master) {
auto repl_ptr = replica_;
CHECK(repl_ptr);
repl_ptr->Pause(pause);
}
}
void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext* cntx) { void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext* cntx) {
if (!section.empty()) { if (!section.empty()) {
return cntx->reply_builder()->SendError(""); return cntx->reply_builder()->SendError("");

View File

@ -91,6 +91,8 @@ class ServerFamily {
void ConfigureMetrics(util::HttpListenerBase* listener); void ConfigureMetrics(util::HttpListenerBase* listener);
void PauseReplication(bool pause);
private: private:
uint32_t shard_count() const { uint32_t shard_count() const {
return shard_set->size(); return shard_set->size();