diff --git a/CMakeLists.txt b/CMakeLists.txt index 888178a..663d7c3 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -24,4 +24,5 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}) include_directories(helio) add_subdirectory(helio) +add_subdirectory(core) add_subdirectory(server) diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt new file mode 100644 index 0000000..3891ec4 --- /dev/null +++ b/core/CMakeLists.txt @@ -0,0 +1,4 @@ +add_library(dfly_core tx_queue.cc dragonfly_core.cc) +cxx_link(dfly_core base absl::flat_hash_map) + +cxx_test(dfly_core_test dfly_core) \ No newline at end of file diff --git a/core/dfly_core_test.cc b/core/dfly_core_test.cc new file mode 100644 index 0000000..c2992ad --- /dev/null +++ b/core/dfly_core_test.cc @@ -0,0 +1,58 @@ +// Copyright 2021, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "core/tx_queue.h" + +#include "base/gtest.h" +#include "core/intent_lock.h" + +namespace dfly { + +class TxQueueTest : public ::testing::Test { + protected: + TxQueueTest() { + } + + uint64_t Pop() { + if (pq_.Empty()) + return uint64_t(-1); + TxQueue::ValueType val = pq_.Front(); + pq_.PopFront(); + + return std::get(val); + } + + TxQueue pq_; +}; + +TEST_F(TxQueueTest, Basic) { + pq_.Insert(4); + pq_.Insert(3); + pq_.Insert(2); + + ASSERT_EQ(2, Pop()); + ASSERT_EQ(3, Pop()); + ASSERT_EQ(4, Pop()); + ASSERT_TRUE(pq_.Empty()); + + pq_.Insert(10); + ASSERT_EQ(10, Pop()); +} + +class IntentLockTest : public ::testing::Test { + protected: + IntentLock lk_; +}; + +TEST_F(IntentLockTest, Basic) { + ASSERT_TRUE(lk_.Acquire(IntentLock::SHARED)); + ASSERT_FALSE(lk_.Acquire(IntentLock::EXCLUSIVE)); + lk_.Release(IntentLock::EXCLUSIVE); + + ASSERT_FALSE(lk_.Check(IntentLock::EXCLUSIVE)); + lk_.Release(IntentLock::SHARED); + ASSERT_TRUE(lk_.Check(IntentLock::EXCLUSIVE)); +} + +} // namespace dfly diff --git a/server/resp_expr.cc b/core/dragonfly_core.cc similarity index 68% rename from server/resp_expr.cc rename to core/dragonfly_core.cc index 7d583e0..9631197 100644 --- a/server/resp_expr.cc +++ b/core/dragonfly_core.cc @@ -2,9 +2,9 @@ // See LICENSE for licensing terms. // -#include "server/resp_expr.h" - #include "base/logging.h" +#include "core/intent_lock.h" +#include "core/resp_expr.h" namespace dfly { @@ -26,10 +26,25 @@ const char* RespExpr::TypeName(Type t) { ABSL_INTERNAL_UNREACHABLE; } +const char* IntentLock::ModeName(Mode m) { + switch (m) { + case IntentLock::SHARED: + return "SHARED"; + case IntentLock::EXCLUSIVE: + return "EXCLUSIVE"; + } + ABSL_INTERNAL_UNREACHABLE; +} + +void IntentLock::VerifyDebug() { + constexpr uint32_t kMsb = 1ULL << (sizeof(cnt_[0]) * 8 - 1); + DCHECK_EQ(0u, cnt_[0] & kMsb); + DCHECK_EQ(0u, cnt_[1] & kMsb); +} + } // namespace dfly namespace std { - ostream& operator<<(ostream& os, const dfly::RespExpr& e) { using dfly::RespExpr; using dfly::ToSV; @@ -39,7 +54,7 @@ ostream& operator<<(ostream& os, const dfly::RespExpr& e) { os << "i" << get(e.u); break; case RespExpr::STRING: - os << "'" << ToSV(e.GetBuf()) << "'"; + os << "'" << ToSV(get(e.u)) << "'"; break; case RespExpr::NIL: os << "nil"; @@ -51,7 +66,7 @@ ostream& operator<<(ostream& os, const dfly::RespExpr& e) { os << dfly::RespSpan{*get(e.u)}; break; case RespExpr::ERROR: - os << "e(" << ToSV(e.GetBuf()) << ")"; + os << "e(" << ToSV(get(e.u)) << ")"; break; } @@ -71,4 +86,4 @@ ostream& operator<<(ostream& os, dfly::RespSpan ras) { return os; } -} // namespace std \ No newline at end of file +} // namespace std diff --git a/core/intent_lock.h b/core/intent_lock.h new file mode 100644 index 0000000..1c202f2 --- /dev/null +++ b/core/intent_lock.h @@ -0,0 +1,54 @@ +// Copyright 2021, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// +#include + +#pragma once + +namespace dfly { + +// SHARED - can be acquired multiple times as long as other intents are absent. +// EXCLUSIVE - is acquired only if it's the only lock recorded. +// BLOCKED_READY - can not be acquired - it's recorded for intent purposes. +// Transactions at the head of tx-queue are considered to be the ones that acquired the lock +class IntentLock { + public: + enum Mode { SHARED = 0, EXCLUSIVE = 1 }; + + // Returns true if lock was acquired. In any case, the intent is recorded. + bool Acquire(Mode m) { + ++cnt_[m]; + + if (cnt_[1 ^ int(m)]) + return false; + return m == SHARED || cnt_[EXCLUSIVE] == 1; + } + + bool Check(Mode m) const { + unsigned s = cnt_[EXCLUSIVE]; + if (s) + return false; + + return (m == SHARED) ? true : IsFree(); + } + + void Release(Mode m, unsigned val = 1) { + assert(cnt_[m] >= val); + + cnt_[m] -= val; + // return cnt_[m] == 0 ? cnt_[1 ^ int(m)] : 0; + } + + bool IsFree() const { + return (cnt_[0] | cnt_[1]) == 0; + } + + static const char* ModeName(Mode m); + + void VerifyDebug(); + + private: + unsigned cnt_[2] = {0, 0}; +}; + +} // namespace dfly diff --git a/server/op_status.h b/core/op_status.h similarity index 100% rename from server/op_status.h rename to core/op_status.h diff --git a/server/resp_expr.h b/core/resp_expr.h similarity index 100% rename from server/resp_expr.h rename to core/resp_expr.h diff --git a/core/tx_queue.cc b/core/tx_queue.cc new file mode 100644 index 0000000..955eae7 --- /dev/null +++ b/core/tx_queue.cc @@ -0,0 +1,115 @@ +// Copyright 2021, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// +#include "core/tx_queue.h" + +#include "base/logging.h" + +namespace dfly { + +TxQueue::TxQueue(std::function sf) + : score_fun_(sf), vec_(32) { + for (size_t i = 0; i < vec_.size(); ++i) { + vec_[i].next = i + 1; + } +} + +auto TxQueue::Insert(Transaction* t) -> Iterator { + if (next_free_ >= vec_.size()) { + Grow(); + } + DCHECK_LT(next_free_, vec_.size()); + DCHECK_EQ(FREE_TAG, vec_[next_free_].tag); + + Iterator res = next_free_; + vec_[next_free_].u.trans = t; + vec_[next_free_].tag = TRANS_TAG; + DVLOG(1) << "Insert " << next_free_ << " " << t; + LinkFree(score_fun_(t)); + return res; +} + +auto TxQueue::Insert(uint64_t val) -> Iterator { + if (next_free_ >= vec_.size()) { + Grow(); + } + DCHECK_LT(next_free_, vec_.size()); + + Iterator res = next_free_; + + vec_[next_free_].u.uval = val; + vec_[next_free_].tag = UINT_TAG; + + LinkFree(val); + return res; +} + +void TxQueue::LinkFree(uint64_t weight) { + uint32_t taken = next_free_; + next_free_ = vec_[taken].next; + + if (size_ == 0) { + head_ = taken; + vec_[head_].next = vec_[head_].prev = head_; + } else { + uint32_t cur = vec_[head_].prev; + while (true) { + if (Rank(vec_[cur]) < weight) { + Link(cur, taken); + break; + } + if (cur == head_) { + Link(vec_[head_].prev, taken); + head_ = taken; + break; + } + cur = vec_[cur].prev; + } + } + ++size_; +} + +void TxQueue::Grow() { + size_t start = vec_.size(); + DVLOG(1) << "Grow from " << start << " to " << start * 2; + + vec_.resize(start * 2); + for (size_t i = start; i < vec_.size(); ++i) { + vec_[i].next = i + 1; + } +} + +void TxQueue::Remove(Iterator it) { + DCHECK_GT(size_, 0u); + DCHECK_LT(it, vec_.size()); + DCHECK_NE(FREE_TAG, vec_[it].tag); + + DVLOG(1) << "Remove " << it << " " << vec_[it].u.trans; + Iterator next = kEnd; + if (size_ > 1) { + Iterator prev = vec_[it].prev; + next = vec_[it].next; + + vec_[prev].next = next; + vec_[next].prev = prev; + } + --size_; + vec_[it].next = next_free_; + vec_[it].tag = FREE_TAG; + next_free_ = it; + if (head_ == it) { + head_ = next; + } +} + +uint64_t TxQueue::Rank(const QRecord& r) const { + switch (r.tag) { + case UINT_TAG: + return r.u.uval; + case TRANS_TAG: + return score_fun_(r.u.trans); + } + return 0; +} + +} // namespace dfly diff --git a/core/tx_queue.h b/core/tx_queue.h new file mode 100644 index 0000000..73fc138 --- /dev/null +++ b/core/tx_queue.h @@ -0,0 +1,111 @@ +// Copyright 2021, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// +#pragma once + +#include +#include +#include +#include + +namespace dfly { + +class Transaction; + +class TxQueue { + void Link(uint32_t p, uint32_t n) { + uint32_t next = vec_[p].next; + vec_[n].next = next; + vec_[n].prev = p; + vec_[p].next = n; + vec_[next].prev = n; + } + + public: + // uint64_t is used for unit-tests. + using ValueType = std::variant; + using Iterator = uint32_t; + enum { kEnd = Iterator(-1) }; + + TxQueue(std::function score_fun = nullptr); + + // returns iterator to that item the list + Iterator Insert(Transaction* t); + + Iterator Insert(uint64_t val); + void Remove(Iterator); + + ValueType At(Iterator it) const { + switch (vec_[it].tag) { + case TRANS_TAG: + return vec_[it].u.trans; + case UINT_TAG: + return vec_[it].u.uval; + } + return 0u; + } + + ValueType Front() const { + return At(head_); + } + + void PopFront() { + Remove(head_); + } + + size_t size() const { + return size_; + } + + bool Empty() const { + return size_ == 0; + } + + //! returns the score of the tail record. Can be called only if !Empty(). + uint64_t TailScore() const { + return Rank(vec_[vec_[head_].prev]); + } + + //! returns the score of the head record. Can be called only if !Empty(). + uint64_t HeadScore() const { + return Rank(vec_[head_]); + } + + //! Can be called only if !Empty(). + Iterator Head() const { + return head_; + } + + private: + enum { TRANS_TAG = 0, UINT_TAG = 11, FREE_TAG = 12 }; + + void Grow(); + void LinkFree(uint64_t rank); + + struct QRecord { + union { + Transaction* trans; + uint64_t uval; + } u; + + uint32_t tag : 8; + uint32_t next : 24; + uint32_t prev; + + QRecord() : tag(FREE_TAG), prev(kEnd) { + } + }; + + static_assert(sizeof(QRecord) == 16, ""); + + uint64_t Rank(const QRecord& r) const; + + std::function score_fun_; + std::vector vec_; + uint32_t next_free_ = 0, head_ = 0; + size_t size_ = 0; + + TxQueue(const TxQueue&) = delete; +}; + +} // namespace dfly diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 9a656ea..c12806d 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -5,9 +5,9 @@ add_library(dragonfly_lib command_registry.cc common.cc config_flags.cc conn_context.cc db_slice.cc debugcmd.cc dragonfly_listener.cc dragonfly_connection.cc engine_shard_set.cc main_service.cc memcache_parser.cc - redis_parser.cc resp_expr.cc reply_builder.cc string_family.cc) + redis_parser.cc reply_builder.cc string_family.cc) -cxx_link(dragonfly_lib uring_fiber_lib +cxx_link(dragonfly_lib dfly_core uring_fiber_lib fibers_ext strings_lib http_server_lib tls_lib) add_library(dfly_test_lib test_utils.cc) diff --git a/server/common_types.h b/server/common_types.h index 28cad92..f1b4463 100644 --- a/server/common_types.h +++ b/server/common_types.h @@ -12,9 +12,17 @@ namespace dfly { +enum class Protocol : uint8_t { + MEMCACHE = 1, + REDIS = 2 +}; + using DbIndex = uint16_t; using ShardId = uint16_t; +using TxId = uint64_t; +using TxClock = uint64_t; +using ArgSlice = absl::Span; using MutableStrSpan = absl::Span; using CmdArgList = absl::Span; using CmdArgVec = std::vector; @@ -23,8 +31,15 @@ constexpr DbIndex kInvalidDbId = DbIndex(-1); constexpr ShardId kInvalidSid = ShardId(-1); class CommandId; +class Transaction; class EngineShard; +struct KeyLockArgs { + DbIndex db_index; + ArgSlice args; + unsigned key_step; +}; + inline std::string_view ArgS(CmdArgList args, size_t i) { auto arg = args[i]; return std::string_view(arg.data(), arg.size()); diff --git a/server/conn_context.h b/server/conn_context.h index f9109b8..37f3748 100644 --- a/server/conn_context.h +++ b/server/conn_context.h @@ -4,8 +4,8 @@ #pragma once -#include "server/reply_builder.h" #include "server/common_types.h" +#include "server/reply_builder.h" namespace dfly { @@ -35,6 +35,7 @@ class ConnectionContext : public ReplyBuilder { ConnectionContext(::io::Sink* stream, Connection* owner); // TODO: to introduce proper accessors. + Transaction* transaction = nullptr; const CommandId* cid = nullptr; EngineShardSet* shard_set = nullptr; diff --git a/server/db_slice.h b/server/db_slice.h index 29d6e2f..c23de79 100644 --- a/server/db_slice.h +++ b/server/db_slice.h @@ -5,7 +5,7 @@ #pragma once #include "server/common_types.h" -#include "server/op_status.h" +#include "core/op_status.h" #include "server/table.h" namespace util { diff --git a/server/dfly_protocol.h b/server/dfly_protocol.h deleted file mode 100644 index 522875a..0000000 --- a/server/dfly_protocol.h +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2021, Roman Gershman. All rights reserved. -// See LICENSE for licensing terms. -// - -#pragma once - -#include - -namespace dfly { - -enum class Protocol : uint8_t { - MEMCACHE = 1, - REDIS = 2 -}; - -} // namespace dfly diff --git a/server/dragonfly_connection.h b/server/dragonfly_connection.h index f7c3483..be96b7b 100644 --- a/server/dragonfly_connection.h +++ b/server/dragonfly_connection.h @@ -9,9 +9,8 @@ #include #include "base/io_buf.h" +#include "core/resp_expr.h" #include "server/common_types.h" -#include "server/dfly_protocol.h" -#include "server/resp_expr.h" #include "util/connection.h" #include "util/fibers/event_count.h" diff --git a/server/dragonfly_listener.h b/server/dragonfly_listener.h index af824f8..10a1ed1 100644 --- a/server/dragonfly_listener.h +++ b/server/dragonfly_listener.h @@ -5,7 +5,7 @@ #pragma once #include "util/listener_interface.h" -#include "server/dfly_protocol.h" +#include "server/common_types.h" typedef struct ssl_ctx_st SSL_CTX; diff --git a/server/main_service.cc b/server/main_service.cc index bb47395..c8ce021 100644 --- a/server/main_service.cc +++ b/server/main_service.cc @@ -13,8 +13,9 @@ #include "base/logging.h" #include "server/conn_context.h" #include "server/debugcmd.h" -#include "util/metrics/metrics.h" +#include "server/error.h" #include "server/string_family.h" +#include "util/metrics/metrics.h" #include "util/uring/uring_fiber_algo.h" #include "util/varz.h" diff --git a/server/redis_parser.h b/server/redis_parser.h index a3ec9bb..600dd7d 100644 --- a/server/redis_parser.h +++ b/server/redis_parser.h @@ -5,7 +5,7 @@ #include -#include "resp_expr.h" +#include "core/resp_expr.h" namespace dfly { diff --git a/server/reply_builder.h b/server/reply_builder.h index 8a83137..3dc3e9f 100644 --- a/server/reply_builder.h +++ b/server/reply_builder.h @@ -3,9 +3,9 @@ // #include +#include "core/op_status.h" #include "io/sync_stream_interface.h" -#include "server/dfly_protocol.h" -#include "server/op_status.h" +#include "server/common_types.h" namespace dfly {