Introduce the initial skeleton code for stream routines

1. Take redis stream code from Redis 7.0 branch (d2b5a579d).
2. Introduce stream_family class and test.
This commit is contained in:
Roman Gershman 2022-06-06 09:53:09 +03:00 committed by Roman Gershman
parent e2c52c47a5
commit 819b2f0716
14 changed files with 6577 additions and 4 deletions

View File

@ -10,7 +10,8 @@ endif()
add_library(redis_lib crc64.c crcspeed.c debug.c dict.c intset.c
listpack.c mt19937-64.c object.c lzf_c.c lzf_d.c sds.c
quicklist.c redis_aux.c siphash.c t_hash.c t_zset.c util.c ziplist.c ${ZMALLOC_SRC})
quicklist.c rax.c redis_aux.c siphash.c t_hash.c t_stream.c t_zset.c
util.c ziplist.c ${ZMALLOC_SRC})
cxx_link(redis_lib ${ZMALLOC_DEPS})

View File

@ -38,6 +38,7 @@
#include "listpack.h"
#include "object.h"
#include "redis_aux.h"
#include "stream.h"
#include "zset.h"
#include "quicklist.h"
#include "util.h"
@ -286,7 +287,6 @@ robj *createZsetListpackObject(void) {
return o;
}
#if ROMAN_ENABLE
robj *createStreamObject(void) {
stream *s = streamNew();
robj *o = createObject(OBJ_STREAM,s);
@ -294,6 +294,8 @@ robj *createStreamObject(void) {
return o;
}
#if ROMAN_ENABLE
robj *createModuleObject(moduleType *mt, void *value) {
moduleValue *mv = zmalloc(sizeof(*mv));
mv->type = mt;

View File

@ -80,6 +80,7 @@ void freeSetObject(robj *o);
void freeZsetObject(robj *o);
void freeHashObject(robj *o);
robj *createObject(int type, void *ptr);
robj *createStreamObject(void);
robj *createStringObject(const char *ptr, size_t len);
robj *createRawStringObject(const char *ptr, size_t len);
robj *createEmbeddedStringObject(const char *ptr, size_t len);

1929
src/redis/rax.c Normal file

File diff suppressed because it is too large Load Diff

216
src/redis/rax.h Normal file
View File

@ -0,0 +1,216 @@
/* Rax -- A radix tree implementation.
*
* Copyright (c) 2017-2018, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef RAX_H
#define RAX_H
#include <stdint.h>
/* Representation of a radix tree as implemented in this file, that contains
* the strings "foo", "foobar" and "footer" after the insertion of each
* word. When the node represents a key inside the radix tree, we write it
* between [], otherwise it is written between ().
*
* This is the vanilla representation:
*
* (f) ""
* \
* (o) "f"
* \
* (o) "fo"
* \
* [t b] "foo"
* / \
* "foot" (e) (a) "foob"
* / \
* "foote" (r) (r) "fooba"
* / \
* "footer" [] [] "foobar"
*
* However, this implementation implements a very common optimization where
* successive nodes having a single child are "compressed" into the node
* itself as a string of characters, each representing a next-level child,
* and only the link to the node representing the last character node is
* provided inside the representation. So the above representation is turned
* into:
*
* ["foo"] ""
* |
* [t b] "foo"
* / \
* "foot" ("er") ("ar") "foob"
* / \
* "footer" [] [] "foobar"
*
* However this optimization makes the implementation a bit more complex.
* For instance if a key "first" is added in the above radix tree, a
* "node splitting" operation is needed, since the "foo" prefix is no longer
* composed of nodes having a single child one after the other. This is the
* above tree and the resulting node splitting after this event happens:
*
*
* (f) ""
* /
* (i o) "f"
* / \
* "firs" ("rst") (o) "fo"
* / \
* "first" [] [t b] "foo"
* / \
* "foot" ("er") ("ar") "foob"
* / \
* "footer" [] [] "foobar"
*
* Similarly after deletion, if a new chain of nodes having a single child
* is created (the chain must also not include nodes that represent keys),
* it must be compressed back into a single node.
*
*/
#define RAX_NODE_MAX_SIZE ((1<<29)-1)
typedef struct raxNode {
uint32_t iskey:1; /* Does this node contain a key? */
uint32_t isnull:1; /* Associated value is NULL (don't store it). */
uint32_t iscompr:1; /* Node is compressed. */
uint32_t size:29; /* Number of children, or compressed string len. */
/* Data layout is as follows:
*
* If node is not compressed we have 'size' bytes, one for each children
* character, and 'size' raxNode pointers, point to each child node.
* Note how the character is not stored in the children but in the
* edge of the parents:
*
* [header iscompr=0][abc][a-ptr][b-ptr][c-ptr](value-ptr?)
*
* if node is compressed (iscompr bit is 1) the node has 1 children.
* In that case the 'size' bytes of the string stored immediately at
* the start of the data section, represent a sequence of successive
* nodes linked one after the other, for which only the last one in
* the sequence is actually represented as a node, and pointed to by
* the current compressed node.
*
* [header iscompr=1][xyz][z-ptr](value-ptr?)
*
* Both compressed and not compressed nodes can represent a key
* with associated data in the radix tree at any level (not just terminal
* nodes).
*
* If the node has an associated key (iskey=1) and is not NULL
* (isnull=0), then after the raxNode pointers pointing to the
* children, an additional value pointer is present (as you can see
* in the representation above as "value-ptr" field).
*/
unsigned char data[];
} raxNode;
typedef struct rax {
raxNode *head;
uint64_t numele;
uint64_t numnodes;
} rax;
/* Stack data structure used by raxLowWalk() in order to, optionally, return
* a list of parent nodes to the caller. The nodes do not have a "parent"
* field for space concerns, so we use the auxiliary stack when needed. */
#define RAX_STACK_STATIC_ITEMS 32
typedef struct raxStack {
void **stack; /* Points to static_items or an heap allocated array. */
size_t items, maxitems; /* Number of items contained and total space. */
/* Up to RAXSTACK_STACK_ITEMS items we avoid to allocate on the heap
* and use this static array of pointers instead. */
void *static_items[RAX_STACK_STATIC_ITEMS];
int oom; /* True if pushing into this stack failed for OOM at some point. */
} raxStack;
/* Optional callback used for iterators and be notified on each rax node,
* including nodes not representing keys. If the callback returns true
* the callback changed the node pointer in the iterator structure, and the
* iterator implementation will have to replace the pointer in the radix tree
* internals. This allows the callback to reallocate the node to perform
* very special operations, normally not needed by normal applications.
*
* This callback is used to perform very low level analysis of the radix tree
* structure, scanning each possible node (but the root node), or in order to
* reallocate the nodes to reduce the allocation fragmentation (this is the
* Redis application for this callback).
*
* This is currently only supported in forward iterations (raxNext) */
typedef int (*raxNodeCallback)(raxNode **noderef);
/* Radix tree iterator state is encapsulated into this data structure. */
#define RAX_ITER_STATIC_LEN 128
#define RAX_ITER_JUST_SEEKED (1<<0) /* Iterator was just seeked. Return current
element for the first iteration and
clear the flag. */
#define RAX_ITER_EOF (1<<1) /* End of iteration reached. */
#define RAX_ITER_SAFE (1<<2) /* Safe iterator, allows operations while
iterating. But it is slower. */
typedef struct raxIterator {
int flags;
rax *rt; /* Radix tree we are iterating. */
unsigned char *key; /* The current string. */
void *data; /* Data associated to this key. */
size_t key_len; /* Current key length. */
size_t key_max; /* Max key len the current key buffer can hold. */
unsigned char key_static_string[RAX_ITER_STATIC_LEN];
raxNode *node; /* Current node. Only for unsafe iteration. */
raxStack stack; /* Stack used for unsafe iteration. */
raxNodeCallback node_cb; /* Optional node callback. Normally set to NULL. */
} raxIterator;
/* A special pointer returned for not found items. */
extern void *raxNotFound;
/* Exported API. */
rax *raxNew(void);
int raxInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old);
int raxTryInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old);
int raxRemove(rax *rax, unsigned char *s, size_t len, void **old);
void *raxFind(rax *rax, unsigned char *s, size_t len);
void raxFree(rax *rax);
void raxFreeWithCallback(rax *rax, void (*free_callback)(void*));
void raxStart(raxIterator *it, rax *rt);
int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len);
int raxNext(raxIterator *it);
int raxPrev(raxIterator *it);
int raxRandomWalk(raxIterator *it, size_t steps);
int raxCompare(raxIterator *iter, const char *op, unsigned char *key, size_t key_len);
void raxStop(raxIterator *it);
int raxEOF(raxIterator *it);
void raxShow(rax *rax);
uint64_t raxSize(rax *rax);
unsigned long raxTouch(raxNode *n);
void raxSetDebugMsg(int onoff);
/* Internal API. May be used by the node callback in order to access rax nodes
* in a low level way, so this function is exported as well. */
void raxSetData(raxNode *n, void *data);
#endif

