diff --git a/core/dash.h b/core/dash.h index 15a83a9..e78cfbd 100644 --- a/core/dash.h +++ b/core/dash.h @@ -42,7 +42,11 @@ class DashTable : public detail::DashTableBase { using Key_t = _Key; using Value_t = _Value; + //! Number of "official" buckets that are used to position a key. In other words, does not include + //! stash buckets. static constexpr unsigned kLogicalBucketNum = Policy::kBucketNum; + + //! Total number of buckets in a segment (including stash). static constexpr unsigned kPhysicalBucketNum = SegmentType::kTotalBuckets; static constexpr unsigned kBucketSize = Policy::kSlotNum; static constexpr double kTaxAmount = SegmentType::kTaxSize; @@ -142,29 +146,6 @@ class DashTable : public detail::DashTableBase { seg->Value(bucket_id_, slot_id_)}; } -#if 0 - const Key_t& key() const { - return owner_->segment_[seg_id_]->Key(bucket_id_, slot_id_); - } - - // Generally we should not expose this method since hash-tables can not allow changing the keys - // of their key-value pairs. However, we need it for AddOrFind semantics - so we could implement - // without allocations. Use this method with caution - it might break hash-table. - Key_t* mutable_key() { - return &owner_->segment_[seg_id_]->Key(bucket_id_, slot_id_); - }; -#endif - typename std::conditional::type value() const { - return owner_->segment_[seg_id_]->Value(bucket_id_, slot_id_); - } - - /*pointer operator->() const { - return std::addressof(value()); - } - - reference operator*() const { - return value(); - }*/ // Make it self-contained. Does not need container::end(). bool is_done() const { diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 5c96d99..8e545e4 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -4,7 +4,7 @@ cxx_link(dragonfly base dragonfly_lib) add_library(dragonfly_lib command_registry.cc common.cc config_flags.cc conn_context.cc db_slice.cc debugcmd.cc dragonfly_listener.cc dragonfly_connection.cc engine_shard_set.cc generic_family.cc - list_family.cc main_service.cc memcache_parser.cc rdb_save.cc + list_family.cc main_service.cc memcache_parser.cc rdb_save.cc rdb_snapshot.cc redis_parser.cc reply_builder.cc server_family.cc string_family.cc transaction.cc) cxx_link(dragonfly_lib dfly_core redis_lib uring_fiber_lib diff --git a/server/db_slice.h b/server/db_slice.h index 3d0de57..97eef9b 100644 --- a/server/db_slice.h +++ b/server/db_slice.h @@ -48,7 +48,6 @@ struct SliceEvents { SliceEvents& operator+=(const SliceEvents& o); }; - class DbSlice { struct InternalDbStats { // Number of inline keys. @@ -144,6 +143,11 @@ class DbSlice { return id < db_arr_.size() && bool(db_arr_[id]); } + std::pair GetTables(DbIndex id) { + return std::pair(&db_arr_[id]->main_table, + &db_arr_[id]->expire_table); + } + // Returns existing keys count in the db. size_t DbSize(DbIndex db_ind) const; diff --git a/server/rdb_save.cc b/server/rdb_save.cc index b717fa5..0c4a3cd 100644 --- a/server/rdb_save.cc +++ b/server/rdb_save.cc @@ -15,6 +15,7 @@ extern "C" { #include "base/logging.h" #include "server/engine_shard_set.h" #include "server/error.h" +#include "server/rdb_snapshot.h" #include "util/fibers/simple_channel.h" namespace dfly { @@ -107,6 +108,46 @@ inline unsigned SerializeLen(uint64_t len, uint8_t* buf) { return 1 + 8; } +uint8_t RdbObjectType(const robj* o) { + switch (o->type) { + case OBJ_STRING: + return RDB_TYPE_STRING; + case OBJ_LIST: + if (o->encoding == OBJ_ENCODING_QUICKLIST) + return RDB_TYPE_LIST_QUICKLIST; + LOG(FATAL) << ("Unknown list encoding"); + break; + case OBJ_SET: + if (o->encoding == OBJ_ENCODING_INTSET) + return RDB_TYPE_SET_INTSET; + else if (o->encoding == OBJ_ENCODING_HT) + return RDB_TYPE_SET; + LOG(FATAL) << ("Unknown set encoding"); + break; + case OBJ_ZSET: + if (o->encoding == OBJ_ENCODING_ZIPLIST) + return RDB_TYPE_ZSET_ZIPLIST; + else if (o->encoding == OBJ_ENCODING_SKIPLIST) + return RDB_TYPE_ZSET_2; + LOG(FATAL) << ("Unknown sorted set encoding"); + break; + case OBJ_HASH: + if (o->encoding == OBJ_ENCODING_ZIPLIST) + return RDB_TYPE_HASH_ZIPLIST; + else if (o->encoding == OBJ_ENCODING_HT) + return RDB_TYPE_HASH; + LOG(FATAL) << ("Unknown hash encoding"); + break; + case OBJ_STREAM: + return RDB_TYPE_STREAM_LISTPACKS; + case OBJ_MODULE: + return RDB_TYPE_MODULE_2; + default: + LOG(FATAL) << ("Unknown object type"); + } + return 0; /* avoid warning */ +} + } // namespace RdbSerializer::RdbSerializer(io::Sink* s) : sink_(s), mem_buf_{4_KB}, tmp_buf_(nullptr) { @@ -115,6 +156,106 @@ RdbSerializer::RdbSerializer(io::Sink* s) : sink_(s), mem_buf_{4_KB}, tmp_buf_(n RdbSerializer::~RdbSerializer() { } +error_code RdbSerializer::SaveKeyVal(string_view key, const robj* val, uint64_t expire_ms) { + uint8_t buf[16]; + + /* Save the expire time */ + if (expire_ms > 0) { + buf[0] = RDB_OPCODE_EXPIRETIME_MS; + absl::little_endian::Store64(buf + 1, expire_ms); + RETURN_ON_ERR(WriteRaw(Bytes{buf, 9})); + } + + uint8_t rdb_type = RdbObjectType(val); + RETURN_ON_ERR(WriteOpcode(rdb_type)); + + RETURN_ON_ERR(SaveString(key)); + + return SaveObject(val); +} + +error_code RdbSerializer::SaveKeyVal(string_view key, string_view value, uint64_t expire_ms) { + uint8_t buf[16]; + + /* Save the expire time */ + if (expire_ms > 0) { + buf[0] = RDB_OPCODE_EXPIRETIME_MS; + absl::little_endian::Store64(buf + 1, expire_ms); + RETURN_ON_ERR(WriteRaw(Bytes{buf, 9})); + } + + DVLOG(2) << "Saving keyval start " << key; + + RETURN_ON_ERR(WriteOpcode(RDB_TYPE_STRING)); + + RETURN_ON_ERR(SaveString(key)); + + RETURN_ON_ERR(SaveString(value)); + + return error_code{}; +} + +error_code RdbSerializer::SaveObject(const robj* o) { + if (o->type == OBJ_STRING) { + /* Save a string value */ + return SaveStringObject(o); + } + + if (o->type == OBJ_LIST) { + /* Save a list value */ + DCHECK_EQ(OBJ_ENCODING_QUICKLIST, o->encoding); + const quicklist* ql = reinterpret_cast(o->ptr); + quicklistNode* node = ql->head; + DVLOG(1) << "Saving list of length " << ql->len; + RETURN_ON_ERR(SaveLen(ql->len)); + + while (node) { + if (quicklistNodeIsCompressed(node)) { + void* data; + size_t compress_len = quicklistGetLzf(node, &data); + RETURN_ON_ERR( + SaveLzfBlob(Bytes{reinterpret_cast(data), compress_len}, node->sz)); + } else { + RETURN_ON_ERR(SaveString(node->entry, node->sz)); + } + node = node->next; + } + return error_code{}; + } + + LOG(FATAL) << "Not implemented " << o->type; + return error_code{}; +} + +error_code RdbSerializer::SaveStringObject(const robj* obj) { + /* Avoid to decode the object, then encode it again, if the + * object is already integer encoded. */ + if (obj->encoding == OBJ_ENCODING_INT) { + return SaveLongLongAsString(long(obj->ptr)); + } + + CHECK(sdsEncodedObject(obj)); + sds s = reinterpret_cast(obj->ptr); + + return SaveString(std::string_view{s, sdslen(s)}); +} + +/* Save a long long value as either an encoded string or a string. */ +error_code RdbSerializer::SaveLongLongAsString(int64_t value) { + uint8_t buf[32]; + unsigned enclen = EncodeInteger(value, buf); + if (enclen > 0) { + return WriteRaw(Bytes{buf, enclen}); + } + + /* Encode as string */ + enclen = ll2string((char*)buf, 32, value); + DCHECK_LT(enclen, 32u); + + RETURN_ON_ERR(SaveLen(enclen)); + return WriteRaw(Bytes{buf, enclen}); +} + // TODO: if buf is large enough, it makes sense to write both mem_buf and buf // directly to sink_. error_code RdbSerializer::WriteRaw(const io::Bytes& buf) { @@ -209,16 +350,15 @@ error_code RdbSerializer::SaveLzfBlob(const io::Bytes& src, size_t uncompressed_ return error_code{}; } -using StringChannel = - ::util::fibers_ext::SimpleChannel>; - struct RdbSaver::Impl { RdbSerializer serializer; - StringChannel channel; + RdbSnapshot::StringChannel channel; + vector> handles; // We pass K=sz to say how many producers are pushing data in order to maintain // correct closing semantics - channel is closing when K producers marked it as closed. - Impl(unsigned sz) : channel{128, sz} {} + Impl(unsigned sz) : channel{128, sz}, handles(sz) { + } }; RdbSaver::RdbSaver(EngineShardSet* ess, ::io::Sink* sink) : ess_(ess), sink_(sink) { @@ -244,6 +384,7 @@ std::error_code RdbSaver::SaveHeader() { error_code RdbSaver::SaveBody() { RETURN_ON_ERR(impl_->serializer.FlushMem()); + VLOG(1) << "SaveBody"; size_t num_written = 0; string val; @@ -265,6 +406,10 @@ error_code RdbSaver::SaveBody() { vals.clear(); } + for (auto& ptr : impl_->handles) { + ptr->Join(); + } + VLOG(1) << "Blobs written " << num_written; RETURN_ON_ERR(SaveEpilog()); @@ -273,9 +418,12 @@ error_code RdbSaver::SaveBody() { } void RdbSaver::StartSnapshotInShard(EngineShard* shard) { - LOG(FATAL) << "TBD"; -} + auto pair = shard->db_slice().GetTables(0); + auto s = make_unique(pair.first, pair.second, &impl_->channel); + s->Start(); + impl_->handles[shard->shard_id()] = move(s); +} error_code RdbSaver::SaveAux() { static_assert(sizeof(void*) == 8, ""); diff --git a/server/rdb_save.h b/server/rdb_save.h index 0ff78c5..556b5d5 100644 --- a/server/rdb_save.h +++ b/server/rdb_save.h @@ -5,6 +5,7 @@ extern "C" { #include "redis/lzfP.h" +#include "redis/object.h" } #include "base/io_buf.h" @@ -14,7 +15,6 @@ namespace dfly { class EngineShardSet; class EngineShard; - class RdbSerializer { public: RdbSerializer(::io::Sink* s = nullptr); @@ -30,6 +30,7 @@ class RdbSerializer { return WriteRaw(::io::Bytes{&opcode, 1}); } + std::error_code SaveKeyVal(std::string_view key, const robj* val, uint64_t expire_ms); std::error_code SaveKeyVal(std::string_view key, std::string_view value, uint64_t expire_ms); std::error_code WriteRaw(const ::io::Bytes& buf); std::error_code SaveString(std::string_view val); @@ -44,6 +45,9 @@ class RdbSerializer { private: std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len); + std::error_code SaveObject(const robj* o); + std::error_code SaveStringObject(const robj* obj); + std::error_code SaveLongLongAsString(int64_t value); ::io::Sink* sink_ = nullptr; std::unique_ptr lzf_; @@ -51,7 +55,6 @@ class RdbSerializer { base::PODArray tmp_buf_; }; - class RdbSaver { public: RdbSaver(EngineShardSet* ess, ::io::Sink* sink); @@ -63,6 +66,8 @@ class RdbSaver { void StartSnapshotInShard(EngineShard* shard); private: + struct Impl; + std::error_code SaveEpilog(); std::error_code SaveAux(); @@ -71,7 +76,6 @@ class RdbSaver { EngineShardSet* ess_; ::io::Sink* sink_; - struct Impl; std::unique_ptr impl_; }; diff --git a/server/rdb_snapshot.cc b/server/rdb_snapshot.cc new file mode 100644 index 0000000..bf784cd --- /dev/null +++ b/server/rdb_snapshot.cc @@ -0,0 +1,120 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/rdb_snapshot.h" + +extern "C" { +#include "redis/object.h" +} + +#include + +#include "base/logging.h" +#include "server/rdb_save.h" +#include "util/fiber_sched_algo.h" + +namespace dfly { +using namespace boost; +using namespace std; +using namespace util; +using namespace chrono_literals; + +RdbSnapshot::RdbSnapshot(PrimeTable* prime, ExpireTable* et, StringChannel* dest) + : prime_table_(prime), dest_(dest) { +} + +RdbSnapshot::~RdbSnapshot() { +} + +void RdbSnapshot::Start() { + DCHECK(!fb_.joinable()); + + VLOG(1) << "DbSaver::Start"; + sfile_.reset(new io::StringFile); + + rdb_serializer_.reset(new RdbSerializer(sfile_.get())); + fb_ = fibers::fiber([this] { FiberFunc(); }); +} + +void RdbSnapshot::Join() { + fb_.join(); +} + +static_assert(sizeof(PrimeTable::const_iterator) == 16); + +void RdbSnapshot::PhysicalCb(MainIterator it) { + error_code ec; + + string tmp; + + auto key = it->first.GetSlice(&tmp); + // TODO: fetch expire. + if (it->second.ObjType() == OBJ_STRING) { + ec = rdb_serializer_->SaveKeyVal(key, it->second.ToString(), 0); + } else { + robj* obj = it->second.AsRObj(); + ec = rdb_serializer_->SaveKeyVal(key, obj, 0); + } + CHECK(!ec); // we write to StringFile. + ++processed_; +} + +// Serializes all the entries with version less than top_version. +void RdbSnapshot::FiberFunc() { + this_fiber::properties().set_name("RdbSnapshot"); + + uint64_t cursor = 0; + + // it's important that cb will run uninterrupted. + // so no I/O work inside it. + // We flush our string file to disk in the traverse loop below. + auto save_cb = [&](const MainIterator& it) { + this->PhysicalCb(it); + + return false; + }; + + uint64_t last_yield = 0; + do { + DVLOG(2) << "traverse cusrsor " << cursor; + + // Traverse a single logical bucket but do not update its versions. + // we can not update a version because entries in the same bucket share part of the version. + // Therefore we save first, and then update version in one atomic swipe. + uint64_t next = prime_table_->Traverse(cursor, save_cb); + + cursor = next; + + // Flush if needed. + FlushSfile(); + if (processed_ >= last_yield + 200) { + this_fiber::yield(); + last_yield = processed_; + + // flush in case other fibers (writes commands that pushed previous values) filled the file. + FlushSfile(); + } + } while (cursor > 0); + auto ec = rdb_serializer_->FlushMem(); + CHECK(!ec); + + FlushSfile(); + dest_->StartClosing(); + + VLOG(1) << "Exit RdbProducer fiber with " << processed_ << " processed"; +} + +void RdbSnapshot::FlushSfile() { + // Flush the string file if needed. + if (!sfile_->val.empty()) { + // Make sure we flush everything from membuffer in order to preserve the atomicity of keyvalue + // serializations. + auto ec = rdb_serializer_->FlushMem(); + CHECK(!ec); // stringfile always succeeds. + string tmp = std::move(sfile_->val); // important to move before pushing! + dest_->Push(std::move(tmp)); + } +} + +} // namespace dfly diff --git a/server/rdb_snapshot.h b/server/rdb_snapshot.h new file mode 100644 index 0000000..8946f77 --- /dev/null +++ b/server/rdb_snapshot.h @@ -0,0 +1,42 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include "io/file.h" +#include "server/table.h" +#include "util/fibers/simple_channel.h" + +namespace dfly { + +class RdbSerializer; + +class RdbSnapshot { + public: + using StringChannel = + ::util::fibers_ext::SimpleChannel>; + + RdbSnapshot(PrimeTable* prime, ExpireTable* et, StringChannel* dest); + ~RdbSnapshot(); + + void Start(); + void Join(); + + private: + void FiberFunc(); + void FlushSfile(); + void PhysicalCb(MainIterator it); + + ::boost::fibers::fiber fb_; + + std::unique_ptr sfile_; + std::unique_ptr rdb_serializer_; + + PrimeTable* prime_table_; + StringChannel* dest_; + + uint64_t processed_ = 0; +}; + +} // namespace dfly