feat(server): support epoll linux api (#351)

This commit is contained in:
Roman Gershman 2022-10-04 11:11:09 +03:00 committed by GitHub
parent af690668ca
commit b616b1e1fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 117 additions and 61 deletions

View File

@ -25,7 +25,7 @@ If applicable, add screenshots to help explain your problem.
**Environment (please complete the following information):** **Environment (please complete the following information):**
- OS: [ubuntu 20.04] - OS: [ubuntu 20.04]
- Kernel: [MUST BE 5.10 or greater] # Command: `uname -a` - Kernel: # Command: `uname -a`
- Containerized?: [Bare Metal, Docker, Docker Compose, Docker Swarm, Kubernetes, Other] - Containerized?: [Bare Metal, Docker, Docker Compose, Docker Swarm, Kubernetes, Other]
- Dragonfly Version: [e.g. 0.3.0] - Dragonfly Version: [e.g. 0.3.0]

View File

@ -81,9 +81,8 @@ For more info about memory efficiency in Dragonfly see [dashtable doc](/docs/das
## Running the server ## Running the server
Dragonfly runs on linux. It uses relatively new linux specific [io-uring API](https://github.com/axboe/liburing) Dragonfly runs on linux. We advice running it on linux version 5.11 or later
for I/O, hence it requires Linux version 5.10 or later. but you can also run Dragonfly on older kernels as well.
Debian/Bullseye, Ubuntu 20.04.4 or later fit these requirements.
### With docker: ### With docker:

View File

@ -2,9 +2,8 @@
## Running the server ## Running the server
Dragonfly runs on linux. It uses relatively new linux specific [io-uring API](https://github.com/axboe/liburing) Dragonfly runs on linux. We advice running it on linux version 5.11 or later
for I/O, hence it requires `Linux verion 5.10` or later. but you can also run Dragonfly on older kernels as well.
Debian/Bullseye, `Ubuntu 20.04.4` or later fit these requirements.
### WARNING: Building from source on older kernels WILL NOT WORK. ### WARNING: Building from source on older kernels WILL NOT WORK.

View File

@ -46,10 +46,6 @@ Continue being great and build your app with the power of DragonflyDB!
## Known issues ## Known issues
#### `Error initializing io_uring`
This likely means your kernel version is too low to run DragonflyDB. Make sure to install
a kernel version that supports `io_uring`.
## More Build Options ## More Build Options
- [Docker Compose Deployment](/contrib/docker/) - [Docker Compose Deployment](/contrib/docker/)

2
helio

@ -1 +1 @@
Subproject commit 723774a0248102f97c8b0e71b5078025386c3ac1 Subproject commit 65325585b54c748fc5039b18961d5f0972e8e045

View File

@ -22,8 +22,6 @@
#include "util/tls/tls_socket.h" #include "util/tls/tls_socket.h"
#endif #endif
#include "util/uring/uring_socket.h"
ABSL_FLAG(bool, tcp_nodelay, false, ABSL_FLAG(bool, tcp_nodelay, false,
"Configures dragonfly connections with socket option TCP_NODELAY"); "Configures dragonfly connections with socket option TCP_NODELAY");
ABSL_FLAG(bool, http_admin_console, true, "If true allows accessing http console on main TCP port"); ABSL_FLAG(bool, http_admin_console, true, "If true allows accessing http console on main TCP port");
@ -162,8 +160,8 @@ void Connection::OnShutdown() {
void Connection::OnPreMigrateThread() { void Connection::OnPreMigrateThread() {
// If we migrating to another io_uring we should cancel any pending requests we have. // If we migrating to another io_uring we should cancel any pending requests we have.
if (break_poll_id_ != kuint32max) { if (break_poll_id_ != kuint32max) {
uring::UringSocket* us = static_cast<uring::UringSocket*>(socket_.get()); auto* ls = static_cast<LinuxSocketBase*>(socket_.get());
us->CancelPoll(break_poll_id_); ls->CancelPoll(break_poll_id_);
break_poll_id_ = kuint32max; break_poll_id_ = kuint32max;
} }
} }
@ -173,9 +171,9 @@ void Connection::OnPostMigrateThread() {
if (breaker_cb_) { if (breaker_cb_) {
DCHECK_EQ(kuint32max, break_poll_id_); DCHECK_EQ(kuint32max, break_poll_id_);
uring::UringSocket* us = static_cast<uring::UringSocket*>(socket_.get()); auto* ls = static_cast<LinuxSocketBase*>(socket_.get());
break_poll_id_ = break_poll_id_ =
us->PollEvent(POLLERR | POLLHUP, [this](int32_t mask) { this->OnBreakCb(mask); }); ls->PollEvent(POLLERR | POLLHUP, [this](int32_t mask) { this->OnBreakCb(mask); });
} }
} }
@ -242,8 +240,7 @@ void Connection::HandleRequests() {
} else { } else {
cc_.reset(service_->CreateContext(peer, this)); cc_.reset(service_->CreateContext(peer, this));
// TODO: to move this interface to LinuxSocketBase so we won't need to cast. auto* us = static_cast<LinuxSocketBase*>(socket_.get());
uring::UringSocket* us = static_cast<uring::UringSocket*>(socket_.get());
if (breaker_cb_) { if (breaker_cb_) {
break_poll_id_ = break_poll_id_ =
us->PollEvent(POLLERR | POLLHUP, [this](int32_t mask) { this->OnBreakCb(mask); }); us->PollEvent(POLLERR | POLLHUP, [this](int32_t mask) { this->OnBreakCb(mask); });

View File

@ -1,5 +1,5 @@
add_executable(dragonfly dfly_main.cc) add_executable(dragonfly dfly_main.cc)
cxx_link(dragonfly base dragonfly_lib) cxx_link(dragonfly base dragonfly_lib epoll_fiber_lib)
if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND CMAKE_BUILD_TYPE STREQUAL "Release") if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND CMAKE_BUILD_TYPE STREQUAL "Release")
# Add core2 only to this file, thus avoiding instructions in this object file that # Add core2 only to this file, thus avoiding instructions in this object file that

View File

@ -26,6 +26,7 @@
#include "server/version.h" #include "server/version.h"
#include "strings/human_readable.h" #include "strings/human_readable.h"
#include "util/accept_server.h" #include "util/accept_server.h"
#include "util/epoll/epoll_pool.h"
#include "util/uring/uring_pool.h" #include "util/uring/uring_pool.h"
#include "util/varz.h" #include "util/varz.h"
@ -52,6 +53,10 @@ ABSL_FLAG(string, unixsocket, "",
"If not empty - specifies path for the Unis socket that will " "If not empty - specifies path for the Unis socket that will "
"be used for listening for incoming connections."); "be used for listening for incoming connections.");
ABSL_FLAG(bool, force_epoll, false,
"If true - uses linux epoll engine underneath."
"Can fit for kernels older than 5.10.");
using namespace util; using namespace util;
using namespace facade; using namespace facade;
using namespace io; using namespace io;
@ -193,6 +198,42 @@ bool CreatePidFile(const string& path) {
return true; return true;
} }
bool ShouldUseEpollAPI(const base::sys::KernelVersion& kver) {
if (GetFlag(FLAGS_force_epoll))
return true;
if (kver.kernel < 5 || (kver.kernel == 5 && kver.major < 10)) {
LOG(WARNING) << "Kernel is older than 5.10, switching to epoll engine.";
return true;
}
struct io_uring ring;
io_uring_params params;
memset(&params, 0, sizeof(params));
int iouring_res = io_uring_queue_init_params(1024, &ring, &params);
if (iouring_res == 0) {
io_uring_queue_exit(&ring);
return false;
}
iouring_res = -iouring_res;
if (iouring_res == ENOSYS) {
LOG(WARNING) << "iouring API is not supported. switching to epoll.";
} else if (iouring_res == ENOMEM) {
LOG(WARNING) << "io_uring does not have enough memory. That can happen when your "
"max locked memory is too limited. If you run via docker, "
"try adding '--ulimit memlock=-1' to \"docker run\" command."
"Meanwhile, switching to epoll";
} else {
LOG(WARNING) << "Weird error " << iouring_res << " switching to epoll";
}
return true;
}
} // namespace } // namespace
} // namespace dfly } // namespace dfly
@ -237,28 +278,11 @@ Usage: dragonfly [FLAGS]
CHECK_GT(GetFlag(FLAGS_port), 0u); CHECK_GT(GetFlag(FLAGS_port), 0u);
mi_stats_reset(); mi_stats_reset();
base::sys::KernelVersion kver;
base::sys::GetKernelVersion(&kver);
if (kver.kernel < 5 || (kver.kernel == 5 && kver.major < 10)) {
LOG(ERROR) << "Kernel 5.10 or later is supported. Exiting...";
return 1;
}
int iouring_res = io_uring_queue_init_params(0, nullptr, nullptr);
if (-iouring_res == ENOSYS) {
LOG(ERROR) << "iouring system call interface is not supported. Exiting...";
return 1;
}
if (GetFlag(FLAGS_dbnum) > dfly::kMaxDbId) { if (GetFlag(FLAGS_dbnum) > dfly::kMaxDbId) {
LOG(ERROR) << "dbnum is too big. Exiting..."; LOG(ERROR) << "dbnum is too big. Exiting...";
return 1; return 1;
} }
CHECK_LT(kver.major, 99u);
dfly::kernel_version = kver.kernel * 100 + kver.major;
string pidfile_path = GetFlag(FLAGS_pidfile); string pidfile_path = GetFlag(FLAGS_pidfile);
if (!pidfile_path.empty()) { if (!pidfile_path.empty()) {
if (!CreatePidFile(pidfile_path)) { if (!CreatePidFile(pidfile_path)) {
@ -287,14 +311,28 @@ Usage: dragonfly [FLAGS]
mi_option_enable(mi_option_show_errors); mi_option_enable(mi_option_show_errors);
mi_option_set(mi_option_max_warnings, 0); mi_option_set(mi_option_max_warnings, 0);
uring::UringPool pp{1024}; base::sys::KernelVersion kver;
pp.Run(); base::sys::GetKernelVersion(&kver);
AcceptServer acceptor(&pp); CHECK_LT(kver.major, 99u);
dfly::kernel_version = kver.kernel * 100 + kver.major;
int res = dfly::RunEngine(&pp, &acceptor) ? 0 : -1; unique_ptr<util::ProactorPool> pool;
pp.Stop(); bool use_epoll = ShouldUseEpollAPI(kver);
if (use_epoll) {
pool.reset(new epoll::EpollPool);
} else {
pool.reset(new uring::UringPool(1024)); // 1024 - iouring queue size.
}
pool->Run();
AcceptServer acceptor(pool.get());
int res = dfly::RunEngine(pool.get(), &acceptor) ? 0 : -1;
pool->Stop();
if (!pidfile_path.empty()) { if (!pidfile_path.empty()) {
unlink(pidfile_path.c_str()); unlink(pidfile_path.c_str());

View File

@ -41,6 +41,7 @@ extern "C" {
#include "server/version.h" #include "server/version.h"
#include "strings/human_readable.h" #include "strings/human_readable.h"
#include "util/accept_server.h" #include "util/accept_server.h"
#include "util/fibers/fiber_file.h"
#include "util/uring/uring_file.h" #include "util/uring/uring_file.h"
using namespace std; using namespace std;
@ -66,6 +67,7 @@ using absl::StrCat;
using namespace facade; using namespace facade;
using strings::HumanReadableNumBytes; using strings::HumanReadableNumBytes;
using util::ProactorBase; using util::ProactorBase;
using util::fibers_ext::FiberQueueThreadPool;
using util::http::StringResponse; using util::http::StringResponse;
namespace { namespace {
@ -169,7 +171,8 @@ class LinuxWriteWrapper : public io::Sink {
class RdbSnapshot { class RdbSnapshot {
public: public:
RdbSnapshot() {} RdbSnapshot(FiberQueueThreadPool* fq_tp) : fq_tp_(fq_tp) {
}
error_code Start(bool single_shard, const std::string& path, const StringVec& lua_scripts); error_code Start(bool single_shard, const std::string& path, const StringVec& lua_scripts);
void StartInShard(EngineShard* shard); void StartInShard(EngineShard* shard);
@ -187,13 +190,12 @@ class RdbSnapshot {
private: private:
bool started_ = false; bool started_ = false;
FiberQueueThreadPool* fq_tp_;
std::unique_ptr<io::Sink> io_sink_; std::unique_ptr<io::Sink> io_sink_;
std::unique_ptr<RdbSaver> saver_; std::unique_ptr<RdbSaver> saver_;
RdbTypeFreqMap freq_map_; RdbTypeFreqMap freq_map_;
}; };
io::Result<size_t> LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) { io::Result<size_t> LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) {
io::Result<size_t> res = lf_->WriteSome(v, len, offset_, 0); io::Result<size_t> res = lf_->WriteSome(v, len, offset_, 0);
if (res) { if (res) {
@ -203,15 +205,24 @@ io::Result<size_t> LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) {
return res; return res;
} }
error_code RdbSnapshot::Start(bool sharded_snapshot, error_code RdbSnapshot::Start(bool sharded_snapshot, const std::string& path,
const std::string& path, const StringVec& lua_scripts) { const StringVec& lua_scripts) {
auto res = uring::OpenLinux(path, kRdbWriteFlags, 0666); bool is_direct = false;
if (!res) { if (fq_tp_) { // EPOLL
return res.error(); auto res = util::OpenFiberWriteFile(path, fq_tp_);
if (!res)
return res.error();
io_sink_.reset(*res);
} else {
auto res = uring::OpenLinux(path, kRdbWriteFlags, 0666);
if (!res) {
return res.error();
}
io_sink_.reset(new LinuxWriteWrapper(res->release()));
is_direct = kRdbWriteFlags & O_DIRECT;
} }
io_sink_.reset(new LinuxWriteWrapper(res->release())); saver_.reset(new RdbSaver(io_sink_.get(), sharded_snapshot, is_direct));
saver_.reset(new RdbSaver(io_sink_.get(), sharded_snapshot, kRdbWriteFlags & O_DIRECT));
return saver_->SaveHeader(lua_scripts); return saver_->SaveHeader(lua_scripts);
} }
@ -222,6 +233,9 @@ error_code RdbSnapshot::SaveBody() {
error_code RdbSnapshot::Close() { error_code RdbSnapshot::Close() {
// TODO: to solve it in a more elegant way. // TODO: to solve it in a more elegant way.
if (fq_tp_) {
return static_cast<io::WriteFile*>(io_sink_.get())->Close();
}
return static_cast<LinuxWriteWrapper*>(io_sink_.get())->Close(); return static_cast<LinuxWriteWrapper*>(io_sink_.get())->Close();
} }
@ -330,6 +344,9 @@ void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* m
dfly_cmd_.reset(new DflyCmd(main_listener, this)); dfly_cmd_.reset(new DflyCmd(main_listener, this));
pb_task_ = shard_set->pool()->GetNextProactor(); pb_task_ = shard_set->pool()->GetNextProactor();
if (pb_task_->GetKind() == ProactorBase::EPOLL) {
fq_threadpool_.reset(new FiberQueueThreadPool());
}
// Unlike EngineShard::Heartbeat that runs independently in each shard thread, // Unlike EngineShard::Heartbeat that runs independently in each shard thread,
// this callback runs in a single thread and it aggregates globally stats from all the shards. // this callback runs in a single thread and it aggregates globally stats from all the shards.
@ -489,8 +506,14 @@ void ServerFamily::SnapshotScheduling(const SnapshotSpec&& spec) {
} }
error_code ServerFamily::LoadRdb(const std::string& rdb_file) { error_code ServerFamily::LoadRdb(const std::string& rdb_file) {
io::ReadonlyFileOrError res = uring::OpenRead(rdb_file);
error_code ec; error_code ec;
io::ReadonlyFileOrError res;
if (fq_threadpool_) {
res = util::OpenFiberReadFile(rdb_file, fq_threadpool_.get());
} else {
res = uring::OpenRead(rdb_file);
}
if (res) { if (res) {
io::FileSource fs(*res); io::FileSource fs(*res);
@ -777,7 +800,7 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er
abs_path += shard_file; abs_path += shard_file;
VLOG(1) << "Saving to " << abs_path; VLOG(1) << "Saving to " << abs_path;
snapshots[sid].reset(new RdbSnapshot); snapshots[sid].reset(new RdbSnapshot(fq_threadpool_.get()));
auto& snapshot = snapshots[sid]; auto& snapshot = snapshots[sid];
error_code local_ec = snapshot->Start(true, abs_path.generic_string(), lua_scripts); error_code local_ec = snapshot->Start(true, abs_path.generic_string(), lua_scripts);
if (local_ec) { if (local_ec) {
@ -800,7 +823,7 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er
path += filename; path += filename;
VLOG(1) << "Saving to " << path; VLOG(1) << "Saving to " << path;
snapshots[0].reset(new RdbSnapshot); snapshots[0].reset(new RdbSnapshot(fq_threadpool_.get()));
ec = snapshots[0]->Start(false, path.generic_string(), lua_scripts); ec = snapshots[0]->Start(false, path.generic_string(), lua_scripts);
if (!ec) { if (!ec) {
@ -1104,12 +1127,15 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
Metrics m = GetMetrics(); Metrics m = GetMetrics();
if (should_enter("SERVER")) { if (should_enter("SERVER")) {
ProactorBase::ProactorKind kind = ProactorBase::me()->GetKind();
const char* multiplex_api = (kind == ProactorBase::IOURING) ? "iouring" : "epoll";
ADD_HEADER("# Server"); ADD_HEADER("# Server");
append("redis_version", GetVersion()); append("redis_version", GetVersion());
append("redis_mode", "standalone"); append("redis_mode", "standalone");
append("arch_bits", 64); append("arch_bits", 64);
append("multiplexing_api", "iouring"); append("multiplexing_api", multiplex_api);
append("tcp_port", GetFlag(FLAGS_port)); append("tcp_port", GetFlag(FLAGS_port));
size_t uptime = m.uptime; size_t uptime = m.uptime;

View File

@ -163,6 +163,7 @@ class ServerFamily {
std::atomic_bool is_saving_{false}; std::atomic_bool is_saving_{false};
util::fibers_ext::Done is_snapshot_done_; util::fibers_ext::Done is_snapshot_done_;
std::unique_ptr<util::fibers_ext::FiberQueueThreadPool> fq_threadpool_;
}; };
} // namespace dfly } // namespace dfly