1. Snapshot now passed db index and rdb save handles by inserting select opcodes when needed.
2. Fix few more bugs in CVCUponInsert.
3. Save lua scripts.
This commit is contained in:
Roman Gershman 2022-05-20 23:25:41 +03:00
parent 599c786c16
commit 343dd22ce5
12 changed files with 134 additions and 37 deletions

View File

@ -355,6 +355,7 @@ class DashTable<_Key, _Value, Policy>::Iterator {
}
template <bool B = Policy::kUseVersion> std::enable_if_t<B, uint64_t> GetVersion() const {
assert(owner_ && seg_id_ < owner_->segment_.size());
return owner_->segment_[seg_id_]->GetVersion(bucket_id_);
}
@ -490,7 +491,7 @@ void DashTable<_Key, _Value, Policy>::CVCUponInsert(uint64_t ver_threshold, cons
// Segment is full, we need to return the whole segment, because it can be split
// and its entries can be reshuffled into different buckets.
for (uint8_t i = 0; i < kPhysicalBucketNum; ++i) {
if (target->GetVersion(i) < ver_threshold) {
if (target->GetVersion(i) < ver_threshold && !target->GetBucket(i).IsEmpty()) {
cb(bucket_iterator{this, seg_id, i});
}
}

View File

@ -126,6 +126,10 @@ template <unsigned NUM_SLOTS, unsigned NUM_STASH_FPS> class BucketBase {
return Size() == NUM_SLOTS;
}
bool IsEmpty() const {
return GetBusy() == 0;
}
unsigned Size() const {
return slotb_.Size();
}
@ -1331,8 +1335,9 @@ std::enable_if_t<UV, unsigned> Segment<Key, Value, Policy>::CVCOnInsert(uint64_t
uint8_t first = target.Size() > neighbor.Size() ? nid : bid;
unsigned cnt = 0;
if (!bucket_[first].IsFull()) {
if (bucket_[first].GetVersion() < ver_threshold) {
const Bucket& bfirst = bucket_[first];
if (!bfirst.IsFull()) {
if (!bfirst.IsEmpty() && bfirst.GetVersion() < ver_threshold) {
bid_res[cnt++] = first;
}
return cnt;
@ -1348,7 +1353,7 @@ std::enable_if_t<UV, unsigned> Segment<Key, Value, Policy>::CVCOnInsert(uint64_t
if (bucket_[nid].GetVersion() < ver_threshold)
bid_res[cnt++] = nid;
if (bucket_[after_next].GetVersion() < ver_threshold)
if (!bucket_[after_next].IsEmpty() && bucket_[after_next].GetVersion() < ver_threshold)
bid_res[cnt++] = after_next;
return cnt;
@ -1359,7 +1364,7 @@ std::enable_if_t<UV, unsigned> Segment<Key, Value, Policy>::CVCOnInsert(uint64_t
if (bucket_[bid].GetVersion() < ver_threshold)
bid_res[cnt++] = bid;
if (bucket_[prev_bid].GetVersion() < ver_threshold)
if (!bucket_[prev_bid].IsEmpty() && bucket_[prev_bid].GetVersion() < ver_threshold)
bid_res[cnt++] = prev_bid;
return cnt;
@ -1370,8 +1375,9 @@ std::enable_if_t<UV, unsigned> Segment<Key, Value, Policy>::CVCOnInsert(uint64_t
unsigned stash_bid = kNumBuckets + ((bid + i) % STASH_BUCKET_NUM);
const Bucket& stash = bucket_[stash_bid];
if (!stash.IsFull()) {
if (stash.GetVersion() < ver_threshold)
if (!stash.IsEmpty() && stash.GetVersion() < ver_threshold)
bid_res[cnt++] = stash_bid;
return cnt;
}
}

View File

@ -20,8 +20,10 @@
DECLARE_uint32(port);
DECLARE_uint32(dbnum);
DECLARE_uint32(memcache_port);
DECLARE_uint64(maxmemory);
DEFINE_bool(use_large_pages, false, "If true - uses large memory pages for allocations");
DEFINE_string(bind, "",
"Bind address. If empty - binds on all interfaces. "
@ -85,6 +87,12 @@ int main(int argc, char* argv[]) {
LOG(ERROR) << "Kernel 5.11 or later is supported. Exiting...";
return 1;
}
if (FLAGS_dbnum > dfly::kMaxDbId) {
LOG(ERROR) << "dbnum is too big. Exiting...";
return 1;
}
CHECK_LT(kver.minor, 99u);
dfly::kernel_version = kver.major * 100 + kver.minor;

View File

@ -31,6 +31,7 @@ extern "C" {
DECLARE_int32(list_max_listpack_size);
DECLARE_int32(list_compress_depth);
DECLARE_uint32(dbnum);
namespace dfly {
@ -189,7 +190,6 @@ struct RdbLoader::ObjSettings {
ObjSettings() = default;
};
RdbLoader::RdbLoader(EngineShardSet* ess, ScriptMgr* script_mgr)
: script_mgr_(script_mgr), ess_(*ess), mem_buf_{16_KB} {
shard_buf_.reset(new ItemsBuf[ess_.size()]);
@ -302,7 +302,9 @@ error_code RdbLoader::Load(io::Source* src) {
/* SELECTDB: Select the specified database. */
SET_OR_RETURN(LoadLen(nullptr), dbid);
if (dbid > kMaxDbId) {
if (dbid > FLAGS_dbnum) {
LOG(WARNING) << "database id " << dbid << " exceeds dbnum limit. Try increasing the flag.";
return RdbError(errc::bad_db_index);
}

View File

@ -157,6 +157,13 @@ RdbSerializer::RdbSerializer(io::Sink* s) : sink_(s), mem_buf_{4_KB}, tmp_buf_(n
RdbSerializer::~RdbSerializer() {
}
error_code RdbSerializer::SelectDb(uint32_t dbid) {
uint8_t buf[16];
buf[0] = RDB_OPCODE_SELECTDB;
unsigned enclen = SerializeLen(dbid, buf + 1);
return WriteRaw(Bytes{buf, enclen + 1});
}
// Called by snapshot
error_code RdbSerializer::SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms) {
uint8_t buf[16];
@ -496,7 +503,7 @@ error_code RdbSerializer::SaveLzfBlob(const io::Bytes& src, size_t uncompressed_
struct RdbSaver::Impl {
// used for serializing non-body components in the calling fiber.
RdbSerializer serializer;
SliceSnapshot::StringChannel channel;
SliceSnapshot::RecordChannel channel;
vector<unique_ptr<SliceSnapshot>> shard_snapshots;
// We pass K=sz to say how many producers are pushing data in order to maintain
@ -515,13 +522,13 @@ RdbSaver::RdbSaver(::io::Sink* sink) : sink_(sink) {
RdbSaver::~RdbSaver() {
}
std::error_code RdbSaver::SaveHeader() {
std::error_code RdbSaver::SaveHeader(const StringVec& lua_scripts) {
char magic[16];
size_t sz = absl::SNPrintF(magic, sizeof(magic), "REDIS%04d", RDB_VERSION);
CHECK_EQ(9u, sz);
RETURN_ON_ERR(impl_->serializer.WriteRaw(Bytes{reinterpret_cast<uint8_t*>(magic), sz}));
RETURN_ON_ERR(SaveAux());
RETURN_ON_ERR(SaveAux(lua_scripts));
return error_code{};
}
@ -531,16 +538,26 @@ error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) {
VLOG(1) << "SaveBody";
size_t num_written = 0;
string val;
SliceSnapshot::DbRecord record;
vector<string> vals;
vector<iovec> ivec;
uint8_t buf[16];
DbIndex last_db_index = kInvalidDbId;
buf[0] = RDB_OPCODE_SELECTDB;
auto& channel = impl_->channel;
while (channel.Pop(val)) {
vals.emplace_back(std::move(val));
while (channel.TryPop(val)) {
vals.emplace_back(std::move(val));
}
while (channel.Pop(record)) {
do {
if (record.db_index != last_db_index) {
unsigned enclen = SerializeLen(record.db_index, buf + 1);
char* str = (char*)buf;
vals.emplace_back(string(str, enclen + 1));
last_db_index = record.db_index;
}
vals.emplace_back(std::move(record.value));
} while (vals.size() < 256 && channel.TryPop(record));
ivec.resize(vals.size());
for (size_t i = 0; i < ivec.size(); ++i) {
ivec[i].iov_base = vals[i].data();
@ -578,7 +595,7 @@ void RdbSaver::StartSnapshotInShard(EngineShard* shard) {
impl_->shard_snapshots[shard->shard_id()] = move(s);
}
error_code RdbSaver::SaveAux() {
error_code RdbSaver::SaveAux(const StringVec& lua_scripts) {
static_assert(sizeof(void*) == 8, "");
int aof_preamble = false;
@ -590,11 +607,14 @@ error_code RdbSaver::SaveAux() {
RETURN_ON_ERR(SaveAuxFieldStrInt("ctime", time(NULL)));
// TODO: to implement used-mem caching.
RETURN_ON_ERR(SaveAuxFieldStrInt("used-mem", used_mem_current.load(memory_order_relaxed)));
RETURN_ON_ERR(SaveAuxFieldStrInt("aof-preamble", aof_preamble));
for (const string& s : lua_scripts) {
RETURN_ON_ERR(SaveAuxFieldStrStr("lua", s));
}
// TODO: "repl-stream-db", "repl-id", "repl-offset"
return error_code{};
}

View File

@ -13,9 +13,9 @@ extern "C" {
#include "base/io_buf.h"
#include "io/io.h"
#include "server/table.h"
#include "server/common.h"
namespace dfly {
class EngineShardSet;
class EngineShard;
// keys are RDB_TYPE_xxx constants.
@ -25,7 +25,7 @@ class RdbSaver {
explicit RdbSaver(::io::Sink* sink);
~RdbSaver();
std::error_code SaveHeader();
std::error_code SaveHeader(const StringVec& lua_scripts);
// Writes the RDB file into sink. Waits for the serialization to finish.
// Fills freq_map with the histogram of rdb types.
@ -40,7 +40,7 @@ class RdbSaver {
std::error_code SaveEpilog();
std::error_code SaveAux();
std::error_code SaveAux(const StringVec& lua_scripts);
std::error_code SaveAuxFieldStrStr(std::string_view key, std::string_view val);
std::error_code SaveAuxFieldStrInt(std::string_view key, int64_t val);
@ -48,10 +48,8 @@ class RdbSaver {
std::unique_ptr<Impl> impl_;
};
class RdbSerializer {
public:
RdbSerializer(::io::Sink* s = nullptr);
~RdbSerializer();
@ -65,6 +63,8 @@ class RdbSerializer {
return WriteRaw(::io::Bytes{&opcode, 1});
}
std::error_code SelectDb(uint32_t dbid);
// Must be called in the thread to which `it` belongs.
std::error_code SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms);
std::error_code WriteRaw(const ::io::Bytes& buf);

View File

@ -173,13 +173,35 @@ TEST_F(RdbTest, SaveManyDbs) {
ASSERT_EQ(2, metrics.db.size());
EXPECT_EQ(50000, metrics.db[0].key_count);
EXPECT_EQ(10000, metrics.db[1].key_count);
Run({"save"});
auto save_fb = pp_->at(0)->LaunchFiber([&] {
RespExpr resp = Run({"save"});
ASSERT_EQ(resp, "OK");
});
usleep(1000);
pp_->at(1)->Await([&] {
Run({"select", "1"});
for (unsigned i = 0; i < 1000; ++i) {
Run({"set", StrCat("abc", i), "bar"});
}
});
save_fb.join();
auto save_info = service_->server_family().GetLastSaveInfo();
ASSERT_EQ(1, save_info->freq_map.size());
auto& k_v = save_info->freq_map.front();
EXPECT_EQ("string", k_v.first);
// EXPECT_EQ(60000, k_v.second);
EXPECT_EQ(60000, k_v.second);
auto resp = Run({"debug", "reload", "NOSAVE"});
EXPECT_EQ(resp, "OK");
metrics = service_->server_family().GetMetrics();
ASSERT_EQ(2, metrics.db.size());
EXPECT_EQ(50000, metrics.db[0].key_count);
EXPECT_EQ(10000, metrics.db[1].key_count);
}
} // namespace dfly

View File

@ -109,4 +109,16 @@ const char* ScriptMgr::Find(std::string_view sha) const {
return it->second.get();
}
vector<string> ScriptMgr::GetLuaScripts() const {
vector<string> res;
lock_guard lk(mu_);
res.reserve(db_.size());
for (const auto& k_v : db_) {
res.emplace_back(k_v.second.get());
}
return res;
}
} // namespace dfly

View File

@ -27,9 +27,11 @@ class ScriptMgr {
// Returns body as null-terminated c-string. NULL if sha is not found.
const char* Find(std::string_view sha) const;
std::vector<std::string> GetLuaScripts() const;
private:
using ScriptKey = std::array<char, 40>;
absl::flat_hash_map<ScriptKey, std::unique_ptr<char[]>> db_;
absl::flat_hash_map<ScriptKey, std::unique_ptr<char[]>> db_; // protected by mu_
mutable ::boost::fibers::mutex mu_;
};

View File

@ -327,7 +327,9 @@ error_code ServerFamily::DoSave(Transaction* trans, string* err_details) {
RdbSaver saver{wf.get()};
RdbTypeFreqMap freq_map;
shared_ptr<LastSaveInfo> save_info;
ec = saver.SaveHeader();
StringVec lua_scripts = script_mgr_->GetLuaScripts();
ec = saver.SaveHeader(lua_scripts);
if (!ec) {
auto cb = [&saver](Transaction* t, EngineShard* shard) {

View File

@ -25,7 +25,7 @@ using namespace chrono_literals;
namespace this_fiber = ::boost::this_fiber;
using boost::fibers::fiber;
SliceSnapshot::SliceSnapshot(DbTableArray db_array, DbSlice* slice, StringChannel* dest)
SliceSnapshot::SliceSnapshot(DbTableArray db_array, DbSlice* slice, RecordChannel* dest)
: db_array_(db_array), db_slice_(slice), dest_(dest) {
}
@ -64,8 +64,18 @@ void SliceSnapshot::SerializeSingleEntry(DbIndex db_indx, const PrimeKey& pk,
expire_time = db_slice_->ExpireTime(eit);
}
if (db_indx != savecb_current_db_) {
FlushSfile(true);
}
error_code ec = rdb_serializer_->SaveEntry(pk, pv, expire_time);
CHECK(!ec); // we write to StringFile.
if (db_indx != savecb_current_db_) {
ec = rdb_serializer_->FlushMem();
CHECK(!ec && !sfile_->val.empty());
string tmp = std::move(sfile_->val);
dest_->Push(DbRecord{db_indx, std::move(tmp)});
}
}
// Serializes all the entries with version less than snapshot_version_.
@ -74,9 +84,10 @@ void SliceSnapshot::FiberFunc() {
absl::StrCat("SliceSnapshot", ProactorBase::GetIndex()));
PrimeTable::cursor cursor;
for (DbIndex db_indx = 0; db_indx < 1; ++db_indx) {
for (DbIndex db_indx = 0; db_indx < db_array_.size(); ++db_indx) {
if (!db_array_[db_indx])
continue;
PrimeTable* pt = &db_array_[db_indx]->prime;
VLOG(1) << "Start traversing " << pt->size() << " items";
@ -98,14 +109,17 @@ void SliceSnapshot::FiberFunc() {
this_fiber::yield();
last_yield = serialized_;
DVLOG(2) << "After sleep";
// flush in case other fibers (writes commands that pushed previous values) filled the file.
// flush in case other fibers (writes commands that pushed previous values)
// filled the buffer.
FlushSfile(false);
}
} while (cursor);
DVLOG(2) << "after loop " << this_fiber::properties<FiberProps>().name();
FlushSfile(true);
}
} // for (dbindex)
dest_->StartClosing();
VLOG(1) << "Exit SnapshotSerializer (serialized/side_saved/cbcalls): " << serialized_ << "/"
@ -131,7 +145,8 @@ bool SliceSnapshot::FlushSfile(bool force) {
VLOG(2) << "FlushSfile " << sfile_->val.size() << " bytes";
string tmp = std::move(sfile_->val); // important to move before pushing!
dest_->Push(std::move(tmp));
dest_->Push(DbRecord{savecb_current_db_, std::move(tmp)});
return true;
}

View File

@ -17,10 +17,17 @@ class RdbSerializer;
class SliceSnapshot {
public:
using StringChannel =
::util::fibers_ext::SimpleChannel<std::string, base::mpmc_bounded_queue<std::string>>;
// Each dbrecord should belong to exactly one db.
// RdbSaver adds "select" opcodes when necessary in order to maintain consistency.
struct DbRecord {
DbIndex db_index;
std::string value;
};
SliceSnapshot(DbTableArray db_array, DbSlice* slice, StringChannel* dest);
using RecordChannel =
::util::fibers_ext::SimpleChannel<DbRecord, base::mpmc_bounded_queue<DbRecord>>;
SliceSnapshot(DbTableArray db_array, DbSlice* slice, RecordChannel* dest);
~SliceSnapshot();
void Start();
@ -54,7 +61,7 @@ class SliceSnapshot {
uint64_t snapshot_version_ = 0;
DbSlice* db_slice_;
DbIndex savecb_current_db_; // used by SaveCb
StringChannel* dest_;
RecordChannel* dest_;
size_t serialized_ = 0, skipped_ = 0, side_saved_ = 0, savecb_calls_ = 0;
};