feat(streams): implement rdb load for streams

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2022-06-17 10:27:46 +03:00
parent 07b92841fe
commit d1d64eb014
7 changed files with 225 additions and 39 deletions

View File

@ -369,11 +369,12 @@ void freeModuleObject(robj *o) {
zfree(mv);
}
void freeStreamObject(robj *o) {
freeStream(o->ptr);
}
#endif
void freeStreamObject(robj *o) {
freeStream(o->ptr);
}
void incrRefCount(robj *o) {
if (o->refcount < OBJ_FIRST_SPECIAL_REFCOUNT) {
o->refcount++;
@ -399,8 +400,7 @@ void decrRefCount(robj *o) {
// freeModuleObject(o);
break;
case OBJ_STREAM:
serverPanic("Unsupported OBJ_STREAM type");
//freeStreamObject(o);
freeStreamObject(o);
break;
default: serverPanic("Unknown object type"); break;
}
@ -658,7 +658,7 @@ size_t stringObjectLen(robj *o) {
}
}
// ROMAN: Copied from the DISABLED part below
// ROMAN: Copied from the DISABLED part below
int getLongLongFromObject(robj *o, long long *target) {
long long value;
@ -850,7 +850,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
}
} else if (o->type == OBJ_STREAM) {
serverPanic("OBJ_STREAM not supported");
#ifdef ROMAN_CLIENT_DISABLE
#ifdef ROMAN_CLIENT_DISABLE
stream *s = o->ptr;
asize = sizeof(*o)+sizeof(*s);
asize += streamRadixTreeMemoryUsage(s->rax);
@ -911,10 +911,10 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
}
raxStop(&ri);
}
#endif
#endif
} else if (o->type == OBJ_MODULE) {
serverPanic("OBJ_MODULE not supported");
#ifdef ROMAN_CLIENT_DISABLE
#ifdef ROMAN_CLIENT_DISABLE
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
if (mt->mem_usage != NULL) {
@ -922,7 +922,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
} else {
asize = 0;
}
#endif
#endif
} else {
serverPanic("Unknown object type");
}
@ -1175,7 +1175,7 @@ sds getMemoryDoctorReport(void) {
return s;
}
#endif
#endif
/* Set the object LRU/LFU depending on server.maxmemory_policy.
* The lfu_freq arg is only relevant if policy is MAXMEMORY_FLAG_LFU.

View File

@ -66,7 +66,7 @@ void streamFreeNACK(streamNACK *na);
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer);
int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int *seq_given);
int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq);
#endif
#endif
/* -----------------------------------------------------------------------
* Low level stream encoding: a radix tree of listpacks.
@ -420,7 +420,7 @@ void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_i
* If 'use_id' is not NULL, the ID is not auto-generated by the function,
* but instead the passed ID is used to add the new entry. In this case
* adding the entry may fail as specified later in this comment.
*
*
* When 'use_id' is used alongside with a zero 'seq-given', the sequence
* part of the passed ID is ignored and the function will attempt to use an
* auto-generated sequence.
@ -966,7 +966,7 @@ static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, i
if (streamParseStrictIDOrReply(c,c->argv[i+1],&args->minid,0,NULL) != C_OK)
return -1;
i++;
args->trim_strategy = TRIM_STRATEGY_MINID;
args->trim_strategy_arg_idx = i;
@ -1026,9 +1026,9 @@ static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, i
} else {
/* User didn't provide LIMIT, we must set it. */
if (args->approx_trim) {
/* In order to prevent from trimming to do too much work and
/* In order to prevent from trimming to do too much work and
* cause latency spikes we limit the amount of work it can do.
* We have to cap args->limit from both sides in case
* We have to cap args->limit from both sides in case
* stream_node_max_entries is 0 or too big (could cause overflow)
*/
args->limit = 100 * server.stream_node_max_entries; /* Maximum 100 rax nodes. */
@ -1044,7 +1044,7 @@ static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, i
return i;
}
#endif
#endif
/* Initialize the stream iterator, so that we can call iterating functions
* to get the next items. This requires a corresponding streamIteratorStop()
@ -1382,7 +1382,7 @@ void streamLastValidID(stream *s, streamID *maxid)
streamIteratorStop(&si);
}
#if ROMAN_ENABLE
#if ROMAN_ENABLE
/* Emit a reply in the client output buffer by formatting a Stream ID
* in the standard <ms>-<seq> format, using the simple string protocol
* of REPL. */
@ -1396,7 +1396,7 @@ void setDeferredReplyStreamID(client *c, void *dr, streamID *id) {
setDeferredReplyBulkSds(c, dr, replyid);
}
#endif
#endif
/* Similar to the above function, but just creates an object, usually useful
* for replication purposes to create arguments. */
@ -1453,7 +1453,7 @@ int streamRangeHasTombstones(stream *s, streamID *start, streamID *end) {
return 0;
}
#if ROMAN_ENABLE
#if ROMAN_ENABLE
/* Replies with a consumer group's current lag, that is the number of messages
* in the stream that are yet to be delivered. In case that the lag isn't
* available due to fragmentation, the reply to the client is a null. */
@ -1490,7 +1490,7 @@ void streamReplyWithCGLag(client *c, stream *s, streamCG *cg) {
/* This function returns a value that is the ID's logical read counter, or its
* distance (the number of entries) from the first entry ever to have been added
* to the stream.
*
*
* A counter is returned only in one of the following cases:
* 1. The ID is the same as the stream's last ID. In this case, the returned
* is the same as the stream's entries_added counter.
@ -1624,7 +1624,7 @@ void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds
decrRefCount(argv[4]);
}
#endif
#endif
/* Send the stream items in the specified range to the client 'c'. The range
* the client will receive is between start and end inclusive, if 'count' is
@ -1706,7 +1706,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
/* Update the group last_id if needed. */
if (group && streamCompareID(&id,&group->last_id) > 0) {
if (group->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&id,NULL)) {
/* A valid counter and no future tombstones mean we can
/* A valid counter and no future tombstones mean we can
* increment the read counter to keep tracking the group's
* progress. */
group->entries_read++;
@ -1882,7 +1882,7 @@ robj *streamTypeLookupWriteOrCreate(client *c, robj *key, int no_create) {
* to be autogenerated. When a non-NULL 'seq_given' argument is provided, this
* form is accepted and the argument is set to 0 unless the sequence part is
* specified.
*
*
* If 'c' is set to NULL, no reply is sent to the client. */
int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t missing_seq, int strict, int *seq_given) {
char buf[128];
@ -1954,7 +1954,7 @@ int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missin
/* Helper for parsing a stream ID that is a range query interval. When the
* exclude argument is NULL, streamParseIDOrReply() is called and the interval
* is treated as close (inclusive). Otherwise, the exclude argument is set if
* is treated as close (inclusive). Otherwise, the exclude argument is set if
* the interval is open (the "(" prefix) and streamParseStrictIDOrReply() is
* called in that case.
*/
@ -1962,13 +1962,13 @@ int streamParseIntervalIDOrReply(client *c, robj *o, streamID *id, int *exclude,
char *p = o->ptr;
size_t len = sdslen(p);
int invalid = 0;
if (exclude != NULL) *exclude = (len > 1 && p[0] == '(');
if (exclude != NULL && *exclude) {
robj *t = createStringObject(p+1,len-1);
invalid = (streamParseStrictIDOrReply(c,t,id,missing_seq,NULL) == C_ERR);
decrRefCount(t);
} else
} else
invalid = (streamParseIDOrReply(c,o,id,missing_seq) == C_ERR);
if (invalid)
return C_ERR;
@ -2095,7 +2095,7 @@ void xrangeGenericCommand(client *c, int rev) {
robj *startarg = rev ? c->argv[3] : c->argv[2];
robj *endarg = rev ? c->argv[2] : c->argv[3];
int startex = 0, endex = 0;
/* Parse start and end IDs. */
if (streamParseIntervalIDOrReply(c,startarg,&startid,&startex,0) != C_OK)
return;
@ -2447,7 +2447,7 @@ cleanup: /* Cleanup. */
zfree(groups);
}
#endif
#endif
/* -----------------------------------------------------------------------
* Low level implementation of consumer groups
@ -2515,15 +2515,12 @@ streamCG *streamLookupCG(stream *s, sds groupname) {
return (cg == raxNotFound) ? NULL : cg;
}
#if ROMAN_ENABLE
/* Create a consumer with the specified name in the group 'cg' and return.
* If the consumer exists, return NULL. As a side effect, when the consumer
* is successfully created, the key space will be notified and dirty++ unless
* the SCC_NO_NOTIFY or SCC_NO_DIRTIFY flags is specified. */
streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags) {
if (cg == NULL) return NULL;
int notify = !(flags & SCC_NO_NOTIFY);
int dirty = !(flags & SCC_NO_DIRTIFY);
streamConsumer *consumer = zmalloc(sizeof(*consumer));
int success = raxTryInsert(cg->consumers,(unsigned char*)name,
sdslen(name),consumer,NULL);
@ -2534,13 +2531,11 @@ streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid
consumer->name = sdsdup(name);
consumer->pel = raxNew();
consumer->seen_time = mstime();
if (dirty) server.dirty++;
if (notify) notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-createconsumer",key,dbid);
return consumer;
}
#endif
/* Lookup the consumer with the specified name in the group 'cg'. Its last
/* Lookup the consumer with the specified name in the group 'cg'. Its last
* seen time is updated unless the SLC_NO_REFRESH flag is specified. */
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) {
if (cg == NULL) return NULL;
@ -3943,6 +3938,8 @@ NULL
}
}
#endif
/* Validate the integrity stream listpack entries structure. Both in term of a
* valid listpack, but also that the structure of the entries matches a valid
* stream. return 1 if valid 0 if not valid. */
@ -4035,4 +4032,3 @@ int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep) {
return 1;
}
#endif

