Add LINSERT and INCRBYFLOAT commands

This commit is contained in:
Roman Gershman 2022-04-18 19:45:48 +03:00
parent e163747023
commit f2c05a277d
9 changed files with 168 additions and 14 deletions

View File

@ -133,7 +133,7 @@ API 2.0
- [X] BRPOP
- [ ] BRPOPLPUSH
- [ ] BLMOVE
- [ ] LINSERT
- [X] LINSERT
- [X] LPUSHX
- [X] RPUSHX
- [X] String Family
@ -146,7 +146,7 @@ API 2.0
- [ ] BITPOS
- [ ] GETBIT
- [X] GETRANGE
- [ ] INCRBYFLOAT
- [X] INCRBYFLOAT
- [X] PSETEX
- [ ] SETBIT
- [X] SETRANGE

View File

@ -18,6 +18,7 @@ enum class OpStatus : uint16_t {
WRONG_TYPE,
TIMED_OUT,
OUT_OF_MEMORY,
INVALID_FLOAT,
};
class OpResultBase {

View File

@ -214,6 +214,9 @@ void RedisReplyBuilder::SendError(OpStatus status) {
case OpStatus::OUT_OF_RANGE:
SendError(kIndexOutOfRange);
break;
case OpStatus::INVALID_FLOAT:
SendError(kInvalidFloatErr);
break;
default:
LOG(ERROR) << "Unsupported status " << status;
SendError("Internal error");

View File

@ -1074,7 +1074,7 @@ REDIS_STATIC void _quicklistInsert(quicklistIter *iter, quicklistEntry *entry,
}
void quicklistInsertBefore(quicklistIter *iter, quicklistEntry *entry,
void *value, const size_t sz)
const void *value, const size_t sz)
{
_quicklistInsert(iter, entry, value, sz, 0);
}

View File

@ -166,7 +166,7 @@ void quicklistAppendPlainNode(quicklist *quicklist, unsigned char *data, size_t
void quicklistInsertAfter(quicklistIter *iter, quicklistEntry *entry,
const void *value, const size_t sz);
void quicklistInsertBefore(quicklistIter *iter, quicklistEntry *entry,
void *value, const size_t sz);
const void *value, const size_t sz);
void quicklistDelEntry(quicklistIter *iter, quicklistEntry *entry);
void quicklistReplaceEntry(quicklistIter *iter, quicklistEntry *entry,
const void *data, size_t sz);

View File

@ -103,6 +103,16 @@ string ListPop(ListDir dir, quicklist* ql) {
return res;
}
bool ElemCompare(const quicklistEntry& entry, string_view elem) {
if (entry.value) {
return entry.sz == elem.size() &&
(entry.sz == 0 || memcmp(entry.value, elem.data(), entry.sz) == 0);
}
absl::AlphaNum an(entry.longval);
return elem == an.Piece();
}
class BPopper {
public:
explicit BPopper(ListDir dir);
@ -269,10 +279,39 @@ void ListFamily::LIndex(CmdArgList args, ConnectionContext* cntx) {
}
}
/* LINSERT <key> (BEFORE|AFTER) <pivot> <element> */
void ListFamily::LInsert(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
string_view param = ArgS(args, 2);
string_view pivot = ArgS(args, 3);
string_view elem = ArgS(args, 4);
int where;
ToUpper(&args[2]);
if (param == "AFTER") {
where = LIST_TAIL;
} else if (param == "BEFORE") {
where = LIST_HEAD;
} else {
return (*cntx)->SendError(kSyntaxErr);
}
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpInsert(OpArgs{shard, t->db_index()}, key, pivot, elem, where);
};
OpResult<int> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result.status() == OpStatus::WRONG_TYPE) {
return (*cntx)->SendError(kWrongTypeErr);
}
(*cntx)->SendLong(result.value());
}
void ListFamily::LTrim(CmdArgList args, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1);
std::string_view s_str = ArgS(args, 2);
std::string_view e_str = ArgS(args, 3);
string_view key = ArgS(args, 1);
string_view s_str = ArgS(args, 2);
string_view e_str = ArgS(args, 3);
int32_t start, end;
if (!absl::SimpleAtoi(s_str, &start) || !absl::SimpleAtoi(e_str, &end)) {
@ -581,8 +620,40 @@ OpResult<string> ListFamily::OpIndex(const OpArgs& op_args, std::string_view key
return str;
}
OpResult<uint32_t> ListFamily::OpRem(const OpArgs& op_args, std::string_view key,
std::string_view elem, long count) {
OpResult<int> ListFamily::OpInsert(const OpArgs& op_args, string_view key, string_view pivot,
string_view elem, int insert_param) {
auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST);
if (!res)
return res.status();
quicklist* ql = GetQL(res.value()->second);
quicklistEntry entry = QLEntry();
quicklistIter* qiter = quicklistGetIterator(ql, AL_START_HEAD);
bool found = false;
while (quicklistNext(qiter, &entry)) {
if (ElemCompare(entry, pivot)) {
if (insert_param == LIST_TAIL) {
quicklistInsertAfter(qiter, &entry, elem.data(), elem.size());
} else {
DCHECK_EQ(LIST_HEAD, insert_param);
quicklistInsertBefore(qiter, &entry, elem.data(), elem.size());
}
found = true;
break;
}
}
quicklistReleaseIterator(qiter);
if (found) {
return quicklistCount(ql);
}
return -1;
}
OpResult<uint32_t> ListFamily::OpRem(const OpArgs& op_args, string_view key, string_view elem,
long count) {
DCHECK(!elem.empty());
auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST);
if (!res)
@ -618,8 +689,7 @@ OpResult<uint32_t> ListFamily::OpRem(const OpArgs& op_args, std::string_view key
return removed;
}
OpStatus ListFamily::OpSet(const OpArgs& op_args, std::string_view key, std::string_view elem,
long index) {
OpStatus ListFamily::OpSet(const OpArgs& op_args, string_view key, string_view elem, long index) {
DCHECK(!elem.empty());
auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST);
if (!res)
@ -633,7 +703,7 @@ OpStatus ListFamily::OpSet(const OpArgs& op_args, std::string_view key, std::str
return OpStatus::OK;
}
OpStatus ListFamily::OpTrim(const OpArgs& op_args, std::string_view key, long start, long end) {
OpStatus ListFamily::OpTrim(const OpArgs& op_args, string_view key, long start, long end) {
auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST);
if (!res)
return res.status();
@ -731,6 +801,7 @@ void ListFamily::Register(CommandRegistry* registry) {
<< CI{"BRPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BRPop)
<< CI{"LLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(LLen)
<< CI{"LINDEX", CO::READONLY, 3, 1, 1, 1}.HFUNC(LIndex)
<< CI{"LINSERT", CO::WRITE, 5, 1, 1, 1}.HFUNC(LInsert)
<< CI{"LRANGE", CO::READONLY, 4, 1, 1, 1}.HFUNC(LRange)
<< CI{"LSET", CO::WRITE | CO::DENYOOM, 4, 1, 1, 1}.HFUNC(LSet)
<< CI{"LTRIM", CO::WRITE, 4, 1, 1, 1}.HFUNC(LTrim)

View File

@ -30,6 +30,7 @@ class ListFamily {
static void BRPop(CmdArgList args, ConnectionContext* cntx);
static void LLen(CmdArgList args, ConnectionContext* cntx);
static void LIndex(CmdArgList args, ConnectionContext* cntx);
static void LInsert(CmdArgList args, ConnectionContext* cntx);
static void LTrim(CmdArgList args, ConnectionContext* cntx);
static void LRange(CmdArgList args, ConnectionContext* cntx);
static void LRem(CmdArgList args, ConnectionContext* cntx);
@ -49,6 +50,8 @@ class ListFamily {
static OpResult<uint32_t> OpLen(const OpArgs& op_args, std::string_view key);
static OpResult<std::string> OpIndex(const OpArgs& op_args, std::string_view key, long index);
static OpResult<int> OpInsert(const OpArgs& op_args, std::string_view key, std::string_view pivot,
std::string_view elem, int insert_param);
static OpResult<uint32_t> OpRem(const OpArgs& op_args, std::string_view key,
std::string_view elem, long count);

View File

@ -9,6 +9,8 @@ extern "C" {
}
#include <absl/container/inlined_vector.h>
#include <double-conversion/double-to-string.h>
#include <double-conversion/string-to-double.h>
#include "base/logging.h"
#include "server/command_registry.h"
@ -25,6 +27,7 @@ namespace {
using namespace std;
using namespace facade;
using namespace double_conversion;
using CI = CommandId;
DEFINE_VARZ(VarzQps, set_qps);
@ -262,6 +265,30 @@ void StringFamily::IncrBy(CmdArgList args, ConnectionContext* cntx) {
return IncrByGeneric(key, val, cntx);
}
void StringFamily::IncrByFloat(CmdArgList args, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1);
std::string_view sval = ArgS(args, 2);
double val;
if (!absl::SimpleAtod(sval, &val)) {
return (*cntx)->SendError(kInvalidFloatErr);
}
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpIncrFloat(OpArgs{shard, t->db_index()}, key, val);
};
OpResult<double> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
auto* builder = (RedisReplyBuilder*)cntx->reply_builder();
DVLOG(2) << "IncrByGeneric " << key << "/" << result.value();
if (!result) {
return (*cntx)->SendError(result.status());
}
builder->SendDouble(result.value());
}
void StringFamily::Decr(CmdArgList args, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1);
return IncrByGeneric(key, -1, cntx);
@ -713,6 +740,53 @@ OpResult<int64_t> StringFamily::OpIncrBy(const OpArgs& op_args, std::string_view
return new_val;
}
OpResult<double> StringFamily::OpIncrFloat(const OpArgs& op_args, std::string_view key,
double val) {
auto& db_slice = op_args.shard->db_slice();
auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key);
char buf[64];
StringBuilder sb(buf, sizeof(buf));
if (inserted) {
CHECK(DoubleToStringConverter::EcmaScriptConverter().ToShortest(val, &sb));
char* str = sb.Finalize();
it->second.SetString(str);
return val;
}
if (it->second.ObjType() != OBJ_STRING)
return OpStatus::WRONG_TYPE;
if (it->second.Size() == 0)
return OpStatus::INVALID_FLOAT;
string tmp;
string_view slice = it->second.GetSlice(&tmp);
double base;
if (!absl::SimpleAtod(slice, &base)) {
return OpStatus::INVALID_FLOAT;
}
base += val;
if (isnan(base) || isinf(base)) {
return OpStatus::INVALID_FLOAT;
}
if (!DoubleToStringConverter::EcmaScriptConverter().ToShortest(base, &sb)) {
return OpStatus::INVALID_FLOAT;
}
char* str = sb.Finalize();
db_slice.PreUpdate(op_args.db_ind, it);
it->second.SetString(str);
db_slice.PostUpdate(op_args.db_ind, it);
return base;
}
OpResult<uint32_t> StringFamily::ExtendOrSet(const OpArgs& op_args, std::string_view key,
std::string_view val, bool prepend) {
auto& db_slice = op_args.shard->db_slice();
@ -783,6 +857,7 @@ void StringFamily::Register(CommandRegistry* registry) {
<< CI{"INCR", CO::WRITE | CO::DENYOOM | CO::FAST, 2, 1, 1, 1}.HFUNC(Incr)
<< CI{"DECR", CO::WRITE | CO::DENYOOM | CO::FAST, 2, 1, 1, 1}.HFUNC(Decr)
<< CI{"INCRBY", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, 1}.HFUNC(IncrBy)
<< CI{"INCRBYFLOAT", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, 1}.HFUNC(IncrByFloat)
<< CI{"DECRBY", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, 1}.HFUNC(DecrBy)
<< CI{"GET", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Get)
<< CI{"GETSET", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, 1}.HFUNC(GetSet)

View File

@ -12,8 +12,8 @@ namespace dfly {
class ConnectionContext;
class CommandRegistry;
using facade::OpStatus;
using facade::OpResult;
using facade::OpStatus;
class SetCmd {
DbSlice* db_slice_;
@ -49,7 +49,6 @@ class StringFamily {
static void Register(CommandRegistry* registry);
private:
static void Append(CmdArgList args, ConnectionContext* cntx);
static void Decr(CmdArgList args, ConnectionContext* cntx);
static void DecrBy(CmdArgList args, ConnectionContext* cntx);
@ -58,6 +57,7 @@ class StringFamily {
static void GetSet(CmdArgList args, ConnectionContext* cntx);
static void Incr(CmdArgList args, ConnectionContext* cntx);
static void IncrBy(CmdArgList args, ConnectionContext* cntx);
static void IncrByFloat(CmdArgList args, ConnectionContext* cntx);
static void MGet(CmdArgList args, ConnectionContext* cntx);
static void MSet(CmdArgList args, ConnectionContext* cntx);
static void MSetNx(CmdArgList args, ConnectionContext* cntx);
@ -89,6 +89,7 @@ class StringFamily {
// if skip_on_missing - returns KEY_NOTFOUND.
static OpResult<int64_t> OpIncrBy(const OpArgs& op_args, std::string_view key, int64_t val,
bool skip_on_missing);
static OpResult<double> OpIncrFloat(const OpArgs& op_args, std::string_view key, double val);
// Returns the length of the extended string. if prepend is false - appends the val.
static OpResult<uint32_t> ExtendOrSet(const OpArgs& op_args, std::string_view key,