Bind redis parser to dragonfly connection

This commit is contained in:
Roman Gershman 2021-11-16 15:04:32 +02:00
parent f2bc27e283
commit 48589604fc
6 changed files with 57 additions and 11 deletions

View File

@ -9,6 +9,7 @@
#include "base/io_buf.h" #include "base/io_buf.h"
#include "base/logging.h" #include "base/logging.h"
#include "server/main_service.h" #include "server/main_service.h"
#include "server/redis_parser.h"
#include "util/fiber_sched_algo.h" #include "util/fiber_sched_algo.h"
using namespace util; using namespace util;
@ -41,6 +42,7 @@ struct Connection::Shutdown {
Connection::Connection(Service* service) Connection::Connection(Service* service)
: service_(service) { : service_(service) {
redis_parser_.reset(new RedisParser);
} }
Connection::~Connection() { Connection::~Connection() {
@ -84,7 +86,7 @@ void Connection::HandleRequests() {
void Connection::InputLoop(FiberSocketBase* peer) { void Connection::InputLoop(FiberSocketBase* peer) {
base::IoBuf io_buf{kMinReadSize}; base::IoBuf io_buf{kMinReadSize};
ParserStatus status = OK;
std::error_code ec; std::error_code ec;
do { do {
@ -93,13 +95,17 @@ void Connection::InputLoop(FiberSocketBase* peer) {
if (!recv_sz) { if (!recv_sz) {
ec = recv_sz.error(); ec = recv_sz.error();
status = OK;
break; break;
} }
io_buf.CommitWrite(*recv_sz); io_buf.CommitWrite(*recv_sz);
ec = peer->Write(io_buf.InputBuffer()); status = ParseRedis(&io_buf, peer);
if (ec) if (status == NEED_MORE) {
status = OK;
} else if (status != OK) {
break; break;
}
} while (peer->IsOpen()); } while (peer->IsOpen());
if (ec && !FiberSocketBase::IsConnClosed(ec)) { if (ec && !FiberSocketBase::IsConnClosed(ec)) {
@ -107,4 +113,37 @@ void Connection::InputLoop(FiberSocketBase* peer) {
} }
} }
auto Connection::ParseRedis(base::IoBuf* io_buf, util::FiberSocketBase* peer) -> ParserStatus {
RespVec args;
uint32_t consumed = 0;
RedisParser::Result result = RedisParser::OK;
error_code ec;
do {
result = redis_parser_->Parse(io_buf->InputBuffer(), &consumed, &args);
if (result == RedisParser::OK && !args.empty()) {
RespExpr& first = args.front();
if (first.type == RespExpr::STRING) {
DVLOG(2) << "Got Args with first token " << ToSV(first.GetBuf());
}
CHECK_EQ(RespExpr::STRING, first.type); // TODO
string_view sv = ToSV(first.GetBuf());
if (sv == "PING") {
ec = peer->Write(io::Buffer("PONG\r\n"));
}
}
io_buf->ConsumeInput(consumed);
} while (RedisParser::OK == result && !ec);
parser_error_ = result;
if (result == RedisParser::OK)
return OK;
if (result == RedisParser::INPUT_PENDING)
return NEED_MORE;
return ERROR;
}
} // namespace dfly } // namespace dfly

View File

@ -6,9 +6,12 @@
#include "util/connection.h" #include "util/connection.h"
#include "base/io_buf.h"
namespace dfly { namespace dfly {
class Service; class Service;
class RedisParser;
class Connection : public util::Connection { class Connection : public util::Connection {
public: public:
@ -26,13 +29,17 @@ class Connection : public util::Connection {
void OnShutdown() override; void OnShutdown() override;
private: private:
enum ParserStatus { OK, NEED_MORE, ERROR };
void HandleRequests() final; void HandleRequests() final;
void InputLoop(util::FiberSocketBase* peer); void InputLoop(util::FiberSocketBase* peer);
void DispatchFiber(util::FiberSocketBase* peer);
ParserStatus ParseRedis(base::IoBuf* buf, util::FiberSocketBase* peer);
std::unique_ptr<RedisParser> redis_parser_;
Service* service_; Service* service_;
unsigned parser_error_ = 0;
struct Shutdown; struct Shutdown;
std::unique_ptr<Shutdown> shutdown_; std::unique_ptr<Shutdown> shutdown_;
}; };

View File

@ -32,14 +32,14 @@ namespace std {
ostream& operator<<(ostream& os, const dfly::RespExpr& e) { ostream& operator<<(ostream& os, const dfly::RespExpr& e) {
using dfly::RespExpr; using dfly::RespExpr;
using dfly::ToAbsl; using dfly::ToSV;
switch (e.type) { switch (e.type) {
case RespExpr::INT64: case RespExpr::INT64:
os << "i" << get<int64_t>(e.u); os << "i" << get<int64_t>(e.u);
break; break;
case RespExpr::STRING: case RespExpr::STRING:
os << "'" << ToAbsl(get<RespExpr::Buffer>(e.u)) << "'"; os << "'" << ToSV(e.GetBuf()) << "'";
break; break;
case RespExpr::NIL: case RespExpr::NIL:
os << "nil"; os << "nil";
@ -51,7 +51,7 @@ ostream& operator<<(ostream& os, const dfly::RespExpr& e) {
os << dfly::RespSpan{*get<RespExpr::Vec*>(e.u)}; os << dfly::RespSpan{*get<RespExpr::Vec*>(e.u)};
break; break;
case RespExpr::ERROR: case RespExpr::ERROR:
os << "e(" << ToAbsl(get<RespExpr::Buffer>(e.u)) << ")"; os << "e(" << ToSV(e.GetBuf()) << ")";
break; break;
} }

View File

@ -39,7 +39,7 @@ class RespExpr {
using RespVec = RespExpr::Vec; using RespVec = RespExpr::Vec;
using RespSpan = absl::Span<const RespExpr>; using RespSpan = absl::Span<const RespExpr>;
inline std::string_view ToAbsl(const absl::Span<uint8_t>& s) { inline std::string_view ToSV(const absl::Span<uint8_t>& s) {
return std::string_view{reinterpret_cast<char*>(s.data()), s.size()}; return std::string_view{reinterpret_cast<char*>(s.data()), s.size()};
} }

View File

@ -103,7 +103,7 @@ vector<int64_t> ToIntArr(const RespVec& vec) {
vector<int64_t> res; vector<int64_t> res;
for (auto a : vec) { for (auto a : vec) {
int64_t val; int64_t val;
std::string_view s = ToAbsl(a.GetBuf()); std::string_view s = ToSV(a.GetBuf());
CHECK(absl::SimpleAtoi(s, &val)) << s; CHECK(absl::SimpleAtoi(s, &val)) << s;
res.push_back(val); res.push_back(val);
} }

View File

@ -74,7 +74,7 @@ inline ::testing::PolymorphicMatcher<RespTypeMatcher> ArgType(RespExpr::Type t)
} }
inline bool operator==(const RespExpr& left, const char* s) { inline bool operator==(const RespExpr& left, const char* s) {
return left.type == RespExpr::STRING && ToAbsl(left.GetBuf()) == s; return left.type == RespExpr::STRING && ToSV(left.GetBuf()) == s;
} }
void PrintTo(const RespExpr::Vec& vec, std::ostream* os); void PrintTo(const RespExpr::Vec& vec, std::ostream* os);