Add consumer part of RdbSave flow

This commit is contained in:
Roman Gershman 2022-01-21 23:37:42 +02:00
parent d3ccd5b836
commit 7af6aef4c7
3 changed files with 77 additions and 17 deletions

View File

@ -15,6 +15,7 @@ extern "C" {
#include "base/logging.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "util/fibers/simple_channel.h"
namespace dfly {
@ -75,7 +76,6 @@ unsigned TryIntegerEncoding(string_view input, uint8_t* dest) {
return EncodeInteger(value, dest);
}
/* Saves an encoded length. The first two bits in the first byte are used to
* hold the encoding type. See the RDB_* definitions for more information
* on the types of encoding. buf must be at least 9 bytes.
@ -109,7 +109,6 @@ inline unsigned SerializeLen(uint64_t len, uint8_t* buf) {
} // namespace
RdbSerializer::RdbSerializer(io::Sink* s) : sink_(s), mem_buf_{4_KB}, tmp_buf_(nullptr) {
}
@ -131,8 +130,8 @@ error_code RdbSerializer::WriteRaw(const io::Bytes& buf) {
if (ib.empty()) {
RETURN_ON_ERR(sink_->Write(buf));
} else {
iovec v[2] = { {.iov_base = const_cast<uint8_t*>(ib.data()), .iov_len = ib.size()},
{.iov_base = const_cast<uint8_t*>(buf.data()), .iov_len = buf.size()}};
iovec v[2] = {{.iov_base = const_cast<uint8_t*>(ib.data()), .iov_len = ib.size()},
{.iov_base = const_cast<uint8_t*>(buf.data()), .iov_len = buf.size()}};
RETURN_ON_ERR(sink_->Write(v, ABSL_ARRAYSIZE(v)));
mem_buf_.ConsumeInput(ib.size());
}
@ -199,7 +198,6 @@ error_code RdbSerializer::SaveLen(size_t len) {
return WriteRaw(Bytes{buf, enclen});
}
error_code RdbSerializer::SaveLzfBlob(const io::Bytes& src, size_t uncompressed_len) {
/* Data compressed! Let's save it on disk */
uint8_t opcode = (RDB_ENCVAL << 6) | RDB_ENC_LZF;
@ -211,9 +209,23 @@ error_code RdbSerializer::SaveLzfBlob(const io::Bytes& src, size_t uncompressed_
return error_code{};
}
using StringChannel =
::util::fibers_ext::SimpleChannel<std::string, base::mpmc_bounded_queue<std::string>>;
struct RdbSaver::Impl {
RdbSerializer serializer;
StringChannel channel;
// We pass K=sz to say how many producers are pushing data in order to maintain
// correct closing semantics - channel is closing when K producers marked it as closed.
Impl(unsigned sz) : channel{128, sz} {}
};
RdbSaver::RdbSaver(EngineShardSet* ess, ::io::Sink* sink) : ess_(ess), sink_(sink) {
CHECK_NOTNULL(sink_);
serializer_.set_sink(sink_);
impl_.reset(new Impl(ess->size()));
impl_->serializer.set_sink(sink_);
}
RdbSaver::~RdbSaver() {
@ -224,12 +236,46 @@ std::error_code RdbSaver::SaveHeader() {
size_t sz = absl::SNPrintF(magic, sizeof(magic), "REDIS%04d", RDB_VERSION);
CHECK_EQ(9u, sz);
RETURN_ON_ERR(serializer_.WriteRaw(Bytes{reinterpret_cast<uint8_t*>(magic), sz}));
RETURN_ON_ERR(impl_->serializer.WriteRaw(Bytes{reinterpret_cast<uint8_t*>(magic), sz}));
RETURN_ON_ERR(SaveAux());
return error_code{};
}
error_code RdbSaver::SaveBody() {
RETURN_ON_ERR(impl_->serializer.FlushMem());
size_t num_written = 0;
string val;
vector<string> vals;
vector<iovec> ivec;
auto& channel = impl_->channel;
while (channel.Pop(val)) {
vals.emplace_back(std::move(val));
while (channel.TryPop(val)) {
vals.emplace_back(std::move(val));
}
ivec.resize(vals.size());
for (size_t i = 0; i < ivec.size(); ++i) {
ivec[i].iov_base = vals[i].data();
ivec[i].iov_len = vals[i].size();
}
RETURN_ON_ERR(sink_->Write(ivec.data(), ivec.size()));
vals.clear();
}
VLOG(1) << "Blobs written " << num_written;
RETURN_ON_ERR(SaveEpilog());
return error_code{};
}
void RdbSaver::StartSnapshotInShard(EngineShard* shard) {
LOG(FATAL) << "TBD";
}
error_code RdbSaver::SaveAux() {
static_assert(sizeof(void*) == 8, "");
@ -256,23 +302,26 @@ error_code RdbSaver::SaveEpilog() {
uint8_t buf[8];
uint64_t chksum;
auto& ser = impl_->serializer;
/* EOF opcode */
RETURN_ON_ERR(serializer_.WriteOpcode(RDB_OPCODE_EOF));
RETURN_ON_ERR(ser.WriteOpcode(RDB_OPCODE_EOF));
/* CRC64 checksum. It will be zero if checksum computation is disabled, the
* loading code skips the check in this case. */
chksum = 0;
absl::little_endian::Store64(buf, chksum);
RETURN_ON_ERR(serializer_.WriteRaw(buf));
RETURN_ON_ERR(ser.WriteRaw(buf));
return serializer_.FlushMem();
return ser.FlushMem();
}
error_code RdbSaver::SaveAuxFieldStrStr(string_view key, string_view val) {
RETURN_ON_ERR(serializer_.WriteOpcode(RDB_OPCODE_AUX));
RETURN_ON_ERR(serializer_.SaveString(key));
RETURN_ON_ERR(serializer_.SaveString(val));
auto& ser = impl_->serializer;
RETURN_ON_ERR(ser.WriteOpcode(RDB_OPCODE_AUX));
RETURN_ON_ERR(ser.SaveString(key));
RETURN_ON_ERR(ser.SaveString(val));
return error_code{};
}

View File

@ -58,17 +58,21 @@ class RdbSaver {
~RdbSaver();
std::error_code SaveHeader();
std::error_code SaveEpilog();
std::error_code SaveBody();
void StartSnapshotInShard(EngineShard* shard);
private:
std::error_code SaveEpilog();
std::error_code SaveAux();
std::error_code SaveAuxFieldStrStr(std::string_view key, std::string_view val);
std::error_code SaveAuxFieldStrInt(std::string_view key, int64_t val);
EngineShardSet* ess_;
::io::Sink* sink_;
RdbSerializer serializer_;
struct Impl;
std::unique_ptr<Impl> impl_;
};
} // namespace dfly

View File

@ -169,7 +169,14 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
ec = saver.SaveHeader();
if (!ec) {
ec = saver.SaveEpilog();
auto cb = [&saver](Transaction* t, EngineShard* shard) {
saver.StartSnapshotInShard(shard);
return OpStatus::OK;
};
cntx->transaction->ScheduleSingleHop(std::move(cb));
// perform snapshot serialization, block the current fiber until it completes.
ec = saver.SaveBody();
}
if (ec) {