Implement CLIENT LIST and CLIENT SETNAME.

This commit is contained in:
Roman Gershman 2022-05-10 06:35:37 +03:00
parent 9e0c705d16
commit 0c5e2a5ecd
13 changed files with 122 additions and 23 deletions

View File

@ -177,7 +177,8 @@ API 2.0
- [ ] WATCH
- [ ] UNWATCH
- [X] DISCARD
- [ ] CLIENT KILL/LIST/UNPAUSE/PAUSE/GETNAME/SETNAME/REPLY/TRACKINGINFO
- [X] CLIENT LIST/SETNAME
- [ ] CLIENT KILL/UNPAUSE/PAUSE/GETNAME/REPLY/TRACKINGINFO
- [X] COMMAND
- [X] COMMAND COUNT
- [ ] COMMAND GETKEYS/INFO

View File

@ -50,6 +50,9 @@ class ConnectionContext {
bool force_dispatch: 1; // whether we should route all requests to the dispatch fiber.
virtual void OnClose() {}
std::string GetContextInfo() const { return std::string{}; }
private:
Connection* owner_;
std::unique_ptr<SinkReplyBuilder> rbuilder_;

View File

@ -103,7 +103,6 @@ struct Connection::Shutdown {
}
};
struct Connection::Request {
absl::FixedArray<MutableSlice, 6> args;
@ -122,6 +121,8 @@ struct Connection::Request {
Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx,
ServiceInterface* service)
: io_buf_(kMinReadSize), http_listener_(http_listener), ctx_(ctx), service_(service) {
static atomic_uint32_t next_id{1};
protocol_ = protocol;
constexpr size_t kReqSz = sizeof(Connection::Request);
@ -136,6 +137,12 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
memcache_parser_.reset(new MemcacheParser);
break;
}
creation_time_ = time(nullptr);
last_interaction_ = creation_time_;
memset(name_, 0, sizeof(name_));
memset(phase_, 0, sizeof(phase_));
id_ = next_id.fetch_add(1, memory_order_relaxed);
}
Connection::~Connection() {
@ -259,6 +266,24 @@ void Connection::SendMsgVecAsync(absl::Span<const std::string_view> msg_vec,
}
}
string Connection::GetClientInfo() const {
LinuxSocketBase* lsb = static_cast<LinuxSocketBase*>(socket_.get());
string res;
auto le = lsb->LocalEndpoint();
auto re = lsb->RemoteEndpoint();
time_t now = time(nullptr);
absl::StrAppend(&res, "id=", id_, " addr=", re.address().to_string(), ":", re.port());
absl::StrAppend(&res, " laddr=", le.address().to_string(), ":", le.port());
absl::StrAppend(&res, " fd=", lsb->native_handle(), " name=", name_);
absl::StrAppend(&res, " age=", now - creation_time_, " idle=", now - last_interaction_);
absl::StrAppend(&res, " phase=", phase_, " ");
absl::StrAppend(&res, cc_->GetContextInfo());
return res;
}
io::Result<bool> Connection::CheckForHttpProto(FiberSocketBase* peer) {
size_t last_len = 0;
do {
@ -295,6 +320,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
// At the start we read from the socket to determine the HTTP/Memstore protocol.
// Therefore we may already have some data in the buffer.
if (io_buf_.InputLen() > 0) {
SetPhase("process");
if (redis_parser_) {
parse_status = ParseRedis();
} else {
@ -379,6 +405,7 @@ auto Connection::ParseRedis() -> ParserStatus {
if (dispatch_q_.empty() && is_sync_dispatch && consumed >= io_buf_.InputLen()) {
RespToArgList(args, &arg_vec);
service_->DispatchCommand(CmdArgList{arg_vec.data(), arg_vec.size()}, cc_.get());
last_interaction_ = time(nullptr);
} else {
// Dispatch via queue to speedup input reading.
Request* req = FromArgs(std::move(args), tlh);
@ -469,7 +496,10 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant<error_code, Pars
FetchBuilderStats(stats, builder);
io::MutableBytes append_buf = io_buf_.AppendBuffer();
SetPhase("readsock");
::io::Result<size_t> recv_sz = peer->Recv(append_buf);
last_interaction_ = time(nullptr);
if (!recv_sz) {
ec = recv_sz.error();
@ -480,6 +510,7 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant<error_code, Pars
io_buf_.CommitWrite(*recv_sz);
stats->io_read_bytes += *recv_sz;
++stats->io_read_cnt;
SetPhase("process");
if (redis_parser_)
parse_status = ParseRedis();
@ -554,6 +585,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
builder->SetBatchMode(!dispatch_q_.empty());
cc_->async_dispatch = true;
service_->DispatchCommand(CmdArgList{req->args.data(), req->args.size()}, cc_.get());
last_interaction_ = time(nullptr);
cc_->async_dispatch = false;
}
req->~Request();

View File

@ -52,6 +52,16 @@ class Connection : public util::Connection {
void SendMsgVecAsync(absl::Span<const std::string_view> msg_vec,
util::fibers_ext::BlockingCounter bc);
void SetName(std::string_view name) {
CopyCharBuf(name, sizeof(name_), name_);
}
void SetPhase(std::string_view phase) {
CopyCharBuf(phase, sizeof(phase_), phase_);
}
std::string GetClientInfo() const;
protected:
void OnShutdown() override;
@ -60,6 +70,13 @@ class Connection : public util::Connection {
void HandleRequests() final;
static void CopyCharBuf(std::string_view src, unsigned dest_len, char* dest) {
src = src.substr(0, dest_len - 1);
if (!src.empty())
memcpy(dest, src.data(), src.size());
dest[src.size()] = '\0';
}
//
io::Result<bool> CheckForHttpProto(util::FiberSocketBase* peer);
@ -77,6 +94,9 @@ class Connection : public util::Connection {
util::HttpListenerBase* http_listener_;
SSL_CTX* ctx_;
ServiceInterface* service_;
time_t creation_time_, last_interaction_;
char name_[16];
char phase_[16];
std::unique_ptr<ConnectionContext> cc_;
@ -88,7 +108,10 @@ class Connection : public util::Connection {
util::fibers_ext::EventCount evc_;
unsigned parser_error_ = 0;
uint32_t id_;
Protocol protocol_;
struct Shutdown;
std::unique_ptr<Shutdown> shutdown_;
BreakerCb breaker_cb_;

View File

@ -38,11 +38,15 @@ bool RunEngine(ProactorPool* pool, AcceptServer* acceptor) {
Service service(pool);
service.Init(acceptor);
Listener* main_listener = new Listener{Protocol::REDIS, &service};
Service::InitOpts opts;
opts.disable_time_update = false;
service.Init(acceptor, main_listener, opts);
const char* bind_addr = FLAGS_bind.empty() ? nullptr : FLAGS_bind.c_str();
error_code ec =
acceptor->AddListener(bind_addr, FLAGS_port, new Listener{Protocol::REDIS, &service});
error_code ec = acceptor->AddListener(bind_addr, FLAGS_port, main_listener);
LOG_IF(FATAL, ec) << "Cound not open port " << FLAGS_port << ", error: " << ec.message();

View File

@ -307,7 +307,8 @@ Service::Service(ProactorPool* pp) : pp_(*pp), shard_set_(pp), server_family_(th
Service::~Service() {
}
void Service::Init(util::AcceptServer* acceptor, const InitOpts& opts) {
void Service::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_interface,
const InitOpts& opts) {
InitRedisTables();
uint32_t shard_num = pp_.size() > 1 ? pp_.size() - 1 : pp_.size();
@ -324,7 +325,7 @@ void Service::Init(util::AcceptServer* acceptor, const InitOpts& opts) {
request_latency_usec.Init(&pp_);
StringFamily::Init(&pp_);
GenericFamily::Init(&pp_);
server_family_.Init(acceptor);
server_family_.Init(acceptor, main_interface);
cmd_req.Init(&pp_, {"type"});
}

View File

@ -33,7 +33,8 @@ class Service : public facade::ServiceInterface {
explicit Service(util::ProactorPool* pp);
~Service();
void Init(util::AcceptServer* acceptor, const InitOpts& opts = InitOpts{});
void Init(util::AcceptServer* acceptor, util::ListenerInterface* main_interface,
const InitOpts& opts = InitOpts{});
void Shutdown();

View File

@ -6,6 +6,7 @@
#include <absl/cleanup/cleanup.h>
#include <absl/random/random.h> // for master_id_ generation.
#include <absl/strings/str_join.h>
#include <absl/strings/match.h>
#include <mimalloc-types.h>
#include <sys/resource.h>
@ -18,6 +19,7 @@ extern "C" {
#include "base/logging.h"
#include "io/proc_reader.h"
#include "facade/dragonfly_connection.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/debugcmd.h"
@ -74,6 +76,11 @@ error_code CreateDirs(fs::path dir_path) {
return ec;
}
string UnknowSubCmd(string_view subcmd, string cmd) {
return absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd,
"'. Try ", cmd, " HELP.");
}
} // namespace
ServerFamily::ServerFamily(Service* service) : service_(*service), ess_(service->shard_set()) {
@ -85,9 +92,10 @@ ServerFamily::ServerFamily(Service* service) : service_(*service), ess_(service-
ServerFamily::~ServerFamily() {
}
void ServerFamily::Init(util::AcceptServer* acceptor) {
void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_listener) {
CHECK(acceptor_ == nullptr);
acceptor_ = acceptor;
main_listener_ = main_listener;
pb_task_ = ess_.pool()->GetNextProactor();
auto cache_cb = [] {
@ -336,12 +344,27 @@ void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[1]);
string_view sub_cmd = ArgS(args, 1);
if (sub_cmd == "SETNAME") {
if (sub_cmd == "SETNAME" && args.size() == 3) {
cntx->owner()->SetName(ArgS(args, 2));
return (*cntx)->SendOk();
} else if (sub_cmd == "LIST") {
vector<string> client_info;
fibers::mutex mu;
auto cb = [&](Connection* conn) {
facade::Connection* dcon = static_cast<facade::Connection*>(conn);
string info = dcon->GetClientInfo();
lock_guard lk(mu);
client_info.push_back(move(info));
};
main_listener_->TraverseConnections(cb);
string result = absl::StrJoin(move(client_info), "\n");
result.append("\n");
return (*cntx)->SendBulkString(result);
}
LOG_FIRST_N(ERROR, 10) << "Subcommand " << sub_cmd << " not supported";
(*cntx)->SendError(kSyntaxErr);
return (*cntx)->SendError(UnknowSubCmd(sub_cmd, "CLIENT"), kSyntaxErr);
}
void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
@ -365,9 +388,7 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
});
return (*cntx)->SendOk();
} else {
string err = StrCat("Unknown subcommand or wrong number of arguments for '", sub_cmd,
"'. Try CONFIG HELP.");
return (*cntx)->SendError(err, kSyntaxErr);
return (*cntx)->SendError(UnknowSubCmd(sub_cmd, "CONFIG"), kSyntaxErr);
}
}

View File

@ -12,6 +12,7 @@
namespace util {
class AcceptServer;
class ListenerInterface;
} // namespace util
namespace dfly {
@ -39,7 +40,7 @@ class ServerFamily {
ServerFamily(Service* service);
~ServerFamily();
void Init(util::AcceptServer* acceptor);
void Init(util::AcceptServer* acceptor, util::ListenerInterface* main_listener);
void Register(CommandRegistry* registry);
void Shutdown();
@ -96,6 +97,7 @@ class ServerFamily {
EngineShardSet& ess_;
util::AcceptServer* acceptor_ = nullptr;
util::ListenerInterface* main_listener_ = nullptr;
util::ProactorBase* pb_task_ = nullptr;
mutable ::boost::fibers::mutex replicaof_mu_, save_mu_;

View File

@ -34,6 +34,8 @@ DEFINE_VARZ(VarzQps, set_qps);
DEFINE_VARZ(VarzQps, get_qps);
constexpr uint32_t kMaxStrLen = 1 << 28;
constexpr uint32_t kMinTieredLen = TieredStorage::kMinBlobLen;
string GetString(EngineShard* shard, const PrimeValue& pv) {
string res;
@ -160,7 +162,7 @@ OpResult<void> SetCmd::Set(const SetParams& params, std::string_view key, std::s
EngineShard* shard = db_slice_.shard_owner();
if (shard->tiered_storage()) { // external storage enabled.
if (value.size() >= 64) {
if (value.size() >= kMinTieredLen) {
shard->tiered_storage()->UnloadItem(params.db_index, it);
}
}
@ -205,6 +207,16 @@ OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIt
// overwrite existing entry.
prime_value.SetString(value);
if (value.size() >= kMinTieredLen) { // external storage enabled.
EngineShard* shard = db_slice_.shard_owner();
if (shard->tiered_storage()) {
shard->tiered_storage()->UnloadItem(params.db_index, it);
}
}
db_slice_.PostUpdate(params.db_index, it);
return OpStatus::OK;

View File

@ -62,7 +62,7 @@ void BaseFamilyTest::SetUp() {
Service::InitOpts opts;
opts.disable_time_update = true;
service_->Init(nullptr, opts);
service_->Init(nullptr, nullptr, opts);
ess_ = &service_->shard_set();
expire_now_ = absl::GetCurrentTimeNanos() / 1000000;

View File

@ -6,12 +6,10 @@
#include <absl/container/flat_hash_map.h>
#include "base/ring_buffer.h"
#include "core/external_alloc.h"
#include "server/common.h"
#include "server/io_mgr.h"
#include "server/table.h"
#include "util/fibers/event_count.h"
namespace dfly {
@ -20,6 +18,8 @@ class DbSlice;
class TieredStorage {
public:
enum : uint16_t { kMinBlobLen = 64 };
explicit TieredStorage(DbSlice* db_slice);
~TieredStorage();
@ -76,16 +76,16 @@ class TieredStorage {
// map of cursor -> pending size
// absl::flat_hash_map<uint64_t, size_t> pending_upload;
// multi_cnt_ - counts how many unloaded items exists in the batch at specified page offset.
// here multi_cnt_.first is (file_offset in 4k pages) and
// multi_cnt_.second is MultiBatch object storing number of allocated records in the batch
// and its capacity (/ 4k).
struct MultiBatch {
uint16_t used; // number of used bytes
uint16_t used; // number of used bytes
uint16_t reserved; // in 4k pages.
MultiBatch(uint16_t mem_used) : used(mem_used) {}
MultiBatch(uint16_t mem_used) : used(mem_used) {
}
};
absl::flat_hash_map<uint32_t, MultiBatch> multi_cnt_;

View File

@ -32,7 +32,6 @@ def fill_hset(args, redis):
def main():
# Check processor architecture
parser = argparse.ArgumentParser(description='fill hset entities')
parser.add_argument(
'-p', type=int, help='redis port', dest='port', default=6380)