From b616b1e1fd96978dca898553bbf58c7cc058a1da Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Tue, 4 Oct 2022 11:11:09 +0300 Subject: [PATCH] feat(server): support epoll linux api (#351) --- .github/ISSUE_TEMPLATE/bug_report.md | 2 +- README.md | 9 ++- docs/build-from-source.md | 7 +-- docs/quick-start/README.md | 6 +- helio | 2 +- src/facade/dragonfly_connection.cc | 13 ++--- src/server/CMakeLists.txt | 2 +- src/server/dfly_main.cc | 82 ++++++++++++++++++++-------- src/server/server_family.cc | 54 +++++++++++++----- src/server/server_family.h | 1 + 10 files changed, 117 insertions(+), 61 deletions(-) diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md index 2c6eabb..90ce9d6 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.md +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -25,7 +25,7 @@ If applicable, add screenshots to help explain your problem. **Environment (please complete the following information):** - OS: [ubuntu 20.04] - - Kernel: [MUST BE 5.10 or greater] # Command: `uname -a` + - Kernel: # Command: `uname -a` - Containerized?: [Bare Metal, Docker, Docker Compose, Docker Swarm, Kubernetes, Other] - Dragonfly Version: [e.g. 0.3.0] diff --git a/README.md b/README.md index 5af2f72..9a2c75f 100644 --- a/README.md +++ b/README.md @@ -81,9 +81,8 @@ For more info about memory efficiency in Dragonfly see [dashtable doc](/docs/das ## Running the server -Dragonfly runs on linux. It uses relatively new linux specific [io-uring API](https://github.com/axboe/liburing) -for I/O, hence it requires Linux version 5.10 or later. -Debian/Bullseye, Ubuntu 20.04.4 or later fit these requirements. +Dragonfly runs on linux. We advice running it on linux version 5.11 or later +but you can also run Dragonfly on older kernels as well. ### With docker: @@ -141,9 +140,9 @@ In addition, it has Dragonfly specific arguments options: `keys` is a dangerous command. We truncate its result to avoid blowup in memory when fetching too many keys. * `dbnum` - maximum number of supported databases for `select`. * `cache_mode` - see [Cache](#novel-cache-design) section below. - * `hz` - key expiry evaluation frequency. Default is 1000. Lower frequency uses less cpu when + * `hz` - key expiry evaluation frequency. Default is 1000. Lower frequency uses less cpu when idle at the expense of precision in key eviction. - * `save_schedule` - glob spec for the UTC time to save a snapshot which matches HH:MM (24h time). default: "" + * `save_schedule` - glob spec for the UTC time to save a snapshot which matches HH:MM (24h time). default: "" * `keys_output_limit` - Maximum number of keys output by keys command. default: 8192 diff --git a/docs/build-from-source.md b/docs/build-from-source.md index bcac65c..0a1bdb8 100644 --- a/docs/build-from-source.md +++ b/docs/build-from-source.md @@ -2,9 +2,8 @@ ## Running the server -Dragonfly runs on linux. It uses relatively new linux specific [io-uring API](https://github.com/axboe/liburing) -for I/O, hence it requires `Linux verion 5.10` or later. -Debian/Bullseye, `Ubuntu 20.04.4` or later fit these requirements. +Dragonfly runs on linux. We advice running it on linux version 5.11 or later +but you can also run Dragonfly on older kernels as well. ### WARNING: Building from source on older kernels WILL NOT WORK. @@ -58,7 +57,7 @@ OK 1) "hello" 127.0.0.1:6379> get hello "world" -127.0.0.1:6379> +127.0.0.1:6379> ``` ## Step 6 diff --git a/docs/quick-start/README.md b/docs/quick-start/README.md index 5c2db58..72f5a34 100644 --- a/docs/quick-start/README.md +++ b/docs/quick-start/README.md @@ -37,7 +37,7 @@ OK 1) "hello" 127.0.0.1:6379> get hello "world" -127.0.0.1:6379> +127.0.0.1:6379> ``` ## Step 3 @@ -46,10 +46,6 @@ Continue being great and build your app with the power of DragonflyDB! ## Known issues -#### `Error initializing io_uring` - -This likely means your kernel version is too low to run DragonflyDB. Make sure to install -a kernel version that supports `io_uring`. ## More Build Options - [Docker Compose Deployment](/contrib/docker/) diff --git a/helio b/helio index 723774a..6532558 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 723774a0248102f97c8b0e71b5078025386c3ac1 +Subproject commit 65325585b54c748fc5039b18961d5f0972e8e045 diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 50ed77b..45a817a 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -22,8 +22,6 @@ #include "util/tls/tls_socket.h" #endif -#include "util/uring/uring_socket.h" - ABSL_FLAG(bool, tcp_nodelay, false, "Configures dragonfly connections with socket option TCP_NODELAY"); ABSL_FLAG(bool, http_admin_console, true, "If true allows accessing http console on main TCP port"); @@ -162,8 +160,8 @@ void Connection::OnShutdown() { void Connection::OnPreMigrateThread() { // If we migrating to another io_uring we should cancel any pending requests we have. if (break_poll_id_ != kuint32max) { - uring::UringSocket* us = static_cast(socket_.get()); - us->CancelPoll(break_poll_id_); + auto* ls = static_cast(socket_.get()); + ls->CancelPoll(break_poll_id_); break_poll_id_ = kuint32max; } } @@ -173,9 +171,9 @@ void Connection::OnPostMigrateThread() { if (breaker_cb_) { DCHECK_EQ(kuint32max, break_poll_id_); - uring::UringSocket* us = static_cast(socket_.get()); + auto* ls = static_cast(socket_.get()); break_poll_id_ = - us->PollEvent(POLLERR | POLLHUP, [this](int32_t mask) { this->OnBreakCb(mask); }); + ls->PollEvent(POLLERR | POLLHUP, [this](int32_t mask) { this->OnBreakCb(mask); }); } } @@ -242,8 +240,7 @@ void Connection::HandleRequests() { } else { cc_.reset(service_->CreateContext(peer, this)); - // TODO: to move this interface to LinuxSocketBase so we won't need to cast. - uring::UringSocket* us = static_cast(socket_.get()); + auto* us = static_cast(socket_.get()); if (breaker_cb_) { break_poll_id_ = us->PollEvent(POLLERR | POLLHUP, [this](int32_t mask) { this->OnBreakCb(mask); }); diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 4a00787..0c9aa9d 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -1,5 +1,5 @@ add_executable(dragonfly dfly_main.cc) -cxx_link(dragonfly base dragonfly_lib) +cxx_link(dragonfly base dragonfly_lib epoll_fiber_lib) if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND CMAKE_BUILD_TYPE STREQUAL "Release") # Add core2 only to this file, thus avoiding instructions in this object file that diff --git a/src/server/dfly_main.cc b/src/server/dfly_main.cc index d5e604b..86377f6 100644 --- a/src/server/dfly_main.cc +++ b/src/server/dfly_main.cc @@ -26,6 +26,7 @@ #include "server/version.h" #include "strings/human_readable.h" #include "util/accept_server.h" +#include "util/epoll/epoll_pool.h" #include "util/uring/uring_pool.h" #include "util/varz.h" @@ -52,6 +53,10 @@ ABSL_FLAG(string, unixsocket, "", "If not empty - specifies path for the Unis socket that will " "be used for listening for incoming connections."); +ABSL_FLAG(bool, force_epoll, false, + "If true - uses linux epoll engine underneath." + "Can fit for kernels older than 5.10."); + using namespace util; using namespace facade; using namespace io; @@ -193,6 +198,42 @@ bool CreatePidFile(const string& path) { return true; } +bool ShouldUseEpollAPI(const base::sys::KernelVersion& kver) { + if (GetFlag(FLAGS_force_epoll)) + return true; + + if (kver.kernel < 5 || (kver.kernel == 5 && kver.major < 10)) { + LOG(WARNING) << "Kernel is older than 5.10, switching to epoll engine."; + return true; + } + + struct io_uring ring; + io_uring_params params; + memset(¶ms, 0, sizeof(params)); + + int iouring_res = io_uring_queue_init_params(1024, &ring, ¶ms); + + if (iouring_res == 0) { + io_uring_queue_exit(&ring); + return false; + } + + iouring_res = -iouring_res; + + if (iouring_res == ENOSYS) { + LOG(WARNING) << "iouring API is not supported. switching to epoll."; + } else if (iouring_res == ENOMEM) { + LOG(WARNING) << "io_uring does not have enough memory. That can happen when your " + "max locked memory is too limited. If you run via docker, " + "try adding '--ulimit memlock=-1' to \"docker run\" command." + "Meanwhile, switching to epoll"; + } else { + LOG(WARNING) << "Weird error " << iouring_res << " switching to epoll"; + } + + return true; +} + } // namespace } // namespace dfly @@ -237,28 +278,11 @@ Usage: dragonfly [FLAGS] CHECK_GT(GetFlag(FLAGS_port), 0u); mi_stats_reset(); - base::sys::KernelVersion kver; - base::sys::GetKernelVersion(&kver); - - if (kver.kernel < 5 || (kver.kernel == 5 && kver.major < 10)) { - LOG(ERROR) << "Kernel 5.10 or later is supported. Exiting..."; - return 1; - } - - int iouring_res = io_uring_queue_init_params(0, nullptr, nullptr); - if (-iouring_res == ENOSYS) { - LOG(ERROR) << "iouring system call interface is not supported. Exiting..."; - return 1; - } - if (GetFlag(FLAGS_dbnum) > dfly::kMaxDbId) { LOG(ERROR) << "dbnum is too big. Exiting..."; return 1; } - CHECK_LT(kver.major, 99u); - dfly::kernel_version = kver.kernel * 100 + kver.major; - string pidfile_path = GetFlag(FLAGS_pidfile); if (!pidfile_path.empty()) { if (!CreatePidFile(pidfile_path)) { @@ -287,14 +311,28 @@ Usage: dragonfly [FLAGS] mi_option_enable(mi_option_show_errors); mi_option_set(mi_option_max_warnings, 0); - uring::UringPool pp{1024}; - pp.Run(); + base::sys::KernelVersion kver; + base::sys::GetKernelVersion(&kver); - AcceptServer acceptor(&pp); + CHECK_LT(kver.major, 99u); + dfly::kernel_version = kver.kernel * 100 + kver.major; - int res = dfly::RunEngine(&pp, &acceptor) ? 0 : -1; + unique_ptr pool; - pp.Stop(); + bool use_epoll = ShouldUseEpollAPI(kver); + if (use_epoll) { + pool.reset(new epoll::EpollPool); + } else { + pool.reset(new uring::UringPool(1024)); // 1024 - iouring queue size. + } + + pool->Run(); + + AcceptServer acceptor(pool.get()); + + int res = dfly::RunEngine(pool.get(), &acceptor) ? 0 : -1; + + pool->Stop(); if (!pidfile_path.empty()) { unlink(pidfile_path.c_str()); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index df4db0d..3ef40f4 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -41,6 +41,7 @@ extern "C" { #include "server/version.h" #include "strings/human_readable.h" #include "util/accept_server.h" +#include "util/fibers/fiber_file.h" #include "util/uring/uring_file.h" using namespace std; @@ -66,6 +67,7 @@ using absl::StrCat; using namespace facade; using strings::HumanReadableNumBytes; using util::ProactorBase; +using util::fibers_ext::FiberQueueThreadPool; using util::http::StringResponse; namespace { @@ -169,7 +171,8 @@ class LinuxWriteWrapper : public io::Sink { class RdbSnapshot { public: - RdbSnapshot() {} + RdbSnapshot(FiberQueueThreadPool* fq_tp) : fq_tp_(fq_tp) { + } error_code Start(bool single_shard, const std::string& path, const StringVec& lua_scripts); void StartInShard(EngineShard* shard); @@ -187,13 +190,12 @@ class RdbSnapshot { private: bool started_ = false; - + FiberQueueThreadPool* fq_tp_; std::unique_ptr io_sink_; std::unique_ptr saver_; RdbTypeFreqMap freq_map_; }; - io::Result LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) { io::Result res = lf_->WriteSome(v, len, offset_, 0); if (res) { @@ -203,15 +205,24 @@ io::Result LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) { return res; } -error_code RdbSnapshot::Start(bool sharded_snapshot, - const std::string& path, const StringVec& lua_scripts) { - auto res = uring::OpenLinux(path, kRdbWriteFlags, 0666); - if (!res) { - return res.error(); +error_code RdbSnapshot::Start(bool sharded_snapshot, const std::string& path, + const StringVec& lua_scripts) { + bool is_direct = false; + if (fq_tp_) { // EPOLL + auto res = util::OpenFiberWriteFile(path, fq_tp_); + if (!res) + return res.error(); + io_sink_.reset(*res); + } else { + auto res = uring::OpenLinux(path, kRdbWriteFlags, 0666); + if (!res) { + return res.error(); + } + io_sink_.reset(new LinuxWriteWrapper(res->release())); + is_direct = kRdbWriteFlags & O_DIRECT; } - io_sink_.reset(new LinuxWriteWrapper(res->release())); - saver_.reset(new RdbSaver(io_sink_.get(), sharded_snapshot, kRdbWriteFlags & O_DIRECT)); + saver_.reset(new RdbSaver(io_sink_.get(), sharded_snapshot, is_direct)); return saver_->SaveHeader(lua_scripts); } @@ -222,6 +233,9 @@ error_code RdbSnapshot::SaveBody() { error_code RdbSnapshot::Close() { // TODO: to solve it in a more elegant way. + if (fq_tp_) { + return static_cast(io_sink_.get())->Close(); + } return static_cast(io_sink_.get())->Close(); } @@ -330,6 +344,9 @@ void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* m dfly_cmd_.reset(new DflyCmd(main_listener, this)); pb_task_ = shard_set->pool()->GetNextProactor(); + if (pb_task_->GetKind() == ProactorBase::EPOLL) { + fq_threadpool_.reset(new FiberQueueThreadPool()); + } // Unlike EngineShard::Heartbeat that runs independently in each shard thread, // this callback runs in a single thread and it aggregates globally stats from all the shards. @@ -489,8 +506,14 @@ void ServerFamily::SnapshotScheduling(const SnapshotSpec&& spec) { } error_code ServerFamily::LoadRdb(const std::string& rdb_file) { - io::ReadonlyFileOrError res = uring::OpenRead(rdb_file); error_code ec; + io::ReadonlyFileOrError res; + + if (fq_threadpool_) { + res = util::OpenFiberReadFile(rdb_file, fq_threadpool_.get()); + } else { + res = uring::OpenRead(rdb_file); + } if (res) { io::FileSource fs(*res); @@ -777,7 +800,7 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er abs_path += shard_file; VLOG(1) << "Saving to " << abs_path; - snapshots[sid].reset(new RdbSnapshot); + snapshots[sid].reset(new RdbSnapshot(fq_threadpool_.get())); auto& snapshot = snapshots[sid]; error_code local_ec = snapshot->Start(true, abs_path.generic_string(), lua_scripts); if (local_ec) { @@ -800,7 +823,7 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er path += filename; VLOG(1) << "Saving to " << path; - snapshots[0].reset(new RdbSnapshot); + snapshots[0].reset(new RdbSnapshot(fq_threadpool_.get())); ec = snapshots[0]->Start(false, path.generic_string(), lua_scripts); if (!ec) { @@ -1104,12 +1127,15 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { Metrics m = GetMetrics(); if (should_enter("SERVER")) { + ProactorBase::ProactorKind kind = ProactorBase::me()->GetKind(); + const char* multiplex_api = (kind == ProactorBase::IOURING) ? "iouring" : "epoll"; + ADD_HEADER("# Server"); append("redis_version", GetVersion()); append("redis_mode", "standalone"); append("arch_bits", 64); - append("multiplexing_api", "iouring"); + append("multiplexing_api", multiplex_api); append("tcp_port", GetFlag(FLAGS_port)); size_t uptime = m.uptime; diff --git a/src/server/server_family.h b/src/server/server_family.h index 2e65efe..496dcf7 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -163,6 +163,7 @@ class ServerFamily { std::atomic_bool is_saving_{false}; util::fibers_ext::Done is_snapshot_done_; + std::unique_ptr fq_threadpool_; }; } // namespace dfly