2021-11-16 16:39:38 +08:00
|
|
|
// Copyright 2021, Beeri 15. All rights reserved.
|
|
|
|
// Author: Roman Gershman (romange@gmail.com)
|
|
|
|
//
|
|
|
|
|
|
|
|
#include "server/dragonfly_connection.h"
|
|
|
|
|
|
|
|
#include <boost/fiber/operations.hpp>
|
|
|
|
|
|
|
|
#include "base/io_buf.h"
|
|
|
|
#include "base/logging.h"
|
2021-11-20 00:00:14 +08:00
|
|
|
#include "server/command_registry.h"
|
|
|
|
#include "server/conn_context.h"
|
2021-11-16 16:39:38 +08:00
|
|
|
#include "server/main_service.h"
|
2021-11-23 18:39:35 +08:00
|
|
|
#include "server/memcache_parser.h"
|
2021-11-16 21:04:32 +08:00
|
|
|
#include "server/redis_parser.h"
|
2021-11-16 16:39:38 +08:00
|
|
|
#include "util/fiber_sched_algo.h"
|
2021-11-20 00:00:14 +08:00
|
|
|
#include "util/tls/tls_socket.h"
|
2021-11-16 16:39:38 +08:00
|
|
|
|
|
|
|
using namespace util;
|
|
|
|
using namespace std;
|
|
|
|
namespace this_fiber = boost::this_fiber;
|
|
|
|
namespace fibers = boost::fibers;
|
|
|
|
|
|
|
|
namespace dfly {
|
|
|
|
namespace {
|
|
|
|
|
2021-11-19 00:38:20 +08:00
|
|
|
using CmdArgVec = std::vector<MutableStrSpan>;
|
2021-11-16 16:39:38 +08:00
|
|
|
|
2021-11-19 00:38:20 +08:00
|
|
|
void SendProtocolError(RedisParser::Result pres, FiberSocketBase* peer) {
|
|
|
|
string res("-ERR Protocol error: ");
|
|
|
|
if (pres == RedisParser::BAD_BULKLEN) {
|
|
|
|
res.append("invalid bulk length\r\n");
|
|
|
|
} else {
|
|
|
|
CHECK_EQ(RedisParser::BAD_ARRAYLEN, pres);
|
|
|
|
res.append("invalid multibulk length\r\n");
|
|
|
|
}
|
|
|
|
|
|
|
|
auto size_res = peer->Send(::io::Buffer(res));
|
|
|
|
if (!size_res) {
|
|
|
|
LOG(WARNING) << "Error " << size_res.error();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
inline MutableStrSpan ToMSS(absl::Span<uint8_t> span) {
|
|
|
|
return MutableStrSpan{reinterpret_cast<char*>(span.data()), span.size()};
|
|
|
|
}
|
|
|
|
|
|
|
|
void RespToArgList(const RespVec& src, CmdArgVec* dest) {
|
|
|
|
dest->resize(src.size());
|
|
|
|
for (size_t i = 0; i < src.size(); ++i) {
|
|
|
|
(*dest)[i] = ToMSS(src[i].GetBuf());
|
|
|
|
}
|
|
|
|
}
|
2021-11-16 16:39:38 +08:00
|
|
|
|
|
|
|
constexpr size_t kMinReadSize = 256;
|
|
|
|
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
struct Connection::Shutdown {
|
|
|
|
absl::flat_hash_map<ShutdownHandle, ShutdownCb> map;
|
|
|
|
ShutdownHandle next_handle = 1;
|
|
|
|
|
|
|
|
ShutdownHandle Add(ShutdownCb cb) {
|
|
|
|
map[next_handle] = move(cb);
|
|
|
|
return next_handle++;
|
|
|
|
}
|
|
|
|
|
|
|
|
void Remove(ShutdownHandle sh) {
|
|
|
|
map.erase(sh);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2021-11-23 18:39:35 +08:00
|
|
|
Connection::Connection(Protocol protocol, Service* service, SSL_CTX* ctx)
|
|
|
|
: service_(service), ctx_(ctx) {
|
|
|
|
protocol_ = protocol;
|
|
|
|
|
|
|
|
switch (protocol) {
|
|
|
|
case Protocol::REDIS:
|
2021-11-16 21:04:32 +08:00
|
|
|
redis_parser_.reset(new RedisParser);
|
2021-11-23 18:39:35 +08:00
|
|
|
break;
|
|
|
|
case Protocol::MEMCACHE:
|
|
|
|
memcache_parser_.reset(new MemcacheParser);
|
|
|
|
break;
|
|
|
|
}
|
2021-11-16 16:39:38 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
Connection::~Connection() {
|
|
|
|
}
|
|
|
|
|
|
|
|
void Connection::OnShutdown() {
|
|
|
|
VLOG(1) << "Connection::OnShutdown";
|
|
|
|
if (shutdown_) {
|
|
|
|
for (const auto& k_v : shutdown_->map) {
|
|
|
|
k_v.second();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
auto Connection::RegisterShutdownHook(ShutdownCb cb) -> ShutdownHandle {
|
|
|
|
if (!shutdown_) {
|
|
|
|
shutdown_ = make_unique<Shutdown>();
|
|
|
|
}
|
|
|
|
return shutdown_->Add(std::move(cb));
|
|
|
|
}
|
|
|
|
|
|
|
|
void Connection::UnregisterShutdownHook(ShutdownHandle id) {
|
|
|
|
if (shutdown_) {
|
|
|
|
shutdown_->Remove(id);
|
|
|
|
if (shutdown_->map.empty())
|
|
|
|
shutdown_.reset();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void Connection::HandleRequests() {
|
|
|
|
this_fiber::properties<FiberProps>().set_name("DflyConnection");
|
|
|
|
|
|
|
|
int val = 1;
|
|
|
|
CHECK_EQ(0, setsockopt(socket_->native_handle(), SOL_TCP, TCP_NODELAY, &val, sizeof(val)));
|
|
|
|
|
2021-11-20 00:00:14 +08:00
|
|
|
std::unique_ptr<tls::TlsSocket> tls_sock;
|
|
|
|
if (ctx_) {
|
|
|
|
tls_sock.reset(new tls::TlsSocket(socket_.get()));
|
|
|
|
tls_sock->InitSSL(ctx_);
|
|
|
|
|
|
|
|
FiberSocketBase::accept_result aresult = tls_sock->Accept();
|
|
|
|
if (!aresult) {
|
|
|
|
LOG(WARNING) << "Error handshaking " << aresult.error().message();
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
VLOG(1) << "TLS handshake succeeded";
|
|
|
|
}
|
|
|
|
FiberSocketBase* peer = tls_sock ? (FiberSocketBase*)tls_sock.get() : socket_.get();
|
2021-11-17 22:38:32 +08:00
|
|
|
cc_.reset(new ConnectionContext(peer, this));
|
|
|
|
cc_->shard_set = &service_->shard_set();
|
|
|
|
|
2021-11-16 16:39:38 +08:00
|
|
|
InputLoop(peer);
|
|
|
|
|
|
|
|
VLOG(1) << "Closed connection for peer " << socket_->RemoteEndpoint();
|
|
|
|
}
|
|
|
|
|
|
|
|
void Connection::InputLoop(FiberSocketBase* peer) {
|
|
|
|
base::IoBuf io_buf{kMinReadSize};
|
2021-11-16 21:04:32 +08:00
|
|
|
ParserStatus status = OK;
|
2021-11-16 16:39:38 +08:00
|
|
|
std::error_code ec;
|
|
|
|
|
|
|
|
do {
|
|
|
|
auto buf = io_buf.AppendBuffer();
|
|
|
|
::io::Result<size_t> recv_sz = peer->Recv(buf);
|
|
|
|
|
|
|
|
if (!recv_sz) {
|
|
|
|
ec = recv_sz.error();
|
2021-11-16 21:04:32 +08:00
|
|
|
status = OK;
|
2021-11-16 16:39:38 +08:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
io_buf.CommitWrite(*recv_sz);
|
2021-11-23 18:39:35 +08:00
|
|
|
|
|
|
|
if (redis_parser_)
|
2021-11-17 22:38:32 +08:00
|
|
|
status = ParseRedis(&io_buf);
|
2021-11-23 18:39:35 +08:00
|
|
|
else {
|
|
|
|
DCHECK(memcache_parser_);
|
|
|
|
status = ParseMemcache(&io_buf);
|
|
|
|
}
|
|
|
|
|
2021-11-20 00:00:14 +08:00
|
|
|
if (status == NEED_MORE) {
|
2021-11-16 21:04:32 +08:00
|
|
|
status = OK;
|
|
|
|
} else if (status != OK) {
|
2021-11-16 16:39:38 +08:00
|
|
|
break;
|
2021-11-16 21:04:32 +08:00
|
|
|
}
|
2021-11-19 00:38:20 +08:00
|
|
|
} while (peer->IsOpen() && !cc_->ec());
|
|
|
|
|
|
|
|
if (cc_->ec()) {
|
|
|
|
ec = cc_->ec();
|
|
|
|
} else {
|
|
|
|
if (status == ERROR) {
|
|
|
|
VLOG(1) << "Error stats " << status;
|
|
|
|
if (redis_parser_) {
|
|
|
|
SendProtocolError(RedisParser::Result(parser_error_), peer);
|
|
|
|
} else {
|
|
|
|
string_view sv{"CLIENT_ERROR bad command line format\r\n"};
|
|
|
|
auto size_res = peer->Send(::io::Buffer(sv));
|
|
|
|
if (!size_res) {
|
|
|
|
LOG(WARNING) << "Error " << size_res.error();
|
|
|
|
ec = size_res.error();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-11-16 16:39:38 +08:00
|
|
|
|
|
|
|
if (ec && !FiberSocketBase::IsConnClosed(ec)) {
|
|
|
|
LOG(WARNING) << "Socket error " << ec;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-11-17 22:38:32 +08:00
|
|
|
auto Connection::ParseRedis(base::IoBuf* io_buf) -> ParserStatus {
|
2021-11-16 21:04:32 +08:00
|
|
|
RespVec args;
|
2021-11-19 00:38:20 +08:00
|
|
|
CmdArgVec arg_vec;
|
2021-11-16 21:04:32 +08:00
|
|
|
uint32_t consumed = 0;
|
|
|
|
|
|
|
|
RedisParser::Result result = RedisParser::OK;
|
2021-11-17 22:38:32 +08:00
|
|
|
|
2021-11-16 21:04:32 +08:00
|
|
|
do {
|
|
|
|
result = redis_parser_->Parse(io_buf->InputBuffer(), &consumed, &args);
|
|
|
|
|
|
|
|
if (result == RedisParser::OK && !args.empty()) {
|
|
|
|
RespExpr& first = args.front();
|
|
|
|
if (first.type == RespExpr::STRING) {
|
|
|
|
DVLOG(2) << "Got Args with first token " << ToSV(first.GetBuf());
|
|
|
|
}
|
|
|
|
|
2021-11-19 00:38:20 +08:00
|
|
|
RespToArgList(args, &arg_vec);
|
|
|
|
service_->DispatchCommand(CmdArgList{arg_vec.data(), arg_vec.size()}, cc_.get());
|
2021-11-16 21:04:32 +08:00
|
|
|
}
|
|
|
|
io_buf->ConsumeInput(consumed);
|
2021-11-17 22:38:32 +08:00
|
|
|
} while (RedisParser::OK == result && !cc_->ec());
|
2021-11-16 21:04:32 +08:00
|
|
|
|
|
|
|
parser_error_ = result;
|
|
|
|
if (result == RedisParser::OK)
|
|
|
|
return OK;
|
|
|
|
|
|
|
|
if (result == RedisParser::INPUT_PENDING)
|
|
|
|
return NEED_MORE;
|
|
|
|
|
|
|
|
return ERROR;
|
|
|
|
}
|
2021-11-19 00:38:20 +08:00
|
|
|
|
2021-11-23 18:39:35 +08:00
|
|
|
auto Connection::ParseMemcache(base::IoBuf* io_buf) -> ParserStatus {
|
|
|
|
MemcacheParser::Result result = MemcacheParser::OK;
|
|
|
|
uint32_t consumed = 0;
|
|
|
|
MemcacheParser::Command cmd;
|
|
|
|
string_view value;
|
|
|
|
do {
|
|
|
|
string_view str = ToSV(io_buf->InputBuffer());
|
|
|
|
result = memcache_parser_->Parse(str, &consumed, &cmd);
|
|
|
|
|
|
|
|
if (result != MemcacheParser::OK) {
|
|
|
|
io_buf->ConsumeInput(consumed);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t total_len = consumed;
|
|
|
|
if (MemcacheParser::IsStoreCmd(cmd.type)) {
|
|
|
|
total_len += cmd.bytes_len + 2;
|
|
|
|
if (io_buf->InputLen() >= total_len) {
|
|
|
|
value = str.substr(consumed, cmd.bytes_len);
|
|
|
|
// TODO: dispatch.
|
|
|
|
} else {
|
|
|
|
return NEED_MORE;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
service_->DispatchMC(cmd, value, cc_.get());
|
|
|
|
io_buf->ConsumeInput(total_len);
|
|
|
|
} while (!cc_->ec());
|
|
|
|
|
|
|
|
parser_error_ = result;
|
|
|
|
|
|
|
|
if (result == MemcacheParser::OK)
|
|
|
|
return OK;
|
|
|
|
|
|
|
|
if (result == MemcacheParser::INPUT_PENDING)
|
|
|
|
return NEED_MORE;
|
|
|
|
|
|
|
|
return ERROR;
|
|
|
|
}
|
|
|
|
|
2021-11-16 16:39:38 +08:00
|
|
|
} // namespace dfly
|