Compare commits

...

3 Commits

Author SHA1 Message Date
Roman Gershman 4fdcfa04cc . 2022-05-10 08:07:19 +00:00
Roman Gershman 1a9b8b0527 . 2022-05-10 06:49:08 +03:00
Roman Gershman 560d3f2440 Implement CLIENT LIST and CLIENT SETNAME. 2022-05-10 06:35:37 +03:00
15 changed files with 188 additions and 27 deletions

View File

@ -177,7 +177,8 @@ API 2.0
- [ ] WATCH
- [ ] UNWATCH
- [X] DISCARD
- [ ] CLIENT KILL/LIST/UNPAUSE/PAUSE/GETNAME/SETNAME/REPLY/TRACKINGINFO
- [X] CLIENT LIST/SETNAME
- [ ] CLIENT KILL/UNPAUSE/PAUSE/GETNAME/REPLY/TRACKINGINFO
- [X] COMMAND
- [X] COMMAND COUNT
- [ ] COMMAND GETKEYS/INFO

2
helio

@ -1 +1 @@
Subproject commit 0b73223565a12a096fa4262ccd66c2cd12209a11
Subproject commit 75f2c88c625ac13692f1165cf1f93f8a7096d799

View File

@ -50,6 +50,9 @@ class ConnectionContext {
bool force_dispatch: 1; // whether we should route all requests to the dispatch fiber.
virtual void OnClose() {}
std::string GetContextInfo() const { return std::string{}; }
private:
Connection* owner_;
std::unique_ptr<SinkReplyBuilder> rbuilder_;

View File

@ -103,7 +103,6 @@ struct Connection::Shutdown {
}
};
struct Connection::Request {
absl::FixedArray<MutableSlice, 6> args;
@ -122,6 +121,8 @@ struct Connection::Request {
Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx,
ServiceInterface* service)
: io_buf_(kMinReadSize), http_listener_(http_listener), ctx_(ctx), service_(service) {
static atomic_uint32_t next_id{1};
protocol_ = protocol;
constexpr size_t kReqSz = sizeof(Connection::Request);
@ -136,6 +137,12 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
memcache_parser_.reset(new MemcacheParser);
break;
}
creation_time_ = time(nullptr);
last_interaction_ = creation_time_;
memset(name_, 0, sizeof(name_));
memset(phase_, 0, sizeof(phase_));
id_ = next_id.fetch_add(1, memory_order_relaxed);
}
Connection::~Connection() {
@ -259,6 +266,24 @@ void Connection::SendMsgVecAsync(absl::Span<const std::string_view> msg_vec,
}
}
string Connection::GetClientInfo() const {
LinuxSocketBase* lsb = static_cast<LinuxSocketBase*>(socket_.get());
string res;
auto le = lsb->LocalEndpoint();
auto re = lsb->RemoteEndpoint();
time_t now = time(nullptr);
absl::StrAppend(&res, "id=", id_, " addr=", re.address().to_string(), ":", re.port());
absl::StrAppend(&res, " laddr=", le.address().to_string(), ":", le.port());
absl::StrAppend(&res, " fd=", lsb->native_handle(), " name=", name_);
absl::StrAppend(&res, " age=", now - creation_time_, " idle=", now - last_interaction_);
absl::StrAppend(&res, " phase=", phase_, " ");
absl::StrAppend(&res, cc_->GetContextInfo());
return res;
}
io::Result<bool> Connection::CheckForHttpProto(FiberSocketBase* peer) {
size_t last_len = 0;
do {
@ -295,6 +320,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
// At the start we read from the socket to determine the HTTP/Memstore protocol.
// Therefore we may already have some data in the buffer.
if (io_buf_.InputLen() > 0) {
SetPhase("process");
if (redis_parser_) {
parse_status = ParseRedis();
} else {
@ -379,6 +405,7 @@ auto Connection::ParseRedis() -> ParserStatus {
if (dispatch_q_.empty() && is_sync_dispatch && consumed >= io_buf_.InputLen()) {
RespToArgList(args, &arg_vec);
service_->DispatchCommand(CmdArgList{arg_vec.data(), arg_vec.size()}, cc_.get());
last_interaction_ = time(nullptr);
} else {
// Dispatch via queue to speedup input reading.
Request* req = FromArgs(std::move(args), tlh);
@ -465,11 +492,45 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant<error_code, Pars
error_code ec;
ParserStatus parse_status = OK;
uring::Proactor* proactor = (uring::Proactor*)ProactorBase::me();
auto* me = fibers::context::active();
LinuxSocketBase* lsb = static_cast<LinuxSocketBase*>(socket_.get());
do {
FetchBuilderStats(stats, builder);
io::MutableBytes append_buf = io_buf_.AppendBuffer();
::io::Result<size_t> recv_sz = peer->Recv(append_buf);
SetPhase("readsock");
uring::Proactor::IoResult io_res;
uint32_t io_flags;
auto waker = [&](uring::Proactor::IoResult res, uint32_t flags, int64_t) {
io_res = res;
io_flags = flags;
SetPhase("recvwake");
fibers::context::active()->schedule(me);
};
uring::SubmitEntry se = proactor->GetSubmitEntry(waker, 0);
msghdr msg;
memset(&msg, 0, sizeof(msg));
iovec iv{append_buf.data(), append_buf.size()};
msg.msg_iov = &iv;
msg.msg_iovlen = 1;
se.PrepRecvMsg(lsb->native_handle(), &msg, 0);
me->suspend();
::io::Result<size_t> recv_sz;
if (io_res > 0) {
recv_sz = io_res;
} else {
error_code ec(-io_res, system_category());
LOG(ERROR) << "socket error " << lsb->native_handle() << " " << ec;
recv_sz = nonstd::make_unexpected(std::move(ec));
}
// ::io::Result<size_t> recv_sz = peer->Recv(append_buf);
last_interaction_ = time(nullptr);
if (!recv_sz) {
ec = recv_sz.error();
@ -480,6 +541,7 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant<error_code, Pars
io_buf_.CommitWrite(*recv_sz);
stats->io_read_bytes += *recv_sz;
++stats->io_read_cnt;
SetPhase("process");
if (redis_parser_)
parse_status = ParseRedis();
@ -554,6 +616,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
builder->SetBatchMode(!dispatch_q_.empty());
cc_->async_dispatch = true;
service_->DispatchCommand(CmdArgList{req->args.data(), req->args.size()}, cc_.get());
last_interaction_ = time(nullptr);
cc_->async_dispatch = false;
}
req->~Request();

View File

@ -52,6 +52,16 @@ class Connection : public util::Connection {
void SendMsgVecAsync(absl::Span<const std::string_view> msg_vec,
util::fibers_ext::BlockingCounter bc);
void SetName(std::string_view name) {
CopyCharBuf(name, sizeof(name_), name_);
}
void SetPhase(std::string_view phase) {
CopyCharBuf(phase, sizeof(phase_), phase_);
}
std::string GetClientInfo() const;
protected:
void OnShutdown() override;
@ -60,6 +70,13 @@ class Connection : public util::Connection {
void HandleRequests() final;
static void CopyCharBuf(std::string_view src, unsigned dest_len, char* dest) {
src = src.substr(0, dest_len - 1);
if (!src.empty())
memcpy(dest, src.data(), src.size());
dest[src.size()] = '\0';
}
//
io::Result<bool> CheckForHttpProto(util::FiberSocketBase* peer);
@ -77,6 +94,9 @@ class Connection : public util::Connection {
util::HttpListenerBase* http_listener_;
SSL_CTX* ctx_;
ServiceInterface* service_;
time_t creation_time_, last_interaction_;
char name_[16];
char phase_[16];
std::unique_ptr<ConnectionContext> cc_;
@ -88,7 +108,10 @@ class Connection : public util::Connection {
util::fibers_ext::EventCount evc_;
unsigned parser_error_ = 0;
uint32_t id_;
Protocol protocol_;
struct Shutdown;
std::unique_ptr<Shutdown> shutdown_;
BreakerCb breaker_cb_;

View File

@ -38,11 +38,15 @@ bool RunEngine(ProactorPool* pool, AcceptServer* acceptor) {
Service service(pool);
service.Init(acceptor);
Listener* main_listener = new Listener{Protocol::REDIS, &service};
Service::InitOpts opts;
opts.disable_time_update = true;
service.Init(acceptor, main_listener, opts);
const char* bind_addr = FLAGS_bind.empty() ? nullptr : FLAGS_bind.c_str();
error_code ec =
acceptor->AddListener(bind_addr, FLAGS_port, new Listener{Protocol::REDIS, &service});
error_code ec = acceptor->AddListener(bind_addr, FLAGS_port, main_listener);
LOG_IF(FATAL, ec) << "Cound not open port " << FLAGS_port << ", error: " << ec.message();

View File

@ -43,6 +43,7 @@ DECLARE_string(requirepass);
DEFINE_uint64(maxmemory, 0,
"Limit on maximum-memory that is used by the database."
"0 - means the program will automatically determine its maximum memory usage");
DEFINE_uint32(thread_shards_num, 0, "");
namespace dfly {
@ -307,10 +308,14 @@ Service::Service(ProactorPool* pp) : pp_(*pp), shard_set_(pp), server_family_(th
Service::~Service() {
}
void Service::Init(util::AcceptServer* acceptor, const InitOpts& opts) {
void Service::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_interface,
const InitOpts& opts) {
InitRedisTables();
uint32_t shard_num = pp_.size() > 1 ? pp_.size() - 1 : pp_.size();
uint32_t shard_num = FLAGS_thread_shards_num;
CHECK_LE(shard_num, pp_.size());
if (shard_num == 0)
shard_num = pp_.size() > 1 ? pp_.size() - 1 : pp_.size();
shard_set_.Init(shard_num);
pp_.AwaitFiberOnAll([&](uint32_t index, ProactorBase* pb) {
@ -324,7 +329,7 @@ void Service::Init(util::AcceptServer* acceptor, const InitOpts& opts) {
request_latency_usec.Init(&pp_);
StringFamily::Init(&pp_);
GenericFamily::Init(&pp_);
server_family_.Init(acceptor);
server_family_.Init(acceptor, main_interface);
cmd_req.Init(&pp_, {"type"});
}

View File

@ -33,7 +33,8 @@ class Service : public facade::ServiceInterface {
explicit Service(util::ProactorPool* pp);
~Service();
void Init(util::AcceptServer* acceptor, const InitOpts& opts = InitOpts{});
void Init(util::AcceptServer* acceptor, util::ListenerInterface* main_interface,
const InitOpts& opts = InitOpts{});
void Shutdown();

View File

@ -6,6 +6,7 @@
#include <absl/cleanup/cleanup.h>
#include <absl/random/random.h> // for master_id_ generation.
#include <absl/strings/str_join.h>
#include <absl/strings/match.h>
#include <mimalloc-types.h>
#include <sys/resource.h>
@ -18,6 +19,7 @@ extern "C" {
#include "base/logging.h"
#include "io/proc_reader.h"
#include "facade/dragonfly_connection.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/debugcmd.h"
@ -74,6 +76,11 @@ error_code CreateDirs(fs::path dir_path) {
return ec;
}
string UnknowSubCmd(string_view subcmd, string cmd) {
return absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd,
"'. Try ", cmd, " HELP.");
}
} // namespace
ServerFamily::ServerFamily(Service* service) : service_(*service), ess_(service->shard_set()) {
@ -85,9 +92,10 @@ ServerFamily::ServerFamily(Service* service) : service_(*service), ess_(service-
ServerFamily::~ServerFamily() {
}
void ServerFamily::Init(util::AcceptServer* acceptor) {
void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_listener) {
CHECK(acceptor_ == nullptr);
acceptor_ = acceptor;
main_listener_ = main_listener;
pb_task_ = ess_.pool()->GetNextProactor();
auto cache_cb = [] {
@ -336,12 +344,27 @@ void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[1]);
string_view sub_cmd = ArgS(args, 1);
if (sub_cmd == "SETNAME") {
if (sub_cmd == "SETNAME" && args.size() == 3) {
cntx->owner()->SetName(ArgS(args, 2));
return (*cntx)->SendOk();
} else if (sub_cmd == "LIST") {
vector<string> client_info;
fibers::mutex mu;
auto cb = [&](Connection* conn) {
facade::Connection* dcon = static_cast<facade::Connection*>(conn);
string info = dcon->GetClientInfo();
lock_guard lk(mu);
client_info.push_back(move(info));
};
main_listener_->TraverseConnections(cb);
string result = absl::StrJoin(move(client_info), "\n");
result.append("\n");
return (*cntx)->SendBulkString(result);
}
LOG_FIRST_N(ERROR, 10) << "Subcommand " << sub_cmd << " not supported";
(*cntx)->SendError(kSyntaxErr);
return (*cntx)->SendError(UnknowSubCmd(sub_cmd, "CLIENT"), kSyntaxErr);
}
void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
@ -365,9 +388,7 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
});
return (*cntx)->SendOk();
} else {
string err = StrCat("Unknown subcommand or wrong number of arguments for '", sub_cmd,
"'. Try CONFIG HELP.");
return (*cntx)->SendError(err, kSyntaxErr);
return (*cntx)->SendError(UnknowSubCmd(sub_cmd, "CONFIG"), kSyntaxErr);
}
}

View File

@ -12,6 +12,7 @@
namespace util {
class AcceptServer;
class ListenerInterface;
} // namespace util
namespace dfly {
@ -39,7 +40,7 @@ class ServerFamily {
ServerFamily(Service* service);
~ServerFamily();
void Init(util::AcceptServer* acceptor);
void Init(util::AcceptServer* acceptor, util::ListenerInterface* main_listener);
void Register(CommandRegistry* registry);
void Shutdown();
@ -96,6 +97,7 @@ class ServerFamily {
EngineShardSet& ess_;
util::AcceptServer* acceptor_ = nullptr;
util::ListenerInterface* main_listener_ = nullptr;
util::ProactorBase* pb_task_ = nullptr;
mutable ::boost::fibers::mutex replicaof_mu_, save_mu_;

View File

@ -34,6 +34,8 @@ DEFINE_VARZ(VarzQps, set_qps);
DEFINE_VARZ(VarzQps, get_qps);
constexpr uint32_t kMaxStrLen = 1 << 28;
constexpr uint32_t kMinTieredLen = TieredStorage::kMinBlobLen;
string GetString(EngineShard* shard, const PrimeValue& pv) {
string res;
@ -160,7 +162,7 @@ OpResult<void> SetCmd::Set(const SetParams& params, std::string_view key, std::s
EngineShard* shard = db_slice_.shard_owner();
if (shard->tiered_storage()) { // external storage enabled.
if (value.size() >= 64) {
if (value.size() >= kMinTieredLen) {
shard->tiered_storage()->UnloadItem(params.db_index, it);
}
}
@ -205,6 +207,16 @@ OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIt
// overwrite existing entry.
prime_value.SetString(value);
if (value.size() >= kMinTieredLen) { // external storage enabled.
EngineShard* shard = db_slice_.shard_owner();
if (shard->tiered_storage()) {
shard->tiered_storage()->UnloadItem(params.db_index, it);
}
}
db_slice_.PostUpdate(params.db_index, it);
return OpStatus::OK;
@ -288,13 +300,23 @@ void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1);
uint64_t inside_usec = 0;
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpGet(OpArgs{shard, t->db_index()}, key);
uint64_t start = absl::GetCurrentTimeNanos();
auto res = OpGet(OpArgs{shard, t->db_index()}, key);
inside_usec = (absl::GetCurrentTimeNanos() - start) / 1000;
// LOG_IF(INFO, inside_usec > 3000) << "Get took " << usec << " us";
return res;
};
DVLOG(1) << "Before Get::ScheduleSingleHopT " << key;
Transaction* trans = cntx->transaction;
auto* pb = util::ProactorBase::me();
uint64_t start = pb->GetMonotonicTimeNs();
OpResult<string> result = trans->ScheduleSingleHopT(std::move(cb));
uint64_t usec = (pb->GetMonotonicTimeNs() - start) / 1000;
// LOG_IF(INFO, usec > 6000) << "RGet took " << usec << " us, inside " << inside_usec;
if (result) {
DVLOG(1) << "GET " << trans->DebugId() << ": " << key << " " << result.value();

View File

@ -6,12 +6,10 @@
#include <absl/container/flat_hash_map.h>
#include "base/ring_buffer.h"
#include "core/external_alloc.h"
#include "server/common.h"
#include "server/io_mgr.h"
#include "server/table.h"
#include "util/fibers/event_count.h"
namespace dfly {
@ -20,6 +18,8 @@ class DbSlice;
class TieredStorage {
public:
enum : uint16_t { kMinBlobLen = 64 };
explicit TieredStorage(DbSlice* db_slice);
~TieredStorage();
@ -76,16 +76,16 @@ class TieredStorage {
// map of cursor -> pending size
// absl::flat_hash_map<uint64_t, size_t> pending_upload;
// multi_cnt_ - counts how many unloaded items exists in the batch at specified page offset.
// here multi_cnt_.first is (file_offset in 4k pages) and
// multi_cnt_.second is MultiBatch object storing number of allocated records in the batch
// and its capacity (/ 4k).
struct MultiBatch {
uint16_t used; // number of used bytes
uint16_t used; // number of used bytes
uint16_t reserved; // in 4k pages.
MultiBatch(uint16_t mem_used) : used(mem_used) {}
MultiBatch(uint16_t mem_used) : used(mem_used) {
}
};
absl::flat_hash_map<uint32_t, MultiBatch> multi_cnt_;

View File

@ -521,6 +521,8 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
}
bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !multi_;
uint64_t at_cb_run = 0;
bool notified = false;
if (schedule_fast) { // Single shard (local) optimization.
// We never resize shard_data because that would affect MULTI transaction correctness.
DCHECK_EQ(1u, shard_data_.size());
@ -538,6 +540,8 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
// call PollExecute that runs the callback which calls DecreaseRunCnt.
// As a result WaitForShardCallbacks below is unblocked.
auto schedule_cb = [&] {
auto* pb = util::ProactorBase::me();
at_cb_run = pb->GetMonotonicTimeNs();
bool run_eager = ScheduleUniqueShard(EngineShard::tlocal());
if (run_eager) {
// it's important to DecreaseRunCnt only for run_eager and after run_eager was assigned.
@ -548,7 +552,8 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
}
};
ess_->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier.
notified = ess_->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier.
// after_sched = pb->GetMonotonicTimeNs();
} else {
// Transaction spans multiple shards or it's global (like flushdb) or multi.
if (!multi_)
@ -557,7 +562,14 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
}
DVLOG(1) << "ScheduleSingleHop before Wait " << DebugId() << " " << run_count_.load();
auto* pb = util::ProactorBase::me();
uint64_t start = pb->GetMonotonicTimeNs();
WaitForShardCallbacks();
uint64_t end = pb->GetMonotonicTimeNs();
uint64_t usec = (end - start) / 1000;
LOG_IF(INFO, usec > 8000) << "waitcbs took " << usec << " us, since notify "
<< (end - notify_ts_) / 1000 << ", since run: " << (end - at_cb_run) / 1000
<< " notified:" << notified;
DVLOG(1) << "ScheduleSingleHop after Wait " << DebugId();
cb_ = nullptr;
@ -1113,6 +1125,10 @@ inline uint32_t Transaction::DecreaseRunCnt() {
// We use release so that no stores will be reordered after.
uint32_t res = run_count_.fetch_sub(1, std::memory_order_release);
if (res == 1) {
auto* pb = util::ProactorBase::me();
uint64_t start = pb->GetMonotonicTimeNs();
notify_ts_.store(start, memory_order_relaxed);
run_ec_.notify();
}
return res;

View File

@ -268,6 +268,7 @@ class Transaction {
util::fibers_ext::EventCount blocking_ec_; // used to wake blocking transactions.
util::fibers_ext::EventCount run_ec_;
std::atomic_uint64_t notify_ts_{0};
// shard_data spans all the shards in ess_.
// I wish we could use a dense array of size [0..uniq_shards] but since

View File

@ -32,7 +32,6 @@ def fill_hset(args, redis):
def main():
# Check processor architecture
parser = argparse.ArgumentParser(description='fill hset entities')
parser.add_argument(
'-p', type=int, help='redis port', dest='port', default=6380)