Implement TYPE and SCAN commands. Update readme with the progress

This commit is contained in:
Roman Gershman 2022-01-24 08:49:38 +02:00
parent b2953293cd
commit 3f80b89e19
6 changed files with 167 additions and 28 deletions

View File

@ -35,11 +35,11 @@ API 1.0
- [X] SET
- [ ] SETNX
- [X] GET
- [ ] DECR
- [ ] INCR
- [ ] DECRBY
- [X] DECR
- [X] INCR
- [X] DECRBY
- [X] GETSET
- [ ] INCRBY
- [X] INCRBY
- [X] MGET
- [X] MSET
- [ ] MSETNX
@ -50,27 +50,27 @@ API 1.0
- [X] EXISTS
- [X] EXPIRE
- [X] EXPIREAT
- [X] Ping
- [X] PING
- [X] RENAME
- [X] RENAMENX
- [X] SELECT
- [X] TTL
- [ ] TYPE
- [X] TYPE
- [ ] SORT
- [X] Server Family
- [X] QUIT
- [X] DBSIZE
- [ ] BGSAVE
- [ ] SAVE
- [X] SAVE
- [X] DBSIZE
- [X] DEBUG
- [X] EXEC
- [ ] FLUSHALL
- [X] FLUSHDB
- [ ] INFO
- [X] INFO
- [X] MULTI
- [X] SHUTDOWN
- [ ] LASTSAVE
- [X] LASTSAVE
- [ ] SLAVEOF/REPLICAOF
- [ ] SYNC
- [ ] Set Family
@ -124,7 +124,7 @@ a distributed log format.
API 2.0
- [ ] List Family
- [ ] BLPOP
- [X] BLPOP
- [ ] BRPOP
- [ ] BRPOPLPUSH
- [ ] PubSub family
@ -137,16 +137,20 @@ API 2.0
- [ ] WATCH
- [ ] UNWATCH
- [ ] DISCARD
- [X] Generic Family
- [X] SCAN
Commands that I prefer not implement before launch:
- [ ] PUNSUBSCRIBE
- [ ] PSUBSCRIBE
And keyspace notifications. For that I would like to deep dive and learn exact use-cases
for this API.
Also, I would omit keyspace notifications. For that I would like to deep dive and learn
exact use-cases for this API.
## Milestone Nymph
API 2,3,4 without cluster support, without modules, without memory inspection commands.
Without keyspace notifications.
Without support for keyspace notifications.
Design config support. ~140 commands overall...
## Milestone Molt

View File

@ -112,22 +112,11 @@ pair<MainIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, string_view
return make_pair(it, ExpireIterator{});
}
ExpireIterator expire_it;
if (it->second.HasExpire()) { // check expiry state
expire_it = db->expire_table.Find(it->first);
CHECK(IsValid(expire_it));
if (expire_it->second <= now_ms_) {
db->expire_table.Erase(expire_it);
db->stats.inline_keys -= it->first.IsInline();
db->stats.obj_memory_usage -= (it->first.MallocUsed() + it->second.MallocUsed());
db->main_table.Erase(it);
return make_pair(MainIterator{}, ExpireIterator{});
}
return ExpireIfNeeded(db_ind, it);
}
return make_pair(it, expire_it);
return make_pair(it, ExpireIterator{});
}
OpResult<pair<MainIterator, unsigned>> DbSlice::FindFirst(DbIndex db_index, const ArgSlice& args) {
@ -384,4 +373,22 @@ void DbSlice::PostUpdate(DbIndex db_ind, MainIterator it) {
db->stats.obj_memory_usage += it->second.MallocUsed();
}
pair<MainIterator, ExpireIterator> DbSlice::ExpireIfNeeded(DbIndex db_ind, MainIterator it) const {
DCHECK(it->second.HasExpire());
auto& db = db_arr_[db_ind];
auto expire_it = db->expire_table.Find(it->first);
CHECK(IsValid(expire_it));
if (expire_it->second > now_ms_)
return make_pair(it, expire_it);
db->expire_table.Erase(expire_it);
db->stats.inline_keys -= it->first.IsInline();
db->stats.obj_memory_usage -= (it->first.MallocUsed() + it->second.MallocUsed());
db->main_table.Erase(it);
return make_pair(MainIterator{}, ExpireIterator{});
}
} // namespace dfly

View File

