From 7d5ad8cc5bd252ef16e2f08a9c34c756d9f41777 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Fri, 28 Jan 2022 23:11:01 +0200 Subject: [PATCH] Serve http requests from redis port --- helio | 2 +- server/dfly_main.cc | 26 ++++--------- server/dragonfly_connection.cc | 67 ++++++++++++++++++++-------------- server/main_service.cc | 3 +- server/main_service.h | 7 +++- 5 files changed, 56 insertions(+), 49 deletions(-) diff --git a/helio b/helio index f7c9d00..bf4af91 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit f7c9d00016ea48866632583918b042b744f261b9 +Subproject commit bf4af91fad4ccd2fa1a2f64d01547a0701e4978a diff --git a/server/dfly_main.cc b/server/dfly_main.cc index a608a8b..45909f2 100644 --- a/server/dfly_main.cc +++ b/server/dfly_main.cc @@ -3,28 +3,25 @@ // #include "base/init.h" -#include "server/main_service.h" #include "server/dragonfly_listener.h" +#include "server/main_service.h" #include "util/accept_server.h" #include "util/uring/uring_pool.h" #include "util/varz.h" -DEFINE_int32(http_port, 8080, "Http port."); DECLARE_uint32(port); DECLARE_uint32(memcache_port); using namespace util; +using namespace std; namespace dfly { void RunEngine(ProactorPool* pool, AcceptServer* acceptor, HttpListener<>* http) { Service service(pool); + + service.RegisterHttp(http); service.Init(acceptor); - - if (http) { - service.RegisterHttp(http); - } - acceptor->AddListener(FLAGS_port, new Listener{Protocol::REDIS, &service}); if (FLAGS_memcache_port > 0) { acceptor->AddListener(FLAGS_memcache_port, new Listener{Protocol::MEMCACHE, &service}); @@ -38,7 +35,6 @@ void RunEngine(ProactorPool* pool, AcceptServer* acceptor, HttpListener<>* http) } // namespace dfly - int main(int argc, char* argv[]) { MainInitGuard guard(&argc, &argv); @@ -48,19 +44,11 @@ int main(int argc, char* argv[]) { pp.Run(); AcceptServer acceptor(&pp); - HttpListener<>* http_listener = nullptr; + unique_ptr> http_listener(new HttpListener<>); - if (FLAGS_http_port >= 0) { - http_listener = new HttpListener<>; - http_listener->enable_metrics(); + http_listener->enable_metrics(); - // Ownership over http_listener is moved to the acceptor. - uint16_t port = acceptor.AddListener(FLAGS_http_port, http_listener); - - LOG(INFO) << "Started http service on port " << port; - } - - dfly::RunEngine(&pp, &acceptor, http_listener); + dfly::RunEngine(&pp, &acceptor, http_listener.get()); pp.Stop(); diff --git a/server/dragonfly_connection.cc b/server/dragonfly_connection.cc index a0324a6..b033c66 100644 --- a/server/dragonfly_connection.cc +++ b/server/dragonfly_connection.cc @@ -124,8 +124,10 @@ void Connection::HandleRequests() { this_fiber::properties().set_name("DflyConnection"); int val = 1; - CHECK_EQ(0, setsockopt(socket_->native_handle(), SOL_TCP, TCP_NODELAY, &val, sizeof(val))); - auto remote_ep = socket_->RemoteEndpoint(); + LinuxSocketBase* lsb = static_cast(socket_.get()); + CHECK_EQ(0, setsockopt(lsb->native_handle(), SOL_TCP, TCP_NODELAY, &val, sizeof(val))); + + auto remote_ep = lsb->RemoteEndpoint(); std::unique_ptr tls_sock; if (ctx_) { @@ -141,36 +143,46 @@ void Connection::HandleRequests() { } FiberSocketBase* peer = tls_sock ? (FiberSocketBase*)tls_sock.get() : socket_.get(); - cc_.reset(new ConnectionContext(peer, this)); - cc_->shard_set = &service_->shard_set(); + io::Result http_res = CheckForHttpProto(peer); - // TODO: to move this interface to LinuxSocketBase so we won't need to cast. - uring::UringSocket* us = static_cast(socket_.get()); + if (http_res) { + if (*http_res) { + VLOG(1) << "HTTP1.1 identified"; + HttpConnection http_conn{service_->http_listener()}; + http_conn.SetSocket(peer); + auto ec = http_conn.ParseFromBuffer(io_buf_.InputBuffer()); + io_buf_.ConsumeInput(io_buf_.InputLen()); + if (!ec) { + http_conn.HandleRequests(); + } + http_conn.ReleaseSocket(); + } else { + cc_.reset(new ConnectionContext(peer, this)); + cc_->shard_set = &service_->shard_set(); - bool poll_armed = true; - uint32_t poll_id = us->PollEvent(POLLERR | POLLHUP, [&](uint32_t mask) { - VLOG(1) << "Got event " << mask; - cc_->conn_state.mask |= ConnectionState::CONN_CLOSING; - if (cc_->transaction) { - cc_->transaction->BreakOnClose(); + // TODO: to move this interface to LinuxSocketBase so we won't need to cast. + uring::UringSocket* us = static_cast(socket_.get()); + + bool poll_armed = true; + uint32_t poll_id = us->PollEvent(POLLERR | POLLHUP, [&](uint32_t mask) { + VLOG(1) << "Got event " << mask; + cc_->conn_state.mask |= ConnectionState::CONN_CLOSING; + if (cc_->transaction) { + cc_->transaction->BreakOnClose(); + } + + evc_.notify(); // Notify dispatch fiber. + poll_armed = false; + }); + + InputLoop(peer); + + if (poll_armed) { + us->CancelPoll(poll_id); + } } - - evc_.notify(); // Notify dispatch fiber. - poll_armed = false; - }); - - io::Result check_res = CheckForHttpProto(peer); - if (!check_res) - return; - if (*check_res) { - LOG(INFO) << "HTTP1.1 identified"; } - InputLoop(peer); - - if (poll_armed) { - us->CancelPoll(poll_id); - } VLOG(1) << "Closed connection for peer " << remote_ep; } @@ -195,6 +207,7 @@ io::Result Connection::CheckForHttpProto(util::FiberSocketBase* peer) { } last_len = io_buf_.InputLen(); } while (last_len < 1024); + return false; } diff --git a/server/main_service.cc b/server/main_service.cc index 6ffb32b..023a77e 100644 --- a/server/main_service.cc +++ b/server/main_service.cc @@ -51,7 +51,7 @@ constexpr size_t kMaxThreadSize = 1024; } // namespace -Service::Service(ProactorPool* pp) : shard_set_(pp), pp_(*pp), server_family_(this) { +Service::Service(ProactorPool* pp) : pp_(*pp), shard_set_(pp), server_family_(this) { CHECK(pp); // We support less than 1024 threads and we support less than 1024 shards. @@ -255,6 +255,7 @@ bool Service::IsShardSetLocked() const { void Service::RegisterHttp(HttpListenerBase* listener) { CHECK_NOTNULL(listener); + http_listener_ = listener; } void Service::Quit(CmdArgList args, ConnectionContext* cntx) { diff --git a/server/main_service.h b/server/main_service.h index 5cc33c4..5041ba6 100644 --- a/server/main_service.h +++ b/server/main_service.h @@ -57,6 +57,8 @@ class Service { return pp_; } + util::HttpListenerBase* http_listener() { return http_listener_; } + private: static void Quit(CmdArgList args, ConnectionContext* cntx); @@ -67,10 +69,13 @@ class Service { base::VarzValue::Map GetVarzStats(); + util::ProactorPool& pp_; + CommandRegistry registry_; EngineShardSet shard_set_; - util::ProactorPool& pp_; + ServerFamily server_family_; + util::HttpListenerBase* http_listener_ = nullptr; }; } // namespace dfly