diff --git a/CMakeLists.txt b/CMakeLists.txt index d7cee2e..8b62a57 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -24,3 +24,4 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}) include_directories(async) add_subdirectory(async) +add_subdirectory(server) diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt new file mode 100644 index 0000000..651f46e --- /dev/null +++ b/server/CMakeLists.txt @@ -0,0 +1,7 @@ +add_executable(dragonfly dfly_main.cc) +cxx_link(dragonfly base dragonfly_lib) + +add_library(dragonfly_lib dragonfly_listener.cc dragonfly_connection.cc main_service.cc) + +cxx_link(dragonfly_lib uring_fiber_lib + fibers_ext strings_lib http_server_lib) diff --git a/server/dfly_main.cc b/server/dfly_main.cc new file mode 100644 index 0000000..de897be --- /dev/null +++ b/server/dfly_main.cc @@ -0,0 +1,62 @@ +// Copyright 2021, Beeri 15. All rights reserved. +// Author: Roman Gershman (romange@gmail.com) +// + +#include "base/init.h" +#include "server/main_service.h" +#include "server/dragonfly_listener.h" +#include "util/accept_server.h" +#include "util/uring/uring_pool.h" +#include "util/varz.h" + +DEFINE_int32(http_port, 8080, "Http port."); +DECLARE_uint32(port); + +using namespace util; + +namespace dfly { + +void RunEngine(ProactorPool* pool, AcceptServer* acceptor, HttpListener<>* http) { + Service service(pool); + service.Init(acceptor); + + if (http) { + service.RegisterHttp(http); + } + + acceptor->AddListener(FLAGS_port, new Listener{&service}); + acceptor->Run(); + acceptor->Wait(); + + service.Shutdown(); +} + +} // namespace dfly + + +int main(int argc, char* argv[]) { + MainInitGuard guard(&argc, &argv); + + CHECK_GT(FLAGS_port, 0u); + + uring::UringPool pp{1024}; + pp.Run(); + + AcceptServer acceptor(&pp); + HttpListener<>* http_listener = nullptr; + + if (FLAGS_http_port >= 0) { + http_listener = new HttpListener<>; + + // Ownership over http_listener is moved to the acceptor. + uint16_t port = acceptor.AddListener(FLAGS_http_port, http_listener); + + LOG(INFO) << "Started http service on port " << port; + } + + dfly::RunEngine(&pp, &acceptor, http_listener); + + pp.Stop(); + + return 0; +} diff --git a/server/dragonfly_connection.cc b/server/dragonfly_connection.cc new file mode 100644 index 0000000..956f17a --- /dev/null +++ b/server/dragonfly_connection.cc @@ -0,0 +1,110 @@ +// Copyright 2021, Beeri 15. All rights reserved. +// Author: Roman Gershman (romange@gmail.com) +// + +#include "server/dragonfly_connection.h" + +#include + +#include "base/io_buf.h" +#include "base/logging.h" +#include "server/main_service.h" +#include "util/fiber_sched_algo.h" + +using namespace util; +using namespace std; +namespace this_fiber = boost::this_fiber; +namespace fibers = boost::fibers; + +namespace dfly { +namespace { + + + +constexpr size_t kMinReadSize = 256; + +} // namespace + +struct Connection::Shutdown { + absl::flat_hash_map map; + ShutdownHandle next_handle = 1; + + ShutdownHandle Add(ShutdownCb cb) { + map[next_handle] = move(cb); + return next_handle++; + } + + void Remove(ShutdownHandle sh) { + map.erase(sh); + } +}; + +Connection::Connection(Service* service) + : service_(service) { +} + +Connection::~Connection() { +} + +void Connection::OnShutdown() { + VLOG(1) << "Connection::OnShutdown"; + if (shutdown_) { + for (const auto& k_v : shutdown_->map) { + k_v.second(); + } + } +} + +auto Connection::RegisterShutdownHook(ShutdownCb cb) -> ShutdownHandle { + if (!shutdown_) { + shutdown_ = make_unique(); + } + return shutdown_->Add(std::move(cb)); +} + +void Connection::UnregisterShutdownHook(ShutdownHandle id) { + if (shutdown_) { + shutdown_->Remove(id); + if (shutdown_->map.empty()) + shutdown_.reset(); + } +} + +void Connection::HandleRequests() { + this_fiber::properties().set_name("DflyConnection"); + + int val = 1; + CHECK_EQ(0, setsockopt(socket_->native_handle(), SOL_TCP, TCP_NODELAY, &val, sizeof(val))); + + FiberSocketBase* peer = socket_.get(); + InputLoop(peer); + + VLOG(1) << "Closed connection for peer " << socket_->RemoteEndpoint(); +} + +void Connection::InputLoop(FiberSocketBase* peer) { + base::IoBuf io_buf{kMinReadSize}; + + std::error_code ec; + + do { + auto buf = io_buf.AppendBuffer(); + ::io::Result recv_sz = peer->Recv(buf); + + if (!recv_sz) { + ec = recv_sz.error(); + break; + } + + io_buf.CommitWrite(*recv_sz); + ec = peer->Write(io_buf.InputBuffer()); + if (ec) + break; + } while (peer->IsOpen()); + + if (ec && !FiberSocketBase::IsConnClosed(ec)) { + LOG(WARNING) << "Socket error " << ec; + } +} + +} // namespace dfly diff --git a/server/dragonfly_connection.h b/server/dragonfly_connection.h new file mode 100644 index 0000000..9aafba7 --- /dev/null +++ b/server/dragonfly_connection.h @@ -0,0 +1,40 @@ +// Copyright 2021, Beeri 15. All rights reserved. +// Author: Roman Gershman (romange@gmail.com) +// + +#pragma once + +#include "util/connection.h" + +namespace dfly { + +class Service; + +class Connection : public util::Connection { + public: + Connection(Service* service); + ~Connection(); + + using error_code = std::error_code; + using ShutdownCb = std::function; + using ShutdownHandle = unsigned; + + ShutdownHandle RegisterShutdownHook(ShutdownCb cb); + void UnregisterShutdownHook(ShutdownHandle id); + + protected: + void OnShutdown() override; + + private: + void HandleRequests() final; + + void InputLoop(util::FiberSocketBase* peer); + void DispatchFiber(util::FiberSocketBase* peer); + + Service* service_; + + struct Shutdown; + std::unique_ptr shutdown_; +}; + +} // namespace dfly diff --git a/server/dragonfly_listener.cc b/server/dragonfly_listener.cc new file mode 100644 index 0000000..449c825 --- /dev/null +++ b/server/dragonfly_listener.cc @@ -0,0 +1,47 @@ +// Copyright 2021, Beeri 15. All rights reserved. +// Author: Roman Gershman (romange@gmail.com) +// + +#include "server/dragonfly_listener.h" + +#include "base/logging.h" +#include "server/dragonfly_connection.h" +#include "util/proactor_pool.h" + +using namespace util; + +DEFINE_uint32(conn_threads, 0, "Number of threads used for handing server connections"); + + +namespace dfly { + +Listener::Listener(Service* e) : engine_(e) { +} + +Listener::~Listener() { +} + +util::Connection* Listener::NewConnection(ProactorBase* proactor) { + return new Connection{engine_}; +} + +void Listener::PreShutdown() { +} + +void Listener::PostShutdown() { +} + +// We can limit number of threads handling dragonfly connections. +ProactorBase* Listener::PickConnectionProactor(LinuxSocketBase* sock) { + uint32_t id = next_id_.fetch_add(1, std::memory_order_relaxed); + uint32_t total = FLAGS_conn_threads; + util::ProactorPool* pp = pool(); + + if (total == 0 || total > pp->size()) { + total = pp->size(); + } + + return pp->at(id % total); +} + +} // namespace dfly diff --git a/server/dragonfly_listener.h b/server/dragonfly_listener.h new file mode 100644 index 0000000..f1e9098 --- /dev/null +++ b/server/dragonfly_listener.h @@ -0,0 +1,31 @@ +// Copyright 2021, Beeri 15. All rights reserved. +// Author: Roman Gershman (romange@gmail.com) +// + +#pragma once + +#include "util/listener_interface.h" + +namespace dfly { + +class Service; + +class Listener : public util::ListenerInterface { + public: + Listener(Service*); + ~Listener(); + + private: + util::Connection* NewConnection(util::ProactorBase* proactor) final; + util::ProactorBase* PickConnectionProactor(util::LinuxSocketBase* sock) final; + + void PreShutdown(); + + void PostShutdown(); + + Service* engine_; + + std::atomic_uint32_t next_id_{0}; +}; + +} // namespace dfly diff --git a/server/main_service.cc b/server/main_service.cc new file mode 100644 index 0000000..15742a4 --- /dev/null +++ b/server/main_service.cc @@ -0,0 +1,64 @@ +// Copyright 2021, Beeri 15. All rights reserved. +// Author: Roman Gershman (romange@gmail.com) +// + +#include "server/main_service.h" + +#include +#include + +#include "base/logging.h" +#include "util/uring/uring_fiber_algo.h" +#include "util/varz.h" + +DEFINE_uint32(port, 6380, "Redis port"); + + +namespace dfly { + +using namespace std; +using namespace util; +using base::VarzValue; + +namespace fibers = ::boost::fibers; +namespace this_fiber = ::boost::this_fiber; + +namespace { + +DEFINE_VARZ(VarzMapAverage, request_latency_usec); + +std::optional engine_varz; + +} // namespace + +Service::Service(ProactorPool* pp) + : pp_(*pp) { + CHECK(pp); + engine_varz.emplace("engine", [this] { return GetVarzStats(); }); +} + +Service::~Service() { + engine_varz.reset(); +} + +void Service::Init(util::AcceptServer* acceptor) { + request_latency_usec.Init(&pp_); +} + +void Service::Shutdown() { + request_latency_usec.Shutdown(); +} + +void Service::RegisterHttp(HttpListenerBase* listener) { + CHECK_NOTNULL(listener); +} + +VarzValue::Map Service::GetVarzStats() { + VarzValue::Map res; + + res.emplace_back("keys", VarzValue::FromInt(0)); + + return res; +} + +} // namespace dfly diff --git a/server/main_service.h b/server/main_service.h new file mode 100644 index 0000000..fa1d583 --- /dev/null +++ b/server/main_service.h @@ -0,0 +1,40 @@ +// Copyright 2021, Beeri 15. All rights reserved. +// Author: Roman Gershman (romange@gmail.com) +// + +#pragma once + +#include "base/varz_value.h" +#include "util/http/http_handler.h" + +namespace util { +class AcceptServer; +} // namespace util + +namespace dfly { + +class Service { + public: + using error_code = std::error_code; + + explicit Service(util::ProactorPool* pp); + ~Service(); + + void RegisterHttp(util::HttpListenerBase* listener); + + void Init(util::AcceptServer* acceptor); + + void Shutdown(); + + util::ProactorPool& proactor_pool() { + return pp_; + } + + private: + + base::VarzValue::Map GetVarzStats(); + + util::ProactorPool& pp_; +}; + +} // namespace dfly