Add AUTH command for 1.0 API

This commit is contained in:
Roman Gershman 2022-02-20 13:49:37 +02:00
parent 29f5052c4d
commit 8d2d49d782
10 changed files with 159 additions and 73 deletions

View File

@ -111,7 +111,7 @@ API 1.0
- [ ] ZREVRANGE
- [ ] ZSCORE
- [ ] Not sure whether these are required for the initial release.
- [ ] AUTH
- [X] AUTH
- [ ] BGREWRITEAOF
- [ ] KEYS
- [ ] MONITOR
@ -122,6 +122,13 @@ In addition, we want to support efficient expiry (TTL) and cache eviction algori
We should implement basic memory management support. For Master/Slave replication we should design
a distributed log format.
### Memchache API
- [X] SET
- [X] GET
- [X] REPLACE
- [X] ADD
- [ ] STATS
API 2.0
- [ ] List Family
- [X] BLPOP

View File

@ -30,7 +30,7 @@ uint32_t CommandId::OptCount(uint32_t mask) {
}
CommandRegistry::CommandRegistry() {
CommandId cd("COMMAND", CO::RANDOM | CO::LOADING, 0, 0, 0, 0);
CommandId cd("COMMAND", CO::RANDOM | CO::LOADING | CO::NOSCRIPT, 0, 0, 0, 0);
cd.SetHandler([this](const auto& args, auto* cntx) { return Command(args, cntx); });

View File

@ -71,6 +71,7 @@ const char kUintErr[] = "value is out of range, must be positive";
const char kDbIndOutOfRangeErr[] = "DB index is out of range";
const char kInvalidDbIndErr[] = "invalid DB index";
const char kScriptNotFound[] = "-NOSCRIPT No matching script. Please use EVAL.";
const char kAuthRejected[] = "-WRONGPASS invalid username-password pair or user is disabled.";
const char* GlobalState::Name(S s) {
switch (s) {

View File

@ -37,6 +37,8 @@ struct ConnectionState {
// Whether this connection belongs to replica, i.e. a dragonfly slave is connected to this
// host (master) via this connection to sync from it.
REPL_CONNECTION = 4,
REQ_AUTH = 8,
AUTHENTICATED = 0x10,
};
uint32_t mask = 0; // A bitmask of Mask values.

View File

@ -160,6 +160,9 @@ void Connection::HandleRequests() {
cc_.reset(new ConnectionContext(peer, this));
cc_->shard_set = &service_->shard_set();
if (service_->IsPassProtected())
cc_->conn_state.mask |= ConnectionState::REQ_AUTH;
// TODO: to move this interface to LinuxSocketBase so we won't need to cast.
uring::UringSocket* us = static_cast<uring::UringSocket*>(socket_.get());

View File

@ -19,6 +19,7 @@ extern const char kUintErr[];
extern const char kDbIndOutOfRangeErr[];
extern const char kInvalidDbIndErr[];
extern const char kScriptNotFound[];
extern const char kAuthRejected[];
#ifndef RETURN_ON_ERR

View File

@ -30,6 +30,7 @@ extern "C" {
DEFINE_uint32(port, 6380, "Redis port");
DEFINE_uint32(memcache_port, 0, "Memcached port");
DECLARE_string(requirepass);
namespace dfly {
@ -368,6 +369,15 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) {
return;
}
string_view cmd_name{cid->name()};
if ((cntx->conn_state.mask & (ConnectionState::REQ_AUTH | ConnectionState::AUTHENTICATED)) ==
ConnectionState::REQ_AUTH) {
if (cmd_name != "AUTH") {
return (*cntx)->SendError("-NOAUTH Authentication required.");
}
}
bool under_script = cntx->conn_state.script_info.has_value();
if (under_script && (cid->opt_mask() & CO::NOSCRIPT)) {
@ -403,7 +413,7 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) {
return;
}
if (string_view{cid->name()} == "SELECT") {
if (cmd_name == "SELECT") {
(*cntx)->SendError("Can not call SELECT within a transaction");
return;
}
@ -454,7 +464,7 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) {
}
cntx->cid = cid;
cmd_req.Inc({cid->name()});
cmd_req.Inc({cmd_name});
cid->Invoke(args, cntx);
end_usec = ProactorBase::GetMonotonicTimeNs();
@ -534,6 +544,10 @@ bool Service::IsShardSetLocked() const {
return res.load() != 0;
}
bool Service::IsPassProtected() const {
return !FLAGS_requirepass.empty();
}
void Service::RegisterHttp(HttpListenerBase* listener) {
CHECK_NOTNULL(listener);
http_listener_ = listener;

View File

@ -64,6 +64,8 @@ class Service {
return http_listener_;
}
bool IsPassProtected() const;
private:
static void Quit(CmdArgList args, ConnectionContext* cntx);
static void Multi(CmdArgList args, ConnectionContext* cntx);

View File

@ -34,6 +34,7 @@ extern "C" {
DEFINE_string(dir, "", "working directory");
DEFINE_string(dbfilename, "", "the filename to save/load the DB");
DEFINE_string(requirepass, "", "password for AUTH authentication");
DECLARE_uint32(port);
@ -143,6 +144,30 @@ void ServerFamily::FlushAll(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendOk();
}
void ServerFamily::Auth(CmdArgList args, ConnectionContext* cntx) {
if (args.size() > 3) {
return (*cntx)->SendError(kSyntaxErr);
}
if (args.size() == 3) {
return (*cntx)->SendError("ACL is not supported yet");
}
if (!(cntx->conn_state.mask & ConnectionState::REQ_AUTH)) {
return (*cntx)->SendError(
"AUTH <password> called without any password configured for the "
"default user. Are you sure your configuration is correct?");
}
string_view pass = ArgS(args, 1);
if (pass == FLAGS_requirepass) {
cntx->conn_state.mask |= ConnectionState::AUTHENTICATED;
(*cntx)->SendOk();
} else {
(*cntx)->SendError(kAuthRejected);
}
}
void ServerFamily::Debug(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[1]);
@ -256,6 +281,19 @@ Metrics ServerFamily::GetMetrics() const {
}
void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
if (args.size() > 2) {
return (*cntx)->SendError(kSyntaxErr);
}
string_view section;
if (args.size() == 2) {
ToUpper(&args[1]);
section = ArgS(args, 1);
}
string info;
const char kInfo1[] =
R"(# Server
redis_version:1.9.9
@ -264,84 +302,100 @@ arch_bits:64
multiplexing_api:iouring
tcp_port:)";
string info = absl::StrCat(kInfo1, FLAGS_port, "\n");
auto should_enter = [&](string_view name) {
if (!info.empty())
info.push_back('\n');
return section.empty() || section == name;
};
size_t uptime = time(NULL) - start_time_;
absl::StrAppend(&info, "uptime_in_seconds:", uptime, "\n");
absl::StrAppend(&info, "uptime_in_days:", uptime / (3600 * 24), "\n");
if (should_enter("SERVER")) {
absl::StrAppend(&info, kInfo1, FLAGS_port, "\n");
auto sdata_res = io::ReadStatusInfo();
size_t uptime = time(NULL) - start_time_;
absl::StrAppend(&info, "uptime_in_seconds:", uptime, "\n");
absl::StrAppend(&info, "uptime_in_days:", uptime / (3600 * 24), "\n");
}
Metrics m = GetMetrics();
auto sdata_res = io::ReadStatusInfo();
// Clients
absl::StrAppend(&info, "\n# Clients\n");
absl::StrAppend(&info, "connected_clients:", m.conn_stats.num_conns, "\n");
absl::StrAppend(&info, "client_read_buf_capacity:", m.conn_stats.read_buf_capacity, "\n");
absl::StrAppend(&info, "blocked_clients:", 0, "\n");
size_t db_size = m.db.table_mem_usage + m.db.obj_memory_usage;
absl::StrAppend(&info, "\n# Memory\n");
// TODO: should be extracted from mimalloc heaps instead of using table stats.
absl::StrAppend(&info, "used_memory:", db_size, "\n");
absl::StrAppend(&info, "used_memory_human:", HumanReadableNumBytes(db_size), "\n");
if (sdata_res.has_value()) {
absl::StrAppend(&info, "used_memory_rss:", sdata_res->vm_rss, "\n");
absl::StrAppend(&info, "used_memory_rss_human:", HumanReadableNumBytes(sdata_res->vm_rss),
"\n");
} else {
LOG(ERROR) << "Error fetching /proc/self/status stats";
if (should_enter("CLIENTS")) {
absl::StrAppend(&info, "# Clients\n");
absl::StrAppend(&info, "connected_clients:", m.conn_stats.num_conns, "\n");
absl::StrAppend(&info, "client_read_buf_capacity:", m.conn_stats.read_buf_capacity, "\n");
absl::StrAppend(&info, "blocked_clients:", 0, "\n");
}
// TBD: should be the max of all seen used_memory values.
absl::StrAppend(&info, "used_memory_peak:", -1, "\n");
absl::StrAppend(&info, "object_used_memory:", m.db.obj_memory_usage, "\n");
absl::StrAppend(&info, "table_used_memory:", m.db.table_mem_usage, "\n");
absl::StrAppend(&info, "num_entries:", m.db.key_count, "\n");
absl::StrAppend(&info, "inline_keys:", m.db.inline_keys, "\n");
if (should_enter("MEMORY")) {
size_t db_size = m.db.table_mem_usage + m.db.obj_memory_usage;
// Stats
absl::StrAppend(&info, "\n# Stats\n");
absl::StrAppend(&info, "instantaneous_ops_per_sec:", m.qps, "\n");
absl::StrAppend(&info, "total_commands_processed:", m.conn_stats.command_cnt, "\n");
absl::StrAppend(&info, "total_pipelined_commands:", m.conn_stats.pipelined_cmd_cnt, "\n");
absl::StrAppend(&info, "total_net_input_bytes:", m.conn_stats.io_read_bytes, "\n");
absl::StrAppend(&info, "total_net_output_bytes:", m.conn_stats.io_write_bytes, "\n");
absl::StrAppend(&info, "instantaneous_input_kbps:", -1, "\n");
absl::StrAppend(&info, "instantaneous_output_kbps:", -1, "\n");
absl::StrAppend(&info, "rejected_connections:", -1, "\n");
absl::StrAppend(&info, "expired_keys:", m.events.expired_keys, "\n");
absl::StrAppend(&info, "keyspace_hits:", -1, "\n");
absl::StrAppend(&info, "keyspace_misses:", -1, "\n");
absl::StrAppend(&info, "total_reads_processed:", m.conn_stats.io_read_cnt, "\n");
absl::StrAppend(&info, "total_writes_processed:", m.conn_stats.io_write_cnt, "\n");
absl::StrAppend(&info, "# Memory\n");
absl::StrAppend(&info, "\n# Replication\n");
// TODO: should be extracted from mimalloc heaps instead of using table stats.
absl::StrAppend(&info, "used_memory:", db_size, "\n");
absl::StrAppend(&info, "used_memory_human:", HumanReadableNumBytes(db_size), "\n");
if (sdata_res.has_value()) {
absl::StrAppend(&info, "used_memory_rss:", sdata_res->vm_rss, "\n");
absl::StrAppend(&info, "used_memory_rss_human:", HumanReadableNumBytes(sdata_res->vm_rss),
"\n");
} else {
LOG(ERROR) << "Error fetching /proc/self/status stats";
}
ServerState& etl = *ServerState::tlocal();
if (etl.is_master) {
absl::StrAppend(&info, "role:master\n");
absl::StrAppend(&info, "connected_slaves:", m.conn_stats.num_replicas, "\n");
} else {
absl::StrAppend(&info, "role:slave\n");
// it's safe to access replica_ because replica_ is created before etl.is_master set to false
// and cleared after etl.is_master is set to true. And since the code here that checks for
// is_master and copies shared_ptr is atomic, it1 should be correct.
auto replica_ptr = replica_;
Replica::Info rinfo = replica_ptr->GetInfo();
absl::StrAppend(&info, "master_host:", rinfo.host, "\n");
absl::StrAppend(&info, "master_port:", rinfo.port, "\n");
const char* link = rinfo.master_link_established ? "up" : "down";
absl::StrAppend(&info, "master_link_status:", link, "\n");
absl::StrAppend(&info, "master_last_io_seconds_ago:", rinfo.master_last_io_sec, "\n");
absl::StrAppend(&info, "master_sync_in_progress:", rinfo.sync_in_progress, "\n");
// TBD: should be the max of all seen used_memory values.
absl::StrAppend(&info, "used_memory_peak:", -1, "\n");
absl::StrAppend(&info, "object_used_memory:", m.db.obj_memory_usage, "\n");
absl::StrAppend(&info, "table_used_memory:", m.db.table_mem_usage, "\n");
absl::StrAppend(&info, "num_entries:", m.db.key_count, "\n");
absl::StrAppend(&info, "inline_keys:", m.db.inline_keys, "\n");
}
if (should_enter("STATS")) {
absl::StrAppend(&info, "# Stats\n");
absl::StrAppend(&info, "instantaneous_ops_per_sec:", m.qps, "\n");
absl::StrAppend(&info, "total_commands_processed:", m.conn_stats.command_cnt, "\n");
absl::StrAppend(&info, "total_pipelined_commands:", m.conn_stats.pipelined_cmd_cnt, "\n");
absl::StrAppend(&info, "total_net_input_bytes:", m.conn_stats.io_read_bytes, "\n");
absl::StrAppend(&info, "total_net_output_bytes:", m.conn_stats.io_write_bytes, "\n");
absl::StrAppend(&info, "instantaneous_input_kbps:", -1, "\n");
absl::StrAppend(&info, "instantaneous_output_kbps:", -1, "\n");
absl::StrAppend(&info, "rejected_connections:", -1, "\n");
absl::StrAppend(&info, "expired_keys:", m.events.expired_keys, "\n");
absl::StrAppend(&info, "keyspace_hits:", -1, "\n");
absl::StrAppend(&info, "keyspace_misses:", -1, "\n");
absl::StrAppend(&info, "total_reads_processed:", m.conn_stats.io_read_cnt, "\n");
absl::StrAppend(&info, "total_writes_processed:", m.conn_stats.io_write_cnt, "\n");
}
if (should_enter("REPLICATION")) {
absl::StrAppend(&info, "# Replication\n");
ServerState& etl = *ServerState::tlocal();
if (etl.is_master) {
absl::StrAppend(&info, "role:master\n");
absl::StrAppend(&info, "connected_slaves:", m.conn_stats.num_replicas, "\n");
} else {
absl::StrAppend(&info, "role:slave\n");
// it's safe to access replica_ because replica_ is created before etl.is_master set to false
// and cleared after etl.is_master is set to true. And since the code here that checks for
// is_master and copies shared_ptr is atomic, it1 should be correct.
auto replica_ptr = replica_;
Replica::Info rinfo = replica_ptr->GetInfo();
absl::StrAppend(&info, "master_host:", rinfo.host, "\n");
absl::StrAppend(&info, "master_port:", rinfo.port, "\n");
const char* link = rinfo.master_link_established ? "up" : "down";
absl::StrAppend(&info, "master_link_status:", link, "\n");
absl::StrAppend(&info, "master_last_io_seconds_ago:", rinfo.master_last_io_sec, "\n");
absl::StrAppend(&info, "master_sync_in_progress:", rinfo.sync_in_progress, "\n");
}
}
if (should_enter("KEYSPACE")) {
absl::StrAppend(&info, "# Keyspace\n");
absl::StrAppend(&info, "db0:keys=xxx,expires=yyy,avg_ttl=zzz\n"); // TODO
}
absl::StrAppend(&info, "\n# Keyspace\n");
absl::StrAppend(&info, "db0:keys=xxx,expires=yyy,avg_ttl=zzz\n"); // TODO
(*cntx)->SendBulkString(info);
}
@ -444,7 +498,8 @@ void ServerFamily::SyncGeneric(std::string_view repl_master_id, uint64_t offs,
void ServerFamily::Register(CommandRegistry* registry) {
constexpr auto kReplicaOpts = CO::ADMIN | CO::GLOBAL_TRANS;
*registry << CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize)
*registry << CI{"AUTH", CO::NOSCRIPT | CO::FAST | CO::LOADING, -2, 0, 0, 0}.HFUNC(Auth)
<< CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize)
<< CI{"DEBUG", CO::RANDOM | CO::ADMIN | CO::LOADING, -2, 0, 0, 0}.HFUNC(Debug)
<< CI{"FLUSHDB", CO::WRITE | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(FlushDb)
<< CI{"FLUSHALL", CO::WRITE | CO::GLOBAL_TRANS, -1, 0, 0, 0}.HFUNC(FlushAll)

View File

@ -53,8 +53,9 @@ class ServerFamily {
return ess_.size();
}
void Debug(CmdArgList args, ConnectionContext* cntx);
void Auth(CmdArgList args, ConnectionContext* cntx);
void DbSize(CmdArgList args, ConnectionContext* cntx);
void Debug(CmdArgList args, ConnectionContext* cntx);
void FlushDb(CmdArgList args, ConnectionContext* cntx);
void FlushAll(CmdArgList args, ConnectionContext* cntx);
void Info(CmdArgList args, ConnectionContext* cntx);