From d3ccd5b836cc75d7048ed0e6b2f85fc72ce14d94 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Fri, 21 Jan 2022 23:13:27 +0200 Subject: [PATCH] Implement Save of rdb header/epilog --- .vscode/c_cpp_properties.json | 15 ++ helio | 2 +- redis/rdb.h | 150 ++++++++++++++++++ server/CMakeLists.txt | 2 +- server/rdb_save.cc | 286 ++++++++++++++++++++++++++++++++++ server/rdb_save.h | 74 +++++++++ server/server_family.cc | 11 +- 7 files changed, 536 insertions(+), 4 deletions(-) create mode 100644 .vscode/c_cpp_properties.json create mode 100644 redis/rdb.h create mode 100644 server/rdb_save.cc create mode 100644 server/rdb_save.h diff --git a/.vscode/c_cpp_properties.json b/.vscode/c_cpp_properties.json new file mode 100644 index 0000000..d9e38de --- /dev/null +++ b/.vscode/c_cpp_properties.json @@ -0,0 +1,15 @@ +{ + "configurations": [ + { + "name": "Linux", + "includePath": [ + "${default}" + ], + "cStandard": "c17", + "cppStandard": "c++17", + "intelliSenseMode": "${default}", + "compileCommands": "${workspaceFolder}/build-dbg/compile_commands.json" + } + ], + "version": 4 +} \ No newline at end of file diff --git a/helio b/helio index 9db0222..bfd4e2e 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 9db022200ac61ef2faa832aaf03d5d480f945071 +Subproject commit bfd4e2e15b16c187b907c50a45465abdab23edf9 diff --git a/redis/rdb.h b/redis/rdb.h new file mode 100644 index 0000000..78770e5 --- /dev/null +++ b/redis/rdb.h @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2009-2012, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __RDB_H +#define __RDB_H + +#include +#include +#include + +#include "object.h" +#include "redis_aux.h" + +/* TBD: include only necessary headers. */ + +/* The current RDB version. When the format changes in a way that is no longer + * backward compatible this number gets incremented. */ +// TODO: should increment to 10 once we start storing RDB_TYPE_ZSET_LISTPACK. +#define RDB_VERSION 9 + +/* Defines related to the dump file format. To store 32 bits lengths for short + * keys requires a lot of space, so we check the most significant 2 bits of + * the first byte to interpreter the length: + * + * 00|XXXXXX => if the two MSB are 00 the len is the 6 bits of this byte + * 01|XXXXXX XXXXXXXX => 01, the len is 14 bits, 6 bits + 8 bits of next byte + * 10|000000 [32 bit integer] => A full 32 bit len in net byte order will follow + * 10|000001 [64 bit integer] => A full 64 bit len in net byte order will follow + * 11|OBKIND this means: specially encoded object will follow. The six bits + * number specify the kind of object that follows. + * See the RDB_ENC_* defines. + * + * Lengths up to 63 are stored using a single byte, most DB keys, and may + * values, will fit inside. */ +#define RDB_6BITLEN 0 +#define RDB_14BITLEN 1 +#define RDB_32BITLEN 0x80 +#define RDB_64BITLEN 0x81 +#define RDB_ENCVAL 3 +#define RDB_LENERR UINT64_MAX + +/* When a length of a string object stored on disk has the first two bits + * set, the remaining six bits specify a special encoding for the object + * accordingly to the following defines: */ +#define RDB_ENC_INT8 0 /* 8 bit signed integer */ +#define RDB_ENC_INT16 1 /* 16 bit signed integer */ +#define RDB_ENC_INT32 2 /* 32 bit signed integer */ +#define RDB_ENC_LZF 3 /* string compressed with FASTLZ */ + +/* Map object types to RDB object types. Macros starting with OBJ_ are for + * memory storage and may change. Instead RDB types must be fixed because + * we store them on disk. */ +#define RDB_TYPE_STRING 0 +#define RDB_TYPE_LIST 1 +#define RDB_TYPE_SET 2 +#define RDB_TYPE_ZSET 3 +#define RDB_TYPE_HASH 4 +#define RDB_TYPE_ZSET_2 5 /* ZSET version 2 with doubles stored in binary. */ +#define RDB_TYPE_MODULE 6 +#define RDB_TYPE_MODULE_2 7 /* Module value with annotations for parsing without + the generating module being loaded. */ +/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */ + +/* Object types for encoded objects. */ +#define RDB_TYPE_HASH_ZIPMAP 9 +#define RDB_TYPE_LIST_ZIPLIST 10 +#define RDB_TYPE_SET_INTSET 11 +#define RDB_TYPE_ZSET_ZIPLIST 12 +#define RDB_TYPE_HASH_ZIPLIST 13 +#define RDB_TYPE_LIST_QUICKLIST 14 +#define RDB_TYPE_STREAM_LISTPACKS 15 +#define RDB_TYPE_HASH_LISTPACK 16 +#define RDB_TYPE_ZSET_LISTPACK 17 +#define RDB_TYPE_LIST_QUICKLIST_2 18 +/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */ + +/* Test if a type is an object type. */ +#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 18)) + +/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */ +#define RDB_OPCODE_FUNCTION 246 /* engine data */ +#define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */ +#define RDB_OPCODE_IDLE 248 /* LRU idle time. */ +#define RDB_OPCODE_FREQ 249 /* LFU frequency. */ +#define RDB_OPCODE_AUX 250 /* RDB aux field. */ +#define RDB_OPCODE_RESIZEDB 251 /* Hash table resize hint. */ +#define RDB_OPCODE_EXPIRETIME_MS 252 /* Expire time in milliseconds. */ +#define RDB_OPCODE_EXPIRETIME 253 /* Old expire time in seconds. */ +#define RDB_OPCODE_SELECTDB 254 /* DB number of the following keys. */ +#define RDB_OPCODE_EOF 255 /* End of the RDB file. */ + +/* Module serialized values sub opcodes */ +#define RDB_MODULE_OPCODE_EOF 0 /* End of module value. */ +#define RDB_MODULE_OPCODE_SINT 1 /* Signed integer. */ +#define RDB_MODULE_OPCODE_UINT 2 /* Unsigned integer. */ +#define RDB_MODULE_OPCODE_FLOAT 3 /* Float. */ +#define RDB_MODULE_OPCODE_DOUBLE 4 /* Double. */ +#define RDB_MODULE_OPCODE_STRING 5 /* String. */ + +/* rdbLoad...() functions flags. */ +#define RDB_LOAD_NONE 0 +#define RDB_LOAD_ENC (1<<0) +#define RDB_LOAD_PLAIN (1<<1) +#define RDB_LOAD_SDS (1<<2) + +/* flags on the purpose of rdb save or load */ +#define RDBFLAGS_NONE 0 /* No special RDB loading. */ +#define RDBFLAGS_AOF_PREAMBLE (1<<0) /* Load/save the RDB as AOF preamble. */ +#define RDBFLAGS_REPLICATION (1<<1) /* Load/save for SYNC. */ +#define RDBFLAGS_ALLOW_DUP (1<<2) /* Allow duplicated keys when loading.*/ +#define RDBFLAGS_FEED_REPL (1<<3) /* Feed replication stream when loading.*/ + +/* When rdbLoadObject() returns NULL, the err flag is + * set to hold the type of error that occurred */ +#define RDB_LOAD_ERR_EMPTY_KEY 1 /* Error of empty key */ +#define RDB_LOAD_ERR_OTHER 2 /* Any other errors */ + +// ROMAN: those constants should be factored out to redis_base.h or something. +// Currently moved here from server.h +#define LONG_STR_SIZE 21 /* Bytes needed for long -> str + '\0' */ + +#define REDIS_VERSION "999.999.999" + +#endif diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 689a22e..5c96d99 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -4,7 +4,7 @@ cxx_link(dragonfly base dragonfly_lib) add_library(dragonfly_lib command_registry.cc common.cc config_flags.cc conn_context.cc db_slice.cc debugcmd.cc dragonfly_listener.cc dragonfly_connection.cc engine_shard_set.cc generic_family.cc - list_family.cc main_service.cc memcache_parser.cc + list_family.cc main_service.cc memcache_parser.cc rdb_save.cc redis_parser.cc reply_builder.cc server_family.cc string_family.cc transaction.cc) cxx_link(dragonfly_lib dfly_core redis_lib uring_fiber_lib diff --git a/server/rdb_save.cc b/server/rdb_save.cc new file mode 100644 index 0000000..5edbf14 --- /dev/null +++ b/server/rdb_save.cc @@ -0,0 +1,286 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/rdb_save.h" + +#include +#include + +extern "C" { +#include "redis/rdb.h" +#include "redis/util.h" +} + +#include "base/logging.h" +#include "server/engine_shard_set.h" +#include "server/error.h" + +namespace dfly { + +using namespace std; +using base::IoBuf; +using io::Bytes; +using nonstd::make_unexpected; + +namespace { + +/* Encodes the "value" argument as integer when it fits in the supported ranges + * for encoded types. If the function successfully encodes the integer, the + * representation is stored in the buffer pointer to by "enc" and the string + * length is returned. Otherwise 0 is returned. */ +unsigned EncodeInteger(long long value, uint8_t* enc) { + if (value >= -(1 << 7) && value <= (1 << 7) - 1) { + enc[0] = (RDB_ENCVAL << 6) | RDB_ENC_INT8; + enc[1] = value & 0xFF; + return 2; + } + + if (value >= -(1 << 15) && value <= (1 << 15) - 1) { + enc[0] = (RDB_ENCVAL << 6) | RDB_ENC_INT16; + enc[1] = value & 0xFF; + enc[2] = (value >> 8) & 0xFF; + return 3; + } + + constexpr long long k31 = (1LL << 31); + if (value >= -k31 && value <= k31 - 1) { + enc[0] = (RDB_ENCVAL << 6) | RDB_ENC_INT32; + enc[1] = value & 0xFF; + enc[2] = (value >> 8) & 0xFF; + enc[3] = (value >> 16) & 0xFF; + enc[4] = (value >> 24) & 0xFF; + return 5; + } + + return 0; +} + +/* String objects in the form "2391" "-100" without any space and with a + * range of values that can fit in an 8, 16 or 32 bit signed value can be + * encoded as integers to save space */ +unsigned TryIntegerEncoding(string_view input, uint8_t* dest) { + long long value; + + /* Check if it's possible to encode this value as a number */ + if (!absl::SimpleAtoi(input, &value)) + return 0; + absl::AlphaNum alpha(value); + + /* If the number converted back into a string is not identical + * then it's not possible to encode the string as integer */ + if (alpha.size() != input.size() || alpha.Piece() != input) + return 0; + + return EncodeInteger(value, dest); +} + + +/* Saves an encoded length. The first two bits in the first byte are used to + * hold the encoding type. See the RDB_* definitions for more information + * on the types of encoding. buf must be at least 9 bytes. + * */ + +inline unsigned SerializeLen(uint64_t len, uint8_t* buf) { + if (len < (1 << 6)) { + /* Save a 6 bit len */ + buf[0] = (len & 0xFF) | (RDB_6BITLEN << 6); + return 1; + } + if (len < (1 << 14)) { + /* Save a 14 bit len */ + buf[0] = ((len >> 8) & 0xFF) | (RDB_14BITLEN << 6); + buf[1] = len & 0xFF; + return 2; + } + + if (len <= UINT32_MAX) { + /* Save a 32 bit len */ + buf[0] = RDB_32BITLEN; + absl::big_endian::Store32(buf + 1, len); + return 1 + 4; + } + + /* Save a 64 bit len */ + buf[0] = RDB_64BITLEN; + absl::big_endian::Store64(buf + 1, len); + return 1 + 8; +} + +} // namespace + + +RdbSerializer::RdbSerializer(io::Sink* s) : sink_(s), mem_buf_{4_KB}, tmp_buf_(nullptr) { +} + +RdbSerializer::~RdbSerializer() { +} + +// TODO: if buf is large enough, it makes sense to write both mem_buf and buf +// directly to sink_. +error_code RdbSerializer::WriteRaw(const io::Bytes& buf) { + IoBuf::Bytes dest = mem_buf_.AppendBuffer(); + if (dest.size() >= buf.size()) { + memcpy(dest.data(), buf.data(), buf.size()); + mem_buf_.CommitWrite(buf.size()); + return error_code{}; + } + + io::Bytes ib = mem_buf_.InputBuffer(); + + if (ib.empty()) { + RETURN_ON_ERR(sink_->Write(buf)); + } else { + iovec v[2] = { {.iov_base = const_cast(ib.data()), .iov_len = ib.size()}, + {.iov_base = const_cast(buf.data()), .iov_len = buf.size()}}; + RETURN_ON_ERR(sink_->Write(v, ABSL_ARRAYSIZE(v))); + mem_buf_.ConsumeInput(ib.size()); + } + + return error_code{}; +} + +error_code RdbSerializer::FlushMem() { + size_t sz = mem_buf_.InputLen(); + if (sz == 0) + return error_code{}; + + DVLOG(1) << "Write file " << sz << " bytes"; + + // interrupt point. + RETURN_ON_ERR(sink_->Write(mem_buf_.InputBuffer())); + mem_buf_.ConsumeInput(sz); + return error_code{}; +} + +error_code RdbSerializer::SaveString(string_view val) { + /* Try integer encoding */ + if (val.size() <= 11) { + uint8_t buf[16]; + + unsigned enclen = TryIntegerEncoding(val, buf); + if (enclen > 0) { + return WriteRaw(Bytes{buf, unsigned(enclen)}); + } + } + + /* Try LZF compression - under 20 bytes it's unable to compress even + * aaaaaaaaaaaaaaaaaa so skip it */ + size_t len = val.size(); + if (server.rdb_compression && len > 20) { + size_t comprlen, outlen = len; + tmp_buf_.resize(outlen + 1); + + // Due to stack constrainsts im fibers we can not allow large arrays on stack. + // Therefore I am lazily allocating it on heap. It's not fixed in quicklist. + if (!lzf_) { + lzf_.reset(new LZF_HSLOT[1 << HLOG]); + } + + /* We require at least 8 bytes compression for this to be worth it */ + comprlen = lzf_compress(val.data(), len, tmp_buf_.data(), outlen, lzf_.get()); + if (comprlen > 0 && comprlen < len - 8 && comprlen < size_t(len * 0.85)) { + return SaveLzfBlob(Bytes{tmp_buf_.data(), comprlen}, len); + } + } + + /* Store verbatim */ + RETURN_ON_ERR(SaveLen(len)); + if (len > 0) { + Bytes b{reinterpret_cast(val.data()), val.size()}; + RETURN_ON_ERR(WriteRaw(b)); + } + return error_code{}; +} + +error_code RdbSerializer::SaveLen(size_t len) { + uint8_t buf[16]; + unsigned enclen = SerializeLen(len, buf); + return WriteRaw(Bytes{buf, enclen}); +} + + +error_code RdbSerializer::SaveLzfBlob(const io::Bytes& src, size_t uncompressed_len) { + /* Data compressed! Let's save it on disk */ + uint8_t opcode = (RDB_ENCVAL << 6) | RDB_ENC_LZF; + RETURN_ON_ERR(WriteOpcode(opcode)); + RETURN_ON_ERR(SaveLen(src.size())); + RETURN_ON_ERR(SaveLen(uncompressed_len)); + RETURN_ON_ERR(WriteRaw(src)); + + return error_code{}; +} + +RdbSaver::RdbSaver(EngineShardSet* ess, ::io::Sink* sink) : ess_(ess), sink_(sink) { + CHECK_NOTNULL(sink_); + serializer_.set_sink(sink_); +} + +RdbSaver::~RdbSaver() { +} + +std::error_code RdbSaver::SaveHeader() { + char magic[16]; + size_t sz = absl::SNPrintF(magic, sizeof(magic), "REDIS%04d", RDB_VERSION); + CHECK_EQ(9u, sz); + + RETURN_ON_ERR(serializer_.WriteRaw(Bytes{reinterpret_cast(magic), sz})); + RETURN_ON_ERR(SaveAux()); + + return error_code{}; +} + + +error_code RdbSaver::SaveAux() { + static_assert(sizeof(void*) == 8, ""); + + int aof_preamble = false; + error_code ec; + + /* Add a few fields about the state when the RDB was created. */ + RETURN_ON_ERR(SaveAuxFieldStrStr("redis-ver", REDIS_VERSION)); + RETURN_ON_ERR(SaveAuxFieldStrInt("redis-bits", 64)); + + RETURN_ON_ERR(SaveAuxFieldStrInt("ctime", time(NULL))); + + // TODO: to implement used-mem caching. + RETURN_ON_ERR(SaveAuxFieldStrInt("used-mem", 666666666)); + + RETURN_ON_ERR(SaveAuxFieldStrInt("aof-preamble", aof_preamble)); + + // TODO: "repl-stream-db", "repl-id", "repl-offset" + return error_code{}; +} + +error_code RdbSaver::SaveEpilog() { + uint8_t buf[8]; + uint64_t chksum; + + /* EOF opcode */ + RETURN_ON_ERR(serializer_.WriteOpcode(RDB_OPCODE_EOF)); + + /* CRC64 checksum. It will be zero if checksum computation is disabled, the + * loading code skips the check in this case. */ + chksum = 0; + + absl::little_endian::Store64(buf, chksum); + RETURN_ON_ERR(serializer_.WriteRaw(buf)); + + return serializer_.FlushMem(); +} + +error_code RdbSaver::SaveAuxFieldStrStr(string_view key, string_view val) { + RETURN_ON_ERR(serializer_.WriteOpcode(RDB_OPCODE_AUX)); + RETURN_ON_ERR(serializer_.SaveString(key)); + RETURN_ON_ERR(serializer_.SaveString(val)); + + return error_code{}; +} + +error_code RdbSaver::SaveAuxFieldStrInt(string_view key, int64_t val) { + char buf[LONG_STR_SIZE]; + int vlen = ll2string(buf, sizeof(buf), val); + return SaveAuxFieldStrStr(key, string_view(buf, vlen)); +} + +} // namespace dfly diff --git a/server/rdb_save.h b/server/rdb_save.h new file mode 100644 index 0000000..f785e5d --- /dev/null +++ b/server/rdb_save.h @@ -0,0 +1,74 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// +#pragma once + +extern "C" { +#include "redis/lzfP.h" +} + +#include "base/io_buf.h" +#include "io/io.h" + +namespace dfly { +class EngineShardSet; +class EngineShard; + + +class RdbSerializer { + public: + RdbSerializer(::io::Sink* s = nullptr); + + ~RdbSerializer(); + + // The ownership stays with the caller. + void set_sink(::io::Sink* s) { + sink_ = s; + } + + std::error_code WriteOpcode(uint8_t opcode) { + return WriteRaw(::io::Bytes{&opcode, 1}); + } + + std::error_code SaveKeyVal(std::string_view key, std::string_view value, uint64_t expire_ms); + std::error_code WriteRaw(const ::io::Bytes& buf); + std::error_code SaveString(std::string_view val); + + std::error_code SaveString(const uint8_t* buf, size_t len) { + return SaveString(std::string_view{reinterpret_cast(buf), len}); + } + + std::error_code SaveLen(size_t len); + + std::error_code FlushMem(); + + private: + std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len); + + ::io::Sink* sink_ = nullptr; + std::unique_ptr lzf_; + base::IoBuf mem_buf_; + base::PODArray tmp_buf_; +}; + + +class RdbSaver { + public: + RdbSaver(EngineShardSet* ess, ::io::Sink* sink); + ~RdbSaver(); + + std::error_code SaveHeader(); + std::error_code SaveEpilog(); + + private: + std::error_code SaveAux(); + std::error_code SaveAuxFieldStrStr(std::string_view key, std::string_view val); + std::error_code SaveAuxFieldStrInt(std::string_view key, int64_t val); + + EngineShardSet* ess_; + + ::io::Sink* sink_; + RdbSerializer serializer_; +}; + +} // namespace dfly diff --git a/server/server_family.cc b/server/server_family.cc index 853d827..19f6c77 100644 --- a/server/server_family.cc +++ b/server/server_family.cc @@ -21,6 +21,7 @@ extern "C" { #include "server/engine_shard_set.h" #include "server/error.h" #include "server/main_service.h" +#include "server/rdb_save.h" #include "server/server_state.h" #include "server/transaction.h" #include "strings/human_readable.h" @@ -152,7 +153,7 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) { fs::path path = dir_path; path.append(filename); path.concat(absl::StrCat("_", fl_index++)); - LOG(INFO) << "Saving to " << path; + VLOG(1) << "Saving to " << path; // TODO: use io-uring file instead. auto res = uring::OpenWrite(path.generic_string()); @@ -164,7 +165,13 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) { unique_ptr<::io::WriteFile> wf(*res); auto start = absl::Now(); - ec = wf->Write(string(1024, 'A')); + RdbSaver saver{&ess_, wf.get()}; + + ec = saver.SaveHeader(); + if (!ec) { + ec = saver.SaveEpilog(); + } + if (ec) { cntx->SendError(res.error().message()); return;