Add generic_family_test. Minor cleanups

This commit is contained in:
Roman Gershman 2022-01-04 15:11:37 +02:00
parent 5a7a67fbcf
commit 29cdcb96ec
16 changed files with 195 additions and 18 deletions

View File

@ -10,6 +10,7 @@ namespace dfly {
enum class OpStatus : uint16_t {
OK,
KEY_EXISTS,
KEY_NOTFOUND,
SKIPPED,
WRONG_TYPE,

2
helio

@ -1 +1 @@
Subproject commit b4bfdd0b8e67b293e98d5d3ce92c60ed05646b61
Subproject commit b3675ad8860a315fffbb14a1247cbfec5e5b8ea5

View File

@ -16,4 +16,9 @@ cxx_link(dfly_test_lib dragonfly_lib gtest_main_ext)
cxx_test(redis_parser_test dfly_test_lib LABELS DFLY)
cxx_test(list_family_test dfly_test_lib LABELS DFLY)
cxx_test(string_family_test dfly_test_lib LABELS DFLY)
cxx_test(generic_family_test dfly_test_lib LABELS DFLY)
cxx_test(memcache_parser_test dfly_test_lib LABELS DFLY)
add_custom_target(check_dfly DEPENDS COMMAND ctest -L DFLY)
add_dependencies(check_dfly redis_parser_test list_family_test string_family_test
generic_family_test memcache_parser_test)

View File

@ -20,6 +20,7 @@ string WrongNumArgsError(std::string_view cmd) {
const char kSyntaxErr[] = "syntax error";
const char kWrongTypeErr[] = "-WRONGTYPE Operation against a key holding the wrong kind of value";
const char kKeyNotFoundErr[] = "no such key";
const char kInvalidIntErr[] = "value is not an integer or out of range";
const char kUintErr[] = "value is out of range, must be positive";
const char kDbIndOutOfRangeErr[] = "DB index is out of range";

View File

@ -45,7 +45,7 @@ void DbSlice::Reserve(DbIndex db_ind, size_t key_size) {
auto DbSlice::Find(DbIndex db_index, std::string_view key, unsigned obj_type) const -> OpResult<MainIterator> {
auto [it, expire_it] = FindExt(db_index, key);
if (it == MainIterator{})
if (!IsValid(it))
return OpStatus::KEY_NOTFOUND;
return it;
@ -57,7 +57,7 @@ pair<MainIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, std::string_
auto& db = db_arr_[db_ind];
MainIterator it = db->main_table.find(key);
if (it == MainIterator{}) {
if (!IsValid(it)) {
return make_pair(it, ExpireIterator{});
}
@ -65,7 +65,7 @@ pair<MainIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, std::string_
if (it->second.HasExpire()) { // check expiry state
expire_it = db->expire_table.find(it->first);
CHECK(expire_it != ExpireIterator{});
CHECK(IsValid(expire_it));
if (expire_it->second <= now_ms_) {
db->expire_table.erase(expire_it);
@ -108,7 +108,7 @@ void DbSlice::CreateDb(DbIndex index) {
bool DbSlice::Del(DbIndex db_ind, const MainIterator& it) {
auto& db = db_arr_[db_ind];
if (it == MainIterator{}) {
if (!IsValid(it)) {
return false;
}

View File

@ -12,6 +12,7 @@ std::string WrongNumArgsError(std::string_view cmd);
extern const char kSyntaxErr[];
extern const char kWrongTypeErr[];
extern const char kKeyNotFoundErr[];
extern const char kInvalidIntErr[];
extern const char kUintErr[];
extern const char kDbIndOutOfRangeErr[];

View File

@ -128,6 +128,11 @@ void GenericFamily::ExpireAt(CmdArgList args, ConnectionContext* cntx) {
cntx->SendLong(status == OpStatus::OK);
}
void GenericFamily::Rename(CmdArgList args, ConnectionContext* cntx) {
OpResult<void> st = RenameGeneric(args, false, cntx);
cntx->SendError(st.status());
}
void GenericFamily::Ttl(CmdArgList args, ConnectionContext* cntx) {
TtlGeneric(args, cntx, TimeUnit::SEC);
}
@ -175,6 +180,26 @@ void GenericFamily::Select(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendOk();
}
OpResult<void> GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_dest,
ConnectionContext* cntx) {
std::string_view key[2] = {ArgS(args, 1), ArgS(args, 2)};
Transaction* transaction = cntx->transaction;
if (transaction->unique_shard_cnt() == 1) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpRen(OpArgs{shard, t->db_index()}, key[0], key[1], skip_exist_dest);
};
OpResult<void> result = transaction->ScheduleSingleHopT(std::move(cb));
return result;
}
// TODO: to finish it
return OpStatus::OK;
}
void GenericFamily::Echo(CmdArgList args, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1);
return cntx->SendBulkString(key);
@ -184,7 +209,7 @@ OpStatus GenericFamily::OpExpire(const OpArgs& op_args, std::string_view key,
const ExpireParams& params) {
auto& db_slice = op_args.shard->db_slice();
auto [it, expire_it] = db_slice.FindExt(op_args.db_ind, key);
if (it == MainIterator{})
if (!IsValid(it))
return OpStatus::KEY_NOTFOUND;
int64_t abs_msec = (params.unit == TimeUnit::SEC) ? params.ts * 1000 : params.ts;
@ -195,7 +220,7 @@ OpStatus GenericFamily::OpExpire(const OpArgs& op_args, std::string_view key,
if (abs_msec <= int64_t(db_slice.Now())) {
CHECK(db_slice.Del(op_args.db_ind, it));
} else if (expire_it != ExpireIterator{}) {
} else if (IsValid(expire_it)) {
expire_it->second = abs_msec;
} else {
db_slice.Expire(op_args.db_ind, it, abs_msec);
@ -207,10 +232,10 @@ OpStatus GenericFamily::OpExpire(const OpArgs& op_args, std::string_view key,
OpResult<uint64_t> GenericFamily::OpTtl(Transaction* t, EngineShard* shard, std::string_view key) {
auto& db_slice = shard->db_slice();
auto [it, expire] = db_slice.FindExt(t->db_index(), key);
if (it == MainIterator{})
if (!IsValid(it))
return OpStatus::KEY_NOTFOUND;
if (expire == ExpireIterator{})
if (!IsValid(expire))
return OpStatus::SKIPPED;
int64_t ttl_ms = expire->second - db_slice.Now();
@ -226,7 +251,7 @@ OpResult<uint32_t> GenericFamily::OpDel(const OpArgs& op_args, ArgSlice keys) {
for (uint32_t i = 0; i < keys.size(); ++i) {
auto fres = db_slice.FindExt(op_args.db_ind, keys[i]);
if (fres.first == MainIterator{})
if (!IsValid(fres.first))
continue;
res += int(db_slice.Del(op_args.db_ind, fres.first));
}
@ -241,11 +266,33 @@ OpResult<uint32_t> GenericFamily::OpExists(const OpArgs& op_args, ArgSlice keys)
for (uint32_t i = 0; i < keys.size(); ++i) {
auto find_res = db_slice.FindExt(op_args.db_ind, keys[i]);
res += (find_res.first != MainIterator{});
res += IsValid(find_res.first);
}
return res;
}
OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, std::string_view from,
std::string_view to, bool skip_exists) {
auto& db_slice = op_args.shard->db_slice();
auto [from_it, expire_it] = db_slice.FindExt(op_args.db_ind, from);
if (!IsValid(from_it))
return OpStatus::KEY_NOTFOUND;
auto to_de = db_slice.FindExt(op_args.db_ind, to);
if (IsValid(to_de.first)) {
if (skip_exists)
return OpStatus::KEY_EXISTS;
CHECK(db_slice.Del(op_args.db_ind, to_de.first));
}
uint64_t exp_ts = IsValid(expire_it) ? expire_it->second : 0;
db_slice.AddNew(op_args.db_ind, to, std::move(from_it->second), exp_ts);
CHECK(db_slice.Del(op_args.db_ind, from_it));
return OpStatus::OK;
}
using CI = CommandId;
#define HFUNC(x) SetHandler(&GenericFamily::x)
@ -258,6 +305,7 @@ void GenericFamily::Register(CommandRegistry* registry) {
<< CI{"EXISTS", CO::READONLY | CO::FAST, -2, 1, -1, 1}.HFUNC(Exists)
<< CI{"EXPIRE", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(Expire)
<< CI{"EXPIREAT", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(ExpireAt)
<< CI{"RENAME", CO::WRITE, 3, 1, 2, 1}.HFUNC(Rename)
<< CI{"SELECT", kSelectOpts, 2, 0, 0, 0}.HFUNC(Select)
<< CI{"TTL", CO::READONLY | CO::FAST | CO::RANDOM, 2, 1, 1, 1}.HFUNC(Ttl)
<< CI{"PTTL", CO::READONLY | CO::FAST | CO::RANDOM, 2, 1, 1, 1}.HFUNC(Pttl);

View File

@ -40,12 +40,16 @@ class GenericFamily {
static void Expire(CmdArgList args, ConnectionContext* cntx);
static void ExpireAt(CmdArgList args, ConnectionContext* cntx);
static void Rename(CmdArgList args, ConnectionContext* cntx);
static void RenameNx(CmdArgList args, ConnectionContext* cntx);
static void Ttl(CmdArgList args, ConnectionContext* cntx);
static void Pttl(CmdArgList args, ConnectionContext* cntx);
static void Echo(CmdArgList args, ConnectionContext* cntx);
static void Select(CmdArgList args, ConnectionContext* cntx);
static OpResult<void> RenameGeneric(CmdArgList args, bool skip_exist_dest,
ConnectionContext* cntx);
static void TtlGeneric(CmdArgList args, ConnectionContext* cntx, TimeUnit unit);
static OpStatus OpExpire(const OpArgs& op_args, std::string_view key, const ExpireParams& params);
@ -53,6 +57,8 @@ class GenericFamily {
static OpResult<uint64_t> OpTtl(Transaction* t, EngineShard* shard, std::string_view key);
static OpResult<uint32_t> OpDel(const OpArgs& op_args, ArgSlice keys);
static OpResult<uint32_t> OpExists(const OpArgs& op_args, ArgSlice keys);
static OpResult<void> OpRen(const OpArgs& op_args, std::string_view from, std::string_view to,
bool skip_exists);
};
} // namespace dfly

View File

@ -0,0 +1,87 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/generic_family.h"
#include "base/gtest.h"
#include "base/logging.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/engine_shard_set.h"
#include "server/string_family.h"
#include "server/test_utils.h"
#include "server/transaction.h"
#include "util/uring/uring_pool.h"
using namespace testing;
using namespace std;
using namespace util;
using namespace boost;
using absl::StrCat;
namespace dfly {
class GenericFamilyTest : public BaseFamilyTest {
};
TEST_F(GenericFamilyTest, Expire) {
constexpr uint64_t kNow = 1636070340000;
UpdateTime(kNow);
Run({"set", "key", "val"});
auto resp = Run({"expire", "key", "1"});
EXPECT_THAT(resp[0], IntArg(1));
UpdateTime(kNow + 1000);
resp = Run({"get", "key"});
EXPECT_THAT(resp, ElementsAre(ArgType(RespExpr::NIL)));
Run({"set", "key", "val"});
resp = Run({"expireat", "key", absl::StrCat((kNow + 2000) / 1000)});
EXPECT_THAT(resp[0], IntArg(1));
resp = Run({"expireat", "key", absl::StrCat((kNow + 3000) / 1000)});
EXPECT_THAT(resp[0], IntArg(1));
UpdateTime(kNow + 2999);
resp = Run({"get", "key"});
EXPECT_THAT(resp[0], "val");
UpdateTime(kNow + 3000);
resp = Run({"get", "key"});
EXPECT_THAT(resp[0], ArgType(RespExpr::NIL));
}
TEST_F(GenericFamilyTest, Del) {
for (size_t i = 0; i < 1000; ++i) {
Run({"set", StrCat("foo", i), "1"});
Run({"set", StrCat("bar", i), "1"});
}
ASSERT_EQ(2000, CheckedInt({"dbsize"}));
auto exist_fb = pp_->at(0)->LaunchFiber([&] {
for (size_t i = 0; i < 1000; ++i) {
int64_t resp = CheckedInt({"exists", StrCat("foo", i), StrCat("bar", i)});
ASSERT_TRUE(2 == resp || resp == 0) << resp << " " << i;
}
});
auto del_fb = pp_->at(2)->LaunchFiber([&] {
for (size_t i = 0; i < 1000; ++i) {
auto resp = CheckedInt({"del", StrCat("foo", i), StrCat("bar", i)});
ASSERT_EQ(2, resp);
}
});
exist_fb.join();
del_fb.join();
}
TEST_F(GenericFamilyTest, Exists) {
Run({"mset", "x", "0", "y", "1"});
auto resp = Run({"exists", "x", "y", "x"});
EXPECT_THAT(resp[0], IntArg(3));
}
} // namespace dfly

View File

@ -225,7 +225,7 @@ OpResult<uint32_t> ListFamily::OpPush(const OpArgs& op_args, std::string_view ke
OpResult<string> ListFamily::OpPop(const OpArgs& op_args, string_view key, ListDir dir) {
auto& db_slice = op_args.shard->db_slice();
auto [it, expire] = db_slice.FindExt(op_args.db_ind, key);
if (it == MainIterator{})
if (!IsValid(it))
return OpStatus::KEY_NOTFOUND;
OpResult<string> res = ListPop(op_args.db_ind, it->second, dir);

View File

@ -5,7 +5,7 @@
#include "server/main_service.h"
extern "C" {
#include "redis/redis_aux.h"
#include "redis/redis_aux.h"
}
#include <absl/strings/ascii.h>
@ -201,6 +201,19 @@ void Service::Debug(CmdArgList args, ConnectionContext* cntx) {
return dbg_cmd.Run(args);
}
void Service::DbSize(CmdArgList args, ConnectionContext* cntx) {
atomic_ulong num_keys{0};
shard_set_.RunBriefInParallel(
[&](EngineShard* shard) {
auto db_size = shard->db_slice().DbSize(cntx->conn_state.db_index);
num_keys.fetch_add(db_size, memory_order_relaxed);
},
[](ShardId) { return true; });
return cntx->SendLong(num_keys.load(memory_order_relaxed));
}
VarzValue::Map Service::GetVarzStats() {
VarzValue::Map res;
@ -221,7 +234,8 @@ inline CommandId::Handler HandlerFunc(Service* se, ServiceFunc f) {
void Service::RegisterCommands() {
using CI = CommandId;
registry_ << CI{"DEBUG", CO::RANDOM | CO::READONLY, -2, 0, 0, 0}.HFUNC(Debug);
registry_ << CI{"DEBUG", CO::RANDOM | CO::READONLY, -2, 0, 0, 0}.HFUNC(Debug)
<< CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize);
StringFamily::Register(&registry_);
GenericFamily::Register(&registry_);

View File

@ -54,6 +54,7 @@ class Service {
private:
void Debug(CmdArgList args, ConnectionContext* cntx);
void DbSize(CmdArgList args, ConnectionContext* cntx);
void RegisterCommands();

View File

@ -7,6 +7,7 @@
#include <absl/strings/str_cat.h>
#include "base/logging.h"
#include "server/error.h"
using namespace std;
using absl::StrAppend;
@ -151,6 +152,9 @@ void ReplyBuilder::SendError(OpStatus status) {
case OpStatus::OK:
SendOk();
break;
case OpStatus::KEY_NOTFOUND:
SendError(kKeyNotFoundErr);
break;
default:
LOG(ERROR) << "Unsupported status " << status;
SendError("Internal error");

View File

@ -45,7 +45,7 @@ OpResult<void> SetCmd::Set(const SetParams& params, std::string_view key, std::s
auto [it, expire_it] = db_slice_->FindExt(params.db_index, key);
uint64_t at_ms = params.expire_after_ms ? params.expire_after_ms + db_slice_->Now() : 0;
if (it != MainIterator{}) { // existing
if (IsValid(it)) { // existing
if (params.how == SET_IF_NOTEXIST)
return OpStatus::SKIPPED;
@ -68,7 +68,7 @@ OpResult<void> SetCmd::Set(const SetParams& params, std::string_view key, std::s
OpResult<void> SetCmd::SetExisting(DbIndex db_ind, std::string_view value, uint64_t expire_at_ms,
MainIterator dest, ExpireIterator exp_it) {
if (exp_it != ExpireIterator{} && expire_at_ms) {
if (IsValid(exp_it) && expire_at_ms) {
exp_it->second = expire_at_ms;
} else {
db_slice_->Expire(db_ind, dest, expire_at_ms);

View File

@ -41,4 +41,12 @@ using ExpireTable = absl::flat_hash_map<std::string, uint64_t>;
using MainIterator = MainTable::iterator;
using ExpireIterator = ExpireTable::iterator;
inline bool IsValid(MainIterator it) {
return it != MainIterator{};
}
inline bool IsValid(ExpireIterator it) {
return it != ExpireIterator{};
}
} // namespace dfly

View File

@ -7,6 +7,7 @@
#include <absl/strings/match.h>
#include "base/logging.h"
#include "base/stl_util.h"
#include "server/dragonfly_connection.h"
#include "util/uring/uring_pool.h"
@ -18,7 +19,7 @@ using namespace std;
bool RespMatcher::MatchAndExplain(const RespExpr& e, MatchResultListener* listener) const {
if (e.type != type_) {
*listener << "\nWrong type: " << e.type;
*listener << "\nWrong type: " << RespExpr::TypeName(e.type);
return false;
}
@ -196,7 +197,7 @@ int64_t BaseFamilyTest::CheckedInt(std::initializer_list<std::string_view> list)
if (resp.front().type == RespExpr::NIL) {
return INT64_MIN;
}
CHECK_EQ(RespExpr::STRING, int(resp.front().type));
CHECK_EQ(RespExpr::STRING, int(resp.front().type)) << list;
string_view sv = ToSV(resp.front().GetBuf());
int64_t res;
CHECK(absl::SimpleAtoi(sv, &res)) << "|" << sv << "|";