implement partial streams API (#141)

* feat(stream): implement xrevrange, xrange, xlen, xinfo and xgroup, xdel.
1. add tests that cover xrange,xrevrange and various error states for xadd.
2. Implement 8 stream commands overall. Fixes #59.

* feat(stream): support xadd/maxlen option


Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2022-06-12 17:41:01 +03:00 committed by GitHub
parent 15fd451487
commit d5cd317a13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 676 additions and 49 deletions

View File

@ -204,7 +204,26 @@ with respect to Memcached and Redis APIs.
### API 3 ### API 3
### API 4 ### API 4
### API 5 ### API 5
### API 6 - [X] Stream Family
- [X] XADD
- [ ] XCLAIM
- [X] XDEL
- [X] XGROUP CREATE/DELCONSUMER/DESTROY/HELP/SETID
- [ ] XGROUP CREATECONSUMER
- [X] XINFO GROUPS/HELP
- [ ] XINFO CONSUMERS/GROUPS/STREAM
- [X] XLEN
- [ ] XPENDING
- [X] XRANGE
- [ ] XREAD
- [ ] XREADGROUP
- [X] XREVRANGE
- [X] XSETID
- [ ] XTRIM
### API 6,7
- [ ] Stream Family
- [ ] XAUTOCLAIM
## Notes ## Notes
Some commands were implemented as decorators along the way: Some commands were implemented as decorators along the way:

View File

@ -99,6 +99,11 @@ size_t MallocUsedZSet(unsigned encoding, void* ptr) {
return 0; return 0;
} }
size_t MallocUsedStream(unsigned encoding, void* streamv) {
// stream* str_obj = (stream*)streamv;
return 0; // TODO
}
inline void FreeObjHash(unsigned encoding, void* ptr) { inline void FreeObjHash(unsigned encoding, void* ptr) {
switch (encoding) { switch (encoding) {
case OBJ_ENCODING_HT: case OBJ_ENCODING_HT:
@ -228,6 +233,8 @@ size_t RobjWrapper::MallocUsed() const {
return MallocUsedHSet(encoding_, inner_obj_); return MallocUsedHSet(encoding_, inner_obj_);
case OBJ_ZSET: case OBJ_ZSET:
return MallocUsedZSet(encoding_, inner_obj_); return MallocUsedZSet(encoding_, inner_obj_);
case OBJ_STREAM:
return MallocUsedStream(encoding_, inner_obj_);
default: default:
LOG(FATAL) << "Not supported " << type_; LOG(FATAL) << "Not supported " << type_;

View File

@ -11,6 +11,8 @@ namespace facade {
std::string WrongNumArgsError(std::string_view cmd); std::string WrongNumArgsError(std::string_view cmd);
std::string InvalidExpireTime(std::string_view cmd); std::string InvalidExpireTime(std::string_view cmd);
std::string UnknownSubCmd(std::string_view subcmd, std::string_view cmd);
extern const char kSyntaxErr[]; extern const char kSyntaxErr[];
extern const char kWrongTypeErr[]; extern const char kWrongTypeErr[];

View File

@ -57,6 +57,12 @@ string InvalidExpireTime(string_view cmd) {
return absl::StrCat("invalid expire time in '", cmd, "' command"); return absl::StrCat("invalid expire time in '", cmd, "' command");
} }
string UnknownSubCmd(string_view subcmd, string_view cmd) {
return absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd, "'. Try ",
cmd, " HELP.");
}
const char kSyntaxErr[] = "syntax error"; const char kSyntaxErr[] = "syntax error";
const char kWrongTypeErr[] = "-WRONGTYPE Operation against a key holding the wrong kind of value"; const char kWrongTypeErr[] = "-WRONGTYPE Operation against a key holding the wrong kind of value";
const char kKeyNotFoundErr[] = "no such key"; const char kKeyNotFoundErr[] = "no such key";

View File

@ -21,6 +21,9 @@ enum class OpStatus : uint16_t {
INVALID_FLOAT, INVALID_FLOAT,
INVALID_INT, INVALID_INT,
SYNTAX_ERR, SYNTAX_ERR,
BUSY_GROUP,
STREAM_ID_SMALL,
ENTRIES_ADDED_SMALL,
}; };
class OpResultBase { class OpResultBase {

View File

@ -243,6 +243,9 @@ void RedisReplyBuilder::SendError(OpStatus status) {
case OpStatus::OUT_OF_MEMORY: case OpStatus::OUT_OF_MEMORY:
SendError(kOutOfMemory); SendError(kOutOfMemory);
break; break;
case OpStatus::BUSY_GROUP:
SendError("-BUSYGROUP Consumer Group name already exists");
break;
default: default:
LOG(ERROR) << "Unsupported status " << status; LOG(ERROR) << "Unsupported status " << status;
SendError("Internal error"); SendError("Internal error");

View File

@ -131,7 +131,7 @@ void streamIteratorStop(streamIterator *si);
streamCG *streamLookupCG(stream *s, sds groupname); streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags); streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags);
streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags); streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags);
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read); streamCG *streamCreateCG(stream *s, const char *name, size_t namelen, streamID *id, long long entries_read);
streamNACK *streamCreateNACK(streamConsumer *consumer); streamNACK *streamCreateNACK(streamConsumer *consumer);
void streamDecodeID(void *buf, streamID *id); void streamDecodeID(void *buf, streamID *id);
int streamCompareID(streamID *a, streamID *b); int streamCompareID(streamID *a, streamID *b);
@ -149,5 +149,9 @@ void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_i
long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id); long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id);
int64_t streamTrimByLength(stream *s, long long maxlen, int approx); int64_t streamTrimByLength(stream *s, long long maxlen, int approx);
int64_t streamTrimByID(stream *s, streamID minid, int approx); int64_t streamTrimByID(stream *s, streamID minid, int approx);
void streamFreeCG(streamCG *cg);
void streamDelConsumer(streamCG *cg, streamConsumer *consumer);
void streamLastValidID(stream *s, streamID *maxid);
int streamIDEqZero(streamID *id);
#endif #endif

