From d5cd317a135c45c46eb7b04e0cdb1e619e101c0d Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Sun, 12 Jun 2022 17:41:01 +0300 Subject: [PATCH] implement partial streams API (#141) * feat(stream): implement xrevrange, xrange, xlen, xinfo and xgroup, xdel. 1. add tests that cover xrange,xrevrange and various error states for xadd. 2. Implement 8 stream commands overall. Fixes #59. * feat(stream): support xadd/maxlen option Signed-off-by: Roman Gershman --- docs/api_status.md | 21 +- src/core/compact_object.cc | 7 + src/facade/error.h | 2 + src/facade/facade.cc | 6 + src/facade/op_status.h | 3 + src/facade/reply_builder.cc | 3 + src/redis/stream.h | 6 +- src/redis/t_stream.c | 2 +- src/server/debugcmd.cc | 14 +- src/server/main_service.cc | 2 +- src/server/server_family.cc | 8 +- src/server/stream_family.cc | 598 +++++++++++++++++++++++++++++-- src/server/stream_family.h | 7 +- src/server/stream_family_test.cc | 46 +++ 14 files changed, 676 insertions(+), 49 deletions(-) diff --git a/docs/api_status.md b/docs/api_status.md index e21979a..f6e5453 100644 --- a/docs/api_status.md +++ b/docs/api_status.md @@ -204,7 +204,26 @@ with respect to Memcached and Redis APIs. ### API 3 ### API 4 ### API 5 -### API 6 +- [X] Stream Family + - [X] XADD + - [ ] XCLAIM + - [X] XDEL + - [X] XGROUP CREATE/DELCONSUMER/DESTROY/HELP/SETID + - [ ] XGROUP CREATECONSUMER + - [X] XINFO GROUPS/HELP + - [ ] XINFO CONSUMERS/GROUPS/STREAM + - [X] XLEN + - [ ] XPENDING + - [X] XRANGE + - [ ] XREAD + - [ ] XREADGROUP + - [X] XREVRANGE + - [X] XSETID + - [ ] XTRIM + +### API 6,7 +- [ ] Stream Family + - [ ] XAUTOCLAIM ## Notes Some commands were implemented as decorators along the way: diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index 1e85c88..76643c0 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -99,6 +99,11 @@ size_t MallocUsedZSet(unsigned encoding, void* ptr) { return 0; } +size_t MallocUsedStream(unsigned encoding, void* streamv) { + // stream* str_obj = (stream*)streamv; + return 0; // TODO +} + inline void FreeObjHash(unsigned encoding, void* ptr) { switch (encoding) { case OBJ_ENCODING_HT: @@ -228,6 +233,8 @@ size_t RobjWrapper::MallocUsed() const { return MallocUsedHSet(encoding_, inner_obj_); case OBJ_ZSET: return MallocUsedZSet(encoding_, inner_obj_); + case OBJ_STREAM: + return MallocUsedStream(encoding_, inner_obj_); default: LOG(FATAL) << "Not supported " << type_; diff --git a/src/facade/error.h b/src/facade/error.h index 7d90e93..51913f7 100644 --- a/src/facade/error.h +++ b/src/facade/error.h @@ -11,6 +11,8 @@ namespace facade { std::string WrongNumArgsError(std::string_view cmd); std::string InvalidExpireTime(std::string_view cmd); +std::string UnknownSubCmd(std::string_view subcmd, std::string_view cmd); + extern const char kSyntaxErr[]; extern const char kWrongTypeErr[]; diff --git a/src/facade/facade.cc b/src/facade/facade.cc index 1a15700..9002a44 100644 --- a/src/facade/facade.cc +++ b/src/facade/facade.cc @@ -57,6 +57,12 @@ string InvalidExpireTime(string_view cmd) { return absl::StrCat("invalid expire time in '", cmd, "' command"); } +string UnknownSubCmd(string_view subcmd, string_view cmd) { + return absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd, "'. Try ", + cmd, " HELP."); +} + + const char kSyntaxErr[] = "syntax error"; const char kWrongTypeErr[] = "-WRONGTYPE Operation against a key holding the wrong kind of value"; const char kKeyNotFoundErr[] = "no such key"; diff --git a/src/facade/op_status.h b/src/facade/op_status.h index 434ce24..84b0c96 100644 --- a/src/facade/op_status.h +++ b/src/facade/op_status.h @@ -21,6 +21,9 @@ enum class OpStatus : uint16_t { INVALID_FLOAT, INVALID_INT, SYNTAX_ERR, + BUSY_GROUP, + STREAM_ID_SMALL, + ENTRIES_ADDED_SMALL, }; class OpResultBase { diff --git a/src/facade/reply_builder.cc b/src/facade/reply_builder.cc index 05af745..5372545 100644 --- a/src/facade/reply_builder.cc +++ b/src/facade/reply_builder.cc @@ -243,6 +243,9 @@ void RedisReplyBuilder::SendError(OpStatus status) { case OpStatus::OUT_OF_MEMORY: SendError(kOutOfMemory); break; + case OpStatus::BUSY_GROUP: + SendError("-BUSYGROUP Consumer Group name already exists"); + break; default: LOG(ERROR) << "Unsupported status " << status; SendError("Internal error"); diff --git a/src/redis/stream.h b/src/redis/stream.h index 4bf3ba0..745f3ab 100644 --- a/src/redis/stream.h +++ b/src/redis/stream.h @@ -131,7 +131,7 @@ void streamIteratorStop(streamIterator *si); streamCG *streamLookupCG(stream *s, sds groupname); streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags); streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags); -streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read); +streamCG *streamCreateCG(stream *s, const char *name, size_t namelen, streamID *id, long long entries_read); streamNACK *streamCreateNACK(streamConsumer *consumer); void streamDecodeID(void *buf, streamID *id); int streamCompareID(streamID *a, streamID *b); @@ -149,5 +149,9 @@ void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_i long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id); int64_t streamTrimByLength(stream *s, long long maxlen, int approx); int64_t streamTrimByID(stream *s, streamID minid, int approx); +void streamFreeCG(streamCG *cg); +void streamDelConsumer(streamCG *cg, streamConsumer *consumer); +void streamLastValidID(stream *s, streamID *maxid); +int streamIDEqZero(streamID *id); #endif diff --git a/src/redis/t_stream.c b/src/redis/t_stream.c index d2028be..c55eb5e 100644 --- a/src/redis/t_stream.c +++ b/src/redis/t_stream.c @@ -2485,7 +2485,7 @@ void streamFreeConsumer(streamConsumer *sc) { * specified name, last server ID and reads counter. If a consumer group with * the same name already exists NULL is returned, otherwise the pointer to the * consumer group is returned. */ -streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read) { +streamCG *streamCreateCG(stream *s, const char *name, size_t namelen, streamID *id, long long entries_read) { if (s->cgroups == NULL) s->cgroups = raxNew(); if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound) return NULL; diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 3f9498f..d8dbfba 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -77,9 +77,9 @@ DebugCmd::DebugCmd(ServerFamily* owner, ConnectionContext* cntx) : sf_(*owner), } void DebugCmd::Run(CmdArgList args) { - std::string_view subcmd = ArgS(args, 1); + string_view subcmd = ArgS(args, 1); if (subcmd == "HELP") { - std::string_view help_arr[] = { + string_view help_arr[] = { "DEBUG [ [value] [opt] ...]. Subcommands are:", "OBJECT ", " Show low-level info about `key` and associated value.", @@ -122,8 +122,7 @@ void DebugCmd::Run(CmdArgList args) { return Inspect(key); } - string reply = absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd, - "'. Try DEBUG HELP."); + string reply = UnknownSubCmd(subcmd, "DEBUG"); return (*cntx_)->SendError(reply, kSyntaxErrType); } @@ -132,7 +131,7 @@ void DebugCmd::Reload(CmdArgList args) { for (size_t i = 2; i < args.size(); ++i) { ToUpper(&args[i]); - std::string_view opt = ArgS(args, i); + string_view opt = ArgS(args, i); VLOG(1) << "opt " << opt; if (opt == "NOSAVE") { @@ -161,7 +160,7 @@ void DebugCmd::Reload(CmdArgList args) { Load(last_save_file); } -void DebugCmd::Load(std::string_view filename) { +void DebugCmd::Load(string_view filename) { GlobalState new_state = sf_.service().SwitchState(GlobalState::ACTIVE, GlobalState::LOADING); if (new_state != GlobalState::LOADING) { LOG(WARNING) << GlobalStateName(new_state) << " in progress, ignored"; @@ -202,8 +201,7 @@ void DebugCmd::Load(std::string_view filename) { void DebugCmd::Populate(CmdArgList args) { if (args.size() < 3 || args.size() > 5) { - return (*cntx_)->SendError( - "Unknown subcommand or wrong number of arguments for 'populate'. Try DEBUG HELP."); + return (*cntx_)->SendError(UnknownSubCmd("populate", "DEBUG")); } uint64_t total_count = 0; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 9230b13..7deb05d 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -993,7 +993,7 @@ void Service::Function(CmdArgList args, ConnectionContext* cntx) { return (*cntx)->SendOk(); } - string err = StrCat("Unknown subcommand '", sub_cmd, "'. Try FUNCTION HELP."); + string err = UnknownSubCmd(sub_cmd, "FUNCTION"); return (*cntx)->SendError(err, kSyntaxErrType); } diff --git a/src/server/server_family.cc b/src/server/server_family.cc index b9b967e..8e0154d 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -87,11 +87,6 @@ error_code CreateDirs(fs::path dir_path) { return ec; } -string UnknownSubCmd(string_view subcmd, string cmd) { - return absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd, "'. Try ", - cmd, " HELP."); -} - string UnknownCmd(string cmd, CmdArgList args) { return absl::StrCat("unknown command '", cmd, "' with args beginning with: ", StrJoin(args.begin(), args.end(), ", ", CmdArgListFormatter())); @@ -689,8 +684,7 @@ void ServerFamily::Memory(CmdArgList args, ConnectionContext* cntx) { return (*cntx)->SendLong(1); } - string err = StrCat("Unknown subcommand or wrong number of arguments for '", sub_cmd, - "'. Try MEMORY HELP."); + string err = UnknownSubCmd(sub_cmd, "MEMORY"); return (*cntx)->SendError(err, kSyntaxErrType); } diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 5a961ed..9d041b0 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -43,6 +43,40 @@ struct RangeId { bool exclude = false; }; +struct AddOpts { + ParsedStreamId parsed_id; + uint32_t max_limit = kuint32max; + bool max_limit_approx = false; +}; + +struct GroupInfo { + string name; + size_t consumer_size; + size_t pending_size; + streamID last_id; +}; + +struct RangeOpts { + ParsedStreamId start; + ParsedStreamId end; + bool is_rev = false; + uint32_t count = kuint32max; +}; + +const char kInvalidStreamId[] = "Invalid stream ID specified as stream command argument"; +const char kXGroupKeyNotFound[] = + "The XGROUP subcommand requires the key to exist. " + "Note that for CREATE you may want to use the MKSTREAM option to create " + "an empty stream automatically."; + +inline string StreamIdRepr(const streamID& id) { + return absl::StrCat(id.ms, "-", id.seq); +}; + +inline string NoGroupError(string_view key, string_view cgroup) { + return absl::StrCat("-NOGROUP No such consumer group '", cgroup, "' for key name '", key, "'"); +} + bool ParseID(string_view strid, bool strict, uint64_t missing_seq, ParsedStreamId* dest) { if (strid.empty() || strid.size() > 127) return false; @@ -70,15 +104,13 @@ bool ParseID(string_view strid, bool strict, uint64_t missing_seq, ParsedStreamI } /* Parse - form. */ - streamID result{.ms = 0, .seq = 0}; + streamID result{.ms = 0, .seq = missing_seq}; size_t dash_pos = strid.find('-'); if (!absl::SimpleAtoi(strid.substr(0, dash_pos), &result.ms)) return false; - if (dash_pos == string_view::npos) { - result.seq = missing_seq; - } else { + if (dash_pos != string_view::npos) { if (dash_pos + 1 == strid.size()) return false; @@ -106,7 +138,7 @@ bool ParseRangeId(string_view id, RangeId* dest) { return ParseID(id, dest->exclude, 0, &dest->parsed_id); } -OpResult OpAdd(const OpArgs& op_args, string_view key, const ParsedStreamId& parsed_id, +OpResult OpAdd(const OpArgs& op_args, string_view key, const AddOpts& opts, CmdArgList args) { DCHECK(!args.empty() && args.size() % 2 == 0); auto& db_slice = op_args.shard->db_slice(); @@ -132,7 +164,7 @@ OpResult OpAdd(const OpArgs& op_args, string_view key, const ParsedStr db_slice.PreUpdate(op_args.db_ind, it); } - stream* str = (stream*)it->second.RObjPtr(); + stream* stream_inst = (stream*)it->second.RObjPtr(); // we should really get rid of this monstrousity and rewrite streamAppendItem ourselves here. unique_ptr objs(new robj*[args.size()]); @@ -141,21 +173,33 @@ OpResult OpAdd(const OpArgs& op_args, string_view key, const ParsedStr } streamID result_id; + const auto& parsed_id = opts.parsed_id; streamID passed_id = parsed_id.val; - int res = streamAppendItem(str, objs.get(), args.size() / 2, &result_id, + int res = streamAppendItem(stream_inst, objs.get(), args.size() / 2, &result_id, parsed_id.id_given ? &passed_id : nullptr, parsed_id.has_seq); + + for (size_t i = 0; i < args.size(); ++i) { + decrRefCount(objs[i]); + } + if (res != C_OK) { if (errno == ERANGE) return OpStatus::OUT_OF_RANGE; + if (errno == EDOM) + return OpStatus::STREAM_ID_SMALL; return OpStatus::OUT_OF_MEMORY; } + if (opts.max_limit < kuint32max) { + /* Notify xtrim event if needed. */ + streamTrimByLength(stream_inst, opts.max_limit, opts.max_limit_approx); + // TODO: when replicating, we should propagate it as exact limit in case of trimming. + } return result_id; } -OpResult OpRange(const OpArgs& op_args, string_view key, const ParsedStreamId& start, - const ParsedStreamId& end, uint32_t count) { +OpResult OpRange(const OpArgs& op_args, string_view key, const RangeOpts& opts) { auto& db_slice = op_args.shard->db_slice(); OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM); if (!res_it) @@ -163,7 +207,7 @@ OpResult OpRange(const OpArgs& op_args, string_view key, const Parsed RecordVec result; - if (count == 0) + if (opts.count == 0) return result; streamIterator si; @@ -171,10 +215,9 @@ OpResult OpRange(const OpArgs& op_args, string_view key, const Parsed streamID id; CompactObj& cobj = (*res_it)->second; stream* s = (stream*)cobj.RObjPtr(); - int rev = 0; - streamID sstart = start.val, send = end.val; + streamID sstart = opts.start.val, send = opts.end.val; - streamIteratorStart(&si, s, &sstart, &send, rev); + streamIteratorStart(&si, s, &sstart, &send, opts.is_rev); while (streamIteratorGetID(&si, &id, &numfields)) { Record rec; rec.id = id; @@ -193,7 +236,7 @@ OpResult OpRange(const OpArgs& op_args, string_view key, const Parsed result.push_back(move(rec)); - if (count == result.size()) + if (opts.count == result.size()) break; } @@ -212,32 +255,360 @@ OpResult OpLen(const OpArgs& op_args, string_view key) { return s->length; } -inline string StreamIdRepr(const streamID& id) { - return absl::StrCat(id.ms, "-", id.seq); +OpResult> OpListGroups(const OpArgs& op_args, string_view key) { + auto& db_slice = op_args.shard->db_slice(); + OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM); + if (!res_it) + return res_it.status(); + + vector result; + CompactObj& cobj = (*res_it)->second; + stream* s = (stream*)cobj.RObjPtr(); + + if (s->cgroups) { + result.reserve(raxSize(s->cgroups)); + + raxIterator ri; + raxStart(&ri, s->cgroups); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + streamCG* cg = (streamCG*)ri.data; + GroupInfo ginfo; + ginfo.name.assign(reinterpret_cast(ri.key), ri.key_len); + ginfo.consumer_size = raxSize(cg->consumers); + ginfo.pending_size = raxSize(cg->pel); + ginfo.last_id = cg->last_id; + result.push_back(std::move(ginfo)); + } + raxStop(&ri); + } + + return result; +} + +struct CreateOpts { + string_view gname; + string_view id; }; -const char kInvalidStreamId[] = "Invalid stream ID specified as stream command argument"; +OpStatus OpCreate(const OpArgs& op_args, string_view key, const CreateOpts& opts) { + auto* shard = op_args.shard; + auto& db_slice = shard->db_slice(); + OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM); + if (!res_it) + return res_it.status(); + + CompactObj& cobj = (*res_it)->second; + stream* s = (stream*)cobj.RObjPtr(); + streamID id; + ParsedStreamId parsed_id; + if (opts.id == "$") { + id = s->last_id; + } else { + if (ParseID(opts.id, true, 0, &parsed_id)) { + id = parsed_id.val; + } else { + return OpStatus::SYNTAX_ERR; + } + } + + streamCG* cg = streamCreateCG(s, opts.gname.data(), opts.gname.size(), &id, 0); + if (cg) { + return OpStatus::OK; + } + return OpStatus::BUSY_GROUP; +} + +OpResult> FindGroup(const OpArgs& op_args, string_view key, + string_view gname) { + auto* shard = op_args.shard; + auto& db_slice = shard->db_slice(); + OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM); + if (!res_it) + return res_it.status(); + + CompactObj& cobj = (*res_it)->second; + pair res; + res.first = (stream*)cobj.RObjPtr(); + shard->tmp_str1 = sdscpylen(shard->tmp_str1, gname.data(), gname.size()); + res.second = streamLookupCG(res.first, shard->tmp_str1); + + return res; +} + +// XGROUP DESTROY key groupname +OpStatus OpDestroyGroup(const OpArgs& op_args, string_view key, string_view gname) { + OpResult> cgr_res = FindGroup(op_args, key, gname); + if (!cgr_res) + return cgr_res.status(); + + stream* s = cgr_res->first; + streamCG* scg = cgr_res->second; + + if (scg) { + raxRemove(s->cgroups, (uint8_t*)(gname.data()), gname.size(), NULL); + streamFreeCG(scg); + return OpStatus::OK; + } + + return OpStatus::SKIPPED; +} + +// XGROUP DELCONSUMER key groupname consumername +OpResult OpDelConsumer(const OpArgs& op_args, string_view key, string_view gname, + string_view consumer_name) { + OpResult> cgroup_res = FindGroup(op_args, key, gname); + if (!cgroup_res) + return cgroup_res.status(); + + streamCG* cg = cgroup_res->second; + if (cg == nullptr) + return OpStatus::SKIPPED; + + long long pending = 0; + auto* shard = op_args.shard; + + shard->tmp_str1 = sdscpylen(shard->tmp_str1, consumer_name.data(), consumer_name.size()); + streamConsumer* consumer = streamLookupConsumer(cg, shard->tmp_str1, SLC_NO_REFRESH); + if (consumer) { + pending = raxSize(consumer->pel); + streamDelConsumer(cg, consumer); + } + + return pending; +} + +OpStatus OpSetId(const OpArgs& op_args, string_view key, string_view gname, string_view id) { + OpResult> cgr_res = FindGroup(op_args, key, gname); + if (!cgr_res) + return cgr_res.status(); + + streamCG* cg = cgr_res->second; + if (cg == nullptr) + return OpStatus::SKIPPED; + + streamID sid; + ParsedStreamId parsed_id; + if (id == "$") { + sid = cgr_res->first->last_id; + } else { + if (ParseID(id, true, 0, &parsed_id)) { + sid = parsed_id.val; + } else { + return OpStatus::SYNTAX_ERR; + } + } + cg->last_id = sid; + + return OpStatus::OK; +} + +OpStatus OpSetId2(const OpArgs& op_args, string_view key, const streamID& sid) { + auto* shard = op_args.shard; + auto& db_slice = shard->db_slice(); + OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM); + if (!res_it) + return res_it.status(); + + CompactObj& cobj = (*res_it)->second; + stream* stream_inst = (stream*)cobj.RObjPtr(); + long long entries_added = -1; + streamID max_xdel_id{0, 0}; + + /* If the stream has at least one item, we want to check that the user + * is setting a last ID that is equal or greater than the current top + * item, otherwise the fundamental ID monotonicity assumption is violated. */ + if (stream_inst->length > 0) { + streamID maxid; + streamID id = sid; + streamLastValidID(stream_inst, &maxid); + + if (streamCompareID(&id, &maxid) < 0) { + return OpStatus::STREAM_ID_SMALL; + } + + /* If an entries_added was provided, it can't be lower than the length. */ + if (entries_added != -1 && stream_inst->length > uint64_t(entries_added)) { + return OpStatus::ENTRIES_ADDED_SMALL; + } + } + + stream_inst->last_id = sid; + if (entries_added != -1) + stream_inst->entries_added = entries_added; + if (!streamIDEqZero(&max_xdel_id)) + stream_inst->max_deleted_entry_id = max_xdel_id; + + return OpStatus::OK; +} + +OpResult OpDel(const OpArgs& op_args, string_view key, absl::Span ids) { + auto* shard = op_args.shard; + auto& db_slice = shard->db_slice(); + OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM); + if (!res_it) + return res_it.status(); + + CompactObj& cobj = (*res_it)->second; + stream* stream_inst = (stream*)cobj.RObjPtr(); + + uint32_t deleted = 0; + bool first_entry = false; + + for (size_t j = 0; j < ids.size(); j++) { + streamID id = ids[j]; + if (!streamDeleteItem(stream_inst, &id)) + continue; + + /* We want to know if the first entry in the stream was deleted + * so we can later set the new one. */ + if (streamCompareID(&id, &stream_inst->first_id) == 0) { + first_entry = 1; + } + /* Update the stream's maximal tombstone if needed. */ + if (streamCompareID(&id, &stream_inst->max_deleted_entry_id) > 0) { + stream_inst->max_deleted_entry_id = id; + } + deleted++; + } + + /* Update the stream's first ID. */ + if (deleted) { + if (stream_inst->length == 0) { + stream_inst->first_id.ms = 0; + stream_inst->first_id.seq = 0; + } else if (first_entry) { + streamGetEdgeID(stream_inst, 1, 1, &stream_inst->first_id); + } + } + + return deleted; +} + +void CreateGroup(CmdArgList args, string_view key, ConnectionContext* cntx) { + if (args.size() < 2) + return (*cntx)->SendError(UnknownSubCmd("CREATE", "XGROUP")); + + CreateOpts opts; + opts.gname = ArgS(args, 0); + opts.id = ArgS(args, 1); + + auto cb = [&](Transaction* t, EngineShard* shard) { + OpArgs op_args{shard, t->db_index()}; + return OpCreate(op_args, key, opts); + }; + + OpStatus result = cntx->transaction->ScheduleSingleHop(std::move(cb)); + switch (result) { + case OpStatus::KEY_NOTFOUND: + return (*cntx)->SendError(kXGroupKeyNotFound); + default: + (*cntx)->SendError(result); + } +} + +void DestroyGroup(string_view key, string_view gname, ConnectionContext* cntx) { + auto cb = [&](Transaction* t, EngineShard* shard) { + OpArgs op_args{shard, t->db_index()}; + return OpDestroyGroup(op_args, key, gname); + }; + + OpStatus result = cntx->transaction->ScheduleSingleHop(std::move(cb)); + switch (result) { + case OpStatus::OK: + return (*cntx)->SendLong(1); + case OpStatus::SKIPPED: + return (*cntx)->SendLong(0); + case OpStatus::KEY_NOTFOUND: + return (*cntx)->SendError(kXGroupKeyNotFound); + default: + (*cntx)->SendError(result); + } +} + +void DelConsumer(string_view key, string_view gname, string_view consumer, + ConnectionContext* cntx) { + auto cb = [&](Transaction* t, EngineShard* shard) { + OpArgs op_args{shard, t->db_index()}; + return OpDelConsumer(op_args, key, gname, consumer); + }; + + OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + + switch (result.status()) { + case OpStatus::OK: + return (*cntx)->SendLong(*result); + case OpStatus::SKIPPED: + return (*cntx)->SendError(NoGroupError(key, gname)); + case OpStatus::KEY_NOTFOUND: + return (*cntx)->SendError(kXGroupKeyNotFound); + default: + (*cntx)->SendError(result.status()); + } +} + +void SetId(string_view key, string_view gname, CmdArgList args, ConnectionContext* cntx) { + string_view id = ArgS(args, 0); + + auto cb = [&](Transaction* t, EngineShard* shard) { + OpArgs op_args{shard, t->db_index()}; + return OpSetId(op_args, key, gname, id); + }; + + OpStatus result = cntx->transaction->ScheduleSingleHop(std::move(cb)); + switch (result) { + case OpStatus::SKIPPED: + return (*cntx)->SendError(NoGroupError(key, gname)); + case OpStatus::KEY_NOTFOUND: + return (*cntx)->SendError(kXGroupKeyNotFound); + default: + (*cntx)->SendError(result); + } +} + } // namespace void StreamFamily::XAdd(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); + unsigned id_indx = 2; + AddOpts add_opts; - // TODO: args parsing - string_view id = ArgS(args, 2); - ParsedStreamId parsed_id; + for (; id_indx < args.size(); ++id_indx) { + ToUpper(&args[id_indx]); + string_view arg = ArgS(args, id_indx); + if (arg == "MAXLEN") { + if (id_indx + 2 >= args.size()) { + return (*cntx)->SendError(kSyntaxErr); + } + ++id_indx; + if (ArgS(args, id_indx) == "~") { + add_opts.max_limit_approx = true; + ++id_indx; + } + arg = ArgS(args, id_indx); + if (!absl::SimpleAtoi(arg, &add_opts.max_limit)) { + return (*cntx)->SendError(kSyntaxErr); + } + } else { + break; + } + } - args.remove_prefix(3); - if (args.empty() || args.size() % 2 == 1) { + args.remove_prefix(id_indx); + if (args.size() < 3 || args.size() % 2 == 0) { return (*cntx)->SendError(WrongNumArgsError("XADD"), kSyntaxErrType); } - if (!ParseID(id, true, 0, &parsed_id)) { + string_view id = ArgS(args, 0); + + if (!ParseID(id, true, 0, &add_opts.parsed_id)) { return (*cntx)->SendError(kInvalidStreamId, kSyntaxErrType); } + args.remove_prefix(1); auto cb = [&](Transaction* t, EngineShard* shard) { OpArgs op_args{shard, t->db_index()}; - return OpAdd(op_args, key, parsed_id, args); + return OpAdd(op_args, key, add_opts, args); }; OpResult add_result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -245,9 +616,134 @@ void StreamFamily::XAdd(CmdArgList args, ConnectionContext* cntx) { return (*cntx)->SendBulkString(StreamIdRepr(*add_result)); } + if (add_result.status() == OpStatus::STREAM_ID_SMALL) { + return (*cntx)->SendError( + "The ID specified in XADD is equal or smaller than " + "the target stream top item"); + } + return (*cntx)->SendError(add_result.status()); } +void StreamFamily::XDel(CmdArgList args, ConnectionContext* cntx) { + string_view key = ArgS(args, 1); + args.remove_prefix(2); + + absl::InlinedVector ids(ids.size()); + for (size_t i = 0; i < args.size(); ++i) { + ParsedStreamId parsed_id; + string_view str_id = ArgS(args, i); + if (!ParseID(str_id, true, 0, &parsed_id)) { + return (*cntx)->SendError(kInvalidStreamId, kSyntaxErrType); + } + ids[i] = parsed_id.val; + } + + auto cb = [&](Transaction* t, EngineShard* shard) { + OpArgs op_args{shard, t->db_index()}; + return OpDel(op_args, key, absl::Span{ids.data(), ids.size()}); + }; + + OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + if (result || result.status() == OpStatus::KEY_NOTFOUND) { + return (*cntx)->SendLong(*result); + } + + (*cntx)->SendError(result.status()); +} + +void StreamFamily::XGroup(CmdArgList args, ConnectionContext* cntx) { + ToUpper(&args[1]); + string_view sub_cmd = ArgS(args, 1); + if (sub_cmd == "HELP") { + string_view help_arr[] = { + "CREATE [option]", + " Create a new consumer group. Options are:", + " * MKSTREAM", + " Create the empty stream if it does not exist.", + "CREATECONSUMER ", + " Create a new consumer in the specified group.", + "DELCONSUMER ", + " Remove the specified consumer.", + "DESTROY " + " Remove the specified group.", + "SETID ", + " Set the current group ID.", + }; + return (*cntx)->SendSimpleStrArr(help_arr, ABSL_ARRAYSIZE(help_arr)); + } + + if (args.size() >= 3) { + string_view key = ArgS(args, 2); + if (sub_cmd == "CREATE") { + args.remove_prefix(3); + return CreateGroup(std::move(args), key, cntx); + } + + if (sub_cmd == "DESTROY" && args.size() == 4) { + string_view gname = ArgS(args, 3); + return DestroyGroup(key, gname, cntx); + } + + if (sub_cmd == "DELCONSUMER" && args.size() == 5) { + string_view gname = ArgS(args, 3); + string_view cname = ArgS(args, 4); + return DelConsumer(key, gname, cname, cntx); + } + + if (sub_cmd == "SETID" && args.size() >= 5) { + string_view gname = ArgS(args, 3); + args.remove_prefix(4); + return SetId(key, gname, std::move(args), cntx); + } + } + + return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "XGROUP")); +} + +void StreamFamily::XInfo(CmdArgList args, ConnectionContext* cntx) { + ToUpper(&args[1]); + string_view sub_cmd = ArgS(args, 1); + if (sub_cmd == "HELP") { + string_view help_arr[] = { + "CONSUMERS ", + " Show consumers of .", + "GROUPS ", + " Show the stream consumer groups.", + "STREAM [FULL [COUNT ]", + " Show information about the stream.", + }; + return (*cntx)->SendSimpleStrArr(help_arr, ABSL_ARRAYSIZE(help_arr)); + } + + if (args.size() >= 3) { + string_view key = ArgS(args, 2); + if (sub_cmd == "GROUPS") { + auto cb = [&](Transaction* t, EngineShard* shard) { + OpArgs op_args{shard, t->db_index()}; + return OpListGroups(op_args, key); + }; + + OpResult> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + if (result) { + (*cntx)->StartArray(result->size()); + for (const auto& ginfo : *result) { + absl::AlphaNum an1(ginfo.consumer_size); + absl::AlphaNum an2(ginfo.pending_size); + string last_id = StreamIdRepr(ginfo.last_id); + string_view arr[8] = {"name", ginfo.name, "consumers", an1.Piece(), + "pending", an2.Piece(), "last-delivered-id", last_id}; + + (*cntx)->SendStringArr(absl::Span{arr, 8}); + } + return; + } + return (*cntx)->SendError(result.status()); + } + } + return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "XINFO")); +} + void StreamFamily::XLen(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) { @@ -264,11 +760,46 @@ void StreamFamily::XLen(CmdArgList args, ConnectionContext* cntx) { } void StreamFamily::XRange(CmdArgList args, ConnectionContext* cntx) { + XRangeGeneric(std::move(args), false, cntx); +} + +void StreamFamily::XRevRange(CmdArgList args, ConnectionContext* cntx) { + XRangeGeneric(std::move(args), true, cntx); +} + +void StreamFamily::XSetId(CmdArgList args, ConnectionContext* cntx) { + string_view key = ArgS(args, 1); + string_view idstr = ArgS(args, 2); + + ParsedStreamId parsed_id; + if (!ParseID(idstr, true, 0, &parsed_id)) { + return (*cntx)->SendError(kInvalidStreamId, kSyntaxErrType); + } + + auto cb = [&](Transaction* t, EngineShard* shard) { + OpArgs op_args{shard, t->db_index()}; + return OpSetId2(op_args, key, parsed_id.val); + }; + + OpStatus result = cntx->transaction->ScheduleSingleHop(std::move(cb)); + switch (result) { + case OpStatus::STREAM_ID_SMALL: + return (*cntx)->SendError( + "The ID specified in XSETID is smaller than the target stream top item"); + case OpStatus::ENTRIES_ADDED_SMALL: + return (*cntx)->SendError( + "The entries_added specified in XSETID is smaller than " + "the target stream length"); + default: + return (*cntx)->SendError(result); + } +} + +void StreamFamily::XRangeGeneric(CmdArgList args, bool is_rev, ConnectionContext* cntx) { string_view key = ArgS(args, 1); string_view start = ArgS(args, 2); string_view end = ArgS(args, 3); - uint32_t count = kuint32max; - + RangeOpts range_opts; RangeId rs, re; if (!ParseRangeId(start, &rs) || !ParseRangeId(end, &re)) { return (*cntx)->SendError(kInvalidStreamId, kSyntaxErrType); @@ -290,14 +821,18 @@ void StreamFamily::XRange(CmdArgList args, ConnectionContext* cntx) { string_view opt = ArgS(args, 4); string_view val = ArgS(args, 5); - if (opt != "COUNT" || !absl::SimpleAtoi(val, &count)) { + if (opt != "COUNT" || !absl::SimpleAtoi(val, &range_opts.count)) { return (*cntx)->SendError(kSyntaxErr); } } + range_opts.start = rs.parsed_id; + range_opts.end = re.parsed_id; + range_opts.is_rev = is_rev; + auto cb = [&](Transaction* t, EngineShard* shard) { OpArgs op_args{shard, t->db_index()}; - return OpRange(op_args, key, rs.parsed_id, re.parsed_id, count); + return OpRange(op_args, key, range_opts); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -327,8 +862,13 @@ void StreamFamily::Register(CommandRegistry* registry) { using CI = CommandId; *registry << CI{"XADD", CO::WRITE | CO::FAST, -5, 1, 1, 1}.HFUNC(XAdd) + << CI{"XDEL", CO::WRITE | CO::FAST, -3, 1, 1, 1}.HFUNC(XDel) + << CI{"XGROUP", CO::WRITE | CO::DENYOOM, -2, 2, 2, 1}.HFUNC(XGroup) + << CI{"XINFO", CO::READONLY, -2, 2, 2, 1}.HFUNC(XInfo) << CI{"XLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(XLen) - << CI{"XRANGE", CO::READONLY, -4, 1, 1, 1}.HFUNC(XRange); + << CI{"XRANGE", CO::READONLY, -4, 1, 1, 1}.HFUNC(XRange) + << CI{"XREVRANGE", CO::READONLY, -4, 1, 1, 1}.HFUNC(XRevRange) + << CI{"XSETID", CO::WRITE | CO::DENYOOM, 3, 1, 1, 1}.HFUNC(XSetId); } } // namespace dfly \ No newline at end of file diff --git a/src/server/stream_family.h b/src/server/stream_family.h index 9292973..1726967 100644 --- a/src/server/stream_family.h +++ b/src/server/stream_family.h @@ -17,9 +17,14 @@ class StreamFamily { private: static void XAdd(CmdArgList args, ConnectionContext* cntx); + static void XDel(CmdArgList args, ConnectionContext* cntx); + static void XGroup(CmdArgList args, ConnectionContext* cntx); + static void XInfo(CmdArgList args, ConnectionContext* cntx); static void XLen(CmdArgList args, ConnectionContext* cntx); + static void XRevRange(CmdArgList args, ConnectionContext* cntx); static void XRange(CmdArgList args, ConnectionContext* cntx); - + static void XSetId(CmdArgList args, ConnectionContext* cntx); + static void XRangeGeneric(CmdArgList args, bool is_rev, ConnectionContext* cntx); }; } // namespace dfly diff --git a/src/server/stream_family_test.cc b/src/server/stream_family_test.cc index 73485c0..a7e8506 100644 --- a/src/server/stream_family_test.cc +++ b/src/server/stream_family_test.cc @@ -36,6 +36,52 @@ TEST_F(StreamFamilyTest, Add) { resp = Run({"xlen", "key"}); EXPECT_THAT(resp, IntArg(1)); + + resp = Run({"xadd", "key", "badid", "f1", "val1"}); + EXPECT_THAT(resp, ErrArg("Invalid stream ID")); +} + +TEST_F(StreamFamilyTest, AddExtended) { + auto resp0 = Run({"xadd", "key", "5", "f1", "v1", "f2", "v2"}); + EXPECT_EQ(resp0, "5-0"); + resp0 = Run({"xrange", "key", "5-0", "5-0"}); + EXPECT_THAT(resp0, ArrLen(2)); + auto sub_arr = resp0.GetVec(); + EXPECT_THAT(sub_arr, ElementsAre("5-0", ArrLen(4))); + sub_arr = sub_arr[1].GetVec(); + EXPECT_THAT(sub_arr, ElementsAre("f1", "v1", "f2", "v2")); + + auto resp1 = Run({"xadd", "key", "maxlen", "1", "*", "field1", "val1"}); + string id1 = string(ToSV(resp1.GetBuf())); + + auto resp2 = Run({"xadd", "key", "maxlen", "1", "*", "field2", "val2"}); + string id2 = string(ToSV(resp2.GetBuf())); + + EXPECT_THAT(Run({"xlen", "key"}), IntArg(1)); + EXPECT_THAT(Run({"xrange", "key", id1, id1}), ArrLen(0)); + + auto resp3 = Run({"xadd", "key", id2, "f1", "val1"}); + EXPECT_THAT(resp3, ErrArg("equal or smaller than")); +} + +TEST_F(StreamFamilyTest, Range) { + Run({"xadd", "key", "1-*", "f1", "v1"}); + Run({"xadd", "key", "1-*", "f2", "v2"}); + auto resp = Run({"xrange", "key", "-", "+"}); + EXPECT_THAT(resp, ArrLen(2)); + auto sub_arr = resp.GetVec(); + EXPECT_THAT(sub_arr, ElementsAre(ArrLen(2), ArrLen(2))); + auto sub0 = sub_arr[0].GetVec(); + auto sub1 = sub_arr[1].GetVec(); + EXPECT_THAT(sub0, ElementsAre("1-0", ArrLen(2))); + EXPECT_THAT(sub1, ElementsAre("1-1", ArrLen(2))); + + resp = Run({"xrevrange", "key", "-", "+"}); + sub_arr = resp.GetVec(); + sub0 = sub_arr[0].GetVec(); + sub1 = sub_arr[1].GetVec(); + EXPECT_THAT(sub0, ElementsAre("1-1", ArrLen(2))); + EXPECT_THAT(sub1, ElementsAre("1-0", ArrLen(2))); } } // namespace dfly \ No newline at end of file