diff --git a/README.md b/README.md index 60d0256..f681351 100644 --- a/README.md +++ b/README.md @@ -111,7 +111,7 @@ API 1.0 - [ ] ZREVRANGE - [ ] ZSCORE - [ ] Not sure whether these are required for the initial release. - - [ ] AUTH + - [X] AUTH - [ ] BGREWRITEAOF - [ ] KEYS - [ ] MONITOR @@ -122,6 +122,13 @@ In addition, we want to support efficient expiry (TTL) and cache eviction algori We should implement basic memory management support. For Master/Slave replication we should design a distributed log format. +### Memchache API +- [X] SET +- [X] GET +- [X] REPLACE +- [X] ADD +- [ ] STATS + API 2.0 - [ ] List Family - [X] BLPOP diff --git a/server/command_registry.cc b/server/command_registry.cc index 1029ac8..c2f1d98 100644 --- a/server/command_registry.cc +++ b/server/command_registry.cc @@ -30,7 +30,7 @@ uint32_t CommandId::OptCount(uint32_t mask) { } CommandRegistry::CommandRegistry() { - CommandId cd("COMMAND", CO::RANDOM | CO::LOADING, 0, 0, 0, 0); + CommandId cd("COMMAND", CO::RANDOM | CO::LOADING | CO::NOSCRIPT, 0, 0, 0, 0); cd.SetHandler([this](const auto& args, auto* cntx) { return Command(args, cntx); }); diff --git a/server/common.cc b/server/common.cc index 5004974..bee449d 100644 --- a/server/common.cc +++ b/server/common.cc @@ -71,6 +71,7 @@ const char kUintErr[] = "value is out of range, must be positive"; const char kDbIndOutOfRangeErr[] = "DB index is out of range"; const char kInvalidDbIndErr[] = "invalid DB index"; const char kScriptNotFound[] = "-NOSCRIPT No matching script. Please use EVAL."; +const char kAuthRejected[] = "-WRONGPASS invalid username-password pair or user is disabled."; const char* GlobalState::Name(S s) { switch (s) { diff --git a/server/conn_context.h b/server/conn_context.h index 0c5f0d8..ab8ae24 100644 --- a/server/conn_context.h +++ b/server/conn_context.h @@ -37,6 +37,8 @@ struct ConnectionState { // Whether this connection belongs to replica, i.e. a dragonfly slave is connected to this // host (master) via this connection to sync from it. REPL_CONNECTION = 4, + REQ_AUTH = 8, + AUTHENTICATED = 0x10, }; uint32_t mask = 0; // A bitmask of Mask values. diff --git a/server/dragonfly_connection.cc b/server/dragonfly_connection.cc index d440282..a343cfc 100644 --- a/server/dragonfly_connection.cc +++ b/server/dragonfly_connection.cc @@ -160,6 +160,9 @@ void Connection::HandleRequests() { cc_.reset(new ConnectionContext(peer, this)); cc_->shard_set = &service_->shard_set(); + if (service_->IsPassProtected()) + cc_->conn_state.mask |= ConnectionState::REQ_AUTH; + // TODO: to move this interface to LinuxSocketBase so we won't need to cast. uring::UringSocket* us = static_cast(socket_.get()); diff --git a/server/error.h b/server/error.h index a50bef7..daa74fc 100644 --- a/server/error.h +++ b/server/error.h @@ -19,6 +19,7 @@ extern const char kUintErr[]; extern const char kDbIndOutOfRangeErr[]; extern const char kInvalidDbIndErr[]; extern const char kScriptNotFound[]; +extern const char kAuthRejected[]; #ifndef RETURN_ON_ERR diff --git a/server/main_service.cc b/server/main_service.cc index 4774274..71dd3f1 100644 --- a/server/main_service.cc +++ b/server/main_service.cc @@ -30,6 +30,7 @@ extern "C" { DEFINE_uint32(port, 6380, "Redis port"); DEFINE_uint32(memcache_port, 0, "Memcached port"); +DECLARE_string(requirepass); namespace dfly { @@ -368,6 +369,15 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) { return; } + string_view cmd_name{cid->name()}; + + if ((cntx->conn_state.mask & (ConnectionState::REQ_AUTH | ConnectionState::AUTHENTICATED)) == + ConnectionState::REQ_AUTH) { + if (cmd_name != "AUTH") { + return (*cntx)->SendError("-NOAUTH Authentication required."); + } + } + bool under_script = cntx->conn_state.script_info.has_value(); if (under_script && (cid->opt_mask() & CO::NOSCRIPT)) { @@ -403,7 +413,7 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) { return; } - if (string_view{cid->name()} == "SELECT") { + if (cmd_name == "SELECT") { (*cntx)->SendError("Can not call SELECT within a transaction"); return; } @@ -454,7 +464,7 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) { } cntx->cid = cid; - cmd_req.Inc({cid->name()}); + cmd_req.Inc({cmd_name}); cid->Invoke(args, cntx); end_usec = ProactorBase::GetMonotonicTimeNs(); @@ -534,6 +544,10 @@ bool Service::IsShardSetLocked() const { return res.load() != 0; } +bool Service::IsPassProtected() const { + return !FLAGS_requirepass.empty(); +} + void Service::RegisterHttp(HttpListenerBase* listener) { CHECK_NOTNULL(listener); http_listener_ = listener; diff --git a/server/main_service.h b/server/main_service.h index 8af4957..b11be68 100644 --- a/server/main_service.h +++ b/server/main_service.h @@ -64,6 +64,8 @@ class Service { return http_listener_; } + bool IsPassProtected() const; + private: static void Quit(CmdArgList args, ConnectionContext* cntx); static void Multi(CmdArgList args, ConnectionContext* cntx); diff --git a/server/server_family.cc b/server/server_family.cc index d4e730c..4ffda40 100644 --- a/server/server_family.cc +++ b/server/server_family.cc @@ -34,6 +34,7 @@ extern "C" { DEFINE_string(dir, "", "working directory"); DEFINE_string(dbfilename, "", "the filename to save/load the DB"); +DEFINE_string(requirepass, "", "password for AUTH authentication"); DECLARE_uint32(port); @@ -143,6 +144,30 @@ void ServerFamily::FlushAll(CmdArgList args, ConnectionContext* cntx) { (*cntx)->SendOk(); } +void ServerFamily::Auth(CmdArgList args, ConnectionContext* cntx) { + if (args.size() > 3) { + return (*cntx)->SendError(kSyntaxErr); + } + + if (args.size() == 3) { + return (*cntx)->SendError("ACL is not supported yet"); + } + + if (!(cntx->conn_state.mask & ConnectionState::REQ_AUTH)) { + return (*cntx)->SendError( + "AUTH called without any password configured for the " + "default user. Are you sure your configuration is correct?"); + } + + string_view pass = ArgS(args, 1); + if (pass == FLAGS_requirepass) { + cntx->conn_state.mask |= ConnectionState::AUTHENTICATED; + (*cntx)->SendOk(); + } else { + (*cntx)->SendError(kAuthRejected); + } +} + void ServerFamily::Debug(CmdArgList args, ConnectionContext* cntx) { ToUpper(&args[1]); @@ -256,6 +281,19 @@ Metrics ServerFamily::GetMetrics() const { } void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { + if (args.size() > 2) { + return (*cntx)->SendError(kSyntaxErr); + } + + string_view section; + + if (args.size() == 2) { + ToUpper(&args[1]); + section = ArgS(args, 1); + } + + string info; + const char kInfo1[] = R"(# Server redis_version:1.9.9 @@ -264,84 +302,100 @@ arch_bits:64 multiplexing_api:iouring tcp_port:)"; - string info = absl::StrCat(kInfo1, FLAGS_port, "\n"); + auto should_enter = [&](string_view name) { + if (!info.empty()) + info.push_back('\n'); + return section.empty() || section == name; + }; - size_t uptime = time(NULL) - start_time_; - absl::StrAppend(&info, "uptime_in_seconds:", uptime, "\n"); - absl::StrAppend(&info, "uptime_in_days:", uptime / (3600 * 24), "\n"); + if (should_enter("SERVER")) { + absl::StrAppend(&info, kInfo1, FLAGS_port, "\n"); - auto sdata_res = io::ReadStatusInfo(); + size_t uptime = time(NULL) - start_time_; + absl::StrAppend(&info, "uptime_in_seconds:", uptime, "\n"); + absl::StrAppend(&info, "uptime_in_days:", uptime / (3600 * 24), "\n"); + } Metrics m = GetMetrics(); + auto sdata_res = io::ReadStatusInfo(); - // Clients - absl::StrAppend(&info, "\n# Clients\n"); - absl::StrAppend(&info, "connected_clients:", m.conn_stats.num_conns, "\n"); - absl::StrAppend(&info, "client_read_buf_capacity:", m.conn_stats.read_buf_capacity, "\n"); - absl::StrAppend(&info, "blocked_clients:", 0, "\n"); - - size_t db_size = m.db.table_mem_usage + m.db.obj_memory_usage; - - absl::StrAppend(&info, "\n# Memory\n"); - - // TODO: should be extracted from mimalloc heaps instead of using table stats. - absl::StrAppend(&info, "used_memory:", db_size, "\n"); - absl::StrAppend(&info, "used_memory_human:", HumanReadableNumBytes(db_size), "\n"); - if (sdata_res.has_value()) { - absl::StrAppend(&info, "used_memory_rss:", sdata_res->vm_rss, "\n"); - absl::StrAppend(&info, "used_memory_rss_human:", HumanReadableNumBytes(sdata_res->vm_rss), - "\n"); - } else { - LOG(ERROR) << "Error fetching /proc/self/status stats"; + if (should_enter("CLIENTS")) { + absl::StrAppend(&info, "# Clients\n"); + absl::StrAppend(&info, "connected_clients:", m.conn_stats.num_conns, "\n"); + absl::StrAppend(&info, "client_read_buf_capacity:", m.conn_stats.read_buf_capacity, "\n"); + absl::StrAppend(&info, "blocked_clients:", 0, "\n"); } - // TBD: should be the max of all seen used_memory values. - absl::StrAppend(&info, "used_memory_peak:", -1, "\n"); - absl::StrAppend(&info, "object_used_memory:", m.db.obj_memory_usage, "\n"); - absl::StrAppend(&info, "table_used_memory:", m.db.table_mem_usage, "\n"); - absl::StrAppend(&info, "num_entries:", m.db.key_count, "\n"); - absl::StrAppend(&info, "inline_keys:", m.db.inline_keys, "\n"); + if (should_enter("MEMORY")) { + size_t db_size = m.db.table_mem_usage + m.db.obj_memory_usage; - // Stats - absl::StrAppend(&info, "\n# Stats\n"); - absl::StrAppend(&info, "instantaneous_ops_per_sec:", m.qps, "\n"); - absl::StrAppend(&info, "total_commands_processed:", m.conn_stats.command_cnt, "\n"); - absl::StrAppend(&info, "total_pipelined_commands:", m.conn_stats.pipelined_cmd_cnt, "\n"); - absl::StrAppend(&info, "total_net_input_bytes:", m.conn_stats.io_read_bytes, "\n"); - absl::StrAppend(&info, "total_net_output_bytes:", m.conn_stats.io_write_bytes, "\n"); - absl::StrAppend(&info, "instantaneous_input_kbps:", -1, "\n"); - absl::StrAppend(&info, "instantaneous_output_kbps:", -1, "\n"); - absl::StrAppend(&info, "rejected_connections:", -1, "\n"); - absl::StrAppend(&info, "expired_keys:", m.events.expired_keys, "\n"); - absl::StrAppend(&info, "keyspace_hits:", -1, "\n"); - absl::StrAppend(&info, "keyspace_misses:", -1, "\n"); - absl::StrAppend(&info, "total_reads_processed:", m.conn_stats.io_read_cnt, "\n"); - absl::StrAppend(&info, "total_writes_processed:", m.conn_stats.io_write_cnt, "\n"); + absl::StrAppend(&info, "# Memory\n"); - absl::StrAppend(&info, "\n# Replication\n"); + // TODO: should be extracted from mimalloc heaps instead of using table stats. + absl::StrAppend(&info, "used_memory:", db_size, "\n"); + absl::StrAppend(&info, "used_memory_human:", HumanReadableNumBytes(db_size), "\n"); + if (sdata_res.has_value()) { + absl::StrAppend(&info, "used_memory_rss:", sdata_res->vm_rss, "\n"); + absl::StrAppend(&info, "used_memory_rss_human:", HumanReadableNumBytes(sdata_res->vm_rss), + "\n"); + } else { + LOG(ERROR) << "Error fetching /proc/self/status stats"; + } - ServerState& etl = *ServerState::tlocal(); - - if (etl.is_master) { - absl::StrAppend(&info, "role:master\n"); - absl::StrAppend(&info, "connected_slaves:", m.conn_stats.num_replicas, "\n"); - } else { - absl::StrAppend(&info, "role:slave\n"); - - // it's safe to access replica_ because replica_ is created before etl.is_master set to false - // and cleared after etl.is_master is set to true. And since the code here that checks for - // is_master and copies shared_ptr is atomic, it1 should be correct. - auto replica_ptr = replica_; - Replica::Info rinfo = replica_ptr->GetInfo(); - absl::StrAppend(&info, "master_host:", rinfo.host, "\n"); - absl::StrAppend(&info, "master_port:", rinfo.port, "\n"); - const char* link = rinfo.master_link_established ? "up" : "down"; - absl::StrAppend(&info, "master_link_status:", link, "\n"); - absl::StrAppend(&info, "master_last_io_seconds_ago:", rinfo.master_last_io_sec, "\n"); - absl::StrAppend(&info, "master_sync_in_progress:", rinfo.sync_in_progress, "\n"); + // TBD: should be the max of all seen used_memory values. + absl::StrAppend(&info, "used_memory_peak:", -1, "\n"); + absl::StrAppend(&info, "object_used_memory:", m.db.obj_memory_usage, "\n"); + absl::StrAppend(&info, "table_used_memory:", m.db.table_mem_usage, "\n"); + absl::StrAppend(&info, "num_entries:", m.db.key_count, "\n"); + absl::StrAppend(&info, "inline_keys:", m.db.inline_keys, "\n"); + } + + if (should_enter("STATS")) { + absl::StrAppend(&info, "# Stats\n"); + absl::StrAppend(&info, "instantaneous_ops_per_sec:", m.qps, "\n"); + absl::StrAppend(&info, "total_commands_processed:", m.conn_stats.command_cnt, "\n"); + absl::StrAppend(&info, "total_pipelined_commands:", m.conn_stats.pipelined_cmd_cnt, "\n"); + absl::StrAppend(&info, "total_net_input_bytes:", m.conn_stats.io_read_bytes, "\n"); + absl::StrAppend(&info, "total_net_output_bytes:", m.conn_stats.io_write_bytes, "\n"); + absl::StrAppend(&info, "instantaneous_input_kbps:", -1, "\n"); + absl::StrAppend(&info, "instantaneous_output_kbps:", -1, "\n"); + absl::StrAppend(&info, "rejected_connections:", -1, "\n"); + absl::StrAppend(&info, "expired_keys:", m.events.expired_keys, "\n"); + absl::StrAppend(&info, "keyspace_hits:", -1, "\n"); + absl::StrAppend(&info, "keyspace_misses:", -1, "\n"); + absl::StrAppend(&info, "total_reads_processed:", m.conn_stats.io_read_cnt, "\n"); + absl::StrAppend(&info, "total_writes_processed:", m.conn_stats.io_write_cnt, "\n"); + } + + if (should_enter("REPLICATION")) { + absl::StrAppend(&info, "# Replication\n"); + + ServerState& etl = *ServerState::tlocal(); + + if (etl.is_master) { + absl::StrAppend(&info, "role:master\n"); + absl::StrAppend(&info, "connected_slaves:", m.conn_stats.num_replicas, "\n"); + } else { + absl::StrAppend(&info, "role:slave\n"); + + // it's safe to access replica_ because replica_ is created before etl.is_master set to false + // and cleared after etl.is_master is set to true. And since the code here that checks for + // is_master and copies shared_ptr is atomic, it1 should be correct. + auto replica_ptr = replica_; + Replica::Info rinfo = replica_ptr->GetInfo(); + absl::StrAppend(&info, "master_host:", rinfo.host, "\n"); + absl::StrAppend(&info, "master_port:", rinfo.port, "\n"); + const char* link = rinfo.master_link_established ? "up" : "down"; + absl::StrAppend(&info, "master_link_status:", link, "\n"); + absl::StrAppend(&info, "master_last_io_seconds_ago:", rinfo.master_last_io_sec, "\n"); + absl::StrAppend(&info, "master_sync_in_progress:", rinfo.sync_in_progress, "\n"); + } + } + + if (should_enter("KEYSPACE")) { + absl::StrAppend(&info, "# Keyspace\n"); + absl::StrAppend(&info, "db0:keys=xxx,expires=yyy,avg_ttl=zzz\n"); // TODO } - absl::StrAppend(&info, "\n# Keyspace\n"); - absl::StrAppend(&info, "db0:keys=xxx,expires=yyy,avg_ttl=zzz\n"); // TODO (*cntx)->SendBulkString(info); } @@ -444,7 +498,8 @@ void ServerFamily::SyncGeneric(std::string_view repl_master_id, uint64_t offs, void ServerFamily::Register(CommandRegistry* registry) { constexpr auto kReplicaOpts = CO::ADMIN | CO::GLOBAL_TRANS; - *registry << CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize) + *registry << CI{"AUTH", CO::NOSCRIPT | CO::FAST | CO::LOADING, -2, 0, 0, 0}.HFUNC(Auth) + << CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize) << CI{"DEBUG", CO::RANDOM | CO::ADMIN | CO::LOADING, -2, 0, 0, 0}.HFUNC(Debug) << CI{"FLUSHDB", CO::WRITE | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(FlushDb) << CI{"FLUSHALL", CO::WRITE | CO::GLOBAL_TRANS, -1, 0, 0, 0}.HFUNC(FlushAll) diff --git a/server/server_family.h b/server/server_family.h index 4aa1c34..106cbb4 100644 --- a/server/server_family.h +++ b/server/server_family.h @@ -53,8 +53,9 @@ class ServerFamily { return ess_.size(); } - void Debug(CmdArgList args, ConnectionContext* cntx); + void Auth(CmdArgList args, ConnectionContext* cntx); void DbSize(CmdArgList args, ConnectionContext* cntx); + void Debug(CmdArgList args, ConnectionContext* cntx); void FlushDb(CmdArgList args, ConnectionContext* cntx); void FlushAll(CmdArgList args, ConnectionContext* cntx); void Info(CmdArgList args, ConnectionContext* cntx);