Add KEYS command

This commit is contained in:
Roman Gershman 2022-04-19 11:38:23 +03:00
parent b74c5398a4
commit a5b59dde25
3 changed files with 67 additions and 36 deletions

View File

@ -60,6 +60,7 @@ API 1.0
- [X] EXISTS - [X] EXISTS
- [X] EXPIRE - [X] EXPIRE
- [X] EXPIREAT - [X] EXPIREAT
- [X] KEYS
- [X] PING - [X] PING
- [X] RENAME - [X] RENAME
- [X] RENAMENX - [X] RENAMENX
@ -68,6 +69,7 @@ API 1.0
- [X] TYPE - [X] TYPE
- [ ] SORT - [ ] SORT
- [X] Server Family - [X] Server Family
- [X] AUTH
- [X] QUIT - [X] QUIT
- [X] DBSIZE - [X] DBSIZE
- [ ] BGSAVE - [ ] BGSAVE
@ -120,9 +122,7 @@ API 1.0
- [X] ZREVRANGE - [X] ZREVRANGE
- [X] ZSCORE - [X] ZSCORE
- [ ] Not sure whether these are required for the initial release. - [ ] Not sure whether these are required for the initial release.
- [X] AUTH
- [ ] BGREWRITEAOF - [ ] BGREWRITEAOF
- [ ] KEYS
- [ ] MONITOR - [ ] MONITOR
- [ ] RANDOMKEY - [ ] RANDOMKEY
- [ ] MOVE - [ ] MOVE

View File

