Serve http requests from redis port

This commit is contained in:
Roman Gershman 2022-01-28 23:11:01 +02:00
parent e68977a7bf
commit 7d5ad8cc5b
5 changed files with 56 additions and 49 deletions

2
helio

@ -1 +1 @@
Subproject commit f7c9d00016ea48866632583918b042b744f261b9
Subproject commit bf4af91fad4ccd2fa1a2f64d01547a0701e4978a

View File

@ -3,28 +3,25 @@
//
#include "base/init.h"
#include "server/main_service.h"
#include "server/dragonfly_listener.h"
#include "server/main_service.h"
#include "util/accept_server.h"
#include "util/uring/uring_pool.h"
#include "util/varz.h"
DEFINE_int32(http_port, 8080, "Http port.");
DECLARE_uint32(port);
DECLARE_uint32(memcache_port);
using namespace util;
using namespace std;
namespace dfly {
void RunEngine(ProactorPool* pool, AcceptServer* acceptor, HttpListener<>* http) {
Service service(pool);
service.RegisterHttp(http);
service.Init(acceptor);
if (http) {
service.RegisterHttp(http);
}
acceptor->AddListener(FLAGS_port, new Listener{Protocol::REDIS, &service});
if (FLAGS_memcache_port > 0) {
acceptor->AddListener(FLAGS_memcache_port, new Listener{Protocol::MEMCACHE, &service});
@ -38,7 +35,6 @@ void RunEngine(ProactorPool* pool, AcceptServer* acceptor, HttpListener<>* http)
} // namespace dfly
int main(int argc, char* argv[]) {
MainInitGuard guard(&argc, &argv);
@ -48,19 +44,11 @@ int main(int argc, char* argv[]) {
pp.Run();
AcceptServer acceptor(&pp);
HttpListener<>* http_listener = nullptr;
unique_ptr<HttpListener<>> http_listener(new HttpListener<>);
if (FLAGS_http_port >= 0) {
http_listener = new HttpListener<>;
http_listener->enable_metrics();
http_listener->enable_metrics();
// Ownership over http_listener is moved to the acceptor.
uint16_t port = acceptor.AddListener(FLAGS_http_port, http_listener);
LOG(INFO) << "Started http service on port " << port;
}
dfly::RunEngine(&pp, &acceptor, http_listener);
dfly::RunEngine(&pp, &acceptor, http_listener.get());
pp.Stop();

View File

@ -124,8 +124,10 @@ void Connection::HandleRequests() {
this_fiber::properties<FiberProps>().set_name("DflyConnection");
int val = 1;
CHECK_EQ(0, setsockopt(socket_->native_handle(), SOL_TCP, TCP_NODELAY, &val, sizeof(val)));
auto remote_ep = socket_->RemoteEndpoint();
LinuxSocketBase* lsb = static_cast<LinuxSocketBase*>(socket_.get());
CHECK_EQ(0, setsockopt(lsb->native_handle(), SOL_TCP, TCP_NODELAY, &val, sizeof(val)));
auto remote_ep = lsb->RemoteEndpoint();
std::unique_ptr<tls::TlsSocket> tls_sock;
if (ctx_) {
@ -141,36 +143,46 @@ void Connection::HandleRequests() {
}
FiberSocketBase* peer = tls_sock ? (FiberSocketBase*)tls_sock.get() : socket_.get();
cc_.reset(new ConnectionContext(peer, this));
cc_->shard_set = &service_->shard_set();
io::Result<bool> http_res = CheckForHttpProto(peer);
// TODO: to move this interface to LinuxSocketBase so we won't need to cast.
uring::UringSocket* us = static_cast<uring::UringSocket*>(socket_.get());
if (http_res) {
if (*http_res) {
VLOG(1) << "HTTP1.1 identified";
HttpConnection http_conn{service_->http_listener()};
http_conn.SetSocket(peer);
auto ec = http_conn.ParseFromBuffer(io_buf_.InputBuffer());
io_buf_.ConsumeInput(io_buf_.InputLen());
if (!ec) {
http_conn.HandleRequests();
}
http_conn.ReleaseSocket();
} else {
cc_.reset(new ConnectionContext(peer, this));
cc_->shard_set = &service_->shard_set();
bool poll_armed = true;
uint32_t poll_id = us->PollEvent(POLLERR | POLLHUP, [&](uint32_t mask) {
VLOG(1) << "Got event " << mask;
cc_->conn_state.mask |= ConnectionState::CONN_CLOSING;
if (cc_->transaction) {
cc_->transaction->BreakOnClose();
// TODO: to move this interface to LinuxSocketBase so we won't need to cast.
uring::UringSocket* us = static_cast<uring::UringSocket*>(socket_.get());
bool poll_armed = true;
uint32_t poll_id = us->PollEvent(POLLERR | POLLHUP, [&](uint32_t mask) {
VLOG(1) << "Got event " << mask;
cc_->conn_state.mask |= ConnectionState::CONN_CLOSING;
if (cc_->transaction) {
cc_->transaction->BreakOnClose();
}
evc_.notify(); // Notify dispatch fiber.
poll_armed = false;
});
InputLoop(peer);
if (poll_armed) {
us->CancelPoll(poll_id);
}
}
evc_.notify(); // Notify dispatch fiber.
poll_armed = false;
});
io::Result<bool> check_res = CheckForHttpProto(peer);
if (!check_res)
return;
if (*check_res) {
LOG(INFO) << "HTTP1.1 identified";
}
InputLoop(peer);
if (poll_armed) {
us->CancelPoll(poll_id);
}
VLOG(1) << "Closed connection for peer " << remote_ep;
}
@ -195,6 +207,7 @@ io::Result<bool> Connection::CheckForHttpProto(util::FiberSocketBase* peer) {
}
last_len = io_buf_.InputLen();
} while (last_len < 1024);
return false;
}

View File

@ -51,7 +51,7 @@ constexpr size_t kMaxThreadSize = 1024;
} // namespace
Service::Service(ProactorPool* pp) : shard_set_(pp), pp_(*pp), server_family_(this) {
Service::Service(ProactorPool* pp) : pp_(*pp), shard_set_(pp), server_family_(this) {
CHECK(pp);
// We support less than 1024 threads and we support less than 1024 shards.
@ -255,6 +255,7 @@ bool Service::IsShardSetLocked() const {
void Service::RegisterHttp(HttpListenerBase* listener) {
CHECK_NOTNULL(listener);
http_listener_ = listener;
}
void Service::Quit(CmdArgList args, ConnectionContext* cntx) {

View File

@ -57,6 +57,8 @@ class Service {
return pp_;
}
util::HttpListenerBase* http_listener() { return http_listener_; }
private:
static void Quit(CmdArgList args, ConnectionContext* cntx);
@ -67,10 +69,13 @@ class Service {
base::VarzValue::Map GetVarzStats();
util::ProactorPool& pp_;
CommandRegistry registry_;
EngineShardSet shard_set_;
util::ProactorPool& pp_;
ServerFamily server_family_;
util::HttpListenerBase* http_listener_ = nullptr;
};
} // namespace dfly