Implement basic XADD, XRANGE, XLEN commands.

Covers most funvtionality for #59.
This commit is contained in:
Roman Gershman 2022-06-08 22:03:46 +03:00 committed by Roman Gershman
parent 819b2f0716
commit a9c0fa8ea4
16 changed files with 351 additions and 76 deletions

View File

@ -11,6 +11,7 @@ extern "C" {
#include "redis/intset.h"
#include "redis/listpack.h"
#include "redis/object.h"
#include "redis/stream.h"
#include "redis/util.h"
#include "redis/zmalloc.h" // for non-string objects.
#include "redis/zset.h"
@ -128,6 +129,10 @@ inline void FreeObjZset(unsigned encoding, void* ptr) {
}
}
inline void FreeObjStream(void* ptr) {
freeStream((stream*)ptr);
}
// Deniel's Lemire function validate_ascii_fast() - under Apache/MIT license.
// See https://github.com/lemire/fastvalidate-utf-8/
// The function returns true (1) if all chars passed in src are
@ -282,7 +287,7 @@ void RobjWrapper::Free(pmr::memory_resource* mr) {
LOG(FATAL) << "Unsupported OBJ_MODULE type";
break;
case OBJ_STREAM:
LOG(FATAL) << "Unsupported OBJ_STREAM type";
FreeObjStream(inner_obj_);
break;
default:
LOG(FATAL) << "Unknown object type";

View File

@ -17,6 +17,7 @@ extern "C" {
#include "redis/intset.h"
#include "redis/object.h"
#include "redis/redis_aux.h"
#include "redis/stream.h"
#include "redis/zmalloc.h"
}
@ -240,4 +241,22 @@ TEST_F(CompactObjectTest, FlatSet) {
EXPECT_LT(fs_used + 8 * kTestSize, dict_used);
}
TEST_F(CompactObjectTest, StreamObj) {
robj* stream_obj = createStreamObject();
stream* sm = (stream*)stream_obj->ptr;
robj* item[2];
item[0] = createStringObject("FIELD", 5);
item[1] = createStringObject("VALUE", 5);
ASSERT_EQ(C_OK, streamAppendItem(sm, item, 1, NULL, NULL, 0));
decrRefCount(item[0]);
decrRefCount(item[1]);
cobj_.ImportRObj(stream_obj);
EXPECT_EQ(OBJ_STREAM, cobj_.ObjType());
EXPECT_EQ(OBJ_ENCODING_STREAM, cobj_.Encoding());
EXPECT_FALSE(cobj_.IsInline());
}
} // namespace dfly

View File

