Implement ZADD
This commit is contained in:
parent
2bdde23e1f
commit
6b869b41a7
|
@ -15,6 +15,7 @@ extern const char kSyntaxErr[];
|
|||
extern const char kWrongTypeErr[];
|
||||
extern const char kKeyNotFoundErr[];
|
||||
extern const char kInvalidIntErr[];
|
||||
extern const char kInvalidFloatErr[];
|
||||
extern const char kUintErr[];
|
||||
extern const char kDbIndOutOfRangeErr[];
|
||||
extern const char kInvalidDbIndErr[];
|
||||
|
|
|
@ -44,6 +44,7 @@ const char kSyntaxErr[] = "syntax error";
|
|||
const char kWrongTypeErr[] = "-WRONGTYPE Operation against a key holding the wrong kind of value";
|
||||
const char kKeyNotFoundErr[] = "no such key";
|
||||
const char kInvalidIntErr[] = "value is not an integer or out of range";
|
||||
const char kInvalidFloatErr[] = "value is not a valid float";
|
||||
const char kUintErr[] = "value is out of range, must be positive";
|
||||
const char kDbIndOutOfRangeErr[] = "DB index is out of range";
|
||||
const char kInvalidDbIndErr[] = "invalid DB index";
|
||||
|
|
1250
src/redis/t_set.c
1250
src/redis/t_set.c
File diff suppressed because it is too large
Load Diff
|
@ -21,6 +21,8 @@ cxx_test(list_family_test dfly_test_lib LABELS DFLY)
|
|||
cxx_test(set_family_test dfly_test_lib LABELS DFLY)
|
||||
cxx_test(string_family_test dfly_test_lib LABELS DFLY)
|
||||
cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/small.rdb LABELS DFLY)
|
||||
cxx_test(zset_family_test dfly_test_lib LABELS DFLY)
|
||||
|
||||
|
||||
add_custom_target(check_dfly WORKING_DIRECTORY .. COMMAND ctest -L DFLY)
|
||||
add_dependencies(check_dfly dragonfly_test list_family_test
|
||||
|
|
|
@ -5,30 +5,156 @@
|
|||
#include "server/zset_family.h"
|
||||
|
||||
extern "C" {
|
||||
#include "redis/object.h"
|
||||
#include "redis/zset.h"
|
||||
}
|
||||
|
||||
#include "base/logging.h"
|
||||
#include "facade/error.h"
|
||||
#include "server/command_registry.h"
|
||||
#include "server/conn_context.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "server/transaction.h"
|
||||
|
||||
namespace dfly {
|
||||
|
||||
using namespace std;
|
||||
using namespace facade;
|
||||
|
||||
namespace {
|
||||
|
||||
using CI = CommandId;
|
||||
|
||||
static const char kNxXxErr[] = "XX and NX options at the same time are not compatible";
|
||||
constexpr unsigned kMaxZiplistValue = 64;
|
||||
|
||||
|
||||
OpResult<MainIterator> FindZEntry(unsigned flags, const OpArgs& op_args, string_view key,
|
||||
size_t member_len) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
if (flags & ZADD_IN_XX) {
|
||||
return db_slice.Find(op_args.db_ind, key, OBJ_ZSET);
|
||||
}
|
||||
|
||||
auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key);
|
||||
if (inserted) {
|
||||
robj* zobj = nullptr;
|
||||
|
||||
if (member_len > kMaxZiplistValue) {
|
||||
zobj = createZsetObject();
|
||||
} else {
|
||||
zobj = createZsetListpackObject();
|
||||
}
|
||||
it->second.ImportRObj(zobj);
|
||||
} else {
|
||||
if (it->second.ObjType() != OBJ_ZSET)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
}
|
||||
return it;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
void ZSetFamily::ZCard(CmdArgList args, ConnectionContext* cntx) {
|
||||
(*cntx)->SendLong(0);
|
||||
string_view key = ArgS(args, 1);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<uint32_t> {
|
||||
OpResult<MainIterator> find_res = shard->db_slice().Find(t->db_index(), key, OBJ_ZSET);
|
||||
if (!find_res) {
|
||||
return find_res.status();
|
||||
}
|
||||
|
||||
return zsetLength(find_res.value()->second.AsRObj());
|
||||
};
|
||||
|
||||
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
if (result.status() == OpStatus::WRONG_TYPE) {
|
||||
(*cntx)->SendError(kWrongTypeErr);
|
||||
return;
|
||||
}
|
||||
|
||||
(*cntx)->SendLong(result.value());
|
||||
}
|
||||
|
||||
void ZSetFamily::ZAdd(CmdArgList args, ConnectionContext* cntx) {
|
||||
(*cntx)->SendLong(0);
|
||||
std::string_view key = ArgS(args, 1);
|
||||
|
||||
ZParams zparams;
|
||||
size_t i = 2;
|
||||
for (; i < args.size() - 1; ++i) {
|
||||
ToUpper(&args[i]);
|
||||
|
||||
std::string_view cur_arg = ArgS(args, i);
|
||||
|
||||
if (cur_arg == "XX") {
|
||||
zparams.flags |= ZADD_IN_XX; // update only
|
||||
} else if (cur_arg == "NX") {
|
||||
zparams.flags |= ZADD_IN_NX; // add new only.
|
||||
} else if (cur_arg == "GT") {
|
||||
zparams.flags |= ZADD_IN_GT;
|
||||
} else if (cur_arg == "LT") {
|
||||
zparams.flags |= ZADD_IN_LT;
|
||||
} else if (cur_arg == "CH") {
|
||||
zparams.ch = true;
|
||||
} else if (cur_arg == "INCR") {
|
||||
zparams.flags |= ZADD_IN_INCR;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if ((args.size() - i) % 2 != 0) {
|
||||
(*cntx)->SendError(kSyntaxErr);
|
||||
return;
|
||||
}
|
||||
|
||||
if ((zparams.flags & ZADD_IN_INCR) && (i + 2 < args.size())) {
|
||||
(*cntx)->SendError("INCR option supports a single increment-element pair");
|
||||
return;
|
||||
}
|
||||
|
||||
unsigned insert_mask = zparams.flags & (ZADD_IN_NX | ZADD_IN_XX);
|
||||
if (insert_mask == (ZADD_IN_NX | ZADD_IN_XX)) {
|
||||
(*cntx)->SendError(kNxXxErr);
|
||||
return;
|
||||
}
|
||||
|
||||
if ((zparams.flags & ZADD_IN_NX) && (zparams.flags & (ZADD_IN_GT | ZADD_IN_LT))) {
|
||||
(*cntx)->SendError("GT, LT, and/or NX options at the same time are not compatible");
|
||||
return;
|
||||
}
|
||||
|
||||
absl::InlinedVector<ScoredMemberView, 4> members;
|
||||
for (; i < args.size(); i += 2) {
|
||||
std::string_view cur_arg = ArgS(args, i);
|
||||
double val;
|
||||
if (!absl::SimpleAtod(cur_arg, &val) || !std::isfinite(val)) {
|
||||
(*cntx)->SendError(kInvalidFloatErr);
|
||||
return;
|
||||
}
|
||||
std::string_view member = ArgS(args, i + 1);
|
||||
members.emplace_back(val, member);
|
||||
}
|
||||
DCHECK(cntx->transaction);
|
||||
|
||||
if (zparams.flags & ZADD_IN_INCR) {
|
||||
LOG(FATAL) << "TBD";
|
||||
return;
|
||||
}
|
||||
|
||||
absl::Span memb_sp{members.data(), members.size()};
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpArgs op_args{shard, t->db_index()};
|
||||
return OpAdd(zparams, op_args, key, memb_sp);
|
||||
};
|
||||
|
||||
OpResult<unsigned> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
if (result.status() == OpStatus::WRONG_TYPE) {
|
||||
(*cntx)->SendError(kWrongTypeErr);
|
||||
} else {
|
||||
(*cntx)->SendLong(result.value());
|
||||
}
|
||||
}
|
||||
|
||||
void ZSetFamily::ZIncrBy(CmdArgList args, ConnectionContext* cntx) {
|
||||
|
@ -49,6 +175,46 @@ void ZSetFamily::ZScore(CmdArgList args, ConnectionContext* cntx) {
|
|||
(*cntx)->SendDouble(0);
|
||||
}
|
||||
|
||||
OpResult<unsigned> ZSetFamily::OpAdd(const ZParams& zparams, const OpArgs& op_args, string_view key,
|
||||
const ScoredMemberSpan& members) {
|
||||
DCHECK(!members.empty());
|
||||
OpResult<MainIterator> res_it =
|
||||
FindZEntry(zparams.flags, op_args, key, members.front().second.size());
|
||||
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
robj* zobj = res_it.value()->second.AsRObj();
|
||||
|
||||
unsigned added = 0;
|
||||
unsigned updated = 0;
|
||||
unsigned processed = 0;
|
||||
|
||||
sds& tmp_str = op_args.shard->tmp_str1;
|
||||
|
||||
for (size_t j = 0; j < members.size(); j++) {
|
||||
const auto& m = members[j];
|
||||
tmp_str = sdscpylen(tmp_str, m.second.data(), m.second.size());
|
||||
|
||||
int retflags = 0;
|
||||
int retval = zsetAdd(zobj, m.first, tmp_str, zparams.flags, &retflags, nullptr);
|
||||
|
||||
if (retval == 0) {
|
||||
LOG(FATAL) << "unexpected error in zsetAdd: " << m.first;
|
||||
}
|
||||
|
||||
if (retflags & ZADD_OUT_ADDED)
|
||||
added++;
|
||||
if (retflags & ZADD_OUT_UPDATED)
|
||||
updated++;
|
||||
if (!(retflags & ZADD_OUT_NOP))
|
||||
processed++;
|
||||
}
|
||||
res_it.value()->second.SyncRObj();
|
||||
|
||||
return zparams.ch ? added + updated : added;
|
||||
}
|
||||
|
||||
#define HFUNC(x) SetHandler(&ZSetFamily::x)
|
||||
|
||||
void ZSetFamily::Register(CommandRegistry* registry) {
|
||||
|
|
|
@ -26,6 +26,19 @@ class ZSetFamily {
|
|||
static void ZRem(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZScore(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZRangeByScore(CmdArgList args, ConnectionContext* cntx);
|
||||
|
||||
struct ZParams {
|
||||
unsigned flags = 0; // mask of ZADD_IN_ macros.
|
||||
bool ch = false; // Corresponds to CH option.
|
||||
};
|
||||
|
||||
using ScoredMemberView = std::pair<double, std::string_view>;
|
||||
using ScoredMemberSpan = absl::Span<ScoredMemberView>;
|
||||
template <typename T> using OpResult = facade::OpResult<T>;
|
||||
|
||||
static OpResult<unsigned> OpAdd(const ZParams& zparams, const OpArgs& op_args,
|
||||
std::string_view key, const ScoredMemberSpan& members);
|
||||
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
// Copyright 2022, Roman Gershman. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
|
||||
#include "server/zset_family.h"
|
||||
|
||||
#include "base/gtest.h"
|
||||
#include "base/logging.h"
|
||||
#include "facade/facade_test.h"
|
||||
#include "server/command_registry.h"
|
||||
#include "server/test_utils.h"
|
||||
|
||||
using namespace testing;
|
||||
using namespace std;
|
||||
using namespace util;
|
||||
using namespace boost;
|
||||
|
||||
namespace dfly {
|
||||
|
||||
class ZSetFamilyTest : public BaseFamilyTest {
|
||||
protected:
|
||||
};
|
||||
|
||||
TEST_F(ZSetFamilyTest, Add) {
|
||||
auto resp = Run({"zadd", "x", "1.1", "a"});
|
||||
EXPECT_THAT(resp[0], IntArg(1));
|
||||
resp = Run({"zadd", "x", "2", "a"});
|
||||
EXPECT_THAT(resp[0], IntArg(0));
|
||||
resp = Run({"zadd", "x", "ch", "3", "a"});
|
||||
EXPECT_THAT(resp[0], IntArg(1));
|
||||
}
|
||||
|
||||
|
||||
} // namespace dfly
|
Loading…
Reference in New Issue