View File

@ -2485,7 +2485,7 @@ void streamFreeConsumer(streamConsumer *sc) {
* specified name, last server ID and reads counter. If a consumer group with * specified name, last server ID and reads counter. If a consumer group with
* the same name already exists NULL is returned, otherwise the pointer to the * the same name already exists NULL is returned, otherwise the pointer to the
* consumer group is returned. */ * consumer group is returned. */
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read) { streamCG *streamCreateCG(stream *s, const char *name, size_t namelen, streamID *id, long long entries_read) {
if (s->cgroups == NULL) s->cgroups = raxNew(); if (s->cgroups == NULL) s->cgroups = raxNew();
if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound) if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound)
return NULL; return NULL;

View File

@ -77,9 +77,9 @@ DebugCmd::DebugCmd(ServerFamily* owner, ConnectionContext* cntx) : sf_(*owner),
} }
void DebugCmd::Run(CmdArgList args) { void DebugCmd::Run(CmdArgList args) {
std::string_view subcmd = ArgS(args, 1); string_view subcmd = ArgS(args, 1);
if (subcmd == "HELP") { if (subcmd == "HELP") {
std::string_view help_arr[] = { string_view help_arr[] = {
"DEBUG <subcommand> [<arg> [value] [opt] ...]. Subcommands are:", "DEBUG <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",
"OBJECT <key>", "OBJECT <key>",
" Show low-level info about `key` and associated value.", " Show low-level info about `key` and associated value.",
@ -122,8 +122,7 @@ void DebugCmd::Run(CmdArgList args) {
return Inspect(key); return Inspect(key);
} }
string reply = absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd, string reply = UnknownSubCmd(subcmd, "DEBUG");
"'. Try DEBUG HELP.");
return (*cntx_)->SendError(reply, kSyntaxErrType); return (*cntx_)->SendError(reply, kSyntaxErrType);
} }
@ -132,7 +131,7 @@ void DebugCmd::Reload(CmdArgList args) {
for (size_t i = 2; i < args.size(); ++i) { for (size_t i = 2; i < args.size(); ++i) {
ToUpper(&args[i]); ToUpper(&args[i]);
std::string_view opt = ArgS(args, i); string_view opt = ArgS(args, i);
VLOG(1) << "opt " << opt; VLOG(1) << "opt " << opt;
if (opt == "NOSAVE") { if (opt == "NOSAVE") {
@ -161,7 +160,7 @@ void DebugCmd::Reload(CmdArgList args) {
Load(last_save_file); Load(last_save_file);
} }
void DebugCmd::Load(std::string_view filename) { void DebugCmd::Load(string_view filename) {
GlobalState new_state = sf_.service().SwitchState(GlobalState::ACTIVE, GlobalState::LOADING); GlobalState new_state = sf_.service().SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
if (new_state != GlobalState::LOADING) { if (new_state != GlobalState::LOADING) {
LOG(WARNING) << GlobalStateName(new_state) << " in progress, ignored"; LOG(WARNING) << GlobalStateName(new_state) << " in progress, ignored";
@ -202,8 +201,7 @@ void DebugCmd::Load(std::string_view filename) {
void DebugCmd::Populate(CmdArgList args) { void DebugCmd::Populate(CmdArgList args) {
if (args.size() < 3 || args.size() > 5) { if (args.size() < 3 || args.size() > 5) {
return (*cntx_)->SendError( return (*cntx_)->SendError(UnknownSubCmd("populate", "DEBUG"));
"Unknown subcommand or wrong number of arguments for 'populate'. Try DEBUG HELP.");
} }
uint64_t total_count = 0; uint64_t total_count = 0;

View File

@ -993,7 +993,7 @@ void Service::Function(CmdArgList args, ConnectionContext* cntx) {
return (*cntx)->SendOk(); return (*cntx)->SendOk();
} }
string err = StrCat("Unknown subcommand '", sub_cmd, "'. Try FUNCTION HELP."); string err = UnknownSubCmd(sub_cmd, "FUNCTION");
return (*cntx)->SendError(err, kSyntaxErrType); return (*cntx)->SendError(err, kSyntaxErrType);
} }

View File

@ -87,11 +87,6 @@ error_code CreateDirs(fs::path dir_path) {
return ec; return ec;
} }
string UnknownSubCmd(string_view subcmd, string cmd) {
return absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd, "'. Try ",
cmd, " HELP.");
}
string UnknownCmd(string cmd, CmdArgList args) { string UnknownCmd(string cmd, CmdArgList args) {
return absl::StrCat("unknown command '", cmd, "' with args beginning with: ", return absl::StrCat("unknown command '", cmd, "' with args beginning with: ",
StrJoin(args.begin(), args.end(), ", ", CmdArgListFormatter())); StrJoin(args.begin(), args.end(), ", ", CmdArgListFormatter()));
@ -689,8 +684,7 @@ void ServerFamily::Memory(CmdArgList args, ConnectionContext* cntx) {
return (*cntx)->SendLong(1); return (*cntx)->SendLong(1);
} }
string err = StrCat("Unknown subcommand or wrong number of arguments for '", sub_cmd, string err = UnknownSubCmd(sub_cmd, "MEMORY");
"'. Try MEMORY HELP.");
return (*cntx)->SendError(err, kSyntaxErrType); return (*cntx)->SendError(err, kSyntaxErrType);
} }

View File

@ -43,6 +43,40 @@ struct RangeId {
bool exclude = false; bool exclude = false;
}; };
struct AddOpts {
ParsedStreamId parsed_id;
uint32_t max_limit = kuint32max;
bool max_limit_approx = false;
};
struct GroupInfo {
string name;
size_t consumer_size;
size_t pending_size;
streamID last_id;
};
struct RangeOpts {
ParsedStreamId start;
ParsedStreamId end;
bool is_rev = false;
uint32_t count = kuint32max;
};
const char kInvalidStreamId[] = "Invalid stream ID specified as stream command argument";
const char kXGroupKeyNotFound[] =
"The XGROUP subcommand requires the key to exist. "
"Note that for CREATE you may want to use the MKSTREAM option to create "
"an empty stream automatically.";
inline string StreamIdRepr(const streamID& id) {
return absl::StrCat(id.ms, "-", id.seq);
};
inline string NoGroupError(string_view key, string_view cgroup) {
return absl::StrCat("-NOGROUP No such consumer group '", cgroup, "' for key name '", key, "'");
}
bool ParseID(string_view strid, bool strict, uint64_t missing_seq, ParsedStreamId* dest) { bool ParseID(string_view strid, bool strict, uint64_t missing_seq, ParsedStreamId* dest) {
if (strid.empty() || strid.size() > 127) if (strid.empty() || strid.size() > 127)
return false; return false;
@ -70,15 +104,13 @@ bool ParseID(string_view strid, bool strict, uint64_t missing_seq, ParsedStreamI
} }
/* Parse <ms>-<seq> form. */ /* Parse <ms>-<seq> form. */
streamID result{.ms = 0, .seq = 0}; streamID result{.ms = 0, .seq = missing_seq};
size_t dash_pos = strid.find('-'); size_t dash_pos = strid.find('-');
if (!absl::SimpleAtoi(strid.substr(0, dash_pos), &result.ms)) if (!absl::SimpleAtoi(strid.substr(0, dash_pos), &result.ms))
return false; return false;
if (dash_pos == string_view::npos) { if (dash_pos != string_view::npos) {
result.seq = missing_seq;
} else {
if (dash_pos + 1 == strid.size()) if (dash_pos + 1 == strid.size())
return false; return false;
@ -106,7 +138,7 @@ bool ParseRangeId(string_view id, RangeId* dest) {
return ParseID(id, dest->exclude, 0, &dest->parsed_id); return ParseID(id, dest->exclude, 0, &dest->parsed_id);
} }
OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const ParsedStreamId& parsed_id, OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const AddOpts& opts,
CmdArgList args) { CmdArgList args) {
DCHECK(!args.empty() && args.size() % 2 == 0); DCHECK(!args.empty() && args.size() % 2 == 0);
auto& db_slice = op_args.shard->db_slice(); auto& db_slice = op_args.shard->db_slice();
@ -132,7 +164,7 @@ OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const ParsedStr
db_slice.PreUpdate(op_args.db_ind, it); db_slice.PreUpdate(op_args.db_ind, it);
} }
stream* str = (stream*)it->second.RObjPtr(); stream* stream_inst = (stream*)it->second.RObjPtr();
// we should really get rid of this monstrousity and rewrite streamAppendItem ourselves here. // we should really get rid of this monstrousity and rewrite streamAppendItem ourselves here.
unique_ptr<robj*[]> objs(new robj*[args.size()]); unique_ptr<robj*[]> objs(new robj*[args.size()]);
@ -141,21 +173,33 @@ OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const ParsedStr
} }
streamID result_id; streamID result_id;
const auto& parsed_id = opts.parsed_id;
streamID passed_id = parsed_id.val; streamID passed_id = parsed_id.val;
int res = streamAppendItem(str, objs.get(), args.size() / 2, &result_id, int res = streamAppendItem(stream_inst, objs.get(), args.size() / 2, &result_id,
parsed_id.id_given ? &passed_id : nullptr, parsed_id.has_seq); parsed_id.id_given ? &passed_id : nullptr, parsed_id.has_seq);
for (size_t i = 0; i < args.size(); ++i) {
decrRefCount(objs[i]);
}
if (res != C_OK) { if (res != C_OK) {
if (errno == ERANGE) if (errno == ERANGE)
return OpStatus::OUT_OF_RANGE; return OpStatus::OUT_OF_RANGE;
if (errno == EDOM)
return OpStatus::STREAM_ID_SMALL;
return OpStatus::OUT_OF_MEMORY; return OpStatus::OUT_OF_MEMORY;
} }
if (opts.max_limit < kuint32max) {
/* Notify xtrim event if needed. */
streamTrimByLength(stream_inst, opts.max_limit, opts.max_limit_approx);
// TODO: when replicating, we should propagate it as exact limit in case of trimming.
}
return result_id; return result_id;
} }
OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, const ParsedStreamId& start, OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, const RangeOpts& opts) {
const ParsedStreamId& end, uint32_t count) {
auto& db_slice = op_args.shard->db_slice(); auto& db_slice = op_args.shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM); OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM);
if (!res_it) if (!res_it)
@ -163,7 +207,7 @@ OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, const Parsed
RecordVec result; RecordVec result;
if (count == 0) if (opts.count == 0)
return result; return result;
streamIterator si; streamIterator si;
@ -171,10 +215,9 @@ OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, const Parsed
streamID id; streamID id;
CompactObj& cobj = (*res_it)->second; CompactObj& cobj = (*res_it)->second;
stream* s = (stream*)cobj.RObjPtr(); stream* s = (stream*)cobj.RObjPtr();
int rev = 0; streamID sstart = opts.start.val, send = opts.end.val;
streamID sstart = start.val, send = end.val;
streamIteratorStart(&si, s, &sstart, &send, rev); streamIteratorStart(&si, s, &sstart, &send, opts.is_rev);
while (streamIteratorGetID(&si, &id, &numfields)) { while (streamIteratorGetID(&si, &id, &numfields)) {
Record rec; Record rec;
rec.id = id; rec.id = id;
@ -193,7 +236,7 @@ OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, const Parsed
result.push_back(move(rec)); result.push_back(move(rec));
if (count == result.size()) if (opts.count == result.size())
break; break;
} }
@ -212,32 +255,360 @@ OpResult<uint32_t> OpLen(const OpArgs& op_args, string_view key) {
return s->length; return s->length;
} }
inline string StreamIdRepr(const streamID& id) { OpResult<vector<GroupInfo>> OpListGroups(const OpArgs& op_args, string_view key) {
return absl::StrCat(id.ms, "-", id.seq); auto& db_slice = op_args.shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM);
if (!res_it)
return res_it.status();
vector<GroupInfo> result;
CompactObj& cobj = (*res_it)->second;
stream* s = (stream*)cobj.RObjPtr();
if (s->cgroups) {
result.reserve(raxSize(s->cgroups));
raxIterator ri;
raxStart(&ri, s->cgroups);
raxSeek(&ri, "^", NULL, 0);
while (raxNext(&ri)) {
streamCG* cg = (streamCG*)ri.data;
GroupInfo ginfo;
ginfo.name.assign(reinterpret_cast<char*>(ri.key), ri.key_len);
ginfo.consumer_size = raxSize(cg->consumers);
ginfo.pending_size = raxSize(cg->pel);
ginfo.last_id = cg->last_id;
result.push_back(std::move(ginfo));
}
raxStop(&ri);
}
return result;
}
struct CreateOpts {
string_view gname;
string_view id;
}; };
const char kInvalidStreamId[] = "Invalid stream ID specified as stream command argument"; OpStatus OpCreate(const OpArgs& op_args, string_view key, const CreateOpts& opts) {
auto* shard = op_args.shard;
auto& db_slice = shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM);
if (!res_it)
return res_it.status();
CompactObj& cobj = (*res_it)->second;
stream* s = (stream*)cobj.RObjPtr();
streamID id;
ParsedStreamId parsed_id;
if (opts.id == "$") {
id = s->last_id;
} else {
if (ParseID(opts.id, true, 0, &parsed_id)) {
id = parsed_id.val;
} else {
return OpStatus::SYNTAX_ERR;
}
}
streamCG* cg = streamCreateCG(s, opts.gname.data(), opts.gname.size(), &id, 0);
if (cg) {
return OpStatus::OK;
}
return OpStatus::BUSY_GROUP;
}
OpResult<pair<stream*, streamCG*>> FindGroup(const OpArgs& op_args, string_view key,
string_view gname) {
auto* shard = op_args.shard;
auto& db_slice = shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM);
if (!res_it)
return res_it.status();
CompactObj& cobj = (*res_it)->second;
pair<stream*, streamCG*> res;
res.first = (stream*)cobj.RObjPtr();
shard->tmp_str1 = sdscpylen(shard->tmp_str1, gname.data(), gname.size());
res.second = streamLookupCG(res.first, shard->tmp_str1);
return res;
}
// XGROUP DESTROY key groupname
OpStatus OpDestroyGroup(const OpArgs& op_args, string_view key, string_view gname) {
OpResult<pair<stream*, streamCG*>> cgr_res = FindGroup(op_args, key, gname);
if (!cgr_res)
return cgr_res.status();
stream* s = cgr_res->first;
streamCG* scg = cgr_res->second;
if (scg) {
raxRemove(s->cgroups, (uint8_t*)(gname.data()), gname.size(), NULL);
streamFreeCG(scg);
return OpStatus::OK;
}
return OpStatus::SKIPPED;
}
// XGROUP DELCONSUMER key groupname consumername
OpResult<uint32_t> OpDelConsumer(const OpArgs& op_args, string_view key, string_view gname,
string_view consumer_name) {
OpResult<pair<stream*, streamCG*>> cgroup_res = FindGroup(op_args, key, gname);
if (!cgroup_res)
return cgroup_res.status();
streamCG* cg = cgroup_res->second;
if (cg == nullptr)
return OpStatus::SKIPPED;
long long pending = 0;
auto* shard = op_args.shard;
shard->tmp_str1 = sdscpylen(shard->tmp_str1, consumer_name.data(), consumer_name.size());
streamConsumer* consumer = streamLookupConsumer(cg, shard->tmp_str1, SLC_NO_REFRESH);
if (consumer) {
pending = raxSize(consumer->pel);
streamDelConsumer(cg, consumer);
}
return pending;
}
OpStatus OpSetId(const OpArgs& op_args, string_view key, string_view gname, string_view id) {
OpResult<pair<stream*, streamCG*>> cgr_res = FindGroup(op_args, key, gname);
if (!cgr_res)
return cgr_res.status();
streamCG* cg = cgr_res->second;
if (cg == nullptr)
return OpStatus::SKIPPED;
streamID sid;
ParsedStreamId parsed_id;
if (id == "$") {
sid = cgr_res->first->last_id;
} else {
if (ParseID(id, true, 0, &parsed_id)) {
sid = parsed_id.val;
} else {
return OpStatus::SYNTAX_ERR;
}
}
cg->last_id = sid;
return OpStatus::OK;
}
OpStatus OpSetId2(const OpArgs& op_args, string_view key, const streamID& sid) {
auto* shard = op_args.shard;
auto& db_slice = shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM);
if (!res_it)
return res_it.status();
CompactObj& cobj = (*res_it)->second;
stream* stream_inst = (stream*)cobj.RObjPtr();
long long entries_added = -1;
streamID max_xdel_id{0, 0};
/* If the stream has at least one item, we want to check that the user
* is setting a last ID that is equal or greater than the current top
* item, otherwise the fundamental ID monotonicity assumption is violated. */
if (stream_inst->length > 0) {
streamID maxid;
streamID id = sid;
streamLastValidID(stream_inst, &maxid);
if (streamCompareID(&id, &maxid) < 0) {
return OpStatus::STREAM_ID_SMALL;
}
/* If an entries_added was provided, it can't be lower than the length. */
if (entries_added != -1 && stream_inst->length > uint64_t(entries_added)) {
return OpStatus::ENTRIES_ADDED_SMALL;
}
}
stream_inst->last_id = sid;
if (entries_added != -1)
stream_inst->entries_added = entries_added;
if (!streamIDEqZero(&max_xdel_id))
stream_inst->max_deleted_entry_id = max_xdel_id;
return OpStatus::OK;
}
OpResult<uint32_t> OpDel(const OpArgs& op_args, string_view key, absl::Span<streamID> ids) {
auto* shard = op_args.shard;
auto& db_slice = shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM);
if (!res_it)
return res_it.status();
CompactObj& cobj = (*res_it)->second;
stream* stream_inst = (stream*)cobj.RObjPtr();
uint32_t deleted = 0;
bool first_entry = false;
for (size_t j = 0; j < ids.size(); j++) {
streamID id = ids[j];
if (!streamDeleteItem(stream_inst, &id))
continue;
/* We want to know if the first entry in the stream was deleted
* so we can later set the new one. */
if (streamCompareID(&id, &stream_inst->first_id) == 0) {
first_entry = 1;
}
/* Update the stream's maximal tombstone if needed. */
if (streamCompareID(&id, &stream_inst->max_deleted_entry_id) > 0) {
stream_inst->max_deleted_entry_id = id;
}
deleted++;
}
/* Update the stream's first ID. */
if (deleted) {
if (stream_inst->length == 0) {
stream_inst->first_id.ms = 0;
stream_inst->first_id.seq = 0;
} else if (first_entry) {
streamGetEdgeID(stream_inst, 1, 1, &stream_inst->first_id);
}
}
return deleted;
}
void CreateGroup(CmdArgList args, string_view key, ConnectionContext* cntx) {
if (args.size() < 2)
return (*cntx)->SendError(UnknownSubCmd("CREATE", "XGROUP"));
CreateOpts opts;
opts.gname = ArgS(args, 0);
opts.id = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args{shard, t->db_index()};
return OpCreate(op_args, key, opts);
};
OpStatus result = cntx->transaction->ScheduleSingleHop(std::move(cb));
switch (result) {
case OpStatus::KEY_NOTFOUND:
return (*cntx)->SendError(kXGroupKeyNotFound);
default:
(*cntx)->SendError(result);
}
}
void DestroyGroup(string_view key, string_view gname, ConnectionContext* cntx) {
auto cb = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args{shard, t->db_index()};
return OpDestroyGroup(op_args, key, gname);
};
OpStatus result = cntx->transaction->ScheduleSingleHop(std::move(cb));
switch (result) {
case OpStatus::OK:
return (*cntx)->SendLong(1);
case OpStatus::SKIPPED:
return (*cntx)->SendLong(0);
case OpStatus::KEY_NOTFOUND:
return (*cntx)->SendError(kXGroupKeyNotFound);
default:
(*cntx)->SendError(result);
}
}
void DelConsumer(string_view key, string_view gname, string_view consumer,
ConnectionContext* cntx) {
auto cb = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args{shard, t->db_index()};
return OpDelConsumer(op_args, key, gname, consumer);
};
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
switch (result.status()) {
case OpStatus::OK:
return (*cntx)->SendLong(*result);
case OpStatus::SKIPPED:
return (*cntx)->SendError(NoGroupError(key, gname));
case OpStatus::KEY_NOTFOUND:
return (*cntx)->SendError(kXGroupKeyNotFound);
default:
(*cntx)->SendError(result.status());
}
}
void SetId(string_view key, string_view gname, CmdArgList args, ConnectionContext* cntx) {
string_view id = ArgS(args, 0);
auto cb = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args{shard, t->db_index()};
return OpSetId(op_args, key, gname, id);
};
OpStatus result = cntx->transaction->ScheduleSingleHop(std::move(cb));
switch (result) {
case OpStatus::SKIPPED:
return (*cntx)->SendError(NoGroupError(key, gname));
case OpStatus::KEY_NOTFOUND:
return (*cntx)->SendError(kXGroupKeyNotFound);
default:
(*cntx)->SendError(result);
}
}
} // namespace } // namespace
void StreamFamily::XAdd(CmdArgList args, ConnectionContext* cntx) { void StreamFamily::XAdd(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1); string_view key = ArgS(args, 1);
unsigned id_indx = 2;
AddOpts add_opts;
// TODO: args parsing for (; id_indx < args.size(); ++id_indx) {
string_view id = ArgS(args, 2); ToUpper(&args[id_indx]);
ParsedStreamId parsed_id; string_view arg = ArgS(args, id_indx);
if (arg == "MAXLEN") {
if (id_indx + 2 >= args.size()) {
return (*cntx)->SendError(kSyntaxErr);
}
++id_indx;
if (ArgS(args, id_indx) == "~") {
add_opts.max_limit_approx = true;
++id_indx;
}
arg = ArgS(args, id_indx);
if (!absl::SimpleAtoi(arg, &add_opts.max_limit)) {
return (*cntx)->SendError(kSyntaxErr);
}
} else {
break;
}
}
args.remove_prefix(3); args.remove_prefix(id_indx);
if (args.empty() || args.size() % 2 == 1) { if (args.size() < 3 || args.size() % 2 == 0) {
return (*cntx)->SendError(WrongNumArgsError("XADD"), kSyntaxErrType); return (*cntx)->SendError(WrongNumArgsError("XADD"), kSyntaxErrType);
} }
if (!ParseID(id, true, 0, &parsed_id)) { string_view id = ArgS(args, 0);
if (!ParseID(id, true, 0, &add_opts.parsed_id)) {
return (*cntx)->SendError(kInvalidStreamId, kSyntaxErrType); return (*cntx)->SendError(kInvalidStreamId, kSyntaxErrType);
} }
args.remove_prefix(1);
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args{shard, t->db_index()}; OpArgs op_args{shard, t->db_index()};
return OpAdd(op_args, key, parsed_id, args); return OpAdd(op_args, key, add_opts, args);
}; };
OpResult<streamID> add_result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); OpResult<streamID> add_result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
@ -245,9 +616,134 @@ void StreamFamily::XAdd(CmdArgList args, ConnectionContext* cntx) {
return (*cntx)->SendBulkString(StreamIdRepr(*add_result)); return (*cntx)->SendBulkString(StreamIdRepr(*add_result));
} }
if (add_result.status() == OpStatus::STREAM_ID_SMALL) {
return (*cntx)->SendError(
"The ID specified in XADD is equal or smaller than "
"the target stream top item");
}
return (*cntx)->SendError(add_result.status()); return (*cntx)->SendError(add_result.status());
} }
void StreamFamily::XDel(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
args.remove_prefix(2);
absl::InlinedVector<streamID, 8> ids(ids.size());
for (size_t i = 0; i < args.size(); ++i) {
ParsedStreamId parsed_id;
string_view str_id = ArgS(args, i);
if (!ParseID(str_id, true, 0, &parsed_id)) {
return (*cntx)->SendError(kInvalidStreamId, kSyntaxErrType);
}
ids[i] = parsed_id.val;
}
auto cb = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args{shard, t->db_index()};
return OpDel(op_args, key, absl::Span{ids.data(), ids.size()});
};
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result || result.status() == OpStatus::KEY_NOTFOUND) {
return (*cntx)->SendLong(*result);
}
(*cntx)->SendError(result.status());
}
void StreamFamily::XGroup(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[1]);
string_view sub_cmd = ArgS(args, 1);
if (sub_cmd == "HELP") {
string_view help_arr[] = {
"CREATE <key> <groupname> <id|$> [option]",
" Create a new consumer group. Options are:",
" * MKSTREAM",
" Create the empty stream if it does not exist.",
"CREATECONSUMER <key> <groupname> <consumer>",
" Create a new consumer in the specified group.",
"DELCONSUMER <key> <groupname> <consumer>",
" Remove the specified consumer.",
"DESTROY <key> <groupname>"
" Remove the specified group.",
"SETID <key> <groupname> <id|$>",
" Set the current group ID.",
};
return (*cntx)->SendSimpleStrArr(help_arr, ABSL_ARRAYSIZE(help_arr));
}
if (args.size() >= 3) {
string_view key = ArgS(args, 2);
if (sub_cmd == "CREATE") {
args.remove_prefix(3);
return CreateGroup(std::move(args), key, cntx);
}
if (sub_cmd == "DESTROY" && args.size() == 4) {
string_view gname = ArgS(args, 3);
return DestroyGroup(key, gname, cntx);
}
if (sub_cmd == "DELCONSUMER" && args.size() == 5) {
string_view gname = ArgS(args, 3);
string_view cname = ArgS(args, 4);
return DelConsumer(key, gname, cname, cntx);
}
if (sub_cmd == "SETID" && args.size() >= 5) {
string_view gname = ArgS(args, 3);
args.remove_prefix(4);
return SetId(key, gname, std::move(args), cntx);
}
}
return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "XGROUP"));
}
void StreamFamily::XInfo(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[1]);
string_view sub_cmd = ArgS(args, 1);
if (sub_cmd == "HELP") {
string_view help_arr[] = {
"CONSUMERS <key> <groupname>",
" Show consumers of <groupname>.",
"GROUPS <key>",
" Show the stream consumer groups.",
"STREAM <key> [FULL [COUNT <count>]",
" Show information about the stream.",
};
return (*cntx)->SendSimpleStrArr(help_arr, ABSL_ARRAYSIZE(help_arr));
}
if (args.size() >= 3) {
string_view key = ArgS(args, 2);
if (sub_cmd == "GROUPS") {
auto cb = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args{shard, t->db_index()};
return OpListGroups(op_args, key);
};
OpResult<vector<GroupInfo>> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) {
(*cntx)->StartArray(result->size());
for (const auto& ginfo : *result) {
absl::AlphaNum an1(ginfo.consumer_size);
absl::AlphaNum an2(ginfo.pending_size);
string last_id = StreamIdRepr(ginfo.last_id);
string_view arr[8] = {"name", ginfo.name, "consumers", an1.Piece(),
"pending", an2.Piece(), "last-delivered-id", last_id};
(*cntx)->SendStringArr(absl::Span<string_view>{arr, 8});
}
return;
}
return (*cntx)->SendError(result.status());
}
}
return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "XINFO"));
}
void StreamFamily::XLen(CmdArgList args, ConnectionContext* cntx) { void StreamFamily::XLen(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1); string_view key = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
@ -264,11 +760,46 @@ void StreamFamily::XLen(CmdArgList args, ConnectionContext* cntx) {
} }
void StreamFamily::XRange(CmdArgList args, ConnectionContext* cntx) { void StreamFamily::XRange(CmdArgList args, ConnectionContext* cntx) {
XRangeGeneric(std::move(args), false, cntx);
}
void StreamFamily::XRevRange(CmdArgList args, ConnectionContext* cntx) {
XRangeGeneric(std::move(args), true, cntx);
}
void StreamFamily::XSetId(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
string_view idstr = ArgS(args, 2);
ParsedStreamId parsed_id;
if (!ParseID(idstr, true, 0, &parsed_id)) {
return (*cntx)->SendError(kInvalidStreamId, kSyntaxErrType);
}
auto cb = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args{shard, t->db_index()};
return OpSetId2(op_args, key, parsed_id.val);
};
OpStatus result = cntx->transaction->ScheduleSingleHop(std::move(cb));
switch (result) {
case OpStatus::STREAM_ID_SMALL:
return (*cntx)->SendError(
"The ID specified in XSETID is smaller than the target stream top item");
case OpStatus::ENTRIES_ADDED_SMALL:
return (*cntx)->SendError(
"The entries_added specified in XSETID is smaller than "
"the target stream length");
default:
return (*cntx)->SendError(result);
}
}
void StreamFamily::XRangeGeneric(CmdArgList args, bool is_rev, ConnectionContext* cntx) {
string_view key = ArgS(args, 1); string_view key = ArgS(args, 1);
string_view start = ArgS(args, 2); string_view start = ArgS(args, 2);
string_view end = ArgS(args, 3); string_view end = ArgS(args, 3);
uint32_t count = kuint32max; RangeOpts range_opts;
RangeId rs, re; RangeId rs, re;
if (!ParseRangeId(start, &rs) || !ParseRangeId(end, &re)) { if (!ParseRangeId(start, &rs) || !ParseRangeId(end, &re)) {
return (*cntx)->SendError(kInvalidStreamId, kSyntaxErrType); return (*cntx)->SendError(kInvalidStreamId, kSyntaxErrType);
@ -290,14 +821,18 @@ void StreamFamily::XRange(CmdArgList args, ConnectionContext* cntx) {
string_view opt = ArgS(args, 4); string_view opt = ArgS(args, 4);
string_view val = ArgS(args, 5); string_view val = ArgS(args, 5);
if (opt != "COUNT" || !absl::SimpleAtoi(val, &count)) { if (opt != "COUNT" || !absl::SimpleAtoi(val, &range_opts.count)) {
return (*cntx)->SendError(kSyntaxErr); return (*cntx)->SendError(kSyntaxErr);
} }
} }
range_opts.start = rs.parsed_id;
range_opts.end = re.parsed_id;
range_opts.is_rev = is_rev;
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args{shard, t->db_index()}; OpArgs op_args{shard, t->db_index()};
return OpRange(op_args, key, rs.parsed_id, re.parsed_id, count); return OpRange(op_args, key, range_opts);
}; };
OpResult<RecordVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); OpResult<RecordVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
@ -327,8 +862,13 @@ void StreamFamily::Register(CommandRegistry* registry) {
using CI = CommandId; using CI = CommandId;
*registry << CI{"XADD", CO::WRITE | CO::FAST, -5, 1, 1, 1}.HFUNC(XAdd) *registry << CI{"XADD", CO::WRITE | CO::FAST, -5, 1, 1, 1}.HFUNC(XAdd)
<< CI{"XDEL", CO::WRITE | CO::FAST, -3, 1, 1, 1}.HFUNC(XDel)
<< CI{"XGROUP", CO::WRITE | CO::DENYOOM, -2, 2, 2, 1}.HFUNC(XGroup)
<< CI{"XINFO", CO::READONLY, -2, 2, 2, 1}.HFUNC(XInfo)
<< CI{"XLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(XLen) << CI{"XLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(XLen)
<< CI{"XRANGE", CO::READONLY, -4, 1, 1, 1}.HFUNC(XRange); << CI{"XRANGE", CO::READONLY, -4, 1, 1, 1}.HFUNC(XRange)
<< CI{"XREVRANGE", CO::READONLY, -4, 1, 1, 1}.HFUNC(XRevRange)
<< CI{"XSETID", CO::WRITE | CO::DENYOOM, 3, 1, 1, 1}.HFUNC(XSetId);
} }
} // namespace dfly } // namespace dfly

