Add more debugging tools for object introspection
This commit is contained in:
parent
038868802f
commit
6d44d72a56
|
@ -44,6 +44,8 @@ struct PopulateBatch {
|
|||
struct ObjInfo {
|
||||
unsigned encoding;
|
||||
unsigned bucket_id = 0;
|
||||
unsigned slot_id = 0;
|
||||
|
||||
int64_t ttl = INT64_MAX;
|
||||
bool has_sec_precision = false;
|
||||
|
||||
|
@ -292,6 +294,7 @@ void DebugCmd::Inspect(string_view key) {
|
|||
}
|
||||
|
||||
ObjInfo oinfo(it->second.Encoding(), it.bucket_id());
|
||||
oinfo.slot_id = it.slot_id();
|
||||
|
||||
if (it->second.HasExpire()) {
|
||||
ExpireIterator exp_it = exp_t->Find(it->first);
|
||||
|
@ -309,6 +312,8 @@ void DebugCmd::Inspect(string_view key) {
|
|||
if (res) {
|
||||
string resp;
|
||||
StrAppend(&resp, "encoding:", strEncoding(res->encoding), " bucket_id:", res->bucket_id);
|
||||
StrAppend(&resp, " slot:", res->slot_id);
|
||||
|
||||
if (res->ttl != INT64_MAX) {
|
||||
StrAppend(&resp, " ttl:", res->ttl, res->has_sec_precision ? "s" : "ms");
|
||||
}
|
||||
|
|
|
@ -10,10 +10,10 @@ extern "C" {
|
|||
}
|
||||
|
||||
#include "base/logging.h"
|
||||
#include "server/blocking_controller.h"
|
||||
#include "server/command_registry.h"
|
||||
#include "server/conn_context.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "server/blocking_controller.h"
|
||||
#include "server/error.h"
|
||||
#include "server/transaction.h"
|
||||
#include "util/varz.h"
|
||||
|
@ -164,6 +164,105 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) {
|
|||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
struct ScanOpts {
|
||||
string_view pattern;
|
||||
string_view type_filter;
|
||||
size_t limit = 10;
|
||||
|
||||
unsigned bucket_id = UINT_MAX;
|
||||
};
|
||||
|
||||
bool ScanCb(const OpArgs& op_args, PrimeIterator it, const ScanOpts& opts, StringVec* res) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
if (it->second.HasExpire()) {
|
||||
it = db_slice.ExpireIfNeeded(op_args.db_ind, it).first;
|
||||
}
|
||||
|
||||
if (!IsValid(it))
|
||||
return false;
|
||||
|
||||
bool matches = opts.type_filter.empty() || ObjTypeName(it->second.ObjType()) == opts.type_filter;
|
||||
|
||||
if (!matches)
|
||||
return false;
|
||||
|
||||
if (opts.bucket_id != UINT_MAX && opts.bucket_id != it.bucket_id()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (opts.pattern.empty()) {
|
||||
res->push_back(it->first.ToString());
|
||||
} else {
|
||||
string str = it->first.ToString();
|
||||
if (stringmatchlen(opts.pattern.data(), opts.pattern.size(), str.data(), str.size(), 0) != 1)
|
||||
return false;
|
||||
|
||||
res->push_back(std::move(str));
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor,
|
||||
StringVec* vec) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
DCHECK(db_slice.IsDbValid(op_args.db_ind));
|
||||
|
||||
unsigned cnt = 0;
|
||||
|
||||
VLOG(1) << "PrimeTable " << db_slice.shard_id() << "/" << op_args.db_ind << " has "
|
||||
<< db_slice.DbSize(op_args.db_ind);
|
||||
|
||||
PrimeTable::cursor cur = *cursor;
|
||||
auto [prime_table, expire_table] = db_slice.GetTables(op_args.db_ind);
|
||||
do {
|
||||
cur = prime_table->Traverse(
|
||||
cur, [&](PrimeIterator it) { cnt += ScanCb(op_args, it, scan_opts, vec); });
|
||||
} while (cur && cnt < scan_opts.limit);
|
||||
|
||||
VLOG(1) << "OpScan " << db_slice.shard_id() << " cursor: " << cur.value();
|
||||
*cursor = cur.value();
|
||||
}
|
||||
|
||||
uint64_t ScanGeneric(uint64_t cursor, const ScanOpts& scan_opts, StringVec* keys,
|
||||
ConnectionContext* cntx) {
|
||||
ShardId sid = cursor % 1024;
|
||||
|
||||
EngineShardSet* ess = cntx->shard_set;
|
||||
unsigned shard_count = ess->size();
|
||||
|
||||
// Dash table returns a cursor with its right byte empty. We will use it
|
||||
// for encoding shard index. For now scan has a limitation of 255 shards.
|
||||
CHECK_LT(shard_count, 1024u);
|
||||
|
||||
if (sid >= shard_count) { // protection
|
||||
return 0;
|
||||
}
|
||||
|
||||
cursor >>= 10;
|
||||
|
||||
do {
|
||||
ess->Await(sid, [&] {
|
||||
OpArgs op_args{EngineShard::tlocal(), cntx->conn_state.db_index};
|
||||
|
||||
OpScan(op_args, scan_opts, &cursor, keys);
|
||||
});
|
||||
if (cursor == 0) {
|
||||
++sid;
|
||||
if (unsigned(sid) == shard_count)
|
||||
break;
|
||||
}
|
||||
} while (keys->size() < scan_opts.limit);
|
||||
|
||||
if (sid < shard_count) {
|
||||
cursor = (cursor << 10) | sid;
|
||||
} else {
|
||||
DCHECK_EQ(0u, cursor);
|
||||
}
|
||||
|
||||
return cursor;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
void GenericFamily::Init(util::ProactorPool* pp) {
|
||||
|
@ -298,8 +397,11 @@ void GenericFamily::Keys(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
StringVec keys;
|
||||
|
||||
ScanOpts scan_opts;
|
||||
scan_opts.pattern = pattern;
|
||||
scan_opts.limit = 512;
|
||||
do {
|
||||
cursor = ScanGeneric(cursor, pattern, string_view{}, 512, &keys, cntx);
|
||||
cursor = ScanGeneric(cursor, scan_opts, &keys, cntx);
|
||||
} while (cursor != 0 && keys.size() < FLAGS_keys_output_limit);
|
||||
|
||||
(*cntx)->StartArray(keys.size());
|
||||
|
@ -446,14 +548,14 @@ void GenericFamily::Echo(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) {
|
||||
string_view token = ArgS(args, 1);
|
||||
string_view pattern, type_filter;
|
||||
uint64_t cursor = 0;
|
||||
uint32_t limit = 10;
|
||||
|
||||
if (!absl::SimpleAtoi(token, &cursor)) {
|
||||
return (*cntx)->SendError("invalid cursor");
|
||||
}
|
||||
|
||||
ScanOpts scan_opts;
|
||||
|
||||
for (unsigned i = 2; i < args.size(); i += 2) {
|
||||
if (i + 1 == args.size()) {
|
||||
return (*cntx)->SendError(kSyntaxErr);
|
||||
|
@ -463,27 +565,31 @@ void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
string_view opt = ArgS(args, i);
|
||||
if (opt == "COUNT") {
|
||||
if (!absl::SimpleAtoi(ArgS(args, i + 1), &limit)) {
|
||||
if (!absl::SimpleAtoi(ArgS(args, i + 1), &scan_opts.limit)) {
|
||||
return (*cntx)->SendError(kInvalidIntErr);
|
||||
}
|
||||
if (limit == 0)
|
||||
limit = 1;
|
||||
else if (limit > 4096)
|
||||
limit = 4096;
|
||||
if (scan_opts.limit == 0)
|
||||
scan_opts.limit = 1;
|
||||
else if (scan_opts.limit > 4096)
|
||||
scan_opts.limit = 4096;
|
||||
} else if (opt == "MATCH") {
|
||||
pattern = ArgS(args, i + 1);
|
||||
if (pattern == "*")
|
||||
pattern = string_view{};
|
||||
scan_opts.pattern = ArgS(args, i + 1);
|
||||
if (scan_opts.pattern == "*")
|
||||
scan_opts.pattern = string_view{};
|
||||
} else if (opt == "TYPE") {
|
||||
ToLower(&args[i + 1]);
|
||||
type_filter = ArgS(args, i + 1);
|
||||
scan_opts.type_filter = ArgS(args, i + 1);
|
||||
} else if (opt == "BUCKET") {
|
||||
if (!absl::SimpleAtoi(ArgS(args, i + 1), &scan_opts.bucket_id)) {
|
||||
return (*cntx)->SendError(kInvalidIntErr);
|
||||
}
|
||||
} else {
|
||||
return (*cntx)->SendError(kSyntaxErr);
|
||||
}
|
||||
}
|
||||
|
||||
StringVec keys;
|
||||
cursor = ScanGeneric(cursor, pattern, type_filter, limit, &keys, cntx);
|
||||
cursor = ScanGeneric(cursor, scan_opts, &keys, cntx);
|
||||
|
||||
(*cntx)->StartArray(2);
|
||||
(*cntx)->SendSimpleString(absl::StrCat(cursor));
|
||||
|
@ -608,93 +714,6 @@ OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from_key,
|
|||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
uint64_t GenericFamily::ScanGeneric(uint64_t cursor, string_view pattern, string_view type_filter,
|
||||
unsigned limit, StringVec* keys, ConnectionContext* cntx) {
|
||||
ShardId sid = cursor % 1024;
|
||||
|
||||
EngineShardSet* ess = cntx->shard_set;
|
||||
unsigned shard_count = ess->size();
|
||||
|
||||
// Dash table returns a cursor with its right byte empty. We will use it
|
||||
// for encoding shard index. For now scan has a limitation of 255 shards.
|
||||
CHECK_LT(shard_count, 1024u);
|
||||
|
||||
if (sid >= shard_count) { // protection
|
||||
return 0;
|
||||
}
|
||||
|
||||
cursor >>= 10;
|
||||
|
||||
do {
|
||||
ess->Await(sid, [&] {
|
||||
OpArgs op_args{EngineShard::tlocal(), cntx->conn_state.db_index};
|
||||
OpScan(op_args, pattern, type_filter, limit, &cursor, keys);
|
||||
});
|
||||
if (cursor == 0) {
|
||||
++sid;
|
||||
if (unsigned(sid) == shard_count)
|
||||
break;
|
||||
}
|
||||
} while (keys->size() < limit);
|
||||
|
||||
if (sid < shard_count) {
|
||||
cursor = (cursor << 10) | sid;
|
||||
} else {
|
||||
DCHECK_EQ(0u, cursor);
|
||||
}
|
||||
|
||||
return cursor;
|
||||
}
|
||||
|
||||
void GenericFamily::OpScan(const OpArgs& op_args, string_view pattern, string_view type_filter,
|
||||
size_t limit, uint64_t* cursor, StringVec* vec) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
DCHECK(db_slice.IsDbValid(op_args.db_ind));
|
||||
|
||||
unsigned cnt = 0;
|
||||
|
||||
VLOG(1) << "PrimeTable " << db_slice.shard_id() << "/" << op_args.db_ind << " has "
|
||||
<< db_slice.DbSize(op_args.db_ind);
|
||||
|
||||
PrimeTable::cursor cur = *cursor;
|
||||
auto [prime_table, expire_table] = db_slice.GetTables(op_args.db_ind);
|
||||
do {
|
||||
cur = prime_table->Traverse(
|
||||
cur, [&](PrimeIterator it) { cnt += ScanCb(op_args, it, pattern, type_filter, vec); });
|
||||
} while (cur && cnt < limit);
|
||||
|
||||
VLOG(1) << "OpScan " << db_slice.shard_id() << " cursor: " << cur.value();
|
||||
*cursor = cur.value();
|
||||
}
|
||||
|
||||
bool GenericFamily::ScanCb(const OpArgs& op_args, PrimeIterator it, string_view pattern,
|
||||
string_view type_filter, StringVec* res) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
if (it->second.HasExpire()) {
|
||||
it = db_slice.ExpireIfNeeded(op_args.db_ind, it).first;
|
||||
}
|
||||
|
||||
if (!IsValid(it))
|
||||
return false;
|
||||
|
||||
bool matches = type_filter.empty() || ObjTypeName(it->second.ObjType()) == type_filter;
|
||||
|
||||
if (!matches)
|
||||
return false;
|
||||
|
||||
if (pattern.empty()) {
|
||||
res->push_back(it->first.ToString());
|
||||
} else {
|
||||
string str = it->first.ToString();
|
||||
if (stringmatchlen(pattern.data(), pattern.size(), str.data(), str.size(), 0) != 1)
|
||||
return false;
|
||||
|
||||
res->push_back(std::move(str));
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
using CI = CommandId;
|
||||
|
||||
#define HFUNC(x) SetHandler(&GenericFamily::x)
|
||||
|
|
|
@ -67,16 +67,6 @@ class GenericFamily {
|
|||
static OpResult<uint32_t> OpExists(const OpArgs& op_args, ArgSlice keys);
|
||||
static OpResult<void> OpRen(const OpArgs& op_args, std::string_view from, std::string_view to,
|
||||
bool skip_exists);
|
||||
|
||||
static uint64_t ScanGeneric(uint64_t cursor, std::string_view pattern,
|
||||
std::string_view type_filter, unsigned limit, StringVec* keys,
|
||||
ConnectionContext* cntx);
|
||||
|
||||
static void OpScan(const OpArgs& op_args, std::string_view pattern, std::string_view type_filter,
|
||||
size_t limit, uint64_t* cursor, StringVec* vec);
|
||||
|
||||
static bool ScanCb(const OpArgs& op_args, PrimeIterator it, std::string_view pattern,
|
||||
std::string_view type_filter, StringVec* res);
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
Loading…
Reference in New Issue