@ -93,7 +93,6 @@
* it must be compressed back into a single node.
*
*/
#define RAX_NODE_MAX_SIZE ((1<<29)-1)
typedef struct raxNode {
uint32_t iskey:1; /* Does this node contain a key? */

View File

@ -6,6 +6,7 @@
#include "object.h"
#include "listpack.h"
/* Stream item ID: a 128 bit number composed of a milliseconds time and
* a sequence counter. IDs generated in the same millisecond (or in a past
* millisecond if the clock jumped backward) will use the millisecond time
@ -16,7 +17,7 @@ typedef struct streamID {
} streamID;
typedef struct stream {
rax *rax; /* The radix tree holding the stream. */
rax *rax_tree; /* The radix tree holding the stream. */
uint64_t length; /* Current number of elements inside this stream. */
streamID last_id; /* Zero if there are yet no items. */
streamID first_id; /* The first non-tombstone entry, zero if empty. */

View File

@ -75,7 +75,7 @@ int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq)
/* Create a new stream data structure. */
stream *streamNew(void) {
stream *s = zmalloc(sizeof(*s));
s->rax = raxNew();
s->rax_tree = raxNew();
s->length = 0;
s->first_id.ms = 0;
s->first_id.seq = 0;
@ -90,7 +90,7 @@ stream *streamNew(void) {
/* Free a stream, including the listpacks stored inside the radix tree. */
void freeStream(stream *s) {
raxFreeWithCallback(s->rax,(void(*)(void*))lpFree);
raxFreeWithCallback(s->rax_tree,(void(*)(void*))lpFree);
if (s->cgroups)
raxFreeWithCallback(s->cgroups,(void(*)(void*))streamFreeCG);
zfree(s);
@ -183,7 +183,7 @@ robj *streamDup(robj *o) {
raxIterator ri;
uint64_t rax_key[2];
raxStart(&ri, s->rax);
raxStart(&ri, s->rax_tree);
raxSeek(&ri, "^", NULL, 0);
size_t lp_bytes = 0; /* Total bytes in the listpack. */
unsigned char *lp = NULL; /* listpack pointer. */
@ -194,7 +194,7 @@ robj *streamDup(robj *o) {
unsigned char *new_lp = zmalloc(lp_bytes);
memcpy(new_lp, lp, lp_bytes);
memcpy(rax_key, ri.key, sizeof(rax_key));
raxInsert(new_s->rax, (unsigned char *)&rax_key, sizeof(rax_key),
raxInsert(new_s->rax_tree, (unsigned char *)&rax_key, sizeof(rax_key),
new_lp, NULL);
}
new_s->length = s->length;
@ -482,7 +482,7 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
/* Add the new entry. */
raxIterator ri;
raxStart(&ri,s->rax);
raxStart(&ri,s->rax_tree);
raxSeek(&ri,"$",NULL,0);
size_t lp_bytes = 0; /* Total bytes in the tail listpack. */
@ -550,7 +550,7 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
/* Shrink extra pre-allocated memory */
lp = lpShrinkToFit(lp);
if (ri.data != lp)
raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
raxInsert(s->rax_tree,ri.key,ri.key_len,lp,NULL);
lp = NULL;
}
}
@ -579,7 +579,7 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
}
lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */
raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
raxInsert(s->rax_tree,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
/* The first entry we insert, has obviously the same fields of the
* master entry. */
flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
@ -664,7 +664,7 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
/* Insert back into the tree in order to update the listpack pointer. */
if (ri.data != lp)
raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
raxInsert(s->rax_tree,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
s->length++;
s->entries_added++;
s->last_id = id;
@ -731,7 +731,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
return 0;
raxIterator ri;
raxStart(&ri,s->rax);
raxStart(&ri,s->rax_tree);
raxSeek(&ri,"^",NULL,0);
int64_t deleted = 0;
@ -765,7 +765,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
if (remove_node) {
lpFree(lp);
raxRemove(s->rax,ri.key,ri.key_len,NULL);
raxRemove(s->rax_tree,ri.key,ri.key_len,NULL);
raxSeek(&ri,">=",ri.key,ri.key_len);
s->length -= entries;
deleted += entries;
@ -863,7 +863,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
}
/* Update the listpack with the new pointer. */
raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
raxInsert(s->rax_tree,ri.key,ri.key_len,lp,NULL);
break; /* If we are here, there was enough to delete in the current
node, so no need to go to the next node. */
@ -1085,7 +1085,7 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI
}
/* Seek the correct node in the radix tree. */
raxStart(&si->ri,s->rax);
raxStart(&si->ri,s->rax_tree);
if (!rev) {
if (start && (start->ms || start->seq)) {
raxSeek(&si->ri,"<=",(unsigned char*)si->start_key,
@ -1302,7 +1302,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
/* If this is the last element in the listpack, we can remove the whole
* node. */
lpFree(lp);
raxRemove(si->stream->rax,si->ri.key,si->ri.key_len,NULL);
raxRemove(si->stream->rax_tree,si->ri.key,si->ri.key_len,NULL);
} else {
/* In the base case we alter the counters of valid/deleted entries. */
lp = lpReplaceInteger(lp,&p,aux-1);
@ -1312,7 +1312,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
/* Update the listpack with the new pointer. */
if (si->lp != lp)
raxInsert(si->stream->rax,si->ri.key,si->ri.key_len,lp,NULL);
raxInsert(si->stream->rax_tree,si->ri.key,si->ri.key_len,lp,NULL);
}
/* Update the number of entries counter. */
@ -3668,9 +3668,9 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) {
addReplyBulkCString(c,"length");
addReplyLongLong(c,s->length);
addReplyBulkCString(c,"radix-tree-keys");
addReplyLongLong(c,raxSize(s->rax));
addReplyLongLong(c,raxSize(s->rax_tree));
addReplyBulkCString(c,"radix-tree-nodes");
addReplyLongLong(c,s->rax->numnodes);
addReplyLongLong(c,s->rax_tree->numnodes);
addReplyBulkCString(c,"last-generated-id");
addReplyStreamID(c,&s->last_id);
addReplyBulkCString(c,"max-deleted-entry-id");

View File

@ -102,6 +102,8 @@ const char* RdbTypeName(unsigned type) {
return "zset";
case RDB_TYPE_HASH:
return "hash";
case RDB_TYPE_STREAM_LISTPACKS:
return "stream";
}
return "other";
}

View File

@ -124,7 +124,7 @@ void DebugCmd::Run(CmdArgList args) {
string reply = absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd,
"'. Try DEBUG HELP.");
return (*cntx_)->SendError(reply, kSyntaxErr);
return (*cntx_)->SendError(reply, kSyntaxErrType);
}
void DebugCmd::Reload(CmdArgList args) {

View File

@ -440,6 +440,7 @@ TEST_F(DflyEngineTest, PSubscribe) {
EXPECT_EQ("ab", msg.channel);
EXPECT_EQ("a*", msg.pattern);
}
TEST_F(DflyEngineTest, Unsubscribe) {
auto resp = Run({"unsubscribe", "a"});
EXPECT_THAT(resp.GetVec(), ElementsAre("unsubscribe", "a", IntArg(0)));

View File

@ -309,7 +309,7 @@ void GenericFamily::Del(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::Ping(CmdArgList args, ConnectionContext* cntx) {
if (args.size() > 2) {
return (*cntx)->SendError(facade::WrongNumArgsError("ping"), kSyntaxErr);
return (*cntx)->SendError(facade::WrongNumArgsError("ping"), kSyntaxErrType);
}
// We synchronously block here until the engine sends us the payload and notifies that

View File

@ -337,7 +337,7 @@ void HSetFamily::HSet(CmdArgList args, ConnectionContext* cntx) {
string_view cmd = ArgS(args, 0);
if (args.size() % 2 != 0) {
return (*cntx)->SendError(facade::WrongNumArgsError(cmd), kSyntaxErr);
return (*cntx)->SendError(facade::WrongNumArgsError(cmd), kSyntaxErrType);
}
args.remove_prefix(2);

View File

@ -28,6 +28,7 @@ extern "C" {
#include "server/script_mgr.h"
#include "server/server_state.h"
#include "server/set_family.h"
#include "server/stream_family.h"
#include "server/string_family.h"
#include "server/transaction.h"
#include "server/version.h"
@ -61,8 +62,7 @@ namespace fibers = ::boost::fibers;
namespace this_fiber = ::boost::this_fiber;
using absl::GetFlag;
using absl::StrCat;
using facade::MCReplyBuilder;
using facade::RedisReplyBuilder;
using namespace facade;
namespace {
@ -293,7 +293,7 @@ bool EvalValidator(CmdArgList args, ConnectionContext* cntx) {
}
if (unsigned(num_keys) > args.size() - 3) {
(*cntx)->SendError("Number of keys can't be greater than number of args", kSyntaxErr);
(*cntx)->SendError("Number of keys can't be greater than number of args", kSyntaxErrType);
return false;
}
@ -424,11 +424,11 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
if ((cid->arity() > 0 && args.size() != size_t(cid->arity())) ||
(cid->arity() < 0 && args.size() < size_t(-cid->arity()))) {
return (*cntx)->SendError(facade::WrongNumArgsError(cmd_str), kSyntaxErr);
return (*cntx)->SendError(facade::WrongNumArgsError(cmd_str), kSyntaxErrType);
}
if (cid->key_arg_step() == 2 && (args.size() % 2) == 0) {
return (*cntx)->SendError(facade::WrongNumArgsError(cmd_str), kSyntaxErr);
return (*cntx)->SendError(facade::WrongNumArgsError(cmd_str), kSyntaxErrType);
}
// Validate more complicated cases with custom validators.
@ -994,7 +994,7 @@ void Service::Function(CmdArgList args, ConnectionContext* cntx) {
}
string err = StrCat("Unknown subcommand '", sub_cmd, "'. Try FUNCTION HELP.");
return (*cntx)->SendError(err, kSyntaxErr);
return (*cntx)->SendError(err, kSyntaxErrType);
}
VarzValue::Map Service::GetVarzStats() {
@ -1052,6 +1052,7 @@ void Service::RegisterCommands() {
<< CI{"PUNSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -1, 0, 0, 0}.MFUNC(PUnsubscribe)
<< CI{"FUNCTION", CO::NOSCRIPT, 2, 0, 0, 0}.MFUNC(Function);
StreamFamily::Register(&registry_);
StringFamily::Register(&registry_);
GenericFamily::Register(&registry_);
ListFamily::Register(&registry_);

View File

@ -76,7 +76,7 @@ void ScriptMgr::Run(CmdArgList args, ConnectionContext* cntx) {
string err = absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd,
"'. Try SCRIPT HELP.");
cntx->reply_builder()->SendError(err, kSyntaxErr);
cntx->reply_builder()->SendError(err, kSyntaxErrType);
}
bool ScriptMgr::InsertFunction(std::string_view id, std::string_view body) {

View File

@ -54,13 +54,16 @@ extern "C" mi_stats_t _mi_stats_main;
namespace dfly {
using namespace util;
namespace fibers = ::boost::fibers;
namespace fs = std::filesystem;
namespace uring = util::uring;
using absl::GetFlag;
using absl::StrCat;
using facade::MCReplyBuilder;
using namespace facade;
using strings::HumanReadableNumBytes;
using util::ProactorBase;
using util::http::StringResponse;
namespace {
@ -327,7 +330,7 @@ void AppendMetricWithoutLabels(string_view name, string_view help, const absl::A
AppendMetricValue(name, value, {}, {}, dest);
}
void PrintPrometheusMetrics(const Metrics& m, http::StringResponse* resp) {
void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) {
// Server metrics
AppendMetricWithoutLabels("up", "", 1, MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("uptime_in_seconds", "", m.uptime, MetricType::GAUGE, &resp->body());
@ -384,8 +387,8 @@ void ServerFamily::ConfigureMetrics(util::HttpListenerBase* http_base) {
// The naming of the metrics should be compatible with redis_exporter, see
// https://github.com/oliver006/redis_exporter/blob/master/exporter/exporter.go#L111
auto cb = [this](const http::QueryArgs& args, HttpContext* send) {
http::StringResponse resp = http::MakeStringResponse(boost::beast::http::status::ok);
auto cb = [this](const util::http::QueryArgs& args, util::HttpContext* send) {
StringResponse resp = util::http::MakeStringResponse(boost::beast::http::status::ok);
PrintPrometheusMetrics(this->GetMetrics(), &resp);
return send->Invoke(std::move(resp));
@ -624,7 +627,7 @@ void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) {
} else if (sub_cmd == "LIST") {
vector<string> client_info;
fibers::mutex mu;
auto cb = [&](Connection* conn) {
auto cb = [&](util::Connection* conn) {
facade::Connection* dcon = static_cast<facade::Connection*>(conn);
string info = dcon->GetClientInfo();
lock_guard lk(mu);
@ -638,7 +641,7 @@ void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) {
}
LOG_FIRST_N(ERROR, 10) << "Subcommand " << sub_cmd << " not supported";
return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "CLIENT"), kSyntaxErr);
return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "CLIENT"), kSyntaxErrType);
}
void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
@ -662,7 +665,7 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
});
return (*cntx)->SendOk();
} else {
return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "CONFIG"), kSyntaxErr);
return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "CONFIG"), kSyntaxErrType);
}
}
@ -683,7 +686,7 @@ void ServerFamily::Memory(CmdArgList args, ConnectionContext* cntx) {
string err = StrCat("Unknown subcommand or wrong number of arguments for '", sub_cmd,
"'. Try MEMORY HELP.");
return (*cntx)->SendError(err, kSyntaxErr);
return (*cntx)->SendError(err, kSyntaxErrType);
}
void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {

View File

@ -6,9 +6,16 @@
#include <absl/strings/str_cat.h>
extern "C" {
#include "redis/object.h"
#include "redis/stream.h"
}
#include "base/logging.h"
#include "facade/error.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/engine_shard_set.h"
#include "server/transaction.h"
namespace dfly {
@ -18,29 +25,198 @@ using namespace std;
namespace {
using StreamId = pair<int64_t, int64_t>;
struct Record {
StreamId id;
string field;
string value;
streamID id;
vector<pair<string, string>> kv_arr;
};
using RecordVec = vector<Record>;
OpResult<StreamId> OpAdd(const OpArgs& op_args, string_view key, string_view id, CmdArgList args) {
return OpStatus::WRONG_TYPE; // TBD
}
OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, string_view start,
string_view end) {
return OpStatus::WRONG_TYPE; // TBD
}
inline string StreamIdRepr(const StreamId& id) {
return absl::StrCat(id.first, "-", id.second);
struct ParsedStreamId {
streamID val;
bool has_seq = false; // Was an ID different than "ms-*" specified? for XADD only.
bool id_given = false; // Was an ID different than "*" specified? for XADD only.
};
struct RangeId {
ParsedStreamId parsed_id;
bool exclude = false;
};
bool ParseID(string_view strid, bool strict, uint64_t missing_seq, ParsedStreamId* dest) {
if (strid.empty() || strid.size() > 127)
return false;
if (strid == "*")
return true;
dest->id_given = true;
dest->has_seq = true;
/* Handle the "-" and "+" special cases. */
if (strid == "-" || strid == "+") {
if (strict)
return false;
if (strid == "-") {
dest->val.ms = 0;
dest->val.seq = 0;
return true;
}
dest->val.ms = UINT64_MAX;
dest->val.seq = UINT64_MAX;
return true;
}
/* Parse <ms>-<seq> form. */
streamID result{.ms = 0, .seq = 0};
size_t dash_pos = strid.find('-');
if (!absl::SimpleAtoi(strid.substr(0, dash_pos), &result.ms))
return false;
if (dash_pos == string_view::npos) {
result.seq = missing_seq;
} else {
if (dash_pos + 1 == strid.size())
return false;
if (dash_pos + 2 == strid.size() && strid[dash_pos + 1] == '*') {
result.seq = 0;
dest->has_seq = false;
} else if (!absl::SimpleAtoi(strid.substr(dash_pos + 1), &result.seq)) {
return false;
}
}
dest->val = result;
return true;
}
bool ParseRangeId(string_view id, RangeId* dest) {
if (id.empty())
return false;
if (id[0] == '(') {
dest->exclude = true;
id.remove_prefix(1);
}
return ParseID(id, dest->exclude, 0, &dest->parsed_id);
}
OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const ParsedStreamId& parsed_id,
CmdArgList args) {
DCHECK(!args.empty() && args.size() % 2 == 0);
auto& db_slice = op_args.shard->db_slice();
pair<PrimeIterator, bool> add_res;
try {
add_res = db_slice.AddOrFind(op_args.db_ind, key);
} catch (bad_alloc&) {
return OpStatus::OUT_OF_MEMORY;
}
robj* stream_obj = nullptr;
PrimeIterator& it = add_res.first;
if (add_res.second) { // new key
stream_obj = createStreamObject();
it->second.ImportRObj(stream_obj);
} else {
if (it->second.ObjType() != OBJ_STREAM)
return OpStatus::WRONG_TYPE;
db_slice.PreUpdate(op_args.db_ind, it);
}
stream* str = (stream*)it->second.RObjPtr();
// we should really get rid of this monstrousity and rewrite streamAppendItem ourselves here.
unique_ptr<robj*[]> objs(new robj*[args.size()]);
for (size_t i = 0; i < args.size(); ++i) {
objs[i] = createStringObject(args[i].data(), args[i].size());
}
streamID result_id;
streamID passed_id = parsed_id.val;
int res = streamAppendItem(str, objs.get(), args.size() / 2, &result_id,
parsed_id.id_given ? &passed_id : nullptr, parsed_id.has_seq);
if (res != C_OK) {
if (errno == ERANGE)
return OpStatus::OUT_OF_RANGE;
return OpStatus::OUT_OF_MEMORY;
}
return result_id;
}
OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, const ParsedStreamId& start,
const ParsedStreamId& end, uint32_t count) {
auto& db_slice = op_args.shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM);
if (!res_it)
return res_it.status();
RecordVec result;
if (count == 0)
return result;
streamIterator si;
int64_t numfields;
streamID id;
CompactObj& cobj = (*res_it)->second;
stream* s = (stream*)cobj.RObjPtr();
int rev = 0;
streamID sstart = start.val, send = end.val;
streamIteratorStart(&si, s, &sstart, &send, rev);
while (streamIteratorGetID(&si, &id, &numfields)) {
Record rec;
rec.id = id;
rec.kv_arr.reserve(numfields);
/* Emit the field-value pairs. */
while (numfields--) {
unsigned char *key, *value;
int64_t key_len, value_len;
streamIteratorGetField(&si, &key, &value, &key_len, &value_len);
string skey(reinterpret_cast<char*>(key), key_len);
string sval(reinterpret_cast<char*>(value), value_len);
rec.kv_arr.emplace_back(move(skey), move(sval));
}
result.push_back(move(rec));
if (count == result.size())
break;
}
streamIteratorStop(&si);
return result;
}
OpResult<uint32_t> OpLen(const OpArgs& op_args, string_view key) {
auto& db_slice = op_args.shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM);
if (!res_it)
return res_it.status();
CompactObj& cobj = (*res_it)->second;
stream* s = (stream*)cobj.RObjPtr();
return s->length;
}
inline string StreamIdRepr(const streamID& id) {
return absl::StrCat(id.ms, "-", id.seq);
};
const char kInvalidStreamId[] = "Invalid stream ID specified as stream command argument";
} // namespace
void StreamFamily::XAdd(CmdArgList args, ConnectionContext* cntx) {
@ -48,17 +224,23 @@ void StreamFamily::XAdd(CmdArgList args, ConnectionContext* cntx) {
// TODO: args parsing
string_view id = ArgS(args, 2);
args.remove_prefix(2);
ParsedStreamId parsed_id;
args.remove_prefix(3);
if (args.empty() || args.size() % 2 == 1) {
return (*cntx)->SendError(WrongNumArgsError("XADD"), kSyntaxErr);
return (*cntx)->SendError(WrongNumArgsError("XADD"), kSyntaxErrType);
}
if (!ParseID(id, true, 0, &parsed_id)) {
return (*cntx)->SendError(kInvalidStreamId, kSyntaxErrType);
}
auto cb = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args{shard, t->db_index()};
return OpAdd(op_args, key, id, args);
return OpAdd(op_args, key, parsed_id, args);
};
OpResult<StreamId> add_result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
OpResult<streamID> add_result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (add_result) {
return (*cntx)->SendBulkString(StreamIdRepr(*add_result));
}
@ -66,31 +248,77 @@ void StreamFamily::XAdd(CmdArgList args, ConnectionContext* cntx) {
return (*cntx)->SendError(add_result.status());
}
void StreamFamily::XLen(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args{shard, t->db_index()};
return OpLen(op_args, key);
};
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result || result.status() == OpStatus::KEY_NOTFOUND) {
return (*cntx)->SendLong(*result);
}
return (*cntx)->SendError(result.status());
}
void StreamFamily::XRange(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
string_view start = ArgS(args, 2);
string_view end = ArgS(args, 1);
string_view end = ArgS(args, 3);
uint32_t count = kuint32max;
// TODO: parse options
RangeId rs, re;
if (!ParseRangeId(start, &rs) || !ParseRangeId(end, &re)) {
return (*cntx)->SendError(kInvalidStreamId, kSyntaxErrType);
}
auto cb = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args{shard, t->db_index()};
return OpRange(op_args, key, start, end);
};
if (rs.exclude && streamIncrID(&rs.parsed_id.val) != C_OK) {
return (*cntx)->SendError("invalid start ID for the interval", kSyntaxErrType);
}
OpResult<RecordVec> add_result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (add_result) {
(*cntx)->StartArray(add_result->size());
for (const auto& item : *add_result) {
(*cntx)->StartArray(2);
(*cntx)->SendBulkString(StreamIdRepr(item.id));
(*cntx)->StartArray(2);
(*cntx)->SendBulkString(item.field);
(*cntx)->SendBulkString(item.value);
if (re.exclude && streamDecrID(&re.parsed_id.val) != C_OK) {
return (*cntx)->SendError("invalid end ID for the interval", kSyntaxErrType);
}
if (args.size() > 4) {
if (args.size() != 6) {
return (*cntx)->SendError(WrongNumArgsError("XRANGE"), kSyntaxErrType);
}
ToUpper(&args[4]);
string_view opt = ArgS(args, 4);
string_view val = ArgS(args, 5);
if (opt != "COUNT" || !absl::SimpleAtoi(val, &count)) {
return (*cntx)->SendError(kSyntaxErr);
}
}
return (*cntx)->SendError(add_result.status());
auto cb = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args{shard, t->db_index()};
return OpRange(op_args, key, rs.parsed_id, re.parsed_id, count);
};
OpResult<RecordVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) {
(*cntx)->StartArray(result->size());
for (const auto& item : *result) {
(*cntx)->StartArray(2);
(*cntx)->SendBulkString(StreamIdRepr(item.id));
(*cntx)->StartArray(item.kv_arr.size() * 2);
for (const auto& k_v : item.kv_arr) {
(*cntx)->SendBulkString(k_v.first);
(*cntx)->SendBulkString(k_v.second);
}
}
}
if (result.status() == OpStatus::KEY_NOTFOUND) {
return (*cntx)->StartArray(0);
}
return (*cntx)->SendError(result.status());
}
#define HFUNC(x) SetHandler(&StreamFamily::x)
@ -99,6 +327,7 @@ void StreamFamily::Register(CommandRegistry* registry) {
using CI = CommandId;
*registry << CI{"XADD", CO::WRITE | CO::FAST, -5, 1, 1, 1}.HFUNC(XAdd)
<< CI{"XLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(XLen)
<< CI{"XRANGE", CO::READONLY, -4, 1, 1, 1}.HFUNC(XRange);
}

View File

@ -17,7 +17,9 @@ class StreamFamily {
private:
static void XAdd(CmdArgList args, ConnectionContext* cntx);
static void XLen(CmdArgList args, ConnectionContext* cntx);
static void XRange(CmdArgList args, ConnectionContext* cntx);
};
} // namespace dfly

View File

@ -21,8 +21,21 @@ class StreamFamilyTest : public BaseFamilyTest {
};
TEST_F(StreamFamilyTest, Add) {
Run({"xadd", "key", "*", "field", "value"});
Run({"xrange", "key", "-", "+"});
auto resp = Run({"xadd", "key", "*", "field", "value"});
ASSERT_THAT(resp, ArgType(RespExpr::STRING));
string id = string(ToSV(resp.GetBuf()));
EXPECT_TRUE(id.ends_with("-0")) << id;
resp = Run({"xrange", "null", "-", "+"});
EXPECT_THAT(resp, ArrLen(0));
resp = Run({"xrange", "key", "-", "+"});
EXPECT_THAT(resp, ArrLen(2));
auto sub_arr = resp.GetVec();
EXPECT_THAT(sub_arr, ElementsAre(id, ArrLen(2)));
resp = Run({"xlen", "key"});
EXPECT_THAT(resp, IntArg(1));
}
} // namespace dfly