@ -18,6 +18,7 @@ extern "C" {
#include "util/varz.h" #include "util/varz.h"
DEFINE_uint32(dbnum, 16, "Number of databases"); DEFINE_uint32(dbnum, 16, "Number of databases");
DEFINE_uint32(keys_output_limit, 8192, "Maximum number of keys output by keys command");
namespace dfly { namespace dfly {
using namespace std; using namespace std;
@ -309,6 +310,22 @@ void GenericFamily::ExpireAt(CmdArgList args, ConnectionContext* cntx) {
} }
} }
void GenericFamily::Keys(CmdArgList args, ConnectionContext* cntx) {
string_view pattern(ArgS(args, 1));
uint64_t cursor = 0;
StringVec keys;
do {
cursor = ScanGeneric(cursor, pattern, string_view{}, 512, &keys, cntx);
} while (cursor != 0 && keys.size() < FLAGS_keys_output_limit);
(*cntx)->StartArray(keys.size());
for (const auto& k : keys) {
(*cntx)->SendBulkString(k);
}
}
void GenericFamily::PexpireAt(CmdArgList args, ConnectionContext* cntx) { void GenericFamily::PexpireAt(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1); string_view key = ArgS(args, 1);
string_view msec = ArgS(args, 2); string_view msec = ArgS(args, 2);
@ -442,15 +459,9 @@ void GenericFamily::Echo(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) { void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) {
string_view token = ArgS(args, 1); string_view token = ArgS(args, 1);
uint64_t cursor = 0;
EngineShardSet* ess = cntx->shard_set;
unsigned shard_count = ess->size();
uint32_t limit = 10;
string_view pattern, type_filter; string_view pattern, type_filter;
uint64_t cursor = 0;
// Dash table returns a cursor with its right byte empty. We will use it uint32_t limit = 10;
// for encoding shard index. For now scan has a limitation of 255 shards.
CHECK_LT(shard_count, 1024u);
if (!absl::SimpleAtoi(token, &cursor)) { if (!absl::SimpleAtoi(token, &cursor)) {
return (*cntx)->SendError("invalid cursor"); return (*cntx)->SendError("invalid cursor");
@ -484,31 +495,8 @@ void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) {
} }
} }
ShardId sid = cursor % 1024; StringVec keys;
if (sid >= shard_count) { cursor = ScanGeneric(cursor, pattern, type_filter, limit, &keys, cntx);
return (*cntx)->SendError("invalid cursor");
}
cursor >>= 10;
vector<string> keys;
do {
ess->Await(sid, [&] {
OpArgs op_args{EngineShard::tlocal(), cntx->conn_state.db_index};
OpScan(op_args, pattern, type_filter, limit, &cursor, &keys);
});
if (cursor == 0) {
++sid;
if (unsigned(sid) == shard_count)
break;
}
} while (keys.size() < limit);
if (sid < shard_count) {
cursor = (cursor << 10) | sid;
} else {
DCHECK_EQ(0u, cursor);
}
(*cntx)->StartArray(2); (*cntx)->StartArray(2);
(*cntx)->SendSimpleString(absl::StrCat(cursor)); (*cntx)->SendSimpleString(absl::StrCat(cursor));
@ -632,6 +620,44 @@ OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from_key,
return OpStatus::OK; return OpStatus::OK;
} }
uint64_t GenericFamily::ScanGeneric(uint64_t cursor, string_view pattern, string_view type_filter,
unsigned limit, StringVec* keys, ConnectionContext* cntx) {
ShardId sid = cursor % 1024;
EngineShardSet* ess = cntx->shard_set;
unsigned shard_count = ess->size();
// Dash table returns a cursor with its right byte empty. We will use it
// for encoding shard index. For now scan has a limitation of 255 shards.
CHECK_LT(shard_count, 1024u);
if (sid >= shard_count) { // protection
return 0;
}
cursor >>= 10;
do {
ess->Await(sid, [&] {
OpArgs op_args{EngineShard::tlocal(), cntx->conn_state.db_index};
OpScan(op_args, pattern, type_filter, limit, &cursor, keys);
});
if (cursor == 0) {
++sid;
if (unsigned(sid) == shard_count)
break;
}
} while (keys->size() < limit);
if (sid < shard_count) {
cursor = (cursor << 10) | sid;
} else {
DCHECK_EQ(0u, cursor);
}
return cursor;
}
void GenericFamily::OpScan(const OpArgs& op_args, string_view pattern, string_view type_filter, void GenericFamily::OpScan(const OpArgs& op_args, string_view pattern, string_view type_filter,
size_t limit, uint64_t* cursor, StringVec* vec) { size_t limit, uint64_t* cursor, StringVec* vec) {
auto& db_slice = op_args.shard->db_slice(); auto& db_slice = op_args.shard->db_slice();
@ -698,6 +724,7 @@ void GenericFamily::Register(CommandRegistry* registry) {
<< CI{"EXISTS", CO::READONLY | CO::FAST, -2, 1, -1, 1}.HFUNC(Exists) << CI{"EXISTS", CO::READONLY | CO::FAST, -2, 1, -1, 1}.HFUNC(Exists)
<< CI{"EXPIRE", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(Expire) << CI{"EXPIRE", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(Expire)
<< CI{"EXPIREAT", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(ExpireAt) << CI{"EXPIREAT", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(ExpireAt)
<< CI{"KEYS", CO::READONLY, 2, 0, 0, 0}.HFUNC(Keys)
<< CI{"PEXPIREAT", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(PexpireAt) << CI{"PEXPIREAT", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(PexpireAt)
<< CI{"RENAME", CO::WRITE, 3, 1, 2, 1}.HFUNC(Rename) << CI{"RENAME", CO::WRITE, 3, 1, 2, 1}.HFUNC(Rename)
<< CI{"SELECT", kSelectOpts, 2, 0, 0, 0}.HFUNC(Select) << CI{"SELECT", kSelectOpts, 2, 0, 0, 0}.HFUNC(Select)

View File

@ -8,7 +8,6 @@
#include "server/common.h" #include "server/common.h"
#include "server/table.h" #include "server/table.h"
namespace util { namespace util {
class ProactorPool; class ProactorPool;
} // namespace util } // namespace util
@ -44,6 +43,7 @@ class GenericFamily {
static void Exists(CmdArgList args, ConnectionContext* cntx); static void Exists(CmdArgList args, ConnectionContext* cntx);
static void Expire(CmdArgList args, ConnectionContext* cntx); static void Expire(CmdArgList args, ConnectionContext* cntx);
static void ExpireAt(CmdArgList args, ConnectionContext* cntx); static void ExpireAt(CmdArgList args, ConnectionContext* cntx);
static void Keys(CmdArgList args, ConnectionContext* cntx);
static void PexpireAt(CmdArgList args, ConnectionContext* cntx); static void PexpireAt(CmdArgList args, ConnectionContext* cntx);
static void Rename(CmdArgList args, ConnectionContext* cntx); static void Rename(CmdArgList args, ConnectionContext* cntx);
@ -68,6 +68,10 @@ class GenericFamily {
static OpResult<void> OpRen(const OpArgs& op_args, std::string_view from, std::string_view to, static OpResult<void> OpRen(const OpArgs& op_args, std::string_view from, std::string_view to,
bool skip_exists); bool skip_exists);
static uint64_t ScanGeneric(uint64_t cursor, std::string_view pattern,
std::string_view type_filter, unsigned limit, StringVec* keys,
ConnectionContext* cntx);
static void OpScan(const OpArgs& op_args, std::string_view pattern, std::string_view type_filter, static void OpScan(const OpArgs& op_args, std::string_view pattern, std::string_view type_filter,
size_t limit, uint64_t* cursor, StringVec* vec); size_t limit, uint64_t* cursor, StringVec* vec);