44
src/redis/rax_malloc.h Normal file
View File

@ -0,0 +1,44 @@
/* Rax -- A radix tree implementation.
*
* Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/* Allocator selection.
*
* This file is used in order to change the Rax allocator at compile time.
* Just define the following defines to what you want to use. Also add
* the include of your alternate allocator if needed (not needed in order
* to use the default libc allocator). */
#ifndef RAX_ALLOC_H
#define RAX_ALLOC_H
#include "zmalloc.h"
#define rax_malloc zmalloc
#define rax_realloc zrealloc
#define rax_free zfree
#endif

View File

@ -4,6 +4,7 @@
#include <unistd.h>
#include "crc64.h"
#include "endianconv.h"
#include "object.h"
#include "zmalloc.h"
@ -26,6 +27,9 @@ void InitRedisTables() {
server.hash_max_listpack_value = 32; // decreased from redis default 64.
server.rdb_compression = 1;
server.stream_node_max_bytes = 4096;
server.stream_node_max_entries = 100;
}
// These functions are moved here from server.c
@ -91,6 +95,31 @@ size_t sdsZmallocSize(sds s) {
return zmalloc_size(sh);
}
/* Toggle the 64 bit unsigned integer pointed by *p from little endian to
* big endian */
void memrev64(void *p) {
unsigned char *x = p, t;
t = x[0];
x[0] = x[7];
x[7] = t;
t = x[1];
x[1] = x[6];
x[6] = t;
t = x[2];
x[2] = x[5];
x[5] = t;
t = x[3];
x[3] = x[4];
x[4] = t;
}
uint64_t intrev64(uint64_t v) {
memrev64(&v);
return v;
}
/* Set dictionary type. Keys are SDS strings, values are not used. */
dictType setDictType = {
dictSdsHash, /* hash function */

View File

@ -96,6 +96,9 @@ typedef struct ServerStub {
int sanitize_dump_payload; /* Enables deep sanitization for ziplist and listpack in RDB and RESTORE. */
long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */
off_t loading_rdb_used_mem;
size_t stream_node_max_bytes;
long long stream_node_max_entries;
} Server;

152
src/redis/stream.h Normal file
View File

@ -0,0 +1,152 @@
#ifndef STREAM_H
#define STREAM_H
#include "util.h"
#include "rax.h"
#include "object.h"
#include "listpack.h"
/* Stream item ID: a 128 bit number composed of a milliseconds time and
* a sequence counter. IDs generated in the same millisecond (or in a past
* millisecond if the clock jumped backward) will use the millisecond time
* of the latest generated ID and an incremented sequence. */
typedef struct streamID {
uint64_t ms; /* Unix time in milliseconds. */
uint64_t seq; /* Sequence number. */
} streamID;
typedef struct stream {
rax *rax; /* The radix tree holding the stream. */
uint64_t length; /* Current number of elements inside this stream. */
streamID last_id; /* Zero if there are yet no items. */
streamID first_id; /* The first non-tombstone entry, zero if empty. */
streamID max_deleted_entry_id; /* The maximal ID that was deleted. */
uint64_t entries_added; /* All time count of elements added. */
rax *cgroups; /* Consumer groups dictionary: name -> streamCG */
} stream;
/* We define an iterator to iterate stream items in an abstract way, without
* caring about the radix tree + listpack representation. Technically speaking
* the iterator is only used inside streamReplyWithRange(), so could just
* be implemented inside the function, but practically there is the AOF
* rewriting code that also needs to iterate the stream to emit the XADD
* commands. */
typedef struct streamIterator {
stream *stream; /* The stream we are iterating. */
streamID master_id; /* ID of the master entry at listpack head. */
uint64_t master_fields_count; /* Master entries # of fields. */
unsigned char *master_fields_start; /* Master entries start in listpack. */
unsigned char *master_fields_ptr; /* Master field to emit next. */
int entry_flags; /* Flags of entry we are emitting. */
int rev; /* True if iterating end to start (reverse). */
int skip_tombstones; /* True if not emitting tombstone entries. */
uint64_t start_key[2]; /* Start key as 128 bit big endian. */
uint64_t end_key[2]; /* End key as 128 bit big endian. */
raxIterator ri; /* Rax iterator. */
unsigned char *lp; /* Current listpack. */
unsigned char *lp_ele; /* Current listpack cursor. */
unsigned char *lp_flags; /* Current entry flags pointer. */
/* Buffers used to hold the string of lpGet() when the element is
* integer encoded, so that there is no string representation of the
* element inside the listpack itself. */
unsigned char field_buf[LP_INTBUF_SIZE];
unsigned char value_buf[LP_INTBUF_SIZE];
} streamIterator;
/* Consumer group. */
typedef struct streamCG {
streamID last_id; /* Last delivered (not acknowledged) ID for this
group. Consumers that will just ask for more
messages will served with IDs > than this. */
long long entries_read; /* In a perfect world (CG starts at 0-0, no dels, no
XGROUP SETID, ...), this is the total number of
group reads. In the real world, the reasoning behind
this value is detailed at the top comment of
streamEstimateDistanceFromFirstEverEntry(). */
rax *pel; /* Pending entries list. This is a radix tree that
has every message delivered to consumers (without
the NOACK option) that was yet not acknowledged
as processed. The key of the radix tree is the
ID as a 64 bit big endian number, while the
associated value is a streamNACK structure.*/
rax *consumers; /* A radix tree representing the consumers by name
and their associated representation in the form
of streamConsumer structures. */
} streamCG;
/* A specific consumer in a consumer group. */
typedef struct streamConsumer {
mstime_t seen_time; /* Last time this consumer was active. */
sds name; /* Consumer name. This is how the consumer
will be identified in the consumer group
protocol. Case sensitive. */
rax *pel; /* Consumer specific pending entries list: all
the pending messages delivered to this
consumer not yet acknowledged. Keys are
big endian message IDs, while values are
the same streamNACK structure referenced
in the "pel" of the consumer group structure
itself, so the value is shared. */
} streamConsumer;
/* Pending (yet not acknowledged) message in a consumer group. */
typedef struct streamNACK {
mstime_t delivery_time; /* Last time this message was delivered. */
uint64_t delivery_count; /* Number of times this message was delivered.*/
streamConsumer *consumer; /* The consumer this message was delivered to
in the last delivery. */
} streamNACK;
/* Stream propagation information, passed to functions in order to propagate
* XCLAIM commands to AOF and slaves. */
typedef struct streamPropInfo {
robj *keyname;
robj *groupname;
} streamPropInfo;
/* Prototypes of exported APIs. */
// struct client;
/* Flags for streamLookupConsumer */
#define SLC_DEFAULT 0
#define SLC_NO_REFRESH (1<<0) /* Do not update consumer's seen-time */
/* Flags for streamCreateConsumer */
#define SCC_DEFAULT 0
#define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */
#define SCC_NO_DIRTIFY (1<<1) /* Do not dirty++ if consumer created */
#define SCG_INVALID_ENTRIES_READ -1
stream *streamNew(void);
void freeStream(stream *s);
unsigned long streamLength(const robj *subject);
// size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi);
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
void streamIteratorRemoveEntry(streamIterator *si, streamID *current);
void streamIteratorStop(streamIterator *si);
streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags);
streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags);
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read);
streamNACK *streamCreateNACK(streamConsumer *consumer);
void streamDecodeID(void *buf, streamID *id);
int streamCompareID(streamID *a, streamID *b);
void streamFreeNACK(streamNACK *na);
int streamIncrID(streamID *id);
int streamDecrID(streamID *id);
// void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername);
robj *streamDup(robj *o);
int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep);
int streamParseID(const robj *o, streamID *id);
robj *createObjectFromStreamID(streamID *id);
int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given);
int streamDeleteItem(stream *s, streamID *id);
void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id);
long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id);
int64_t streamTrimByLength(stream *s, long long maxlen, int approx);
int64_t streamTrimByID(stream *s, streamID minid, int approx);
#endif