View File

@ -17,9 +17,14 @@ class StreamFamily {
private: private:
static void XAdd(CmdArgList args, ConnectionContext* cntx); static void XAdd(CmdArgList args, ConnectionContext* cntx);
static void XDel(CmdArgList args, ConnectionContext* cntx);
static void XGroup(CmdArgList args, ConnectionContext* cntx);
static void XInfo(CmdArgList args, ConnectionContext* cntx);
static void XLen(CmdArgList args, ConnectionContext* cntx); static void XLen(CmdArgList args, ConnectionContext* cntx);
static void XRevRange(CmdArgList args, ConnectionContext* cntx);
static void XRange(CmdArgList args, ConnectionContext* cntx); static void XRange(CmdArgList args, ConnectionContext* cntx);
static void XSetId(CmdArgList args, ConnectionContext* cntx);
static void XRangeGeneric(CmdArgList args, bool is_rev, ConnectionContext* cntx);
}; };
} // namespace dfly } // namespace dfly

View File

@ -36,6 +36,52 @@ TEST_F(StreamFamilyTest, Add) {
resp = Run({"xlen", "key"}); resp = Run({"xlen", "key"});
EXPECT_THAT(resp, IntArg(1)); EXPECT_THAT(resp, IntArg(1));
resp = Run({"xadd", "key", "badid", "f1", "val1"});
EXPECT_THAT(resp, ErrArg("Invalid stream ID"));
}
TEST_F(StreamFamilyTest, AddExtended) {
auto resp0 = Run({"xadd", "key", "5", "f1", "v1", "f2", "v2"});
EXPECT_EQ(resp0, "5-0");
resp0 = Run({"xrange", "key", "5-0", "5-0"});
EXPECT_THAT(resp0, ArrLen(2));
auto sub_arr = resp0.GetVec();
EXPECT_THAT(sub_arr, ElementsAre("5-0", ArrLen(4)));
sub_arr = sub_arr[1].GetVec();
EXPECT_THAT(sub_arr, ElementsAre("f1", "v1", "f2", "v2"));
auto resp1 = Run({"xadd", "key", "maxlen", "1", "*", "field1", "val1"});
string id1 = string(ToSV(resp1.GetBuf()));
auto resp2 = Run({"xadd", "key", "maxlen", "1", "*", "field2", "val2"});
string id2 = string(ToSV(resp2.GetBuf()));
EXPECT_THAT(Run({"xlen", "key"}), IntArg(1));
EXPECT_THAT(Run({"xrange", "key", id1, id1}), ArrLen(0));
auto resp3 = Run({"xadd", "key", id2, "f1", "val1"});
EXPECT_THAT(resp3, ErrArg("equal or smaller than"));
}
TEST_F(StreamFamilyTest, Range) {
Run({"xadd", "key", "1-*", "f1", "v1"});
Run({"xadd", "key", "1-*", "f2", "v2"});
auto resp = Run({"xrange", "key", "-", "+"});
EXPECT_THAT(resp, ArrLen(2));
auto sub_arr = resp.GetVec();
EXPECT_THAT(sub_arr, ElementsAre(ArrLen(2), ArrLen(2)));
auto sub0 = sub_arr[0].GetVec();
auto sub1 = sub_arr[1].GetVec();
EXPECT_THAT(sub0, ElementsAre("1-0", ArrLen(2)));
EXPECT_THAT(sub1, ElementsAre("1-1", ArrLen(2)));
resp = Run({"xrevrange", "key", "-", "+"});
sub_arr = resp.GetVec();
sub0 = sub_arr[0].GetVec();
sub1 = sub_arr[1].GetVec();
EXPECT_THAT(sub0, ElementsAre("1-1", ArrLen(2)));
EXPECT_THAT(sub1, ElementsAre("1-0", ArrLen(2)));
} }
} // namespace dfly } // namespace dfly