@ -155,6 +155,10 @@ class DbSlice {
void PreUpdate(DbIndex db_ind, MainIterator it);
void PostUpdate(DbIndex db_ind, MainIterator it);
// Check whether 'it' has not expired. Returns it if it's still valid. Otherwise, erases it
// from both tables and return MainIterator{}.
std::pair<MainIterator, ExpireIterator> ExpireIfNeeded(DbIndex db_ind, MainIterator it) const;
// Current version of this slice.
// We maintain a shared versioning scheme for all databases in the slice.
uint64_t version() const { return version_; }

View File

@ -4,6 +4,10 @@
#include "server/generic_family.h"
extern "C" {
#include "redis/object.h"
}
#include "base/logging.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
@ -126,6 +130,25 @@ Transaction::RunnableType Renamer::Finalize(bool skip_exist_dest) {
};
}
const char* ObjTypeName(int type) {
switch (type) {
case OBJ_STRING:
return "string";
case OBJ_LIST:
return "list";
case OBJ_SET:
return "set";
case OBJ_ZSET:
return "zset";
case OBJ_HASH:
return "hash";
case OBJ_STREAM:
return "stream";
default:
LOG(ERROR) << "Unsupported type " << type;
}
return "invalid";
};
} // namespace
void GenericFamily::Init(util::ProactorPool* pp) {
@ -285,6 +308,24 @@ void GenericFamily::Select(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendOk();
}
void GenericFamily::Type(CmdArgList args, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<int> {
auto it = shard->db_slice().FindExt(t->db_index(), key).first;
if (!it.is_done()) {
return it->second.ObjType();
} else {
return OpStatus::KEY_NOTFOUND;
}
};
OpResult<int> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (!result) {
cntx->SendSimpleRespString("none");
} else {
cntx->SendSimpleRespString(ObjTypeName(result.value()));
}
}
OpResult<void> GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_dest,
ConnectionContext* cntx) {
@ -325,6 +366,57 @@ void GenericFamily::Echo(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendBulkString(key);
}
void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) {
std::string_view token = ArgS(args, 1);
uint64_t cursor = 0;
EngineShardSet* ess = cntx->shard_set;
unsigned shard_count = ess->size();
// Dash table returns a cursor with its right byte empty. We will use it
// for encoding shard index. For now scan has a limitation of 255 shards.
CHECK_LT(shard_count, 1024u);
if (!absl::SimpleAtoi(token, &cursor)) {
return cntx->SendError("invalid cursor");
}
ShardId sid = cursor % 1024;
if (sid >= shard_count) {
return cntx->SendError("invalid cursor");
}
cursor >>= 10;
vector<string> keys;
do {
ess->Await(sid, [&] {
OpArgs op_args{EngineShard::tlocal(), cntx->conn_state.db_index};
OpScan(op_args, &cursor, &keys);
});
if (cursor == 0) {
++sid;
if (unsigned(sid) == shard_count)
break;
}
} while (keys.size() < 10);
if (sid < shard_count) {
cursor = (cursor << 10) | sid;
} else {
DCHECK_EQ(0u, cursor);
}
string res("*2\r\n$");
string curs_str = absl::StrCat(cursor);
absl::StrAppend(&res, curs_str.size(), "\r\n", curs_str, "\r\n*", keys.size(), "\r\n");
for (const auto& k : keys) {
absl::StrAppend(&res, "$", k.size(), "\r\n", k, "\r\n");
}
return cntx->SendRespBlob(res);
}
OpStatus GenericFamily::OpExpire(const OpArgs& op_args, string_view key,
const ExpireParams& params) {
auto& db_slice = op_args.shard->db_slice();
@ -425,6 +517,32 @@ OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from,
return OpStatus::OK;
}
void GenericFamily::OpScan(const OpArgs& op_args, uint64_t* cursor, vector<string>* vec) {
auto& db_slice = op_args.shard->db_slice();
DCHECK(db_slice.IsDbValid(op_args.db_ind));
unsigned cnt = 0;
auto scan_cb = [&](MainIterator it) {
if (it->second.HasExpire()) {
it = db_slice.ExpireIfNeeded(op_args.db_ind, it).first;
}
vec->push_back(it->first.ToString());
++cnt;
};
VLOG(1) << "PrimeTable " << db_slice.shard_id() << "/" << op_args.db_ind
<< " has " << db_slice.DbSize(op_args.db_ind);
uint64_t cur = *cursor;
auto [prime_table, expire_table] = db_slice.GetTables(op_args.db_ind);
do {
cur = prime_table->Traverse(cur, scan_cb);
} while (cur && cnt < 10);
VLOG(1) << "OpScan " << db_slice.shard_id() << " cursor: " << cur;
*cursor = cur;
}
using CI = CommandId;
#define HFUNC(x) SetHandler(&GenericFamily::x)
@ -439,8 +557,10 @@ void GenericFamily::Register(CommandRegistry* registry) {
<< CI{"EXPIREAT", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(ExpireAt)
<< CI{"RENAME", CO::WRITE, 3, 1, 2, 1}.HFUNC(Rename)
<< CI{"SELECT", kSelectOpts, 2, 0, 0, 0}.HFUNC(Select)
<< CI{"SCAN", CO::READONLY | CO::FAST, -2, 0, 0, 0}.HFUNC(Scan)
<< CI{"TTL", CO::READONLY | CO::FAST | CO::RANDOM, 2, 1, 1, 1}.HFUNC(Ttl)
<< CI{"PTTL", CO::READONLY | CO::FAST | CO::RANDOM, 2, 1, 1, 1}.HFUNC(Pttl);
<< CI{"PTTL", CO::READONLY | CO::FAST | CO::RANDOM, 2, 1, 1, 1}.HFUNC(Pttl)
<< CI{"TYPE", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Type);
}
} // namespace dfly

View File

@ -47,6 +47,8 @@ class GenericFamily {
static void Echo(CmdArgList args, ConnectionContext* cntx);
static void Select(CmdArgList args, ConnectionContext* cntx);
static void Scan(CmdArgList args, ConnectionContext* cntx);
static void Type(CmdArgList args, ConnectionContext* cntx);
static OpResult<void> RenameGeneric(CmdArgList args, bool skip_exist_dest,
ConnectionContext* cntx);
@ -59,6 +61,7 @@ class GenericFamily {
static OpResult<uint32_t> OpExists(const OpArgs& op_args, ArgSlice keys);
static OpResult<void> OpRen(const OpArgs& op_args, std::string_view from, std::string_view to,
bool skip_exists);
static void OpScan(const OpArgs& op_args, uint64_t* cursor, std::vector<std::string>* vec);
};
} // namespace dfly

View File

@ -54,7 +54,8 @@ constexpr size_t kMaxThreadSize = 1024;
Service::Service(ProactorPool* pp) : shard_set_(pp), pp_(*pp), server_family_(this) {
CHECK(pp);
// We support less than 1024 threads.
// We support less than 1024 threads and we support less than 1024 shards.
// For example, Scan uses 10 bits in cursor to encode shard id it currently traverses.
CHECK_LT(pp->size(), kMaxThreadSize);
RegisterCommands();