4038
src/redis/t_stream.c Normal file

File diff suppressed because it is too large Load Diff

View File

@ -7,7 +7,7 @@ add_library(dragonfly_lib blocking_controller.cc channel_slice.cc command_regist
engine_shard_set.cc generic_family.cc hset_family.cc io_mgr.cc
list_family.cc main_service.cc rdb_load.cc rdb_save.cc replica.cc
snapshot.cc script_mgr.cc server_family.cc
set_family.cc string_family.cc table.cc tiered_storage.cc
set_family.cc stream_family.cc string_family.cc table.cc tiered_storage.cc
transaction.cc zset_family.cc version.cc)
cxx_link(dragonfly_lib dfly_core dfly_facade redis_lib strings_lib)
@ -20,12 +20,14 @@ cxx_test(generic_family_test dfly_test_lib LABELS DFLY)
cxx_test(hset_family_test dfly_test_lib LABELS DFLY)
cxx_test(list_family_test dfly_test_lib LABELS DFLY)
cxx_test(set_family_test dfly_test_lib LABELS DFLY)
cxx_test(stream_family_test dfly_test_lib LABELS DFLY)
cxx_test(string_family_test dfly_test_lib LABELS DFLY)
cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/redis6_small.rdb LABELS DFLY)
cxx_test(zset_family_test dfly_test_lib LABELS DFLY)
cxx_test(blocking_controller_test dragonfly_lib LABELS DFLY)
add_custom_target(check_dfly WORKING_DIRECTORY .. COMMAND ctest -L DFLY)
add_dependencies(check_dfly dragonfly_test list_family_test
generic_family_test memcache_parser_test rdb_test
redis_parser_test string_family_test)
redis_parser_test stream_family_test string_family_test)

