diff --git a/src/facade/op_status.h b/src/facade/op_status.h index bad993a..434ce24 100644 --- a/src/facade/op_status.h +++ b/src/facade/op_status.h @@ -19,6 +19,8 @@ enum class OpStatus : uint16_t { TIMED_OUT, OUT_OF_MEMORY, INVALID_FLOAT, + INVALID_INT, + SYNTAX_ERR, }; class OpResultBase { diff --git a/src/facade/reply_builder.cc b/src/facade/reply_builder.cc index 31cd810..822aa1c 100644 --- a/src/facade/reply_builder.cc +++ b/src/facade/reply_builder.cc @@ -153,7 +153,6 @@ void MCReplyBuilder::SendNotFound() { SendSimpleString("NOT_FOUND"); } - char* RedisReplyBuilder::FormatDouble(double val, char* dest, unsigned dest_len) { StringBuilder sb(dest, dest_len); CHECK(dfly_conv.ToShortest(val, &sb)); @@ -229,6 +228,12 @@ void RedisReplyBuilder::SendError(OpStatus status) { case OpStatus::INVALID_FLOAT: SendError(kInvalidFloatErr); break; + case OpStatus::INVALID_INT: + SendError(kInvalidIntErr); + break; + case OpStatus::SYNTAX_ERR: + SendError(kSyntaxErr); + break; default: LOG(ERROR) << "Unsupported status " << status; SendError("Internal error"); diff --git a/src/server/command_registry.cc b/src/server/command_registry.cc index 4b785b7..ece7e7d 100644 --- a/src/server/command_registry.cc +++ b/src/server/command_registry.cc @@ -21,9 +21,11 @@ CommandId::CommandId(const char* name, uint32_t mask, int8_t arity, int8_t first int8_t last_key, int8_t step) : name_(name), opt_mask_(mask), arity_(arity), first_key_(first_key), last_key_(last_key), step_key_(step) { - if (mask & CO::ADMIN) { + if (mask & CO::ADMIN) opt_mask_ |= CO::NOSCRIPT; - } + + if (mask & CO::BLOCKING) + opt_mask_ |= CO::REVERSE_MAPPING; } uint32_t CommandId::OptCount(uint32_t mask) { @@ -81,38 +83,6 @@ CommandRegistry& CommandRegistry::operator<<(CommandId cmd) { return *this; } -KeyIndex DetermineKeys(const CommandId* cid, const CmdArgList& args) { - DCHECK_EQ(0u, cid->opt_mask() & CO::GLOBAL_TRANS); - - KeyIndex key_index; - - if (cid->first_key_pos() > 0) { - key_index.start = cid->first_key_pos(); - int last = cid->last_key_pos(); - key_index.end = last > 0 ? last + 1 : (int(args.size()) + 1 + last); - key_index.step = cid->key_arg_step(); - - return key_index; - } - - string_view name{cid->name()}; - if (name == "EVAL" || name == "EVALSHA") { - DCHECK_GE(args.size(), 3u); - uint32_t num_keys; - - CHECK(absl::SimpleAtoi(ArgS(args, 2), &num_keys)); - key_index.start = 3; - key_index.end = 3 + num_keys; - key_index.step = 1; - - return key_index; - } - - LOG(FATAL) << "TBD: Not supported"; - - return key_index; -} - namespace CO { const char* OptName(CO::CommandOpt fl) { @@ -125,6 +95,8 @@ const char* OptName(CO::CommandOpt fl) { return "readonly"; case DENYOOM: return "denyoom"; + case REVERSE_MAPPING: + return "reverse-mapping"; case FAST: return "fast"; case LOADING: @@ -139,6 +111,8 @@ const char* OptName(CO::CommandOpt fl) { return "blocking"; case GLOBAL_TRANS: return "global-trans"; + case DESTINATION_KEY: + return "dest-key"; } return "unknown"; } diff --git a/src/server/command_registry.h b/src/server/command_registry.h index 589a567..1bd6a15 100644 --- a/src/server/command_registry.h +++ b/src/server/command_registry.h @@ -24,11 +24,13 @@ enum CommandOpt : uint32_t { WRITE = 4, LOADING = 8, DENYOOM = 0x10, // use-memory in redis. + REVERSE_MAPPING = 0x20, RANDOM = 0x40, ADMIN = 0x80, // implies NOSCRIPT, NOSCRIPT = 0x100, - BLOCKING = 0x200, + BLOCKING = 0x200, // implies REVERSE_MAPPING GLOBAL_TRANS = 0x1000, + DESTINATION_KEY = 0x2000, }; const char* OptName(CommandOpt fl); @@ -83,7 +85,7 @@ class CommandId { } bool is_multi_key() const { - return last_key_ != first_key_; + return (last_key_ != first_key_) || (opt_mask_ & CO::DESTINATION_KEY); } int8_t key_arg_step() const { @@ -157,8 +159,4 @@ class CommandRegistry { void Command(CmdArgList args, ConnectionContext* cntx); }; - -// Given the command and the arguments determines the keys range (index). -KeyIndex DetermineKeys(const CommandId* cid, const CmdArgList& args); - } // namespace dfly diff --git a/src/server/common.h b/src/server/common.h index 0d3f3a5..f7777b4 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -48,9 +48,20 @@ struct KeyLockArgs { // Describes key indices. struct KeyIndex { + // if index is non-zero then adds another key index (usually 1). + // relevant for for commands like ZUNIONSTORE/ZINTERSTORE for destination key. + unsigned bonus = 0; unsigned start; unsigned end; // does not include this index (open limit). unsigned step; // 1 for commands like mget. 2 for commands like mset. + + bool HasSingleKey() const { + return bonus == 0 && (start + step >= end); + } + + unsigned num_args() const { + return end - start + (bonus > 0); + } }; struct OpArgs { diff --git a/src/server/main_service.cc b/src/server/main_service.cc index c165ebe..6a1c126 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -354,6 +354,12 @@ void Service::Shutdown() { boost::this_fiber::sleep_for(10ms); } +static void MultiSetError(ConnectionContext* cntx) { + if (cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE) { + cntx->conn_state.exec_state = ConnectionState::EXEC_ERROR; + } +} + void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) { CHECK(!args.empty()); DCHECK_NE(0u, shard_set_.size()) << "Init was not called"; @@ -370,11 +376,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) etl.RecordCmd(); ConnectionContext* dfly_cntx = static_cast(cntx); - absl::Cleanup multi_error = [dfly_cntx] { - if (dfly_cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE) { - dfly_cntx->conn_state.exec_state = ConnectionState::EXEC_ERROR; - } - }; + absl::Cleanup multi_error([dfly_cntx] { MultiSetError(dfly_cntx); }); if (cid == nullptr) { (*cntx)->SendError(absl::StrCat("unknown command `", cmd_str, "`"), "unknown_cmd"); @@ -464,7 +466,11 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) if (under_script) { DCHECK(dfly_cntx->transaction); - KeyIndex key_index = DetermineKeys(cid, args); + OpResult key_index_res = DetermineKeys(cid, args); + if (!key_index_res) + return (*cntx)->SendError(key_index_res.status()); + + const auto& key_index = *key_index_res; for (unsigned i = key_index.start; i < key_index.end; ++i) { string_view key = ArgS(args, i); if (!dfly_cntx->conn_state.script_info->keys.contains(key)) { @@ -472,7 +478,9 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) } } dfly_cntx->transaction->SetExecCmd(cid); - dfly_cntx->transaction->InitByArgs(dfly_cntx->conn_state.db_index, args); + OpStatus st = dfly_cntx->transaction->InitByArgs(dfly_cntx->conn_state.db_index, args); + if (st != OpStatus::OK) + return (*cntx)->SendError(st); } else { DCHECK(dfly_cntx->transaction == nullptr); @@ -480,7 +488,10 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) dist_trans.reset(new Transaction{cid, &shard_set_}); dfly_cntx->transaction = dist_trans.get(); - dist_trans->InitByArgs(dfly_cntx->conn_state.db_index, args); + OpStatus st = dist_trans->InitByArgs(dfly_cntx->conn_state.db_index, args); + if (st != OpStatus::OK) + return (*cntx)->SendError(st); + dfly_cntx->last_command_debug.shards_count = dfly_cntx->transaction->unique_shard_cnt(); } else { dfly_cntx->transaction = nullptr; @@ -856,7 +867,11 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) { cntx->transaction->SetExecCmd(scmd.descr); CmdArgList cmd_arg_list{str_list.data(), str_list.size()}; if (IsTransactional(scmd.descr)) { - cntx->transaction->InitByArgs(cntx->conn_state.db_index, cmd_arg_list); + OpStatus st = cntx->transaction->InitByArgs(cntx->conn_state.db_index, cmd_arg_list); + if (st != OpStatus::OK) { + (*cntx)->SendError(st); + break; + } } scmd.descr->Invoke(cmd_arg_list, cntx); if (rb->GetError()) diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 7accd33..7b0a284 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -332,6 +332,18 @@ void ServerFamily::Auth(CmdArgList args, ConnectionContext* cntx) { } } +void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) { + ToUpper(&args[1]); + string_view sub_cmd = ArgS(args, 1); + + if (sub_cmd == "SETNAME") { + return (*cntx)->SendOk(); + } + + LOG_FIRST_N(ERROR, 10) << "Subcommand " << sub_cmd << " not supported"; + (*cntx)->SendError(kSyntaxErr); +} + void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) { ToUpper(&args[1]); string_view sub_cmd = ArgS(args, 1); @@ -678,6 +690,18 @@ void ServerFamily::LastSave(CmdArgList args, ConnectionContext* cntx) { (*cntx)->SendLong(last_save_); } +void ServerFamily::Latency(CmdArgList args, ConnectionContext* cntx) { + ToUpper(&args[1]); + string_view sub_cmd = ArgS(args, 1); + + if (sub_cmd == "LATEST") { + return (*cntx)->StartArray(0); + } + + LOG_FIRST_N(ERROR, 10) << "Subcommand " << sub_cmd << " not supported"; + (*cntx)->SendError(kSyntaxErr); +} + void ServerFamily::_Shutdown(CmdArgList args, ConnectionContext* cntx) { CHECK_NOTNULL(acceptor_)->Stop(); (*cntx)->SendOk(); @@ -705,6 +729,7 @@ void ServerFamily::Register(CommandRegistry* registry) { *registry << CI{"AUTH", CO::NOSCRIPT | CO::FAST | CO::LOADING, -2, 0, 0, 0}.HFUNC(Auth) << CI{"BGSAVE", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Save) + << CI{"CLIENT", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.HFUNC(Client) << CI{"CONFIG", CO::ADMIN, -2, 0, 0, 0}.HFUNC(Config) << CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize) << CI{"DEBUG", CO::RANDOM | CO::ADMIN | CO::LOADING, -2, 0, 0, 0}.HFUNC(Debug) @@ -713,6 +738,8 @@ void ServerFamily::Register(CommandRegistry* registry) { << CI{"INFO", CO::LOADING, -1, 0, 0, 0}.HFUNC(Info) << CI{"HELLO", CO::LOADING, -1, 0, 0, 0}.HFUNC(Hello) << CI{"LASTSAVE", CO::LOADING | CO::RANDOM | CO::FAST, 1, 0, 0, 0}.HFUNC(LastSave) + << CI{"LATENCY", CO::NOSCRIPT | CO::LOADING | CO::RANDOM | CO::FAST, -2, 0, 0, 0}.HFUNC( + Latency) << CI{"MEMORY", kMemOpts, -2, 0, 0, 0}.HFUNC(Memory) << CI{"SAVE", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Save) << CI{"SHUTDOWN", CO::ADMIN | CO::NOSCRIPT | CO::LOADING, 1, 0, 0, 0}.HFUNC(_Shutdown) diff --git a/src/server/server_family.h b/src/server/server_family.h index 30df823..8b477d0 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -68,6 +68,7 @@ class ServerFamily { } void Auth(CmdArgList args, ConnectionContext* cntx); + void Client(CmdArgList args, ConnectionContext* cntx); void Config(CmdArgList args, ConnectionContext* cntx); void DbSize(CmdArgList args, ConnectionContext* cntx); void Debug(CmdArgList args, ConnectionContext* cntx); @@ -77,6 +78,7 @@ class ServerFamily { void Info(CmdArgList args, ConnectionContext* cntx); void Hello(CmdArgList args, ConnectionContext* cntx); void LastSave(CmdArgList args, ConnectionContext* cntx); + void Latency(CmdArgList args, ConnectionContext* cntx); void Psync(CmdArgList args, ConnectionContext* cntx); void ReplicaOf(CmdArgList args, ConnectionContext* cntx); void Role(CmdArgList args, ConnectionContext* cntx); @@ -84,6 +86,7 @@ class ServerFamily { void Script(CmdArgList args, ConnectionContext* cntx); void Sync(CmdArgList args, ConnectionContext* cntx); + void _Shutdown(CmdArgList args, ConnectionContext* cntx); void SyncGeneric(std::string_view repl_master_id, uint64_t offs, ConnectionContext* cntx); diff --git a/src/server/string_family.cc b/src/server/string_family.cc index e9c5be2..8cd6022 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -904,7 +904,7 @@ void StringFamily::Register(CommandRegistry* registry) { << CI{"DECRBY", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, 1}.HFUNC(DecrBy) << CI{"GET", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Get) << CI{"GETSET", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, 1}.HFUNC(GetSet) - << CI{"MGET", CO::READONLY | CO::FAST, -2, 1, -1, 1}.HFUNC(MGet) + << CI{"MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING, -2, 1, -1, 1}.HFUNC(MGet) << CI{"MSET", CO::WRITE | CO::DENYOOM, -3, 1, -1, 2}.HFUNC(MSet) << CI{"MSETNX", CO::WRITE | CO::DENYOOM, -3, 1, -1, 2}.HFUNC(MSetNx) << CI{"STRLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(StrLen) diff --git a/src/server/transaction.cc b/src/server/transaction.cc index c8b1256..fa53c89 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -74,36 +74,42 @@ Transaction::~Transaction() { * **/ -void Transaction::InitByArgs(DbIndex index, CmdArgList args) { +OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) { db_index_ = index; if (IsGlobal()) { unique_shard_cnt_ = ess_->size(); shard_data_.resize(unique_shard_cnt_); - return; + return OpStatus::OK; } CHECK_GT(args.size(), 1U); // first entry is the command name. DCHECK_EQ(unique_shard_cnt_, 0u); DCHECK(args_.empty()); - KeyIndex key_index = DetermineKeys(cid_, args); + OpResult key_index_res = DetermineKeys(cid_, args); + if (!key_index_res) + return key_index_res.status(); + + const auto& key_index = *key_index_res; if (key_index.start == args.size()) { // eval with 0 keys. CHECK(absl::StartsWith(cid_->name(), "EVAL")); - return; + return OpStatus::OK; } DCHECK_LT(key_index.start, args.size()); DCHECK_GT(key_index.start, 0u); bool incremental_locking = multi_ && multi_->incremental; - bool single_key = !multi_ && (key_index.start + key_index.step) >= key_index.end; + bool single_key = !multi_ && key_index.HasSingleKey(); if (single_key) { DCHECK_GT(key_index.step, 0u); shard_data_.resize(1); // Single key optimization + + // even for a single key we may have multiple arguments per key (MSET). for (unsigned j = key_index.start; j < key_index.start + key_index.step; ++j) { args_.push_back(ArgS(args, j)); } @@ -112,7 +118,7 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) { unique_shard_cnt_ = 1; unique_shard_id_ = Shard(key, ess_->size()); - return; + return OpStatus::OK; } // Our shard_data is not sparse, so we must allocate for all threads :( @@ -131,6 +137,8 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) { // and regular commands. IntentLock::Mode mode = IntentLock::EXCLUSIVE; bool should_record_locks = false; + bool needs_reverse_mapping = cid_->opt_mask() & CO::REVERSE_MAPPING; + if (multi_) { mode = Mode(); tmp_space.uniq_keys.clear(); @@ -138,11 +146,22 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) { should_record_locks = incremental_locking || !multi_->locks_recorded; } + if (key_index.bonus) { // additional one-of key. + DCHECK(key_index.step == 1); + DCHECK(!needs_reverse_mapping); + + string_view key = ArgS(args, key_index.bonus); + uint32_t sid = Shard(key, shard_data_.size()); + shard_index[sid].args.push_back(key); + } + for (unsigned i = key_index.start; i < key_index.end; ++i) { string_view key = ArgS(args, i); uint32_t sid = Shard(key, shard_data_.size()); + shard_index[sid].args.push_back(key); - shard_index[sid].original_index.push_back(i - 1); + if (needs_reverse_mapping) + shard_index[sid].original_index.push_back(i - 1); if (should_record_locks && tmp_space.uniq_keys.insert(key).second) { multi_->locks[key].cnt[int(mode)]++; @@ -150,9 +169,11 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) { if (key_index.step == 2) { // value ++i; - auto val = ArgS(args, i); + + string_view val = ArgS(args, i); shard_index[sid].args.push_back(val); - shard_index[sid].original_index.push_back(i - 1); + if (needs_reverse_mapping) + shard_index[sid].original_index.push_back(i - 1); } } @@ -160,8 +181,11 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) { multi_->locks_recorded = true; } - args_.resize(key_index.end - key_index.start); - reverse_index_.resize(args_.size()); + args_.resize(key_index.num_args()); + + // we need reverse index only for blocking commands or commands like MSET. + if (needs_reverse_mapping) + reverse_index_.resize(args_.size()); auto next_arg = args_.begin(); auto rev_indx_it = reverse_index_.begin(); @@ -192,11 +216,11 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) { uint32_t orig_indx = 0; for (size_t j = 0; j < si.args.size(); ++j) { *next_arg = si.args[j]; - *rev_indx_it = si.original_index[orig_indx]; - + if (needs_reverse_mapping) { + *rev_indx_it++ = si.original_index[orig_indx]; + } ++next_arg; ++orig_indx; - ++rev_indx_it; } } @@ -224,6 +248,8 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) { DCHECK_EQ(TxQueue::kEnd, sd.pq_pos); } } + + return OpStatus::OK; } void Transaction::SetExecCmd(const CommandId* cid) { @@ -1126,4 +1152,53 @@ void Transaction::BreakOnClose() { } } + +OpResult DetermineKeys(const CommandId* cid, CmdArgList args) { + DCHECK_EQ(0u, cid->opt_mask() & CO::GLOBAL_TRANS); + + KeyIndex key_index; + int num_custom_keys = -1; + + if (cid->opt_mask() & CO::DESTINATION_KEY) { + key_index.bonus = 1; + if (args.size() < 3) { + return OpStatus::SYNTAX_ERR; + } + string_view num(ArgS(args, 2)); + if (!absl::SimpleAtoi(num, &num_custom_keys) || num_custom_keys < 0 || + size_t(num_custom_keys) + 3 > args.size()) + return OpStatus::INVALID_INT; + } + + if (cid->first_key_pos() > 0) { + key_index.start = cid->first_key_pos(); + int last = cid->last_key_pos(); + if (num_custom_keys >= 0) { + key_index.end = key_index.start + num_custom_keys; + } else { + key_index.end = last > 0 ? last + 1 : (int(args.size()) + 1 + last); + } + key_index.step = cid->key_arg_step(); + + return key_index; + } + + string_view name{cid->name()}; + if (name == "EVAL" || name == "EVALSHA") { + DCHECK_GE(args.size(), 3u); + uint32_t num_keys; + + CHECK(absl::SimpleAtoi(ArgS(args, 2), &num_keys)); + key_index.start = 3; + key_index.end = 3 + num_keys; + key_index.step = 1; + + return key_index; + } + + LOG(FATAL) << "TBD: Not supported"; + + return key_index; +} + } // namespace dfly diff --git a/src/server/transaction.h b/src/server/transaction.h index 0cc9956..bc0331c 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -61,7 +61,7 @@ class Transaction { Transaction(const CommandId* cid, EngineShardSet* ess); - void InitByArgs(DbIndex index, CmdArgList args); + OpStatus InitByArgs(DbIndex index, CmdArgList args); void SetExecCmd(const CommandId* cid); @@ -342,4 +342,6 @@ inline uint16_t trans_id(const Transaction* ptr) { return intptr_t(ptr) & 0xFFFF; } +OpResult DetermineKeys(const CommandId* cid, CmdArgList args); + } // namespace dfly diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index 3632499..aa2ac87 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -422,10 +422,10 @@ void IntervalVisitor::ExtractSkipList(const zrangespec& range) { void IntervalVisitor::ExtractListPack(const zlexrangespec& range) { uint8_t* zl = (uint8_t*)zobj_->ptr; - uint8_t *eptr, *sptr; - uint8_t* vstr; - unsigned int vlen; - long long vlong; + uint8_t *eptr, *sptr = nullptr; + uint8_t* vstr = nullptr; + unsigned int vlen = 0; + long long vlong = 0; unsigned offset = params_.offset; unsigned limit = params_.limit; @@ -748,6 +748,10 @@ void ZSetFamily::ZIncrBy(CmdArgList args, ConnectionContext* cntx) { (*cntx)->SendDouble(add_result.new_score); } +void ZSetFamily::ZInterStore(CmdArgList args, ConnectionContext* cntx) { + +} + void ZSetFamily::ZLexCount(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); @@ -979,6 +983,19 @@ void ZSetFamily::ZScan(CmdArgList args, ConnectionContext* cntx) { } } +void ZSetFamily::ZUnionStore(CmdArgList args, ConnectionContext* cntx) { + auto cb = [&](Transaction* t, EngineShard* es) { + auto args = t->ShardArgsInShard(es->shard_id()); + for (auto x : args) { + LOG(INFO) << "arg " << x; + } + return OpStatus::OK; + }; + + OpStatus result = cntx->transaction->ScheduleSingleHop(std::move(cb)); + (*cntx)->SendOk(); +} + void ZSetFamily::ZRangeByScoreInternal(string_view key, string_view min_s, string_view max_s, const RangeParams& params, ConnectionContext* cntx) { ZRangeSpec range_spec; @@ -1201,7 +1218,7 @@ OpStatus ZSetFamily::OpAdd(const ZParams& zparams, const OpArgs& op_args, string unsigned processed = 0; sds& tmp_str = op_args.shard->tmp_str1; - double new_score; + double new_score = 0; int retflags = 0; OpStatus res = OpStatus::OK; @@ -1483,6 +1500,7 @@ void ZSetFamily::Register(CommandRegistry* registry) { << CI{"ZCARD", CO::FAST | CO::READONLY, 2, 1, 1, 1}.HFUNC(ZCard) << CI{"ZCOUNT", CO::FAST | CO::READONLY, 4, 1, 1, 1}.HFUNC(ZCount) << CI{"ZINCRBY", CO::FAST | CO::WRITE | CO::DENYOOM, 4, 1, 1, 1}.HFUNC(ZIncrBy) + << CI{"ZINTERSTORE", CO::WRITE | CO::DESTINATION_KEY, -4, 1, 1, 1}.HFUNC(ZInterStore) << CI{"ZLEXCOUNT", CO::READONLY, 4, 1, 1, 1}.HFUNC(ZLexCount) << CI{"ZREM", CO::FAST | CO::WRITE, -3, 1, 1, 1}.HFUNC(ZRem) << CI{"ZRANGE", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRange) @@ -1496,7 +1514,8 @@ void ZSetFamily::Register(CommandRegistry* registry) { << CI{"ZREVRANGE", CO::READONLY, 4, 1, 1, 1}.HFUNC(ZRevRange) << CI{"ZREVRANGEBYSCORE", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRevRangeByScore) << CI{"ZREVRANK", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(ZRevRank) - << CI{"ZSCAN", CO::READONLY | CO::RANDOM, -3, 1, 1, 1}.HFUNC(ZScan); + << CI{"ZSCAN", CO::READONLY | CO::RANDOM, -3, 1, 1, 1}.HFUNC(ZScan) + << CI{"ZUNIONSTORE", CO::WRITE | CO::DESTINATION_KEY, -4, 3, 3, 1}.HFUNC(ZUnionStore); } } // namespace dfly diff --git a/src/server/zset_family.h b/src/server/zset_family.h index d807b1c..dda433e 100644 --- a/src/server/zset_family.h +++ b/src/server/zset_family.h @@ -56,6 +56,7 @@ class ZSetFamily { static void ZCard(CmdArgList args, ConnectionContext* cntx); static void ZCount(CmdArgList args, ConnectionContext* cntx); static void ZIncrBy(CmdArgList args, ConnectionContext* cntx); + static void ZInterStore(CmdArgList args, ConnectionContext* cntx); static void ZLexCount(CmdArgList args, ConnectionContext* cntx); static void ZRange(CmdArgList args, ConnectionContext* cntx); static void ZRank(CmdArgList args, ConnectionContext* cntx); @@ -70,6 +71,7 @@ class ZSetFamily { static void ZRevRangeByScore(CmdArgList args, ConnectionContext* cntx); static void ZRevRank(CmdArgList args, ConnectionContext* cntx); static void ZScan(CmdArgList args, ConnectionContext* cntx); + static void ZUnionStore(CmdArgList args, ConnectionContext* cntx); static void ZRangeByScoreInternal(std::string_view key, std::string_view min_s, std::string_view max_s, const RangeParams& params,