diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index fb5123a..58faf7a 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -2,8 +2,8 @@ add_executable(dragonfly dfly_main.cc) cxx_link(dragonfly base dragonfly_lib) add_library(dragonfly_lib db_slice.cc dragonfly_listener.cc dragonfly_connection.cc - main_service.cc engine_shard_set.cc - redis_parser.cc resp_expr.cc) + main_service.cc engine_shard_set.cc + redis_parser.cc resp_expr.cc reply_builder.cc) cxx_link(dragonfly_lib uring_fiber_lib fibers_ext strings_lib http_server_lib) diff --git a/server/conn_context.h b/server/conn_context.h new file mode 100644 index 0000000..cd51a99 --- /dev/null +++ b/server/conn_context.h @@ -0,0 +1,28 @@ +// Copyright 2021, Beeri 15. All rights reserved. +// Author: Roman Gershman (romange@gmail.com) +// + +#pragma once + +#include "server/reply_builder.h" + +namespace dfly { + +class Connection; +class EngineShardSet; + +class ConnectionContext : public ReplyBuilder { + public: + ConnectionContext(::io::Sink* stream, Connection* owner) : ReplyBuilder(stream), owner_(owner) { + } + + // TODO: to introduce proper accessors. + EngineShardSet* shard_set = nullptr; + + Connection* owner() { return owner_;} + + private: + Connection* owner_; +}; + +} // namespace dfly diff --git a/server/dragonfly_connection.cc b/server/dragonfly_connection.cc index 7d8a57a..951a7df 100644 --- a/server/dragonfly_connection.cc +++ b/server/dragonfly_connection.cc @@ -10,6 +10,7 @@ #include "base/logging.h" #include "server/main_service.h" #include "server/redis_parser.h" +#include "server/conn_context.h" #include "util/fiber_sched_algo.h" using namespace util; @@ -79,6 +80,9 @@ void Connection::HandleRequests() { CHECK_EQ(0, setsockopt(socket_->native_handle(), SOL_TCP, TCP_NODELAY, &val, sizeof(val))); FiberSocketBase* peer = socket_.get(); + cc_.reset(new ConnectionContext(peer, this)); + cc_->shard_set = &service_->shard_set(); + InputLoop(peer); VLOG(1) << "Closed connection for peer " << socket_->RemoteEndpoint(); @@ -100,7 +104,7 @@ void Connection::InputLoop(FiberSocketBase* peer) { } io_buf.CommitWrite(*recv_sz); - status = ParseRedis(&io_buf, peer); + status = ParseRedis(&io_buf); if (status == NEED_MORE) { status = OK; } else if (status != OK) { @@ -113,12 +117,12 @@ void Connection::InputLoop(FiberSocketBase* peer) { } } -auto Connection::ParseRedis(base::IoBuf* io_buf, util::FiberSocketBase* peer) -> ParserStatus { +auto Connection::ParseRedis(base::IoBuf* io_buf) -> ParserStatus { RespVec args; uint32_t consumed = 0; RedisParser::Result result = RedisParser::OK; - error_code ec; + do { result = redis_parser_->Parse(io_buf->InputBuffer(), &consumed, &args); @@ -131,15 +135,15 @@ auto Connection::ParseRedis(base::IoBuf* io_buf, util::FiberSocketBase* peer) -> CHECK_EQ(RespExpr::STRING, first.type); // TODO string_view sv = ToSV(first.GetBuf()); if (sv == "PING") { - ec = peer->Write(io::Buffer("PONG\r\n")); + cc_->SendSimpleString("PONG"); } else if (sv == "SET") { CHECK_EQ(3u, args.size()); service_->Set(ToSV(args[1].GetBuf()), ToSV(args[2].GetBuf())); - ec = peer->Write(io::Buffer("OK\r\n")); + cc_->SendOk(); } } io_buf->ConsumeInput(consumed); - } while (RedisParser::OK == result && !ec); + } while (RedisParser::OK == result && !cc_->ec()); parser_error_ = result; if (result == RedisParser::OK) diff --git a/server/dragonfly_connection.h b/server/dragonfly_connection.h index e541a1b..7fff4af 100644 --- a/server/dragonfly_connection.h +++ b/server/dragonfly_connection.h @@ -12,6 +12,7 @@ namespace dfly { class Service; class RedisParser; +class ConnectionContext; class Connection : public util::Connection { public: @@ -35,11 +36,14 @@ class Connection : public util::Connection { void InputLoop(util::FiberSocketBase* peer); - ParserStatus ParseRedis(base::IoBuf* buf, util::FiberSocketBase* peer); + ParserStatus ParseRedis(base::IoBuf* buf); std::unique_ptr redis_parser_; + std::unique_ptr cc_; + Service* service_; unsigned parser_error_ = 0; + struct Shutdown; std::unique_ptr shutdown_; }; diff --git a/server/reply_builder.cc b/server/reply_builder.cc new file mode 100644 index 0000000..abeb616 --- /dev/null +++ b/server/reply_builder.cc @@ -0,0 +1,99 @@ +// Copyright 2021, Beeri 15. All rights reserved. +// Author: Roman Gershman (romange@gmail.com) +// +#include "server/reply_builder.h" + +#include +#include + +#include "base/logging.h" + +using namespace std; +using absl::StrAppend; + +namespace dfly { + +namespace { + +inline iovec constexpr IoVec(std::string_view s) { + iovec r{const_cast(s.data()), s.size()}; + return r; +} + +constexpr char kCRLF[] = "\r\n"; +constexpr char kErrPref[] = "-ERR "; +constexpr char kSimplePref[] = "+"; + +} // namespace + +RespSerializer::RespSerializer(io::Sink* stream) : sink_(stream) { +} + +void RespSerializer::Send(const iovec* v, uint32_t len) { + error_code ec = sink_->Write(v, len); + if (ec) { + ec_ = ec; + } +} + +void RespSerializer::SendDirect(std::string_view raw) { + iovec v = {IoVec(raw)}; + + Send(&v, 1); +} + +void ReplyBuilder::SendBulkString(std::string_view str) { + char tmp[absl::numbers_internal::kFastToBufferSize + 3]; + tmp[0] = '$'; // Format length + char* next = absl::numbers_internal::FastIntToBuffer(uint32_t(str.size()), tmp + 1); + *next++ = '\r'; + *next++ = '\n'; + + std::string_view lenpref{tmp, size_t(next - tmp)}; + + // 3 parts: length, string and CRLF. + iovec v[3] = {IoVec(lenpref), IoVec(str), IoVec(kCRLF)}; + + return Send(v, ABSL_ARRAYSIZE(v)); +} + +void ReplyBuilder::SendError(std::string_view str) { + if (str[0] == '-') { + iovec v[] = {IoVec(str), IoVec(kCRLF)}; + return Send(v, ABSL_ARRAYSIZE(v)); + } else { + iovec v[] = {IoVec(kErrPref), IoVec(str), IoVec(kCRLF)}; + return Send(v, ABSL_ARRAYSIZE(v)); + } +} + +void ReplyBuilder::SendError(OpStatus status) { + switch (status) { + case OpStatus::OK: + SendOk(); + break; + case OpStatus::KEY_NOTFOUND: + SendError("no such key"); + break; + default: + LOG(ERROR) << "Unsupported status " << status; + SendError("Internal error"); + break; + } +} + +void ReplyBuilder::SendNull() { + constexpr char kNullStr[] = "$-1\r\n"; + + iovec v[] = {IoVec(kNullStr)}; + + Send(v, ABSL_ARRAYSIZE(v)); +} + +void ReplyBuilder::SendSimpleString(std::string_view str) { + iovec v[3] = {IoVec(kSimplePref), IoVec(str), IoVec(kCRLF)}; + + Send(v, ABSL_ARRAYSIZE(v)); +} + +} // namespace dfly diff --git a/server/reply_builder.h b/server/reply_builder.h new file mode 100644 index 0000000..97229ef --- /dev/null +++ b/server/reply_builder.h @@ -0,0 +1,63 @@ +// Copyright 2021, Beeri 15. All rights reserved. +// Author: Roman Gershman (romange@gmail.com) +// +#include +#include + +#include "io/sync_stream_interface.h" + +#include "server/op_status.h" + +namespace dfly { + +class RespSerializer { + public: + explicit RespSerializer(::io::Sink* sink); + + std::error_code ec() const { + return ec_; + } + + void CloseConnection() { + if (!ec_) + ec_ = std::make_error_code(std::errc::connection_aborted); + } + + //! Sends a string as is without any formatting. raw should be RESP-encoded. + void SendDirect(std::string_view str); + + ::io::Sink* sink() { return sink_; } + + protected: + void Send(const iovec* v, uint32_t len); + + ::io::Sink* sink_; + + private: + std::error_code ec_; +}; + +class ReplyBuilder : public RespSerializer { + public: + explicit ReplyBuilder(::io::Sink* stream) : RespSerializer(stream) { + } + + /// aka "$6\r\nfoobar\r\n" + void SendBulkString(std::string_view str); + + void SendNull(); + + void SendOk() { + return SendSimpleString("OK"); + } + + void SendError(std::string_view str); + void SendError(OpStatus status); + + //! See https://redis.io/topics/protocol + void SendSimpleString(std::string_view str); + +private: +}; + +} // namespace dfly