Introduce a naive snapshot flow and implement SaveBody for string/list entries

This commit is contained in:
Roman Gershman 2022-01-22 11:11:09 +02:00
parent 7af6aef4c7
commit 069ed12c68
7 changed files with 334 additions and 35 deletions

View File

@ -42,7 +42,11 @@ class DashTable : public detail::DashTableBase {
using Key_t = _Key;
using Value_t = _Value;
//! Number of "official" buckets that are used to position a key. In other words, does not include
//! stash buckets.
static constexpr unsigned kLogicalBucketNum = Policy::kBucketNum;
//! Total number of buckets in a segment (including stash).
static constexpr unsigned kPhysicalBucketNum = SegmentType::kTotalBuckets;
static constexpr unsigned kBucketSize = Policy::kSlotNum;
static constexpr double kTaxAmount = SegmentType::kTaxSize;
@ -142,29 +146,6 @@ class DashTable : public detail::DashTableBase {
seg->Value(bucket_id_, slot_id_)};
}
#if 0
const Key_t& key() const {
return owner_->segment_[seg_id_]->Key(bucket_id_, slot_id_);
}
// Generally we should not expose this method since hash-tables can not allow changing the keys
// of their key-value pairs. However, we need it for AddOrFind semantics - so we could implement
// without allocations. Use this method with caution - it might break hash-table.
Key_t* mutable_key() {
return &owner_->segment_[seg_id_]->Key(bucket_id_, slot_id_);
};
#endif
typename std::conditional<IsConst, const Value_t&, Value_t&>::type value() const {
return owner_->segment_[seg_id_]->Value(bucket_id_, slot_id_);
}
/*pointer operator->() const {
return std::addressof(value());
}
reference operator*() const {
return value();
}*/
// Make it self-contained. Does not need container::end().
bool is_done() const {

View File

@ -4,7 +4,7 @@ cxx_link(dragonfly base dragonfly_lib)
add_library(dragonfly_lib command_registry.cc common.cc config_flags.cc
conn_context.cc db_slice.cc debugcmd.cc dragonfly_listener.cc
dragonfly_connection.cc engine_shard_set.cc generic_family.cc
list_family.cc main_service.cc memcache_parser.cc rdb_save.cc
list_family.cc main_service.cc memcache_parser.cc rdb_save.cc rdb_snapshot.cc
redis_parser.cc reply_builder.cc server_family.cc string_family.cc transaction.cc)
cxx_link(dragonfly_lib dfly_core redis_lib uring_fiber_lib

View File

@ -48,7 +48,6 @@ struct SliceEvents {
SliceEvents& operator+=(const SliceEvents& o);
};
class DbSlice {
struct InternalDbStats {
// Number of inline keys.
@ -144,6 +143,11 @@ class DbSlice {
return id < db_arr_.size() && bool(db_arr_[id]);
}
std::pair<PrimeTable*, ExpireTable*> GetTables(DbIndex id) {
return std::pair<PrimeTable*, ExpireTable*>(&db_arr_[id]->main_table,
&db_arr_[id]->expire_table);
}
// Returns existing keys count in the db.
size_t DbSize(DbIndex db_ind) const;

View File

@ -15,6 +15,7 @@ extern "C" {
#include "base/logging.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/rdb_snapshot.h"
#include "util/fibers/simple_channel.h"
namespace dfly {
@ -107,6 +108,46 @@ inline unsigned SerializeLen(uint64_t len, uint8_t* buf) {
return 1 + 8;
}
uint8_t RdbObjectType(const robj* o) {
switch (o->type) {
case OBJ_STRING:
return RDB_TYPE_STRING;
case OBJ_LIST:
if (o->encoding == OBJ_ENCODING_QUICKLIST)
return RDB_TYPE_LIST_QUICKLIST;
LOG(FATAL) << ("Unknown list encoding");
break;
case OBJ_SET:
if (o->encoding == OBJ_ENCODING_INTSET)
return RDB_TYPE_SET_INTSET;
else if (o->encoding == OBJ_ENCODING_HT)
return RDB_TYPE_SET;
LOG(FATAL) << ("Unknown set encoding");
break;
case OBJ_ZSET:
if (o->encoding == OBJ_ENCODING_ZIPLIST)
return RDB_TYPE_ZSET_ZIPLIST;
else if (o->encoding == OBJ_ENCODING_SKIPLIST)
return RDB_TYPE_ZSET_2;
LOG(FATAL) << ("Unknown sorted set encoding");
break;
case OBJ_HASH:
if (o->encoding == OBJ_ENCODING_ZIPLIST)
return RDB_TYPE_HASH_ZIPLIST;
else if (o->encoding == OBJ_ENCODING_HT)
return RDB_TYPE_HASH;
LOG(FATAL) << ("Unknown hash encoding");
break;
case OBJ_STREAM:
return RDB_TYPE_STREAM_LISTPACKS;
case OBJ_MODULE:
return RDB_TYPE_MODULE_2;
default:
LOG(FATAL) << ("Unknown object type");
}
return 0; /* avoid warning */
}
} // namespace
RdbSerializer::RdbSerializer(io::Sink* s) : sink_(s), mem_buf_{4_KB}, tmp_buf_(nullptr) {
@ -115,6 +156,106 @@ RdbSerializer::RdbSerializer(io::Sink* s) : sink_(s), mem_buf_{4_KB}, tmp_buf_(n
RdbSerializer::~RdbSerializer() {
}
error_code RdbSerializer::SaveKeyVal(string_view key, const robj* val, uint64_t expire_ms) {
uint8_t buf[16];
/* Save the expire time */
if (expire_ms > 0) {
buf[0] = RDB_OPCODE_EXPIRETIME_MS;
absl::little_endian::Store64(buf + 1, expire_ms);
RETURN_ON_ERR(WriteRaw(Bytes{buf, 9}));
}
uint8_t rdb_type = RdbObjectType(val);
RETURN_ON_ERR(WriteOpcode(rdb_type));
RETURN_ON_ERR(SaveString(key));
return SaveObject(val);
}
error_code RdbSerializer::SaveKeyVal(string_view key, string_view value, uint64_t expire_ms) {
uint8_t buf[16];
/* Save the expire time */
if (expire_ms > 0) {
buf[0] = RDB_OPCODE_EXPIRETIME_MS;
absl::little_endian::Store64(buf + 1, expire_ms);
RETURN_ON_ERR(WriteRaw(Bytes{buf, 9}));
}
DVLOG(2) << "Saving keyval start " << key;
RETURN_ON_ERR(WriteOpcode(RDB_TYPE_STRING));
RETURN_ON_ERR(SaveString(key));
RETURN_ON_ERR(SaveString(value));
return error_code{};
}
error_code RdbSerializer::SaveObject(const robj* o) {
if (o->type == OBJ_STRING) {
/* Save a string value */
return SaveStringObject(o);
}
if (o->type == OBJ_LIST) {
/* Save a list value */
DCHECK_EQ(OBJ_ENCODING_QUICKLIST, o->encoding);
const quicklist* ql = reinterpret_cast<const quicklist*>(o->ptr);
quicklistNode* node = ql->head;
DVLOG(1) << "Saving list of length " << ql->len;
RETURN_ON_ERR(SaveLen(ql->len));
while (node) {
if (quicklistNodeIsCompressed(node)) {
void* data;
size_t compress_len = quicklistGetLzf(node, &data);
RETURN_ON_ERR(
SaveLzfBlob(Bytes{reinterpret_cast<uint8_t*>(data), compress_len}, node->sz));
} else {
RETURN_ON_ERR(SaveString(node->entry, node->sz));
}
node = node->next;
}
return error_code{};
}
LOG(FATAL) << "Not implemented " << o->type;
return error_code{};
}
error_code RdbSerializer::SaveStringObject(const robj* obj) {
/* Avoid to decode the object, then encode it again, if the
* object is already integer encoded. */
if (obj->encoding == OBJ_ENCODING_INT) {
return SaveLongLongAsString(long(obj->ptr));
}
CHECK(sdsEncodedObject(obj));
sds s = reinterpret_cast<sds>(obj->ptr);
return SaveString(std::string_view{s, sdslen(s)});
}
/* Save a long long value as either an encoded string or a string. */
error_code RdbSerializer::SaveLongLongAsString(int64_t value) {
uint8_t buf[32];
unsigned enclen = EncodeInteger(value, buf);
if (enclen > 0) {
return WriteRaw(Bytes{buf, enclen});
}
/* Encode as string */
enclen = ll2string((char*)buf, 32, value);
DCHECK_LT(enclen, 32u);
RETURN_ON_ERR(SaveLen(enclen));
return WriteRaw(Bytes{buf, enclen});
}
// TODO: if buf is large enough, it makes sense to write both mem_buf and buf
// directly to sink_.
error_code RdbSerializer::WriteRaw(const io::Bytes& buf) {
@ -209,16 +350,15 @@ error_code RdbSerializer::SaveLzfBlob(const io::Bytes& src, size_t uncompressed_
return error_code{};
}
using StringChannel =
::util::fibers_ext::SimpleChannel<std::string, base::mpmc_bounded_queue<std::string>>;
struct RdbSaver::Impl {
RdbSerializer serializer;
StringChannel channel;
RdbSnapshot::StringChannel channel;
vector<unique_ptr<RdbSnapshot>> handles;
// We pass K=sz to say how many producers are pushing data in order to maintain
// correct closing semantics - channel is closing when K producers marked it as closed.
Impl(unsigned sz) : channel{128, sz} {}
Impl(unsigned sz) : channel{128, sz}, handles(sz) {
}
};
RdbSaver::RdbSaver(EngineShardSet* ess, ::io::Sink* sink) : ess_(ess), sink_(sink) {
@ -244,6 +384,7 @@ std::error_code RdbSaver::SaveHeader() {
error_code RdbSaver::SaveBody() {
RETURN_ON_ERR(impl_->serializer.FlushMem());
VLOG(1) << "SaveBody";
size_t num_written = 0;
string val;
@ -265,6 +406,10 @@ error_code RdbSaver::SaveBody() {
vals.clear();
}
for (auto& ptr : impl_->handles) {
ptr->Join();
}
VLOG(1) << "Blobs written " << num_written;
RETURN_ON_ERR(SaveEpilog());
@ -273,9 +418,12 @@ error_code RdbSaver::SaveBody() {
}
void RdbSaver::StartSnapshotInShard(EngineShard* shard) {
LOG(FATAL) << "TBD";
}
auto pair = shard->db_slice().GetTables(0);
auto s = make_unique<RdbSnapshot>(pair.first, pair.second, &impl_->channel);
s->Start();
impl_->handles[shard->shard_id()] = move(s);
}
error_code RdbSaver::SaveAux() {
static_assert(sizeof(void*) == 8, "");

View File

@ -5,6 +5,7 @@
extern "C" {
#include "redis/lzfP.h"
#include "redis/object.h"
}
#include "base/io_buf.h"
@ -14,7 +15,6 @@ namespace dfly {
class EngineShardSet;
class EngineShard;
class RdbSerializer {
public:
RdbSerializer(::io::Sink* s = nullptr);
@ -30,6 +30,7 @@ class RdbSerializer {
return WriteRaw(::io::Bytes{&opcode, 1});
}
std::error_code SaveKeyVal(std::string_view key, const robj* val, uint64_t expire_ms);
std::error_code SaveKeyVal(std::string_view key, std::string_view value, uint64_t expire_ms);
std::error_code WriteRaw(const ::io::Bytes& buf);
std::error_code SaveString(std::string_view val);
@ -44,6 +45,9 @@ class RdbSerializer {
private:
std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len);
std::error_code SaveObject(const robj* o);
std::error_code SaveStringObject(const robj* obj);
std::error_code SaveLongLongAsString(int64_t value);
::io::Sink* sink_ = nullptr;
std::unique_ptr<LZF_HSLOT[]> lzf_;
@ -51,7 +55,6 @@ class RdbSerializer {
base::PODArray<uint8_t> tmp_buf_;
};
class RdbSaver {
public:
RdbSaver(EngineShardSet* ess, ::io::Sink* sink);
@ -63,6 +66,8 @@ class RdbSaver {
void StartSnapshotInShard(EngineShard* shard);
private:
struct Impl;
std::error_code SaveEpilog();
std::error_code SaveAux();
@ -71,7 +76,6 @@ class RdbSaver {
EngineShardSet* ess_;
::io::Sink* sink_;
struct Impl;
std::unique_ptr<Impl> impl_;
};

120
server/rdb_snapshot.cc Normal file
View File

@ -0,0 +1,120 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/rdb_snapshot.h"
extern "C" {
#include "redis/object.h"
}
#include <bitset>
#include "base/logging.h"
#include "server/rdb_save.h"
#include "util/fiber_sched_algo.h"
namespace dfly {
using namespace boost;
using namespace std;
using namespace util;
using namespace chrono_literals;
RdbSnapshot::RdbSnapshot(PrimeTable* prime, ExpireTable* et, StringChannel* dest)
: prime_table_(prime), dest_(dest) {
}
RdbSnapshot::~RdbSnapshot() {
}
void RdbSnapshot::Start() {
DCHECK(!fb_.joinable());
VLOG(1) << "DbSaver::Start";
sfile_.reset(new io::StringFile);
rdb_serializer_.reset(new RdbSerializer(sfile_.get()));
fb_ = fibers::fiber([this] { FiberFunc(); });
}
void RdbSnapshot::Join() {
fb_.join();
}
static_assert(sizeof(PrimeTable::const_iterator) == 16);
void RdbSnapshot::PhysicalCb(MainIterator it) {
error_code ec;
string tmp;
auto key = it->first.GetSlice(&tmp);
// TODO: fetch expire.
if (it->second.ObjType() == OBJ_STRING) {
ec = rdb_serializer_->SaveKeyVal(key, it->second.ToString(), 0);
} else {
robj* obj = it->second.AsRObj();
ec = rdb_serializer_->SaveKeyVal(key, obj, 0);
}
CHECK(!ec); // we write to StringFile.
++processed_;
}
// Serializes all the entries with version less than top_version.
void RdbSnapshot::FiberFunc() {
this_fiber::properties<FiberProps>().set_name("RdbSnapshot");
uint64_t cursor = 0;
// it's important that cb will run uninterrupted.
// so no I/O work inside it.
// We flush our string file to disk in the traverse loop below.
auto save_cb = [&](const MainIterator& it) {
this->PhysicalCb(it);
return false;
};
uint64_t last_yield = 0;
do {
DVLOG(2) << "traverse cusrsor " << cursor;
// Traverse a single logical bucket but do not update its versions.
// we can not update a version because entries in the same bucket share part of the version.
// Therefore we save first, and then update version in one atomic swipe.
uint64_t next = prime_table_->Traverse(cursor, save_cb);
cursor = next;
// Flush if needed.
FlushSfile();
if (processed_ >= last_yield + 200) {
this_fiber::yield();
last_yield = processed_;
// flush in case other fibers (writes commands that pushed previous values) filled the file.
FlushSfile();
}
} while (cursor > 0);
auto ec = rdb_serializer_->FlushMem();
CHECK(!ec);
FlushSfile();
dest_->StartClosing();
VLOG(1) << "Exit RdbProducer fiber with " << processed_ << " processed";
}
void RdbSnapshot::FlushSfile() {
// Flush the string file if needed.
if (!sfile_->val.empty()) {
// Make sure we flush everything from membuffer in order to preserve the atomicity of keyvalue
// serializations.
auto ec = rdb_serializer_->FlushMem();
CHECK(!ec); // stringfile always succeeds.
string tmp = std::move(sfile_->val); // important to move before pushing!
dest_->Push(std::move(tmp));
}
}
} // namespace dfly

42
server/rdb_snapshot.h Normal file
View File

@ -0,0 +1,42 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include "io/file.h"
#include "server/table.h"
#include "util/fibers/simple_channel.h"
namespace dfly {
class RdbSerializer;
class RdbSnapshot {
public:
using StringChannel =
::util::fibers_ext::SimpleChannel<std::string, base::mpmc_bounded_queue<std::string>>;
RdbSnapshot(PrimeTable* prime, ExpireTable* et, StringChannel* dest);
~RdbSnapshot();
void Start();
void Join();
private:
void FiberFunc();
void FlushSfile();
void PhysicalCb(MainIterator it);
::boost::fibers::fiber fb_;
std::unique_ptr<io::StringFile> sfile_;
std::unique_ptr<RdbSerializer> rdb_serializer_;
PrimeTable* prime_table_;
StringChannel* dest_;
uint64_t processed_ = 0;
};
} // namespace dfly