support lua scripts loading

This commit is contained in:
Roman Gershman 2022-04-30 22:21:05 +03:00
parent 5c9cee171c
commit 370d4cd0ee
6 changed files with 58 additions and 25 deletions

View File

@ -159,7 +159,7 @@ void DebugCmd::Reload(CmdArgList args) {
VLOG(1) << "Performing load";
io::FileSource fs(*res);
RdbLoader loader(&ess);
RdbLoader loader(&ess, sf_.script_mgr());
ec = loader.Load(&fs);
if (ec) {

View File

@ -23,7 +23,6 @@ using facade::MemcacheParser;
class Service : public facade::ServiceInterface {
public:
using error_code = std::error_code;
struct InitOpts {
bool disable_time_update;
@ -71,6 +70,10 @@ class Service : public facade::ServiceInterface {
return registry_.Find(cmd);
}
ScriptMgr* script_mgr() {
return server_family_.script_mgr();
}
private:
static void Quit(CmdArgList args, ConnectionContext* cntx);
static void Multi(CmdArgList args, ConnectionContext* cntx);

View File

@ -24,6 +24,8 @@ extern "C" {
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/hset_family.h"
#include "server/script_mgr.h"
#include "server/server_state.h"
#include "server/set_family.h"
#include "strings/human_readable.h"
@ -187,7 +189,9 @@ struct RdbLoader::ObjSettings {
ObjSettings() = default;
};
RdbLoader::RdbLoader(EngineShardSet* ess) : ess_(*ess), mem_buf_{16_KB} {
RdbLoader::RdbLoader(EngineShardSet* ess, ScriptMgr* script_mgr)
: script_mgr_(script_mgr), ess_(*ess), mem_buf_{16_KB} {
shard_buf_.reset(new ItemsBuf[ess_.size()]);
}
@ -251,6 +255,7 @@ error_code RdbLoader::Load(io::Source* src) {
/* Key-specific attributes, set by opcodes before the key type. */
ObjSettings settings;
settings.now = mstime();
size_t keys_loaded = 0;
while (1) {
/* Read type. */
@ -339,6 +344,7 @@ error_code RdbLoader::Load(io::Source* src) {
return RdbError(errc::invalid_rdb_type);
}
++keys_loaded;
RETURN_ON_ERR(LoadKeyValPair(type, &settings));
settings.Reset();
} // main load loop
@ -358,6 +364,7 @@ error_code RdbLoader::Load(io::Source* src) {
absl::Duration dur = absl::Now() - start;
double seconds = double(absl::ToInt64Milliseconds(dur)) / 1000;
LOG(INFO) << "Done loading RDB, keys loaded: " << keys_loaded;
LOG(INFO) << "Loading finished after " << strings::HumanReadableElapsedTime(seconds);
return kOk;
@ -490,40 +497,52 @@ error_code RdbLoader::HandleAux() {
}
auxval = (robj*)exp->first;
if (((char*)auxkey->ptr)[0] == '%') {
char* auxkey_sds = (sds)auxkey->ptr;
char* auxval_sds = (sds)auxval->ptr;
if (auxkey_sds[0] == '%') {
/* All the fields with a name staring with '%' are considered
* information fields and are logged at startup with a log
* level of NOTICE. */
LOG(INFO) << "RDB '" << (char*)auxkey->ptr << "': " << (char*)auxval->ptr;
} else if (!strcasecmp((char*)auxkey->ptr, "repl-stream-db")) {
LOG(INFO) << "RDB '" << auxkey_sds << "': " << auxval_sds;
} else if (!strcasecmp(auxkey_sds, "repl-stream-db")) {
// TODO
} else if (!strcasecmp((char*)auxkey->ptr, "repl-id")) {
} else if (!strcasecmp(auxkey_sds, "repl-id")) {
// TODO
} else if (!strcasecmp((char*)auxkey->ptr, "repl-offset")) {
} else if (!strcasecmp(auxkey_sds, "repl-offset")) {
// TODO
} else if (!strcasecmp((char*)auxkey->ptr, "lua")) {
LOG(ERROR) << "Lua scripts are not supported";
return RdbError(errc::feature_not_supported);
} else if (!strcasecmp((char*)auxkey->ptr, "redis-ver")) {
LOG(INFO) << "Loading RDB produced by version " << (char*)auxval->ptr;
} else if (!strcasecmp((char*)auxkey->ptr, "ctime")) {
time_t age = time(NULL) - strtol((char*)auxval->ptr, NULL, 10);
} else if (!strcasecmp(auxkey_sds, "lua")) {
ServerState* ss = ServerState::tlocal();
Interpreter& script = ss->GetInterpreter();
string_view body{auxval_sds, strlen(auxval_sds)};
string result;
Interpreter::AddResult add_result = script.AddFunction(body, &result);
if (add_result == Interpreter::ADD_OK) {
if (script_mgr_)
script_mgr_->InsertFunction(result, body);
} else if (add_result == Interpreter::COMPILE_ERR) {
LOG(ERROR) << "Error when compiling lua scripts";
}
} else if (!strcasecmp(auxkey_sds, "redis-ver")) {
LOG(INFO) << "Loading RDB produced by version " << auxval_sds;
} else if (!strcasecmp(auxkey_sds, "ctime")) {
time_t age = time(NULL) - strtol(auxval_sds, NULL, 10);
if (age < 0)
age = 0;
LOG(INFO) << "RDB age " << strings::HumanReadableElapsedTime(age);
} else if (!strcasecmp((char*)auxkey->ptr, "used-mem")) {
long long usedmem = strtoll((char*)auxval->ptr, NULL, 10);
} else if (!strcasecmp(auxkey_sds, "used-mem")) {
long long usedmem = strtoll(auxval_sds, NULL, 10);
LOG(INFO) << "RDB memory usage when created " << strings::HumanReadableNumBytes(usedmem);
} else if (!strcasecmp((char*)auxkey->ptr, "aof-preamble")) {
long long haspreamble = strtoll((char*)auxval->ptr, NULL, 10);
} else if (!strcasecmp(auxkey_sds, "aof-preamble")) {
long long haspreamble = strtoll(auxval_sds, NULL, 10);
if (haspreamble)
LOG(INFO) << "RDB has an AOF tail";
} else if (!strcasecmp((char*)auxkey->ptr, "redis-bits")) {
} else if (!strcasecmp(auxkey_sds, "redis-bits")) {
/* Just ignored. */
} else {
/* We ignore fields we don't understand, as by AUX field
* contract. */
LOG(WARNING) << "Unrecognized RDB AUX field: '" << (char*)auxkey->ptr << "'";
LOG(WARNING) << "Unrecognized RDB AUX field: '" << auxkey_sds << "'";
}
decrRefCount(auxkey);

View File

@ -14,10 +14,11 @@ extern "C" {
namespace dfly {
class EngineShardSet;
class ScriptMgr;
class RdbLoader {
public:
explicit RdbLoader(EngineShardSet* ess);
explicit RdbLoader(EngineShardSet* ess, ScriptMgr* script_mgr);
~RdbLoader();
@ -82,6 +83,7 @@ class RdbLoader {
static void LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib);
ScriptMgr* script_mgr_;
EngineShardSet& ess_;
base::IoBuf mem_buf_;
base::PODArray<uint8_t> compr_buf_;

View File

@ -72,16 +72,17 @@ TEST_F(RdbTest, Crc) {
TEST_F(RdbTest, LoadEmpty) {
io::FileSource fs = GetSource("empty.rdb");
RdbLoader loader(ess_);
RdbLoader loader(ess_, NULL);
auto ec = loader.Load(&fs);
CHECK(!ec);
}
TEST_F(RdbTest, LoadSmall6) {
io::FileSource fs = GetSource("redis6_small.rdb");
RdbLoader loader(ess_);
RdbLoader loader(ess_, service_->script_mgr());
auto ec = loader.Load(&fs);
CHECK(!ec);
ASSERT_FALSE(ec) << ec.message();
auto resp = Run({"scan", "0"});
ASSERT_THAT(resp, ArrLen(2));
@ -96,6 +97,14 @@ TEST_F(RdbTest, LoadSmall6) {
// TODO: when we implement PEXPIRETIME we will be able to do it directly.
int ttl = CheckedInt({"ttl", "set1"}); // should expire at 1747008000.
EXPECT_GT(ttl + time(NULL), 1747007000); // left 1000 seconds margin in case the clock is off.
Run({"select", "1"});
ASSERT_EQ(10, CheckedInt({"dbsize"}));
ASSERT_EQ(128, CheckedInt({"strlen", "longggggggggggggggkeyyyyyyyyyyyyy:9"}));
resp = Run({"script", "exists", "4ca238f611c9d0ae4e9a75a5dbac22aedc379801",
"282297a0228f48cd3fc6a55de6316f31422f5d17"});
ASSERT_THAT(resp, ArrLen(2));
EXPECT_THAT(resp.GetVec(), ElementsAre(IntArg(1), IntArg(1)));
}
TEST_F(RdbTest, Reload) {

Binary file not shown.