105
src/server/stream_family.cc Normal file
View File

@ -0,0 +1,105 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/stream_family.h"
#include <absl/strings/str_cat.h>
#include "facade/error.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/transaction.h"
namespace dfly {
using namespace facade;
using namespace std;
namespace {
using StreamId = pair<int64_t, int64_t>;
struct Record {
StreamId id;
string field;
string value;
};
using RecordVec = vector<Record>;
OpResult<StreamId> OpAdd(const OpArgs& op_args, string_view key, string_view id, CmdArgList args) {
return OpStatus::WRONG_TYPE; // TBD
}
OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, string_view start,
string_view end) {
return OpStatus::WRONG_TYPE; // TBD
}
inline string StreamIdRepr(const StreamId& id) {
return absl::StrCat(id.first, "-", id.second);
};
} // namespace
void StreamFamily::XAdd(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
// TODO: args parsing
string_view id = ArgS(args, 2);
args.remove_prefix(2);
if (args.empty() || args.size() % 2 == 1) {
return (*cntx)->SendError(WrongNumArgsError("XADD"), kSyntaxErr);
}
auto cb = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args{shard, t->db_index()};
return OpAdd(op_args, key, id, args);
};
OpResult<StreamId> add_result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (add_result) {
return (*cntx)->SendBulkString(StreamIdRepr(*add_result));
}
return (*cntx)->SendError(add_result.status());
}
void StreamFamily::XRange(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
string_view start = ArgS(args, 2);
string_view end = ArgS(args, 1);
// TODO: parse options
auto cb = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args{shard, t->db_index()};
return OpRange(op_args, key, start, end);
};
OpResult<RecordVec> add_result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (add_result) {
(*cntx)->StartArray(add_result->size());
for (const auto& item : *add_result) {
(*cntx)->StartArray(2);
(*cntx)->SendBulkString(StreamIdRepr(item.id));
(*cntx)->StartArray(2);
(*cntx)->SendBulkString(item.field);
(*cntx)->SendBulkString(item.value);
}
}
return (*cntx)->SendError(add_result.status());
}
#define HFUNC(x) SetHandler(&StreamFamily::x)
void StreamFamily::Register(CommandRegistry* registry) {
using CI = CommandId;
*registry << CI{"XADD", CO::WRITE | CO::FAST, -5, 1, 1, 1}.HFUNC(XAdd)
<< CI{"XRANGE", CO::READONLY, -4, 1, 1, 1}.HFUNC(XRange);
}
} // namespace dfly

View File

@ -0,0 +1,23 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include "server/common.h"
namespace dfly {
class CommandRegistry;
class ConnectionContext;
class StreamFamily {
public:
static void Register(CommandRegistry* registry);
private:
static void XAdd(CmdArgList args, ConnectionContext* cntx);
static void XRange(CmdArgList args, ConnectionContext* cntx);
};
} // namespace dfly

View File

@ -0,0 +1,28 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/stream_family.h"
#include "base/gtest.h"
#include "base/logging.h"
#include "facade/facade_test.h"
#include "server/command_registry.h"
#include "server/test_utils.h"
using namespace testing;
using namespace std;
using namespace util;
namespace dfly {
class StreamFamilyTest : public BaseFamilyTest {
protected:
};
TEST_F(StreamFamilyTest, Add) {
Run({"xadd", "key", "*", "field", "value"});
Run({"xrange", "key", "-", "+"});
}
} // namespace dfly