View File

@ -1,7 +1,7 @@
add_executable(dragonfly dfly_main.cc)
cxx_link(dragonfly base dragonfly_lib)
add_library(dragonfly_lib blocking_controller.cc channel_slice.cc command_registry.cc
add_library(dragonfly_lib blocking_controller.cc channel_slice.cc command_registry.cc
common.cc config_flags.cc
conn_context.cc db_slice.cc debugcmd.cc
engine_shard_set.cc generic_family.cc hset_family.cc io_mgr.cc
@ -22,7 +22,8 @@ cxx_test(list_family_test dfly_test_lib LABELS DFLY)
cxx_test(set_family_test dfly_test_lib LABELS DFLY)
cxx_test(stream_family_test dfly_test_lib LABELS DFLY)
cxx_test(string_family_test dfly_test_lib LABELS DFLY)
cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/redis6_small.rdb LABELS DFLY)
cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/redis6_small.rdb
testdata/redis6_stream.rdb LABELS DFLY)
cxx_test(zset_family_test dfly_test_lib LABELS DFLY)
cxx_test(blocking_controller_test dragonfly_lib LABELS DFLY)

View File

@ -10,6 +10,7 @@ extern "C" {
#include "redis/listpack.h"
#include "redis/lzfP.h" /* LZF compression library */
#include "redis/rdb.h"
#include "redis/stream.h"
#include "redis/util.h"
#include "redis/ziplist.h"
#include "redis/zmalloc.h"
@ -834,6 +835,9 @@ io::Result<robj*> RdbLoader::ReadObj(int rdbtype) {
case RDB_TYPE_LIST_QUICKLIST:
res_obj = ReadListQuicklist(rdbtype);
break;
case RDB_TYPE_STREAM_LISTPACKS:
res_obj = ReadStreams();
break;
default:
LOG(ERROR) << "Unsupported rdb type " << rdbtype;
return Unexpected(errc::invalid_encoding);
@ -1264,6 +1268,179 @@ io::Result<robj*> RdbLoader::ReadListQuicklist(int rdbtype) {
return res;
}
::io::Result<robj*> RdbLoader::ReadStreams() {
uint64_t listpacks;
SET_OR_UNEXPECT(LoadLen(nullptr), listpacks);
robj* o = createStreamObject();
stream* s = (stream*)o->ptr;
auto cleanup = absl::Cleanup([&] { decrRefCount(o); });
while (listpacks--) {
/* Get the master ID, the one we'll use as key of the radix tree
* node: the entries inside the listpack itself are delta-encoded
* relatively to this ID. */
sds nodekey;
SET_OR_UNEXPECT(ReadKey(), nodekey);
auto cleanup2 = absl::Cleanup([&] { sdsfree(nodekey); });
if (sdslen(nodekey) != sizeof(streamID)) {
LOG(ERROR) << "Stream node key entry is not the size of a stream ID";
return Unexpected(errc::rdb_file_corrupted);
}
/* Load the listpack. */
OpaqueBuf fetch;
SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_PLAIN), fetch);
if (fetch.second == 0 || fetch.first == nullptr) {
LOG(ERROR) << "Stream listpacks loading failed";
return Unexpected(errc::rdb_file_corrupted);
}
DCHECK(fetch.first);
uint8_t* lp = (uint8_t*)fetch.first;
if (!streamValidateListpackIntegrity(lp, fetch.second, 0)) {
zfree(lp);
LOG(ERROR) << "Stream listpack integrity check failed.";
return Unexpected(errc::rdb_file_corrupted);
}
unsigned char* first = lpFirst(lp);
if (first == NULL) {
/* Serialized listpacks should never be empty, since on
* deletion we should remove the radix tree key if the
* resulting listpack is empty. */
LOG(ERROR) << "Empty listpack inside stream";
zfree(lp);
return Unexpected(errc::rdb_file_corrupted);
}
/* Insert the key in the radix tree. */
int retval = raxTryInsert(s->rax_tree, (unsigned char*)nodekey, sizeof(streamID), lp, NULL);
if (!retval) {
LOG(ERROR) << "Listpack re-added with existing key";
return Unexpected(errc::duplicate_key);
}
}
/* Load total number of items inside the stream. */
SET_OR_UNEXPECT(LoadLen(nullptr), s->length);
/* Load the last entry ID. */
SET_OR_UNEXPECT(LoadLen(nullptr), s->last_id.ms);
SET_OR_UNEXPECT(LoadLen(nullptr), s->last_id.seq);
/* Consumer groups loading */
uint64_t cgroups_count;
SET_OR_UNEXPECT(LoadLen(nullptr), cgroups_count);
while (cgroups_count--) {
/* Get the consumer group name and ID. We can then create the
* consumer group ASAP and populate its structure as
* we read more data. */
streamID cg_id;
sds cgname;
SET_OR_UNEXPECT(ReadKey(), cgname);
auto cleanup2 = absl::Cleanup([&] { sdsfree(cgname); });
SET_OR_UNEXPECT(LoadLen(nullptr), cg_id.ms);
SET_OR_UNEXPECT(LoadLen(nullptr), cg_id.seq);
streamCG* cgroup = streamCreateCG(s, cgname, sdslen(cgname), &cg_id, 0);
if (cgroup == NULL) {
LOG(ERROR) << "Duplicated consumer group name " << cgname;
return Unexpected(errc::duplicate_key);
}
std::move(cleanup2).Invoke();
// no need to free cgroup because it's attached to s.
/* Load the global PEL for this consumer group, however we'll
* not yet populate the NACK structures with the message
* owner, since consumers for this group and their messages will
* be read as a next step. So for now leave them not resolved
* and later populate it. */
uint64_t pel_size;
SET_OR_UNEXPECT(LoadLen(nullptr), pel_size);
while (pel_size--) {
uint8_t rawid[sizeof(streamID)];
error_code ec = FetchBuf(sizeof(rawid), rawid);
if (ec) {
LOG(ERROR) << "Stream PEL ID loading failed.";
return make_unexpected(ec);
}
streamNACK* nack = streamCreateNACK(NULL);
auto cleanup2 = absl::Cleanup([&] { streamFreeNACK(nack); });
SET_OR_UNEXPECT(FetchInt<int64_t>(), nack->delivery_time);
SET_OR_UNEXPECT(LoadLen(nullptr), nack->delivery_count);
if (!raxTryInsert(cgroup->pel, rawid, sizeof(rawid), nack, NULL)) {
LOG(ERROR) << "Duplicated global PEL entry loading stream consumer group";
return Unexpected(errc::duplicate_key);
}
std::move(cleanup2).Cancel();
}
/* Now that we loaded our global PEL, we need to load the
* consumers and their local PELs. */
uint64_t consumers_num;
SET_OR_UNEXPECT(LoadLen(nullptr), consumers_num);
while (consumers_num--) {
sds cname;
SET_OR_UNEXPECT(ReadKey(), cname);
streamConsumer* consumer =
streamCreateConsumer(cgroup, cname, NULL, 0, SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
sdsfree(cname);
if (!consumer) {
LOG(ERROR) << "Duplicate stream consumer detected.";
return Unexpected(errc::duplicate_key);
}
// no need to free consumer because it's attached to cgroup.
SET_OR_UNEXPECT(FetchInt<int64_t>(), consumer->seen_time);
/* Load the PEL about entries owned by this specific
* consumer. */
SET_OR_UNEXPECT(LoadLen(nullptr), pel_size);
while (pel_size--) {
unsigned char rawid[sizeof(streamID)];
error_code ec = FetchBuf(sizeof(rawid), rawid);
if (ec) {
LOG(ERROR) << "Stream PEL ID loading failed.";
return make_unexpected(ec);
}
streamNACK* nack = (streamNACK*)raxFind(cgroup->pel, rawid, sizeof(rawid));
if (nack == raxNotFound) {
LOG(ERROR) << "Consumer entry not found in group global PEL";
return Unexpected(errc::rdb_file_corrupted);
}
/* Set the NACK consumer, that was left to NULL when
* loading the global PEL. Then set the same shared
* NACK structure also in the consumer-specific PEL. */
nack->consumer = consumer;
if (!raxTryInsert(consumer->pel, rawid, sizeof(rawid), nack, NULL)) {
LOG(ERROR) << "Duplicated consumer PEL entry loading a stream consumer group";
streamFreeNACK(nack);
return Unexpected(errc::duplicate_key);
}
}
} // while (consumers_num)
} // while (cgroup_num)
std::move(cleanup).Cancel();
return o;
}
void RdbLoader::ResizeDb(size_t key_num, size_t expire_num) {
DCHECK_LT(key_num, 1U << 31);
DCHECK_LT(expire_num, 1U << 31);

View File

@ -72,6 +72,7 @@ class RdbLoader {
::io::Result<robj*> ReadZSet(int rdbtype);
::io::Result<robj*> ReadZSetZL();
::io::Result<robj*> ReadListQuicklist(int rdbtype);
::io::Result<robj*> ReadStreams();
std::error_code EnsureRead(size_t min_sz) {
if (mem_buf_.InputLen() >= min_sz)

View File

@ -115,6 +115,17 @@ TEST_F(RdbTest, LoadSmall6) {
EXPECT_THAT(resp.GetVec(), ElementsAre(IntArg(1), IntArg(1)));
}
TEST_F(RdbTest, LoadStream) {
io::FileSource fs = GetSource("redis6_stream.rdb");
RdbLoader loader(service_->script_mgr());
// must run in proactor thread in order to avoid polluting the serverstate
// in the main, testing thread.
auto ec = pp_->at(0)->Await([&] { return loader.Load(&fs); });
ASSERT_FALSE(ec) << ec.message();
}
TEST_F(RdbTest, Reload) {
absl::FlagSaver fs;

BIN
src/server/testdata/redis6_stream.rdb vendored Normal file

Binary file not shown.