fix(lua): Fix few lua-related bugs (#157)

1. Correctly parse keys for EVAL command.
2. Support PUBLISH within lua.
3. Remove spurious failure in debug-mode with the increased verbosity level.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2022-06-16 13:41:09 +03:00 committed by GitHub
parent 46220183ae
commit 32f47be034
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 114 additions and 69 deletions

View File

@ -111,8 +111,8 @@ const char* OptName(CO::CommandOpt fl) {
return "blocking";
case GLOBAL_TRANS:
return "global-trans";
case DESTINATION_KEY:
return "dest-key";
case VARIADIC_KEYS:
return "variadic-keys";
}
return "unknown";
}

View File

@ -30,7 +30,9 @@ enum CommandOpt : uint32_t {
NOSCRIPT = 0x100,
BLOCKING = 0x200, // implies REVERSE_MAPPING
GLOBAL_TRANS = 0x1000,
DESTINATION_KEY = 0x2000,
// arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc.
VARIADIC_KEYS = 0x2000,
};
const char* OptName(CommandOpt fl);
@ -85,7 +87,7 @@ class CommandId {
}
bool is_multi_key() const {
return (last_key_ != first_key_) || (opt_mask_ & CO::DESTINATION_KEY);
return (last_key_ != first_key_) || (opt_mask_ & CO::VARIADIC_KEYS);
}
int8_t key_arg_step() const {

View File

@ -24,10 +24,10 @@ namespace dfly {
using namespace std;
using namespace util;
using absl::StrCat;
using ::io::Result;
using testing::ElementsAre;
using testing::HasSubstr;
using absl::StrCat;
namespace this_fiber = boost::this_fiber;
namespace {
@ -286,6 +286,11 @@ TEST_F(DflyEngineTest, Eval) {
resp = Run({"eval", "return 77", "2", "foo", "zoo"});
EXPECT_THAT(resp, IntArg(77));
// a,b important here to spawn multiple shards.
resp = Run({"eval", "return redis.call('exists', KEYS[2])", "2", "a", "b"});
EXPECT_EQ(2, GetDebugInfo().shards_count);
EXPECT_THAT(resp, IntArg(0));
}
TEST_F(DflyEngineTest, EvalResp) {
@ -297,23 +302,40 @@ TEST_F(DflyEngineTest, EvalResp) {
EXPECT_THAT(resp.GetVec(), ElementsAre(IntArg(5), "foo", "17.5"));
}
TEST_F(DflyEngineTest, Hello) {
auto resp = Run({"hello"});
ASSERT_THAT(resp, ArrLen(12));
resp = Run({"hello", "2"});
ASSERT_THAT(resp, ArrLen(12));
TEST_F(DflyEngineTest, EvalPublish) {
auto resp = pp_->at(1)->Await([&] { return Run({"subscribe", "foo"}); });
EXPECT_THAT(resp, ArrLen(3));
EXPECT_THAT(resp.GetVec(), ElementsAre("server", "redis", "version", ArgType(RespExpr::STRING),
"proto", IntArg(2), "id", ArgType(RespExpr::INT64), "mode",
"standalone", "role", "master"));
resp = Run({"eval", "return redis.call('publish', 'foo', 'bar')", "0"});
EXPECT_THAT(resp, IntArg(1));
}
// These are valid arguments to HELLO, however as they are not yet supported the implementation
// is degraded to 'unknown command'.
EXPECT_THAT(Run({"hello", "3"}),
ErrArg("ERR unknown command 'HELLO' with args beginning with: `3`"));
EXPECT_THAT(
Run({"hello", "2", "AUTH", "uname", "pwd"}),
ErrArg("ERR unknown command 'HELLO' with args beginning with: `2`, `AUTH`, `uname`, `pwd`"));
TEST_F(DflyEngineTest, EvalBug59) {
auto resp = Run({"eval", R"(
local epoch
if redis.call('exists', KEYS[2]) ~= 0 then
epoch = redis.call("hget", KEYS[2], "e")
end
if epoch == false or epoch == nil then
epoch = ARGV[6]
redis.call("hset", KEYS[2], "e", epoch)
end
local offset = redis.call("hincrby", KEYS[2], "s", 1)
if ARGV[5] ~= '0' then
redis.call("expire", KEYS[2], ARGV[5])
end
redis.call("xadd", KEYS[1], "MAXLEN", ARGV[2], offset, "d", ARGV[1])
redis.call("expire", KEYS[1], ARGV[3])
if ARGV[4] ~= '' then
local payload = "__" .. "p1:" .. offset .. ":" .. epoch .. "__" .. ARGV[1]
redis.call("publish", ARGV[4], payload)
end
return {offset, epoch}
)",
"2", "x", "y", "1", "2", "3", "4", "5", "6"});
ASSERT_THAT(resp, ArrLen(2));
EXPECT_THAT(resp.GetVec(), ElementsAre(IntArg(1), "6"));
}
TEST_F(DflyEngineTest, EvalSha) {
@ -339,6 +361,25 @@ TEST_F(DflyEngineTest, EvalSha) {
EXPECT_THAT(resp, "c6459b95a0e81df97af6fdd49b1a9e0287a57363");
}
TEST_F(DflyEngineTest, Hello) {
auto resp = Run({"hello"});
ASSERT_THAT(resp, ArrLen(12));
resp = Run({"hello", "2"});
ASSERT_THAT(resp, ArrLen(12));
EXPECT_THAT(resp.GetVec(), ElementsAre("server", "redis", "version", ArgType(RespExpr::STRING),
"proto", IntArg(2), "id", ArgType(RespExpr::INT64), "mode",
"standalone", "role", "master"));
// These are valid arguments to HELLO, however as they are not yet supported the implementation
// is degraded to 'unknown command'.
EXPECT_THAT(Run({"hello", "3"}),
ErrArg("ERR unknown command 'HELLO' with args beginning with: `3`"));
EXPECT_THAT(
Run({"hello", "2", "AUTH", "uname", "pwd"}),
ErrArg("ERR unknown command 'HELLO' with args beginning with: `2`, `AUTH`, `uname`, `pwd`"));
}
TEST_F(DflyEngineTest, Memcache) {
using MP = MemcacheParser;
@ -437,9 +478,7 @@ TEST_F(DflyEngineTest, OOM) {
}
TEST_F(DflyEngineTest, PSubscribe) {
auto resp = pp_->at(1)->Await([&] {
return Run({"psubscribe", "a*", "b*"});
});
auto resp = pp_->at(1)->Await([&] { return Run({"psubscribe", "a*", "b*"}); });
EXPECT_THAT(resp, ArrLen(3));
resp = pp_->at(0)->Await([&] { return Run({"publish", "ab", "foo"}); });
EXPECT_THAT(resp, IntArg(1));

View File

@ -511,21 +511,23 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
if (under_script) {
DCHECK(dfly_cntx->transaction);
OpResult<KeyIndex> key_index_res = DetermineKeys(cid, args);
if (!key_index_res)
return (*cntx)->SendError(key_index_res.status());
if (IsTransactional(cid)) {
OpResult<KeyIndex> key_index_res = DetermineKeys(cid, args);
if (!key_index_res)
return (*cntx)->SendError(key_index_res.status());
const auto& key_index = *key_index_res;
for (unsigned i = key_index.start; i < key_index.end; ++i) {
string_view key = ArgS(args, i);
if (!dfly_cntx->conn_state.script_info->keys.contains(key)) {
return (*cntx)->SendError("script tried accessing undeclared key");
const auto& key_index = *key_index_res;
for (unsigned i = key_index.start; i < key_index.end; ++i) {
string_view key = ArgS(args, i);
if (!dfly_cntx->conn_state.script_info->keys.contains(key)) {
return (*cntx)->SendError("script tried accessing undeclared key");
}
}
dfly_cntx->transaction->SetExecCmd(cid);
OpStatus st = dfly_cntx->transaction->InitByArgs(dfly_cntx->conn_state.db_index, args);
if (st != OpStatus::OK)
return (*cntx)->SendError(st);
}
dfly_cntx->transaction->SetExecCmd(cid);
OpStatus st = dfly_cntx->transaction->InitByArgs(dfly_cntx->conn_state.db_index, args);
if (st != OpStatus::OK)
return (*cntx)->SendError(st);
} else {
DCHECK(dfly_cntx->transaction == nullptr);
@ -1081,18 +1083,21 @@ void Service::RegisterCommands() {
constexpr auto kExecMask = CO::LOADING | CO::NOSCRIPT | CO::GLOBAL_TRANS;
registry_ << CI{"QUIT", CO::READONLY | CO::FAST, 1, 0, 0, 0}.HFUNC(Quit)
<< CI{"MULTI", CO::NOSCRIPT | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(Multi)
<< CI{"DISCARD", CO::NOSCRIPT | CO::FAST | CO::LOADING, 1, 0, 0, 0}.MFUNC(Discard)
<< CI{"EVAL", CO::NOSCRIPT, -3, 3, 3, 1}.MFUNC(Eval).SetValidator(&EvalValidator)
<< CI{"EVALSHA", CO::NOSCRIPT, -3, 3, 3, 1}.MFUNC(EvalSha).SetValidator(&EvalValidator)
<< CI{"EXEC", kExecMask, 1, 0, 0, 0}.MFUNC(Exec)
<< CI{"PUBLISH", CO::LOADING | CO::FAST, 3, 0, 0, 0}.MFUNC(Publish)
<< CI{"SUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.MFUNC(Subscribe)
<< CI{"UNSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -1, 0, 0, 0}.MFUNC(Unsubscribe)
<< CI{"PSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.MFUNC(PSubscribe)
<< CI{"PUNSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -1, 0, 0, 0}.MFUNC(PUnsubscribe)
<< CI{"FUNCTION", CO::NOSCRIPT, 2, 0, 0, 0}.MFUNC(Function);
registry_
<< CI{"QUIT", CO::READONLY | CO::FAST, 1, 0, 0, 0}.HFUNC(Quit)
<< CI{"MULTI", CO::NOSCRIPT | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(Multi)
<< CI{"DISCARD", CO::NOSCRIPT | CO::FAST | CO::LOADING, 1, 0, 0, 0}.MFUNC(Discard)
<< CI{"EVAL", CO::NOSCRIPT | CO::VARIADIC_KEYS, -3, 3, 3, 1}.MFUNC(Eval).SetValidator(
&EvalValidator)
<< CI{"EVALSHA", CO::NOSCRIPT | CO::VARIADIC_KEYS, -3, 3, 3, 1}.MFUNC(EvalSha).SetValidator(
&EvalValidator)
<< CI{"EXEC", kExecMask, 1, 0, 0, 0}.MFUNC(Exec)
<< CI{"PUBLISH", CO::LOADING | CO::FAST, 3, 0, 0, 0}.MFUNC(Publish)
<< CI{"SUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.MFUNC(Subscribe)
<< CI{"UNSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -1, 0, 0, 0}.MFUNC(Unsubscribe)
<< CI{"PSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.MFUNC(PSubscribe)
<< CI{"PUNSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -1, 0, 0, 0}.MFUNC(PUnsubscribe)
<< CI{"FUNCTION", CO::NOSCRIPT, 2, 0, 0, 0}.MFUNC(Function);
StreamFamily::Register(&registry_);
StringFamily::Register(&registry_);
@ -1116,6 +1121,13 @@ void Service::RegisterCommands() {
LOG(INFO) << " " << key << ": with " << key_len << " keys";
}
});
LOG(INFO) << "Non-transactional commands are: ";
registry_.Traverse([](std::string_view name, const CI& cid) {
if (!IsTransactional(&cid)) {
LOG(INFO) << " " << name;
}
});
}
}

View File

@ -16,6 +16,7 @@ namespace dfly {
using namespace std;
using namespace util;
using absl::StrCat;
thread_local Transaction::TLTmpSpace Transaction::tmp_space;
@ -51,7 +52,8 @@ Transaction::Transaction(const CommandId* cid) : cid_(cid) {
}
Transaction::~Transaction() {
DVLOG(2) << "Transaction " << DebugId() << " destroyed";
DVLOG(2) << "Transaction " << StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, ")")
<< " destroyed";
}
/**
@ -286,7 +288,7 @@ void Transaction::SetExecCmd(const CommandId* cid) {
string Transaction::DebugId() const {
DCHECK_GT(use_count_.load(memory_order_relaxed), 0u);
return absl::StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, " (", trans_id(this), ")");
return StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, " (", trans_id(this), ")");
}
// Runs in the dbslice thread. Returns true if transaction needs to be kept in the queue.
@ -496,9 +498,7 @@ void Transaction::ScheduleInternal() {
if (!is_active(i))
continue;
shard_set->Add(i, [] {
EngineShard::tlocal()->PollExecution("cancel_cleanup", nullptr);
});
shard_set->Add(i, [] { EngineShard::tlocal()->PollExecution("cancel_cleanup", nullptr); });
}
}
}
@ -1156,14 +1156,19 @@ OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
DCHECK_EQ(0u, cid->opt_mask() & CO::GLOBAL_TRANS);
KeyIndex key_index;
int num_custom_keys = -1;
if (cid->opt_mask() & CO::DESTINATION_KEY) {
key_index.bonus = 1;
if (cid->opt_mask() & CO::VARIADIC_KEYS) {
if (args.size() < 3) {
return OpStatus::SYNTAX_ERR;
}
string_view name{cid->name()};
if (!absl::StartsWith(name, "EVAL")) {
key_index.bonus = 1; // Z<xxx>STORE commands
}
string_view num(ArgS(args, 2));
if (!absl::SimpleAtoi(num, &num_custom_keys) || num_custom_keys < 0)
return OpStatus::INVALID_INT;
@ -1185,20 +1190,7 @@ OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
return key_index;
}
string_view name{cid->name()};
if (name == "EVAL" || name == "EVALSHA") {
DCHECK_GE(args.size(), 3u);
uint32_t num_keys;
CHECK(absl::SimpleAtoi(ArgS(args, 2), &num_keys));
key_index.start = 3;
key_index.end = 3 + num_keys;
key_index.step = 1;
return key_index;
}
LOG(FATAL) << "TBD: Not supported";
LOG(FATAL) << "TBD: Not supported " << cid->name();
return key_index;
}

View File

@ -1875,7 +1875,7 @@ OpResult<unsigned> ZSetFamily::OpLexCount(const OpArgs& op_args, string_view key
#define HFUNC(x) SetHandler(&ZSetFamily::x)
void ZSetFamily::Register(CommandRegistry* registry) {
constexpr uint32_t kUnionMask = CO::WRITE | CO::DESTINATION_KEY | CO::REVERSE_MAPPING;
constexpr uint32_t kUnionMask = CO::WRITE | CO::VARIADIC_KEYS | CO::REVERSE_MAPPING;
*registry << CI{"ZADD", CO::FAST | CO::WRITE | CO::DENYOOM, -4, 1, 1, 1}.HFUNC(ZAdd)
<< CI{"ZCARD", CO::FAST | CO::READONLY, 2, 1, 1, 1}.HFUNC(ZCard)