diff --git a/src/server/command_registry.cc b/src/server/command_registry.cc index d52dc84..7cca191 100644 --- a/src/server/command_registry.cc +++ b/src/server/command_registry.cc @@ -33,7 +33,7 @@ uint32_t CommandId::OptCount(uint32_t mask) { } CommandRegistry::CommandRegistry() { - CommandId cd("COMMAND", CO::RANDOM | CO::LOADING | CO::NOSCRIPT, -1, 0, 0, 0); + CommandId cd("COMMAND", CO::LOADING | CO::NOSCRIPT, -1, 0, 0, 0); cd.SetHandler([this](const auto& args, auto* cntx) { return Command(args, cntx); }); @@ -101,8 +101,6 @@ const char* OptName(CO::CommandOpt fl) { return "fast"; case LOADING: return "loading"; - case RANDOM: - return "random"; case ADMIN: return "admin"; case NOSCRIPT: diff --git a/src/server/command_registry.h b/src/server/command_registry.h index 439edf9..1c082f3 100644 --- a/src/server/command_registry.h +++ b/src/server/command_registry.h @@ -25,14 +25,14 @@ enum CommandOpt : uint32_t { LOADING = 8, DENYOOM = 0x10, // use-memory in redis. REVERSE_MAPPING = 0x20, - RANDOM = 0x40, + + // arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc. + VARIADIC_KEYS = 0x40, + ADMIN = 0x80, // implies NOSCRIPT, NOSCRIPT = 0x100, BLOCKING = 0x200, // implies REVERSE_MAPPING GLOBAL_TRANS = 0x1000, - - // arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc. - VARIADIC_KEYS = 0x2000, }; const char* OptName(CommandOpt fl); diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 693bfc1..ef64740 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -740,8 +740,8 @@ void GenericFamily::Register(CommandRegistry* registry) { << CI{"RENAMENX", CO::WRITE, 3, 1, 2, 1}.HFUNC(RenameNx) << CI{"SELECT", kSelectOpts, 2, 0, 0, 0}.HFUNC(Select) << CI{"SCAN", CO::READONLY | CO::FAST, -2, 0, 0, 0}.HFUNC(Scan) - << CI{"TTL", CO::READONLY | CO::FAST | CO::RANDOM, 2, 1, 1, 1}.HFUNC(Ttl) - << CI{"PTTL", CO::READONLY | CO::FAST | CO::RANDOM, 2, 1, 1, 1}.HFUNC(Pttl) + << CI{"TTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Ttl) + << CI{"PTTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Pttl) << CI{"TYPE", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Type) << CI{"UNLINK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Del); } diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index b84c0df..8a74dbd 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -945,7 +945,7 @@ void HSetFamily::Register(CommandRegistry* registry) { // TODO: add options support << CI{"HRANDFIELD", CO::READONLY, 2, 1, 1, 1}.HFUNC(HRandField) - << CI{"HSCAN", CO::READONLY | CO::RANDOM, -3, 1, 1, 1}.HFUNC(HScan) + << CI{"HSCAN", CO::READONLY, -3, 1, 1, 1}.HFUNC(HScan) << CI{"HSET", CO::WRITE | CO::FAST | CO::DENYOOM, -4, 1, 1, 1}.HFUNC(HSet) << CI{"HSETNX", CO::WRITE | CO::DENYOOM | CO::FAST, 4, 1, 1, 1}.HFUNC(HSetNx) << CI{"HSTRLEN", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(HStrLen) diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index acb90bb..f21ef5e 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -12,6 +12,7 @@ extern "C" { #include "redis/intset.h" #include "redis/listpack.h" #include "redis/rdb.h" +#include "redis/stream.h" #include "redis/util.h" #include "redis/ziplist.h" #include "redis/zmalloc.h" @@ -236,6 +237,10 @@ error_code RdbSerializer::SaveObject(const PrimeValue& pv) { return SaveZSetObject(pv.AsRObj()); } + if (obj_type == OBJ_STREAM) { + return SaveStreamObject(pv.AsRObj()); + } + LOG(ERROR) << "Not implemented " << obj_type; return make_error_code(errc::function_not_supported); } @@ -375,6 +380,83 @@ error_code RdbSerializer::SaveZSetObject(const robj* obj) { return error_code{}; } +error_code RdbSerializer::SaveStreamObject(const robj* obj) { + /* Store how many listpacks we have inside the radix tree. */ + stream* s = (stream*)obj->ptr; + rax* rax = s->rax_tree; + + RETURN_ON_ERR(SaveLen(raxSize(rax))); + + /* Serialize all the listpacks inside the radix tree as they are, + * when loading back, we'll use the first entry of each listpack + * to insert it back into the radix tree. */ + raxIterator ri; + raxStart(&ri, rax); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + uint8_t* lp = (uint8_t*)ri.data; + size_t lp_bytes = lpBytes(lp); + error_code ec = SaveString((uint8_t*)ri.key, ri.key_len); + if (ec) { + raxStop(&ri); + return ec; + } + + ec = SaveString(lp, lp_bytes); + if (ec) { + raxStop(&ri); + return ec; + } + } + raxStop(&ri); + + /* Save the number of elements inside the stream. We cannot obtain + * this easily later, since our macro nodes should be checked for + * number of items: not a great CPU / space tradeoff. */ + + RETURN_ON_ERR(SaveLen(s->length)); + + /* Save the last entry ID. */ + RETURN_ON_ERR(SaveLen(s->last_id.ms)); + RETURN_ON_ERR(SaveLen(s->last_id.seq)); + + /* The consumer groups and their clients are part of the stream + * type, so serialize every consumer group. */ + + /* Save the number of groups. */ + size_t num_cgroups = s->cgroups ? raxSize(s->cgroups) : 0; + RETURN_ON_ERR(SaveLen(num_cgroups)); + + if (num_cgroups) { + /* Serialize each consumer group. */ + raxStart(&ri, s->cgroups); + raxSeek(&ri, "^", NULL, 0); + + auto cleanup = absl::MakeCleanup([&] { raxStop(&ri); }); + + while (raxNext(&ri)) { + streamCG* cg = (streamCG*)ri.data; + + /* Save the group name. */ + RETURN_ON_ERR(SaveString((uint8_t*)ri.key, ri.key_len)); + + /* Last ID. */ + RETURN_ON_ERR(SaveLen(s->last_id.ms)); + + RETURN_ON_ERR(SaveLen(s->last_id.seq)); + + /* Save the global PEL. */ + RETURN_ON_ERR(SaveStreamPEL(cg->pel, true)); + + /* Save the consumers of this group. */ + + RETURN_ON_ERR(SaveStreamConsumers(cg)); + } + } + + return error_code{}; +} + /* Save a long long value as either an encoded string or a string. */ error_code RdbSerializer::SaveLongLongAsString(int64_t value) { uint8_t buf[32]; @@ -424,6 +506,71 @@ error_code RdbSerializer::SaveListPackAsZiplist(uint8_t* lp) { return ec; } +error_code RdbSerializer::SaveStreamPEL(rax* pel, bool nacks) { + /* Number of entries in the PEL. */ + + RETURN_ON_ERR(SaveLen(raxSize(pel))); + + /* Save each entry. */ + raxIterator ri; + raxStart(&ri, pel); + raxSeek(&ri, "^", NULL, 0); + auto cleanup = absl::MakeCleanup([&] { raxStop(&ri); }); + + while (raxNext(&ri)) { + /* We store IDs in raw form as 128 big big endian numbers, like + * they are inside the radix tree key. */ + RETURN_ON_ERR(WriteRaw(Bytes{ri.key, sizeof(streamID)})); + + if (nacks) { + streamNACK* nack = (streamNACK*)ri.data; + uint8_t buf[8]; + absl::little_endian::Store64(buf, nack->delivery_time); + RETURN_ON_ERR(WriteRaw(buf)); + RETURN_ON_ERR(SaveLen(nack->delivery_count)); + + /* We don't save the consumer name: we'll save the pending IDs + * for each consumer in the consumer PEL, and resolve the consumer + * at loading time. */ + } + } + + return error_code{}; +} + +error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) { + /* Number of consumers in this consumer group. */ + + RETURN_ON_ERR(SaveLen(raxSize(cg->consumers))); + + /* Save each consumer. */ + raxIterator ri; + raxStart(&ri, cg->consumers); + raxSeek(&ri, "^", NULL, 0); + auto cleanup = absl::MakeCleanup([&] { raxStop(&ri); }); + uint8_t buf[8]; + + while (raxNext(&ri)) { + streamConsumer* consumer = (streamConsumer*)ri.data; + + /* Consumer name. */ + RETURN_ON_ERR(SaveString(ri.key, ri.key_len)); + + /* Last seen time. */ + absl::little_endian::Store64(buf, consumer->seen_time); + RETURN_ON_ERR(WriteRaw(buf)); + + /* Consumer PEL, without the ACKs (see last parameter of the function + * passed with value of 0), at loading time we'll lookup the ID + * in the consumer group global PEL and will put a reference in the + * consumer local PEL. */ + + RETURN_ON_ERR(SaveStreamPEL(consumer->pel, false)); + } + + return error_code{}; +} + // TODO: if buf is large enough, it makes sense to write both mem_buf and buf // directly to sink_. error_code RdbSerializer::WriteRaw(const io::Bytes& buf) { diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index df984a4..133fdfe 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -16,6 +16,10 @@ extern "C" { #include "server/common.h" #include "server/table.h" + +typedef struct rax rax; +typedef struct streamCG streamCG; + namespace dfly { class EngineShard; @@ -114,9 +118,12 @@ class RdbSerializer { std::error_code SaveSetObject(const PrimeValue& pv); std::error_code SaveHSetObject(const robj* obj); std::error_code SaveZSetObject(const robj* obj); + std::error_code SaveStreamObject(const robj* obj); std::error_code SaveLongLongAsString(int64_t value); std::error_code SaveBinaryDouble(double val); std::error_code SaveListPackAsZiplist(uint8_t* lp); + std::error_code SaveStreamPEL(rax* pel, bool nacks); + std::error_code SaveStreamConsumers(streamCG* cg); ::io::Sink* sink_ = nullptr; AlignedBuffer* aligned_buf_ = nullptr; diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index 4154a22..00a035c 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -115,7 +115,7 @@ TEST_F(RdbTest, LoadSmall6) { EXPECT_THAT(resp.GetVec(), ElementsAre(IntArg(1), IntArg(1))); } -TEST_F(RdbTest, LoadStream) { +TEST_F(RdbTest, Stream) { io::FileSource fs = GetSource("redis6_stream.rdb"); RdbLoader loader(service_->script_mgr()); @@ -124,6 +124,21 @@ TEST_F(RdbTest, LoadStream) { auto ec = pp_->at(0)->Await([&] { return loader.Load(&fs); }); ASSERT_FALSE(ec) << ec.message(); + + auto resp = Run({"type", "key:10"}); + EXPECT_EQ(resp, "stream"); + resp = Run({"xinfo", "groups", "key:0"}); + EXPECT_THAT(resp, ArrLen(2)); + + resp = Run({"xinfo", "groups", "key:1"}); // test dereferences array of size 1 + EXPECT_THAT(resp, ArrLen(8)); + EXPECT_THAT(resp.GetVec(), ElementsAre("name", "g2", "consumers", "0", "pending", "0", + "last-delivered-id", "1655444851523-1")); + + resp = Run({"xinfo", "groups", "key:2"}); + EXPECT_THAT(resp, ArrLen(0)); + + Run({"save"}); } TEST_F(RdbTest, Reload) { @@ -233,7 +248,7 @@ TEST_F(RdbTest, SaveManyDbs) { Run({"select", "1"}); resp = Run({"scan", "0", "match", "ab*"}); StringVec vec = StrArray(resp.GetVec()[1]); - for (const auto& s: vec) { + for (const auto& s : vec) { LOG(ERROR) << "Bad key: " << s; } } diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 598d7b3..21bc735 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1131,13 +1131,13 @@ void ServerFamily::Register(CommandRegistry* registry) { << CI{"CLIENT", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.HFUNC(Client) << CI{"CONFIG", CO::ADMIN, -2, 0, 0, 0}.HFUNC(Config) << CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize) - << CI{"DEBUG", CO::RANDOM | CO::ADMIN | CO::LOADING, -2, 0, 0, 0}.HFUNC(Debug) + << CI{"DEBUG", CO::ADMIN | CO::LOADING, -2, 0, 0, 0}.HFUNC(Debug) << CI{"FLUSHDB", CO::WRITE | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(FlushDb) << CI{"FLUSHALL", CO::WRITE | CO::GLOBAL_TRANS, -1, 0, 0, 0}.HFUNC(FlushAll) << CI{"INFO", CO::LOADING, -1, 0, 0, 0}.HFUNC(Info) << CI{"HELLO", CO::LOADING, -1, 0, 0, 0}.HFUNC(Hello) - << CI{"LASTSAVE", CO::LOADING | CO::RANDOM | CO::FAST, 1, 0, 0, 0}.HFUNC(LastSave) - << CI{"LATENCY", CO::NOSCRIPT | CO::LOADING | CO::RANDOM | CO::FAST, -2, 0, 0, 0}.HFUNC( + << CI{"LASTSAVE", CO::LOADING | CO::FAST, 1, 0, 0, 0}.HFUNC(LastSave) + << CI{"LATENCY", CO::NOSCRIPT | CO::LOADING | CO::FAST, -2, 0, 0, 0}.HFUNC( Latency) << CI{"MEMORY", kMemOpts, -2, 0, 0, 0}.HFUNC(Memory) << CI{"SAVE", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Save) diff --git a/src/server/set_family.cc b/src/server/set_family.cc index 86bf494..3c990f1 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -1172,10 +1172,10 @@ void SetFamily::Register(CommandRegistry* registry) { << CI{"SMOVE", CO::FAST | CO::WRITE, 4, 1, 2, 1}.HFUNC(SMove) << CI{"SREM", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(SRem) << CI{"SCARD", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(SCard) - << CI{"SPOP", CO::WRITE | CO::RANDOM | CO::FAST, -2, 1, 1, 1}.HFUNC(SPop) + << CI{"SPOP", CO::WRITE | CO::FAST, -2, 1, 1, 1}.HFUNC(SPop) << CI{"SUNION", CO::READONLY, -2, 1, -1, 1}.HFUNC(SUnion) << CI{"SUNIONSTORE", CO::WRITE | CO::DENYOOM, -3, 1, -1, 1}.HFUNC(SUnionStore) - << CI{"SSCAN", CO::READONLY | CO::RANDOM, -3, 1, 1, 1}.HFUNC(SScan); + << CI{"SSCAN", CO::READONLY, -3, 1, 1, 1}.HFUNC(SScan); } uint32_t SetFamily::MaxIntsetEntries() { diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 4cd2804..fab9586 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -255,9 +255,9 @@ OpResult OpLen(const OpArgs& op_args, string_view key) { return s->length; } -OpResult> OpListGroups(const OpArgs& op_args, string_view key) { - auto& db_slice = op_args.shard->db_slice(); - OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM); +OpResult> OpListGroups(DbIndex db_index, string_view key, EngineShard* shard) { + auto& db_slice = shard->db_slice(); + OpResult res_it = db_slice.Find(db_index, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -719,13 +719,16 @@ void StreamFamily::XInfo(CmdArgList args, ConnectionContext* cntx) { if (args.size() >= 3) { string_view key = ArgS(args, 2); + ShardId sid = Shard(key, shard_set->size()); + if (sub_cmd == "GROUPS") { - auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpListGroups(op_args, key); + // We do not use transactional xemantics for xinfo since it's informational command. + auto cb = [&]() { + EngineShard* shard = EngineShard::tlocal(); + return OpListGroups(cntx->db_index(), key, shard); }; - OpResult> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + OpResult> result = shard_set->Await(sid, std::move(cb)); if (result) { (*cntx)->StartArray(result->size()); for (const auto& ginfo : *result) { @@ -865,7 +868,7 @@ void StreamFamily::Register(CommandRegistry* registry) { *registry << CI{"XADD", CO::WRITE | CO::FAST, -5, 1, 1, 1}.HFUNC(XAdd) << CI{"XDEL", CO::WRITE | CO::FAST, -3, 1, 1, 1}.HFUNC(XDel) << CI{"XGROUP", CO::WRITE | CO::DENYOOM, -2, 2, 2, 1}.HFUNC(XGroup) - << CI{"XINFO", CO::READONLY, -2, 2, 2, 1}.HFUNC(XInfo) + << CI{"XINFO", CO::READONLY | CO::NOSCRIPT, -2, 0, 0, 0}.HFUNC(XInfo) << CI{"XLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(XLen) << CI{"XRANGE", CO::READONLY, -4, 1, 1, 1}.HFUNC(XRange) << CI{"XREVRANGE", CO::READONLY, -4, 1, 1, 1}.HFUNC(XRevRange) diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index 2630a5f..96e7c17 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -1895,7 +1895,7 @@ void ZSetFamily::Register(CommandRegistry* registry) { << CI{"ZREVRANGE", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRevRange) << CI{"ZREVRANGEBYSCORE", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRevRangeByScore) << CI{"ZREVRANK", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(ZRevRank) - << CI{"ZSCAN", CO::READONLY | CO::RANDOM, -3, 1, 1, 1}.HFUNC(ZScan) + << CI{"ZSCAN", CO::READONLY, -3, 1, 1, 1}.HFUNC(ZScan) << CI{"ZUNIONSTORE", kUnionMask, -4, 3, 3, 1}.HFUNC(ZUnionStore); }