Add HSCAN and HINCRBYFLOAT commands
This commit is contained in:
parent
a5b59dde25
commit
7c29ea445f
|
@ -132,7 +132,6 @@ API 2.0
|
|||
- [X] BLPOP
|
||||
- [X] BRPOP
|
||||
- [ ] BRPOPLPUSH
|
||||
- [ ] BLMOVE
|
||||
- [X] LINSERT
|
||||
- [X] LPUSHX
|
||||
- [X] RPUSHX
|
||||
|
@ -160,12 +159,12 @@ API 2.0
|
|||
- [X] HMGET
|
||||
- [X] HLEN
|
||||
- [X] HINCRBY
|
||||
- [ ] HINCRBYFLOAT
|
||||
- [X] HINCRBYFLOAT
|
||||
- [X] HGETALL
|
||||
- [X] HKEYS
|
||||
- [X] HSETNX
|
||||
- [X] HVALS
|
||||
- [ ] HSCAN
|
||||
- [X] HSCAN
|
||||
- [ ] PubSub family
|
||||
- [X] PUBLISH
|
||||
- [ ] PUBSUB
|
||||
|
|
|
@ -11,6 +11,8 @@ extern "C" {
|
|||
#include "redis/util.h"
|
||||
}
|
||||
|
||||
#include <double-conversion/double-to-string.h>
|
||||
|
||||
#include "base/logging.h"
|
||||
#include "facade/error.h"
|
||||
#include "server/command_registry.h"
|
||||
|
@ -23,6 +25,8 @@ using namespace std;
|
|||
namespace dfly {
|
||||
|
||||
using namespace facade;
|
||||
using namespace double_conversion;
|
||||
|
||||
namespace {
|
||||
|
||||
constexpr size_t kMaxListPackLen = 1024;
|
||||
|
@ -235,6 +239,38 @@ void HSetFamily::HIncrBy(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
|
||||
void HSetFamily::HIncrByFloat(CmdArgList args, ConnectionContext* cntx) {
|
||||
string_view key = ArgS(args, 1);
|
||||
string_view field = ArgS(args, 2);
|
||||
string_view incrs = ArgS(args, 3);
|
||||
double dval = 0;
|
||||
|
||||
if (!absl::SimpleAtod(incrs, &dval)) {
|
||||
return (*cntx)->SendError(kInvalidFloatErr);
|
||||
}
|
||||
|
||||
IncrByParam param{dval};
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpIncrBy(OpArgs{shard, t->db_index()}, key, field, ¶m);
|
||||
};
|
||||
|
||||
OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
|
||||
if (status == OpStatus::OK) {
|
||||
(*cntx)->SendDouble(get<double>(param));
|
||||
} else {
|
||||
switch (status) {
|
||||
case OpStatus::INVALID_VALUE:
|
||||
(*cntx)->SendError("hash value is not a float");
|
||||
break;
|
||||
default:
|
||||
(*cntx)->SendError(status);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void HSetFamily::HKeys(CmdArgList args, ConnectionContext* cntx) {
|
||||
HGetGeneric(args, cntx, FIELDS);
|
||||
}
|
||||
|
@ -266,6 +302,37 @@ void HSetFamily::HGetGeneric(CmdArgList args, ConnectionContext* cntx, uint8_t g
|
|||
}
|
||||
}
|
||||
|
||||
void HSetFamily::HScan(CmdArgList args, ConnectionContext* cntx) {
|
||||
std::string_view key = ArgS(args, 1);
|
||||
std::string_view token = ArgS(args, 2);
|
||||
|
||||
uint64_t cursor = 0;
|
||||
|
||||
if (!absl::SimpleAtoi(token, &cursor)) {
|
||||
return (*cntx)->SendError("invalid cursor");
|
||||
}
|
||||
|
||||
if (args.size() > 3) {
|
||||
return (*cntx)->SendError("scan options are not supported yet");
|
||||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpScan(OpArgs{shard, t->db_index()}, key, &cursor);
|
||||
};
|
||||
|
||||
OpResult<StringVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
if (result.status() != OpStatus::WRONG_TYPE) {
|
||||
(*cntx)->StartArray(2);
|
||||
(*cntx)->SendSimpleString(absl::StrCat(cursor));
|
||||
(*cntx)->StartArray(result->size());
|
||||
for (const auto& k : *result) {
|
||||
(*cntx)->SendBulkString(k);
|
||||
}
|
||||
} else {
|
||||
(*cntx)->SendError(result.status());
|
||||
}
|
||||
}
|
||||
|
||||
void HSetFamily::HSet(CmdArgList args, ConnectionContext* cntx) {
|
||||
string_view key = ArgS(args, 1);
|
||||
ToLower(&args[0]);
|
||||
|
@ -722,7 +789,46 @@ OpStatus HSetFamily::OpIncrBy(const OpArgs& op_args, string_view key, string_vie
|
|||
int exist_res = hashTypeGetValue(hset, op_args.shard->tmp_str1, &vstr, &vlen, &old_val);
|
||||
|
||||
if (holds_alternative<double>(*param)) {
|
||||
LOG(FATAL) << "TBD";
|
||||
long double value;
|
||||
double incr = get<double>(*param);
|
||||
if (exist_res == C_OK) {
|
||||
if (vstr) {
|
||||
const char* exist_val = reinterpret_cast<char*>(vstr);
|
||||
|
||||
if (!string2ld(exist_val, vlen, &value)) {
|
||||
stats->listpack_bytes += lpb;
|
||||
|
||||
return OpStatus::INVALID_VALUE;
|
||||
}
|
||||
} else {
|
||||
value = old_val;
|
||||
}
|
||||
value += incr;
|
||||
|
||||
if (isnan(value) || isinf(value)) {
|
||||
return OpStatus::INVALID_FLOAT;
|
||||
}
|
||||
} else {
|
||||
value = incr;
|
||||
}
|
||||
|
||||
char buf[128];
|
||||
StringBuilder sb(buf, sizeof(buf));
|
||||
CHECK(DoubleToStringConverter::EcmaScriptConverter().ToShortest(value, &sb));
|
||||
char* str = sb.Finalize();
|
||||
string_view sval{str};
|
||||
|
||||
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
uint8_t* lp = (uint8_t*)hset->ptr;
|
||||
|
||||
lp = LpInsert(lp, field, sval, false).first;
|
||||
hset->ptr = lp;
|
||||
stats->listpack_bytes += lpBytes(lp);
|
||||
} else {
|
||||
sds news = sdsnewlen(str, sval.size());
|
||||
hashTypeSet(hset, op_args.shard->tmp_str1, news, HASH_SET_TAKE_VALUE);
|
||||
}
|
||||
param->emplace<double>(value);
|
||||
} else {
|
||||
if (exist_res == C_OK && vstr) {
|
||||
const char* exist_val = reinterpret_cast<char*>(vstr);
|
||||
|
@ -763,6 +869,54 @@ OpStatus HSetFamily::OpIncrBy(const OpArgs& op_args, string_view key, string_vie
|
|||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
OpResult<StringVec> HSetFamily::OpScan(const OpArgs& op_args, std::string_view key,
|
||||
uint64_t* cursor) {
|
||||
OpResult<PrimeIterator> find_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_HASH);
|
||||
|
||||
if (!find_res)
|
||||
return find_res.status();
|
||||
|
||||
PrimeIterator it = find_res.value();
|
||||
StringVec res;
|
||||
uint32_t count = 20;
|
||||
robj* hset = it->second.AsRObj();
|
||||
|
||||
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
uint8_t* lp = (uint8_t*)hset->ptr;
|
||||
uint8_t* lp_elem = lpFirst(lp);
|
||||
|
||||
DCHECK(lp_elem); // empty containers are not allowed.
|
||||
|
||||
int64_t ele_len;
|
||||
unsigned char intbuf[LP_INTBUF_SIZE];
|
||||
|
||||
// We do single pass on listpack for this operation.
|
||||
do {
|
||||
uint8_t* elem = lpGet(lp_elem, &ele_len, intbuf);
|
||||
DCHECK(elem);
|
||||
res.emplace_back(reinterpret_cast<char*>(elem), size_t(ele_len));
|
||||
lp_elem = lpNext(lp, lp_elem); // switch to value
|
||||
} while (lp_elem);
|
||||
} else {
|
||||
dict* ht = (dict*)hset->ptr;
|
||||
long maxiterations = count * 10;
|
||||
void* privdata = &res;
|
||||
auto scanCb = [](void* privdata, const dictEntry* de) {
|
||||
StringVec* res = (StringVec*)privdata;
|
||||
sds val = (sds)de->key;
|
||||
res->emplace_back(val, sdslen(val));
|
||||
val = (sds)de->v.val;
|
||||
res->emplace_back(val, sdslen(val));
|
||||
};
|
||||
|
||||
do {
|
||||
*cursor = dictScan(ht, *cursor, scanCb, NULL, privdata);
|
||||
} while (*cursor && maxiterations-- && res.size() < count);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
using CI = CommandId;
|
||||
|
||||
#define HFUNC(x) SetHandler(&HSetFamily::x)
|
||||
|
@ -776,10 +930,13 @@ void HSetFamily::Register(CommandRegistry* registry) {
|
|||
<< CI{"HMGET", CO::FAST | CO::READONLY, -3, 1, 1, 1}.HFUNC(HMGet)
|
||||
<< CI{"HMSET", CO::WRITE | CO::FAST | CO::DENYOOM, -4, 1, 1, 1}.HFUNC(HSet)
|
||||
<< CI{"HINCRBY", CO::WRITE | CO::DENYOOM | CO::FAST, 4, 1, 1, 1}.HFUNC(HIncrBy)
|
||||
<< CI{"HINCRBYFLOAT", CO::WRITE | CO::DENYOOM | CO::FAST, 4, 1, 1, 1}.HFUNC(
|
||||
HIncrByFloat)
|
||||
<< CI{"HKEYS", CO::READONLY, 2, 1, 1, 1}.HFUNC(HKeys)
|
||||
|
||||
// TODO: add options support
|
||||
<< CI{"HRANDFIELD", CO::READONLY, 2, 1, 1, 1}.HFUNC(HRandField)
|
||||
<< CI{"HSCAN", CO::READONLY | CO::RANDOM, -3, 1, 1, 1}.HFUNC(HScan)
|
||||
<< CI{"HSET", CO::WRITE | CO::FAST | CO::DENYOOM, -4, 1, 1, 1}.HFUNC(HSet)
|
||||
<< CI{"HSETNX", CO::WRITE | CO::DENYOOM | CO::FAST, 4, 1, 1, 1}.HFUNC(HSetNx)
|
||||
<< CI{"HSTRLEN", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(HStrLen)
|
||||
|
|
|
@ -33,7 +33,8 @@ class HSetFamily {
|
|||
static void HKeys(CmdArgList args, ConnectionContext* cntx);
|
||||
static void HVals(CmdArgList args, ConnectionContext* cntx);
|
||||
static void HGetAll(CmdArgList args, ConnectionContext* cntx);
|
||||
|
||||
static void HIncrByFloat(CmdArgList args, ConnectionContext* cntx);
|
||||
static void HScan(CmdArgList args, ConnectionContext* cntx);
|
||||
static void HSet(CmdArgList args, ConnectionContext* cntx);
|
||||
static void HSetNx(CmdArgList args, ConnectionContext* cntx);
|
||||
static void HStrLen(CmdArgList args, ConnectionContext* cntx);
|
||||
|
@ -62,6 +63,8 @@ class HSetFamily {
|
|||
using IncrByParam = std::variant<double, int64_t>;
|
||||
static OpStatus OpIncrBy(const OpArgs& op_args, std::string_view key, std::string_view field,
|
||||
IncrByParam* param);
|
||||
|
||||
static OpResult<StringVec> OpScan(const OpArgs& op_args, std::string_view key, uint64_t* cursor);
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -745,7 +745,7 @@ OpResult<double> StringFamily::OpIncrFloat(const OpArgs& op_args, std::string_vi
|
|||
auto& db_slice = op_args.shard->db_slice();
|
||||
auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key);
|
||||
|
||||
char buf[64];
|
||||
char buf[128];
|
||||
StringBuilder sb(buf, sizeof(buf));
|
||||
|
||||
if (inserted) {
|
||||
|
|
Loading…
Reference in New Issue