diff --git a/README.md b/README.md index f9c7feb..a68ce39 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,7 @@ API 1.0 - [X] EXISTS - [X] EXPIRE - [X] EXPIREAT + - [X] KEYS - [X] PING - [X] RENAME - [X] RENAMENX @@ -68,6 +69,7 @@ API 1.0 - [X] TYPE - [ ] SORT - [X] Server Family + - [X] AUTH - [X] QUIT - [X] DBSIZE - [ ] BGSAVE @@ -120,9 +122,7 @@ API 1.0 - [X] ZREVRANGE - [X] ZSCORE - [ ] Not sure whether these are required for the initial release. - - [X] AUTH - [ ] BGREWRITEAOF - - [ ] KEYS - [ ] MONITOR - [ ] RANDOMKEY - [ ] MOVE diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 2cfd5ee..a121688 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -18,6 +18,7 @@ extern "C" { #include "util/varz.h" DEFINE_uint32(dbnum, 16, "Number of databases"); +DEFINE_uint32(keys_output_limit, 8192, "Maximum number of keys output by keys command"); namespace dfly { using namespace std; @@ -309,6 +310,22 @@ void GenericFamily::ExpireAt(CmdArgList args, ConnectionContext* cntx) { } } +void GenericFamily::Keys(CmdArgList args, ConnectionContext* cntx) { + string_view pattern(ArgS(args, 1)); + uint64_t cursor = 0; + + StringVec keys; + + do { + cursor = ScanGeneric(cursor, pattern, string_view{}, 512, &keys, cntx); + } while (cursor != 0 && keys.size() < FLAGS_keys_output_limit); + + (*cntx)->StartArray(keys.size()); + for (const auto& k : keys) { + (*cntx)->SendBulkString(k); + } +} + void GenericFamily::PexpireAt(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); string_view msec = ArgS(args, 2); @@ -442,15 +459,9 @@ void GenericFamily::Echo(CmdArgList args, ConnectionContext* cntx) { void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) { string_view token = ArgS(args, 1); - uint64_t cursor = 0; - EngineShardSet* ess = cntx->shard_set; - unsigned shard_count = ess->size(); - uint32_t limit = 10; string_view pattern, type_filter; - - // Dash table returns a cursor with its right byte empty. We will use it - // for encoding shard index. For now scan has a limitation of 255 shards. - CHECK_LT(shard_count, 1024u); + uint64_t cursor = 0; + uint32_t limit = 10; if (!absl::SimpleAtoi(token, &cursor)) { return (*cntx)->SendError("invalid cursor"); @@ -484,31 +495,8 @@ void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) { } } - ShardId sid = cursor % 1024; - if (sid >= shard_count) { - return (*cntx)->SendError("invalid cursor"); - } - - cursor >>= 10; - - vector keys; - do { - ess->Await(sid, [&] { - OpArgs op_args{EngineShard::tlocal(), cntx->conn_state.db_index}; - OpScan(op_args, pattern, type_filter, limit, &cursor, &keys); - }); - if (cursor == 0) { - ++sid; - if (unsigned(sid) == shard_count) - break; - } - } while (keys.size() < limit); - - if (sid < shard_count) { - cursor = (cursor << 10) | sid; - } else { - DCHECK_EQ(0u, cursor); - } + StringVec keys; + cursor = ScanGeneric(cursor, pattern, type_filter, limit, &keys, cntx); (*cntx)->StartArray(2); (*cntx)->SendSimpleString(absl::StrCat(cursor)); @@ -632,6 +620,44 @@ OpResult GenericFamily::OpRen(const OpArgs& op_args, string_view from_key, return OpStatus::OK; } +uint64_t GenericFamily::ScanGeneric(uint64_t cursor, string_view pattern, string_view type_filter, + unsigned limit, StringVec* keys, ConnectionContext* cntx) { + ShardId sid = cursor % 1024; + + EngineShardSet* ess = cntx->shard_set; + unsigned shard_count = ess->size(); + + // Dash table returns a cursor with its right byte empty. We will use it + // for encoding shard index. For now scan has a limitation of 255 shards. + CHECK_LT(shard_count, 1024u); + + if (sid >= shard_count) { // protection + return 0; + } + + cursor >>= 10; + + do { + ess->Await(sid, [&] { + OpArgs op_args{EngineShard::tlocal(), cntx->conn_state.db_index}; + OpScan(op_args, pattern, type_filter, limit, &cursor, keys); + }); + if (cursor == 0) { + ++sid; + if (unsigned(sid) == shard_count) + break; + } + } while (keys->size() < limit); + + if (sid < shard_count) { + cursor = (cursor << 10) | sid; + } else { + DCHECK_EQ(0u, cursor); + } + + return cursor; +} + void GenericFamily::OpScan(const OpArgs& op_args, string_view pattern, string_view type_filter, size_t limit, uint64_t* cursor, StringVec* vec) { auto& db_slice = op_args.shard->db_slice(); @@ -698,6 +724,7 @@ void GenericFamily::Register(CommandRegistry* registry) { << CI{"EXISTS", CO::READONLY | CO::FAST, -2, 1, -1, 1}.HFUNC(Exists) << CI{"EXPIRE", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(Expire) << CI{"EXPIREAT", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(ExpireAt) + << CI{"KEYS", CO::READONLY, 2, 0, 0, 0}.HFUNC(Keys) << CI{"PEXPIREAT", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(PexpireAt) << CI{"RENAME", CO::WRITE, 3, 1, 2, 1}.HFUNC(Rename) << CI{"SELECT", kSelectOpts, 2, 0, 0, 0}.HFUNC(Select) diff --git a/src/server/generic_family.h b/src/server/generic_family.h index ba08e77..ea2be8e 100644 --- a/src/server/generic_family.h +++ b/src/server/generic_family.h @@ -8,7 +8,6 @@ #include "server/common.h" #include "server/table.h" - namespace util { class ProactorPool; } // namespace util @@ -44,6 +43,7 @@ class GenericFamily { static void Exists(CmdArgList args, ConnectionContext* cntx); static void Expire(CmdArgList args, ConnectionContext* cntx); static void ExpireAt(CmdArgList args, ConnectionContext* cntx); + static void Keys(CmdArgList args, ConnectionContext* cntx); static void PexpireAt(CmdArgList args, ConnectionContext* cntx); static void Rename(CmdArgList args, ConnectionContext* cntx); @@ -68,6 +68,10 @@ class GenericFamily { static OpResult OpRen(const OpArgs& op_args, std::string_view from, std::string_view to, bool skip_exists); + static uint64_t ScanGeneric(uint64_t cursor, std::string_view pattern, + std::string_view type_filter, unsigned limit, StringVec* keys, + ConnectionContext* cntx); + static void OpScan(const OpArgs& op_args, std::string_view pattern, std::string_view type_filter, size_t limit, uint64_t* cursor, StringVec* vec);