Add BLPOP support

This commit is contained in:
Roman Gershman 2022-01-12 08:58:07 +02:00
parent 0cf2f57bf2
commit fc63eec1b6
7 changed files with 1070 additions and 78 deletions

View File

@ -19,6 +19,26 @@ namespace fibers = ::boost::fibers;
thread_local EngineShard* EngineShard::shard_ = nullptr; thread_local EngineShard* EngineShard::shard_ = nullptr;
constexpr size_t kQueueLen = 64; constexpr size_t kQueueLen = 64;
struct WatchItem {
::boost::intrusive_ptr<Transaction> trans;
WatchItem(Transaction* t) : trans(t) {
}
};
struct EngineShard::WatchQueue {
deque<WatchItem> items;
TxId notify_txid = UINT64_MAX;
// Updated by both coordinator and shard threads but at different times.
enum State { SUSPENDED, ACTIVE } state = SUSPENDED;
void Suspend() {
state = SUSPENDED;
notify_txid = UINT64_MAX;
}
};
EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time) EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time)
: queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), : queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }),
db_slice_(pb->GetIndex(), this) { db_slice_(pb->GetIndex(), this) {
@ -70,6 +90,16 @@ void EngineShard::PollExecution(Transaction* trans) {
DVLOG(1) << "PollExecution " << (trans ? trans->DebugId() : ""); DVLOG(1) << "PollExecution " << (trans ? trans->DebugId() : "");
ShardId sid = shard_id(); ShardId sid = shard_id();
uint16_t trans_mask = trans ? trans->GetLocalMask(sid) : 0;
if (trans_mask & Transaction::AWAKED_Q) {
DCHECK(continuation_trans_ == nullptr);
CHECK_EQ(committed_txid_, trans->notify_txid()) << "TBD";
bool keep = trans->RunInShard(this);
if (keep)
return;
}
if (continuation_trans_) { if (continuation_trans_) {
if (trans == continuation_trans_) if (trans == continuation_trans_)
trans = nullptr; trans = nullptr;
@ -79,77 +109,83 @@ void EngineShard::PollExecution(Transaction* trans) {
DVLOG(1) << "RunContTrans: " << continuation_trans_->DebugId() << " keep: " << to_keep; DVLOG(1) << "RunContTrans: " << continuation_trans_->DebugId() << " keep: " << to_keep;
if (!to_keep) { if (!to_keep) {
continuation_trans_ = nullptr; continuation_trans_ = nullptr;
OnTxFinish();
} }
} }
if (continuation_trans_) {
// Once we start executing transaction we do not continue until it's finished.
// This preserves atomicity property of multi-hop transactions.
return;
}
} }
DCHECK(!continuation_trans_); bool has_awaked_trans = HasAwakedTransaction();
Transaction* head = nullptr; Transaction* head = nullptr;
string dbg_id; string dbg_id;
while (!txq_.Empty()) { if (continuation_trans_ == nullptr && !has_awaked_trans) {
auto val = txq_.Front(); while (!txq_.Empty()) {
head = absl::get<Transaction*>(val); auto val = txq_.Front();
head = absl::get<Transaction*>(val);
// The fact that Tx is in the queue, already means that coordinator fiber will not progress, // The fact that Tx is in the queue, already means that coordinator fiber will not progress,
// hence here it's enough to test for run_count and check local_mask. // hence here it's enough to test for run_count and check local_mask.
bool is_armed = head->IsArmedInShard(sid); bool is_armed = head->IsArmedInShard(sid);
if (!is_armed) if (!is_armed)
break; break;
// It could be that head is processed and unblocks multi-hop transaction . // It could be that head is processed and unblocks multi-hop transaction .
// The transaction will schedule again and will arm another callback. // The transaction will schedule again and will arm another callback.
// Then we will reach invalid state by running trans after this loop, // Then we will reach invalid state by running trans after this loop,
// which is not what we want. // which is not what we want.
// This function should not process 2 different callbacks for the same transaction. // This function should not process 2 different callbacks for the same transaction.
// Hence we make sure to reset trans if it has been processed via tx-queue. // Hence we make sure to reset trans if it has been processed via tx-queue.
if (head == trans) if (head == trans)
trans = nullptr; trans = nullptr;
TxId txid = head->txid(); TxId txid = head->txid();
DCHECK_LT(committed_txid_, txid); // committed_txid_ is strictly increasing when processed via TxQueue.
DCHECK_LT(committed_txid_, txid);
// We update committed_txid_ before calling RunInShard() to avoid cases where // We update committed_txid_ before calling RunInShard() to avoid cases where
// a transaction stalls the execution with IO while another fiber queries this shard for // a transaction stalls the execution with IO while another fiber queries this shard for
// committed_txid_ (for example during the scheduling). // committed_txid_ (for example during the scheduling).
committed_txid_ = txid; committed_txid_ = txid;
if (VLOG_IS_ON(2)) { if (VLOG_IS_ON(2)) {
dbg_id = head->DebugId(); dbg_id = head->DebugId();
} }
bool keep = head->RunInShard(this); bool keep = head->RunInShard(this);
// We should not access head from this point since RunInShard callback decrements refcount. // We should not access head from this point since RunInShard callback decrements refcount.
DLOG_IF(INFO, !dbg_id.empty()) << "RunHead " << dbg_id << ", keep " << keep; DLOG_IF(INFO, !dbg_id.empty()) << "RunHead " << dbg_id << ", keep " << keep;
if (keep) { if (keep) {
continuation_trans_ = head; continuation_trans_ = head;
break; break;
} }
}
if (!trans) OnTxFinish();
} // while(!txq_.Empty())
} // if (!has_awaked_trans)
// For SUSPENDED_Q - if transaction has not been notified, it will still be
// in the watch queue. We need to unlock an Execute by running a noop.
if (trans_mask & Transaction::SUSPENDED_Q) {
TxId notify_txid = trans->notify_txid();
DCHECK(HasResultConverged(notify_txid));
trans->RunNoop(this);
return; return;
}
uint16_t local_mask = trans->GetLocalMask(sid);
// If trans is out of order, i.e. locks keys that previous transactions have not locked. // If trans is out of order, i.e. locks keys that previous transactions have not locked.
// It may be that there are other transactions that touch those keys but they necessary ordered // It may be that there are other transactions that touch those keys but they necessary ordered
// after trans in the queue, hence it's safe to run trans out of order. // after trans in the queue, hence it's safe to run trans out of order.
if (local_mask & Transaction::OUT_OF_ORDER) { if (trans && trans_mask & Transaction::OUT_OF_ORDER) {
DCHECK(trans != head); DCHECK(trans != head);
DCHECK(!trans->IsMulti()); // multi, global transactions can not be OOO.
DCHECK(trans_mask & Transaction::ARMED);
dbg_id.clear(); dbg_id.clear();
if (VLOG_IS_ON(1)) { if (VLOG_IS_ON(1)) {
dbg_id = trans->DebugId(); dbg_id = trans->DebugId();
} }
++stats_.ooo_runs;
bool keep = trans->RunInShard(this); bool keep = trans->RunInShard(this);
DLOG_IF(INFO, !dbg_id.empty()) << "Eager run " << sid << ", " << dbg_id << ", keep " << keep; DLOG_IF(INFO, !dbg_id.empty()) << "Eager run " << sid << ", " << dbg_id << ", keep " << keep;
@ -159,12 +195,230 @@ void EngineShard::PollExecution(Transaction* trans) {
} }
} }
// Internal function called from ProcessAwakened().
// Marks the queue as active and notifies the first transaction in the queue.
Transaction* EngineShard::NotifyWatchQueue(WatchQueue* wq) {
wq->state = WatchQueue::ACTIVE;
auto& q = wq->items;
ShardId sid = shard_id();
do {
const WatchItem& wi = q.front();
Transaction* head = wi.trans.get();
if (head->NotifySuspended(committed_txid_, sid)) {
wq->notify_txid = committed_txid_;
return head;
}
q.pop_front();
} while (!q.empty());
return nullptr;
}
// Processes potentially awakened keys and verifies that these are indeed
// awakened to eliminate false positives.
void EngineShard::ProcessAwakened(Transaction* completed_t) {
for (DbIndex index : awakened_indices_) {
WatchTable& wt = watch_map_[index];
for (auto key : wt.awakened_keys) {
string_view sv_key = static_cast<string_view>(key);
auto [it, exp_it] = db_slice_.FindExt(index, sv_key); // Double verify we still got the item.
if (IsValid(it)) {
auto w_it = wt.queue_map.find(sv_key);
CHECK(w_it != wt.queue_map.end());
DVLOG(1) << "NotifyWatchQueue " << key;
Transaction* t2 = NotifyWatchQueue(w_it->second.get());
if (t2) {
awakened_transactions_.insert(t2);
}
}
}
wt.awakened_keys.clear();
}
awakened_indices_.clear();
if (!completed_t)
return;
auto& wt = watch_map_[completed_t->db_index()];
KeyLockArgs lock_args = completed_t->GetLockArgs(shard_id());
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
string_view key = lock_args.args[i];
auto w_it = wt.queue_map.find(key);
if (w_it == wt.queue_map.end() || w_it->second->state != WatchQueue::ACTIVE)
continue;
WatchQueue& wq = *w_it->second;
DCHECK_LE(wq.notify_txid, committed_txid_);
auto& queue = wq.items;
DCHECK(!queue.empty()); // since it's active
if (queue.front().trans == completed_t) {
queue.pop_front();
while (!queue.empty()) {
const WatchItem& bi = queue.front();
Transaction* head = bi.trans.get();
if (head->NotifySuspended(wq.notify_txid, shard_id()))
break;
queue.pop_front();
}
if (queue.empty()) {
wt.queue_map.erase(w_it);
}
}
}
awakened_transactions_.erase(completed_t);
}
void EngineShard::AddWatched(string_view key, Transaction* me) {
WatchTable& wt = watch_map_[me->db_index()];
auto [res, inserted] = wt.queue_map.emplace(key, nullptr);
if (inserted) {
res->second.reset(new WatchQueue);
}
res->second->items.emplace_back(me);
}
// Runs in O(N) complexity.
bool EngineShard::RemovedWatched(string_view key, Transaction* me) {
WatchTable& wt = watch_map_[me->db_index()];
auto watch_it = wt.queue_map.find(key);
CHECK(watch_it != wt.queue_map.end());
WatchQueue& wq = *watch_it->second;
for (auto j = wq.items.begin(); j != wq.items.end(); ++j) {
if (j->trans == me) {
wq.items.erase(j);
if (wq.items.empty()) {
wt.queue_map.erase(watch_it);
}
return true;
}
}
LOG(FATAL) << "should not happen";
return false;
}
void EngineShard::GCWatched(const KeyLockArgs& largs) {
auto& queue_map = watch_map_[largs.db_index].queue_map;
for (size_t i = 0; i < largs.args.size(); i += largs.key_step) {
auto key = largs.args[i];
auto watch_it = queue_map.find(key);
CHECK(watch_it != queue_map.end());
WatchQueue& wq = *watch_it->second;
DCHECK(!wq.items.empty());
do {
auto local_mask = wq.items.front().trans->GetLocalMask(shard_id());
if ((local_mask & Transaction::EXPIRED_Q) == 0) {
break;
}
wq.items.pop_front();
} while (!wq.items.empty());
if (wq.items.empty()) {
queue_map.erase(watch_it);
}
}
}
// Called from commands like lpush.
void EngineShard::AwakeWatched(DbIndex db_index, const MainIterator& main_it) {
auto it = watch_map_.find(db_index);
if (it == watch_map_.end())
return;
WatchTable& wt = it->second;
if (wt.queue_map.empty()) { /// No blocked transactions.
return;
}
string tmp;
string_view db_key = main_it->first;
auto wit = wt.queue_map.find(db_key);
if (wit == wt.queue_map.end())
return; /// Similarly, nobody watches this key.
string_view key = wit->first;
// Already awakened this key.
if (wt.awakened_keys.find(key) != wt.awakened_keys.end())
return;
wt.awakened_keys.insert(wit->first);
awakened_indices_.insert(db_index);
}
void EngineShard::ShutdownMulti(Transaction* multi) { void EngineShard::ShutdownMulti(Transaction* multi) {
if (continuation_trans_ == multi) { if (continuation_trans_ == multi) {
continuation_trans_ = nullptr; continuation_trans_ = nullptr;
} }
} }
void EngineShard::WaitForConvergence(TxId notifyid, Transaction* t) {
DVLOG(1) << "ConvergeNotification " << shard_id();
waiting_convergence_.emplace(notifyid, t);
}
void EngineShard::OnTxFinish() {
DCHECK(continuation_trans_ == nullptr); // By definition of OnTxFinish.
if (waiting_convergence_.empty())
return;
if (txq_.Empty()) {
for (const auto& k_v : waiting_convergence_) {
NotifyConvergence(k_v.second);
}
waiting_convergence_.clear();
return;
}
TxId txq_score = txq_.HeadScore();
do {
auto tx_waiting = waiting_convergence_.begin();
// Instead of taking the map key, we use upto date notify_txid
// That could meanwhile improve. Not important though.
TxId notifyid = tx_waiting->second->notify_txid();
if (notifyid > committed_txid_ && txq_score <= tx_waiting->first)
break;
auto nh = waiting_convergence_.extract(tx_waiting);
NotifyConvergence(nh.mapped());
} while (!waiting_convergence_.empty());
}
void EngineShard::NotifyConvergence(Transaction* tx) {
LOG(FATAL) << "TBD";
}
// There are several cases that contain proof of convergence for this shard:
// 1. txq_ empty - it means that anything that is goonna be scheduled will already be scheduled
// with txid > notifyid.
// 2. committed_txid_ > notifyid - similarly, this shard can not affect the result with timestamp
// notifyid.
// 3. committed_txid_ == notifyid, then if a transaction in progress (continuation_trans_ != NULL)
// the this transaction can still affect the result, hence we require continuation_trans_ is null
// which will point to converged result @notifyid.
// 4. Finally with committed_txid_ < notifyid and continuation_trans_ == nullptr,
// we can check if the next in line (HeadScore) is after notifyid in that case we can also
// conclude regarding the result convergence for this shard.
bool EngineShard::HasResultConverged(TxId notifyid) const {
return txq_.Empty() || committed_txid_ > notifyid ||
(continuation_trans_ == nullptr &&
(committed_txid_ == notifyid || txq_.HeadScore() > notifyid));
}
void EngineShardSet::Init(uint32_t sz) { void EngineShardSet::Init(uint32_t sz) {
CHECK_EQ(0u, size()); CHECK_EQ(0u, size());

View File

@ -8,8 +8,11 @@ extern "C" {
#include "redis/sds.h" #include "redis/sds.h"
} }
#include <absl/container/btree_map.h>
#include <absl/container/flat_hash_map.h>
#include <xxhash.h> #include <xxhash.h>
#include "base/string_view_sso.h"
#include "core/tx_queue.h" #include "core/tx_queue.h"
#include "server/db_slice.h" #include "server/db_slice.h"
#include "util/fibers/fiberqueue_threadpool.h" #include "util/fibers/fiberqueue_threadpool.h"
@ -73,8 +76,34 @@ class EngineShard {
return &shard_lock_; return &shard_lock_;
} }
// Iterates over awakened key candidates in each db and moves verified ones into
// global verified_awakened_ array.
// Returns true if there are active awakened keys, false otherwise.
// It has 2 responsibilities.
// 1: to go over potential wakened keys, verify them and activate watch queues.
// 2: if t is awaked and finished running - to remove it from the head
// of the queue and notify the next one.
// If t is null then second part is omitted.
void ProcessAwakened(Transaction* t);
// Blocking API
// TODO: consider moving all watched functions to
// EngineShard with separate per db map.
//! AddWatched adds a transaction to the blocking queue.
void AddWatched(std::string_view key, Transaction* me);
bool RemovedWatched(std::string_view key, Transaction* me);
void GCWatched(const KeyLockArgs& lock_args);
void AwakeWatched(DbIndex db_index, const MainIterator& it);
bool HasAwakedTransaction() const {
return !awakened_transactions_.empty();
}
// TODO: Awkward interface. I should solve it somehow. // TODO: Awkward interface. I should solve it somehow.
void ShutdownMulti(Transaction* multi); void ShutdownMulti(Transaction* multi);
void WaitForConvergence(TxId notifyid, Transaction* t);
bool HasResultConverged(TxId notifyid) const;
void IncQuickRun() { void IncQuickRun() {
stats_.quick_runs++; stats_.quick_runs++;
@ -90,6 +119,29 @@ class EngineShard {
private: private:
EngineShard(util::ProactorBase* pb, bool update_db_time); EngineShard(util::ProactorBase* pb, bool update_db_time);
struct WatchQueue;
void OnTxFinish();
void NotifyConvergence(Transaction* tx);
/// Returns the notified transaction,
/// or null if all transactions in the queue have expired..
Transaction* NotifyWatchQueue(WatchQueue* wq);
struct WatchTable {
absl::flat_hash_map<std::string, std::unique_ptr<WatchQueue>> queue_map;
// awakened keys that point to blocked entries that can potentially be unblocked.
// reference watched keys.
absl::flat_hash_set<base::string_view_sso> awakened_keys;
};
absl::flat_hash_map<DbIndex, WatchTable> watch_map_;
absl::flat_hash_set<DbIndex> awakened_indices_;
absl::flat_hash_set<Transaction*> awakened_transactions_;
absl::btree_multimap<TxId, Transaction*> waiting_convergence_;
::util::fibers_ext::FiberQueue queue_; ::util::fibers_ext::FiberQueue queue_;
::boost::fibers::fiber fiber_q_; ::boost::fibers::fiber fiber_q_;

View File

@ -97,6 +97,101 @@ OpResult<string> ListPop(DbIndex db_ind, const MainValue& mv, ListDir dir) {
return res; return res;
} }
class BPopper {
public:
explicit BPopper();
// Returns WRONG_TYPE, OK.
// If OK is returned then use result() to fetch the value.
OpStatus Run(Transaction* t, unsigned msec);
const std::pair<std::string_view, std::string_view> result() const {
return std::pair<std::string_view, std::string_view>(key_, value_);
}
bool found() const {
return found_;
}
private:
OpStatus Pop(Transaction* t, EngineShard* shard);
bool found_ = false;
MainIterator find_it_;
ShardId find_sid_ = std::numeric_limits<ShardId>::max();
std::string key_, value_;
};
BPopper::BPopper() {
}
OpStatus BPopper::Run(Transaction* t, unsigned msec) {
OpResult<Transaction::FindFirstResult> result;
using time_point = Transaction::time_point;
time_point tp =
msec ? chrono::steady_clock::now() + chrono::milliseconds(msec) : time_point::max();
bool is_multi = t->IsMulti();
if (!is_multi) {
t->Schedule();
}
while (true) {
result = t->FindFirst();
if (result)
break;
if (result.status() != OpStatus::KEY_NOTFOUND) { // Some error occurred.
// We could be registered in the queue due to previous iterations.
t->UnregisterWatch();
return result.status();
}
if (is_multi) {
auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
t->Execute(std::move(cb), true);
return OpStatus::TIMED_OUT;
}
if (!t->WaitOnWatch(tp)) {
return OpStatus::TIMED_OUT;
}
}
DCHECK_EQ(OpStatus::OK, result.status());
VLOG(1) << "Popping an element";
find_sid_ = result->sid;
find_it_ = result->find_res;
found_ = true;
auto cb = [this](Transaction* t, EngineShard* shard) { return Pop(t, shard); };
t->Execute(std::move(cb), true);
return OpStatus::OK;
}
OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) {
DCHECK(found());
if (shard->shard_id() == find_sid_) {
key_ = find_it_->first;
OpResult<string> res = ListPop(t->db_index(), find_it_->second, ListDir::LEFT);
CHECK(res.ok());
value_ = std::move(res.value());
quicklist* ql = GetQL(find_it_->second);
if (quicklistCount(ql) == 0) {
CHECK(shard->db_slice().Del(t->db_index(), find_it_));
}
}
return OpStatus::OK;
}
} // namespace } // namespace
void ListFamily::LPush(CmdArgList args, ConnectionContext* cntx) { void ListFamily::LPush(CmdArgList args, ConnectionContext* cntx) {
@ -150,6 +245,42 @@ void ListFamily::LIndex(CmdArgList args, ConnectionContext* cntx) {
} }
} }
void ListFamily::BLPop(CmdArgList args, ConnectionContext* cntx) {
DCHECK_GE(args.size(), 3u);
float timeout;
auto timeout_str = ArgS(args, args.size() - 1);
if (!absl::SimpleAtof(timeout_str, &timeout)) {
return cntx->SendError("timeout is not a float or out of range");
}
if (timeout < 0) {
return cntx->SendError("timeout is negative");
}
VLOG(1) << "BLPop start " << timeout;
Transaction* transaction = cntx->transaction;
BPopper popper;
OpStatus result = popper.Run(transaction, unsigned(timeout * 1000));
switch (result) {
case OpStatus::WRONG_TYPE:
return cntx->SendError(kWrongTypeErr);
case OpStatus::OK:
break;
case OpStatus::TIMED_OUT:
return cntx->SendNullArray();
default:
LOG(FATAL) << "Unexpected error " << result;
}
CHECK(popper.found());
VLOG(1) << "BLPop returned ";
auto res = popper.result();
std::string_view str_arr[2] = {res.first, res.second};
return cntx->SendStringArr(str_arr);
}
void ListFamily::PushGeneric(ListDir dir, const CmdArgList& args, ConnectionContext* cntx) { void ListFamily::PushGeneric(ListDir dir, const CmdArgList& args, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1); std::string_view key = ArgS(args, 1);
vector<std::string_view> vals(args.size() - 2); vector<std::string_view> vals(args.size() - 2);
@ -219,6 +350,9 @@ OpResult<uint32_t> ListFamily::OpPush(const OpArgs& op_args, std::string_view ke
quicklistPush(ql, es->tmp_str, sdslen(es->tmp_str), pos); quicklistPush(ql, es->tmp_str, sdslen(es->tmp_str), pos);
} }
if (new_key) {
es->AwakeWatched(op_args.db_ind, it);
}
return quicklistCount(ql); return quicklistCount(ql);
} }
@ -275,6 +409,7 @@ void ListFamily::Register(CommandRegistry* registry) {
<< CI{"LPOP", CO::WRITE | CO::FAST | CO::DENYOOM, 2, 1, 1, 1}.HFUNC(LPop) << CI{"LPOP", CO::WRITE | CO::FAST | CO::DENYOOM, 2, 1, 1, 1}.HFUNC(LPop)
<< CI{"RPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPush) << CI{"RPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPush)
<< CI{"RPOP", CO::WRITE | CO::FAST | CO::DENYOOM, 2, 1, 1, 1}.HFUNC(RPop) << CI{"RPOP", CO::WRITE | CO::FAST | CO::DENYOOM, 2, 1, 1, 1}.HFUNC(RPop)
<< CI{"BLPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BLPop)
<< CI{"LLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(LLen) << CI{"LLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(LLen)
<< CI{"LINDEX", CO::READONLY, 3, 1, 1, 1}.HFUNC(LIndex); << CI{"LINDEX", CO::READONLY, 3, 1, 1, 1}.HFUNC(LIndex);
} }

View File

@ -22,6 +22,7 @@ class ListFamily {
static void RPush(CmdArgList args, ConnectionContext* cntx); static void RPush(CmdArgList args, ConnectionContext* cntx);
static void LPop(CmdArgList args, ConnectionContext* cntx); static void LPop(CmdArgList args, ConnectionContext* cntx);
static void RPop(CmdArgList args, ConnectionContext* cntx); static void RPop(CmdArgList args, ConnectionContext* cntx);
static void BLPop(CmdArgList args, ConnectionContext* cntx);
static void LLen(CmdArgList args, ConnectionContext* cntx); static void LLen(CmdArgList args, ConnectionContext* cntx);
static void LIndex(CmdArgList args, ConnectionContext* cntx); static void LIndex(CmdArgList args, ConnectionContext* cntx);

View File

@ -19,16 +19,21 @@
using namespace testing; using namespace testing;
using namespace std; using namespace std;
using namespace util; using namespace util;
using namespace boost; namespace this_fiber = ::boost::this_fiber;
namespace fibers = ::boost::fibers;
namespace dfly { namespace dfly {
class ListFamilyTest : public BaseFamilyTest { class ListFamilyTest : public BaseFamilyTest {
protected: protected:
ListFamilyTest() {
num_threads_ = 4;
}
}; };
const char* kKey1 = "x"; const char* kKey1 = "x";
const char* kKey2 = "b"; const char* kKey2 = "b";
const char* kKey3 = "c";
TEST_F(ListFamilyTest, Basic) { TEST_F(ListFamilyTest, Basic) {
auto resp = Run({"lpush", kKey1, "1"}); auto resp = Run({"lpush", kKey1, "1"});
@ -53,4 +58,157 @@ TEST_F(ListFamilyTest, Expire) {
EXPECT_THAT(resp[0], IntArg(1)); EXPECT_THAT(resp[0], IntArg(1));
} }
TEST_F(ListFamilyTest, BLPopUnblocking) {
auto resp = Run({"lpush", kKey1, "1"});
EXPECT_THAT(resp[0], IntArg(1));
resp = Run({"lpush", kKey2, "2"});
ASSERT_THAT(resp, ElementsAre(IntArg(1)));
resp = Run({"blpop", kKey1, kKey2}); // missing "0" delimiter.
ASSERT_THAT(resp[0], ErrArg("timeout is not a float"));
resp = Run({"blpop", kKey1, kKey2, "0"});
ASSERT_EQ(2, GetDebugInfo().shards_count);
EXPECT_THAT(resp, ElementsAre(kKey1, "1"));
resp = Run({"blpop", kKey1, kKey2, "0"});
EXPECT_THAT(resp, ElementsAre(kKey2, "2"));
Run({"set", "z", "1"});
resp = Run({"blpop", "z", "0"});
ASSERT_THAT(resp[0], ErrArg("WRONGTYPE "));
ASSERT_FALSE(IsLocked(0, "x"));
ASSERT_FALSE(IsLocked(0, "y"));
ASSERT_FALSE(IsLocked(0, "z"));
}
TEST_F(ListFamilyTest, BLPopBlocking) {
RespVec resp0, resp1;
// Run the fiber at creation.
auto fb0 = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
resp0 = Run({"blpop", "x", "0"});
LOG(INFO) << "pop0";
});
this_fiber::sleep_for(50us);
/*auto fb1 = pp_->at(1)->LaunchFiber([&] {
resp1 = Run({"blpop", "x", "0"});
LOG(INFO) << "pop1";
});*/
this_fiber::sleep_for(30us);
pp_->at(1)->AwaitBlocking([&] { Run({"lpush", "x", "2", "1"}); });
fb0.join();
// fb1.join();
// fb0 should start first and be the first transaction blocked. Therefore, it should pop '1'.
// sometimes order is switched, need to think how to fix it.
int64_t epoch0 = GetDebugInfo("IO0").clock;
int64_t epoch1 = GetDebugInfo("IO1").clock;
ASSERT_LT(epoch0, epoch1);
EXPECT_THAT(resp0, ElementsAre("x", "1"));
ASSERT_FALSE(IsLocked(0, "x"));
}
TEST_F(ListFamilyTest, BLPopMultiple) {
RespVec resp0, resp1;
resp0 = Run({"blpop", kKey1, kKey2, "0.01"}); // timeout
EXPECT_THAT(resp0, ElementsAre(ArgType(RespExpr::NIL_ARRAY)));
ASSERT_EQ(2, GetDebugInfo().shards_count);
ASSERT_FALSE(IsLocked(0, kKey1));
ASSERT_FALSE(IsLocked(0, kKey2));
auto fb1 = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
resp0 = Run({"blpop", kKey1, kKey2, "0"});
});
pp_->at(1)->AwaitBlocking([&] { Run({"lpush", kKey1, "1", "2", "3"}); });
fb1.join();
EXPECT_THAT(resp0, ElementsAre(StrArg(kKey1), StrArg("3")));
ASSERT_FALSE(IsLocked(0, kKey1));
ASSERT_FALSE(IsLocked(0, kKey2));
ess_->RunBriefInParallel([](EngineShard* es) { ASSERT_FALSE(es->HasAwakedTransaction()); });
}
TEST_F(ListFamilyTest, BLPopTimeout) {
RespVec resp = Run({"blpop", kKey1, kKey2, kKey3, "0.01"});
EXPECT_THAT(resp[0], ArgType(RespExpr::NIL_ARRAY));
EXPECT_EQ(3, GetDebugInfo().shards_count);
ASSERT_FALSE(service_->IsLocked(0, kKey1));
// Under Multi
resp = Run({"multi"});
ASSERT_THAT(resp, RespEq("OK"));
Run({"blpop", kKey1, "0"});
resp = Run({"exec"});
EXPECT_THAT(resp, ElementsAre(ArgType(RespExpr::NIL_ARRAY)));
ASSERT_FALSE(service_->IsLocked(0, kKey1));
}
TEST_F(ListFamilyTest, BLPopSerialize) {
RespVec blpop_resp;
auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"});
});
do {
this_fiber::sleep_for(30us);
} while (!IsLocked(0, kKey1));
LOG(INFO) << "Starting multi";
TxClock cl1, cl2;
unsigned key1_len1 = 0, key1_len2 = 0;
auto p1_fb = pp_->at(1)->LaunchFiber([&] {
auto resp = Run({"multi"}); // We use multi to assign ts to lpush.
ASSERT_THAT(resp, RespEq("OK"));
Run({"lpush", kKey1, "A"});
resp = Run({"exec"});
// Either this lpush has run first or the one below.
// In any case it must be that between 2 invocations of lpush (wrapped in multi)
// blpop will be triggerred and it will empty the list again. Hence, in any case
// lpush kKey1 here and below should return 1.
EXPECT_THAT(resp, ElementsAre(IntArg(1)));
key1_len1 = get<int64_t>(resp[0].u);
cl1 = GetDebugInfo("IO1").clock;
LOG(INFO) << "push1 ts: " << cl1;
});
auto p2_fb = pp_->at(2)->LaunchFiber([&] {
auto resp = Run({"multi"}); // We use multi to assign ts to lpush.
ASSERT_THAT(resp, RespEq("OK"));
Run({"lpush", kKey1, "B"});
Run({"lpush", kKey2, "C"});
resp = Run({"exec"});
EXPECT_THAT(resp, ElementsAre(IntArg(1), IntArg(1)));
key1_len2 = get<int64_t>(resp[0].u);
cl2 = GetDebugInfo("IO2").clock;
LOG(INFO) << "push2 ts: " << cl2;
});
p1_fb.join();
p2_fb.join();
pop_fb.join();
EXPECT_THAT(blpop_resp, ElementsAre(StrArg(kKey1), ArgType(RespExpr::STRING)));
if (cl2 < cl1) {
EXPECT_EQ(blpop_resp[1], "B");
} else {
EXPECT_EQ(blpop_resp[1], "A");
}
}
} // namespace dfly } // namespace dfly

View File

@ -24,6 +24,72 @@ std::atomic_uint64_t op_seq{1};
} // namespace } // namespace
struct Transaction::FindFirstProcessor {
public:
FindFirstProcessor(TxId notify, unsigned size)
: find_res_(size, OpStatus::KEY_NOTFOUND), notify_txid_(notify) {
}
void Find(Transaction* t);
OpResult<Transaction::FindFirstResult> Process(Transaction* t);
private:
OpStatus RunInShard(Transaction* t, EngineShard* shard);
// Holds Find results: (iterator to a found key, and its index in the passed arguments).
// See DbSlice::FindFirst for more details.
// spans all the shards for now.
std::vector<OpResult<std::pair<MainIterator, unsigned>>> find_res_;
TxId notify_txid_;
};
void Transaction::FindFirstProcessor::Find(Transaction* t) {
VLOG(2) << "FindFirst::Find " << t->DebugId();
t->Execute([this](auto* t, auto* s) { return RunInShard(t, s); }, false);
}
OpStatus Transaction::FindFirstProcessor::RunInShard(Transaction* t, EngineShard* shard) {
if (notify_txid_ == kuint64max || shard->committed_txid() == notify_txid_) {
// TODO: to add timestamp logic that provides consistency guarantees for blocking transactions.
auto args = t->ShardArgsInShard(shard->shard_id());
find_res_[shard->shard_id()] = shard->db_slice().FindFirst(t->db_index(), args);
}
return OpStatus::OK;
}
OpResult<Transaction::FindFirstResult> Transaction::FindFirstProcessor::Process(Transaction* t) {
uint32_t min_arg_indx = UINT32_MAX;
FindFirstResult result;
for (size_t sid = 0; sid < find_res_.size(); ++sid) {
const auto& fr = find_res_[sid];
auto status = fr.status();
if (status == OpStatus::KEY_NOTFOUND)
continue;
if (status == OpStatus::WRONG_TYPE) {
return status;
}
DCHECK(fr && IsValid(fr->first));
const auto& it_pos = fr.value();
size_t arg_indx = t->ReverseArgIndex(sid, it_pos.second);
if (arg_indx < min_arg_indx) {
min_arg_indx = arg_indx;
result.sid = sid;
result.find_res = it_pos.first;
}
}
if (result.sid == kInvalidSid) {
return OpStatus::KEY_NOTFOUND;
}
return result;
}
IntentLock::Mode Transaction::Mode() const { IntentLock::Mode Transaction::Mode() const {
return (trans_options_ & CO::READONLY) ? IntentLock::SHARED : IntentLock::EXCLUSIVE; return (trans_options_ & CO::READONLY) ? IntentLock::SHARED : IntentLock::EXCLUSIVE;
} }
@ -164,7 +230,7 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) {
} }
CHECK(next_arg == args_.end()); CHECK(next_arg == args_.end());
DVLOG(1) << "InitByArgs " << DebugId(); DVLOG(1) << "InitByArgs " << DebugId() << " " << args_.front();
if (unique_shard_cnt_ == 1) { if (unique_shard_cnt_ == 1) {
PerShardData* sd; PerShardData* sd;
@ -224,18 +290,25 @@ bool Transaction::RunInShard(EngineShard* shard) {
DCHECK(sd.local_mask & ARMED); DCHECK(sd.local_mask & ARMED);
sd.local_mask &= ~ARMED; sd.local_mask &= ~ARMED;
DCHECK_EQ(sd.local_mask & (SUSPENDED_Q | EXPIRED_Q), 0);
bool awaked_prerun = (sd.local_mask & AWAKED_Q) != 0;
// For multi we unlock transaction (i.e. its keys) in UnlockMulti() call. // For multi we unlock transaction (i.e. its keys) in UnlockMulti() call.
// Therefore we differentiate between concluding, which says that this specific // Therefore we differentiate between concluding, which says that this specific
// runnable concludes current operation, and should_release which tells // runnable concludes current operation, and should_release which tells
// whether we should unlock the keys. should_release is false for multi and // whether we should unlock the keys. should_release is false for multi and
// equal to concluding otherwise. // equal to concluding otherwise.
bool should_release = is_concluding_cb_ && !multi_; bool should_release = (coordinator_state_ & COORD_EXEC_CONCLUDING) && !multi_;
IntentLock::Mode mode = Mode();
// We make sure that we lock exactly once for each (multi-hop) transaction inside // We make sure that we lock exactly once for each (multi-hop) transaction inside
// multi-transactions. // multi-transactions.
if (multi_ && ((sd.local_mask & KEYLOCK_ACQUIRED) == 0)) { if (multi_ && ((sd.local_mask & KEYLOCK_ACQUIRED) == 0)) {
DCHECK(!awaked_prerun); // we should not have blocking transaction inside multi block.
sd.local_mask |= KEYLOCK_ACQUIRED; sd.local_mask |= KEYLOCK_ACQUIRED;
shard->db_slice().Acquire(Mode(), GetLockArgs(idx)); shard->db_slice().Acquire(mode, GetLockArgs(idx));
} }
DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED)); DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED));
@ -264,7 +337,10 @@ bool Transaction::RunInShard(EngineShard* shard) {
// If it's a final hop we should release the locks. // If it's a final hop we should release the locks.
if (should_release) { if (should_release) {
bool is_suspended = sd.local_mask & SUSPENDED_Q;
if (IsGlobal()) { if (IsGlobal()) {
DCHECK(!awaked_prerun && !is_suspended); // Global transactions can not be blocking.
shard->shard_lock()->Release(Mode()); shard->shard_lock()->Release(Mode());
} else { // not global. } else { // not global.
KeyLockArgs largs = GetLockArgs(idx); KeyLockArgs largs = GetLockArgs(idx);
@ -272,9 +348,16 @@ bool Transaction::RunInShard(EngineShard* shard) {
// If a transaction has been suspended, we keep the lock so that future transaction // If a transaction has been suspended, we keep the lock so that future transaction
// touching those keys will be ordered via TxQueue. It's necessary because we preserve // touching those keys will be ordered via TxQueue. It's necessary because we preserve
// the atomicity of awaked transactions by halting the TxQueue. // the atomicity of awaked transactions by halting the TxQueue.
shard->db_slice().Release(Mode(), largs); if (!is_suspended) {
sd.local_mask &= ~KEYLOCK_ACQUIRED; shard->db_slice().Release(mode, largs);
sd.local_mask &= ~KEYLOCK_ACQUIRED;
}
sd.local_mask &= ~OUT_OF_ORDER; sd.local_mask &= ~OUT_OF_ORDER;
// It has 2 responsibilities.
// 1: to go over potential wakened keys, verify them and activate watch queues.
// 2: if this transaction was notified and finished running - to remove it from the head
// of the queue and notify the next one.
shard->ProcessAwakened(awaked_prerun ? this : nullptr);
} }
} }
@ -284,11 +367,43 @@ bool Transaction::RunInShard(EngineShard* shard) {
return !should_release; // keep return !should_release; // keep
} }
void Transaction::ScheduleInternal(bool single_hop) { void Transaction::RunNoop(EngineShard* shard) {
DVLOG(1) << "RunNoop " << DebugId();
unsigned idx = SidToId(shard->shard_id());
auto& sd = shard_data_[idx];
DCHECK(sd.local_mask & ARMED);
DCHECK(sd.local_mask & KEYLOCK_ACQUIRED);
DCHECK(!multi_);
DCHECK(!IsGlobal());
sd.local_mask &= ~ARMED;
if (unique_shard_cnt_ == 1) {
cb_ = nullptr;
local_result_ = OpStatus::OK;
}
if (coordinator_state_ & COORD_EXEC_CONCLUDING) {
KeyLockArgs largs = GetLockArgs(idx);
shard->db_slice().Release(Mode(), largs);
sd.local_mask &= ~KEYLOCK_ACQUIRED;
if (sd.local_mask & SUSPENDED_Q) {
sd.local_mask |= EXPIRED_Q;
shard->GCWatched(largs);
}
}
// Decrease run count after we update all the data in the transaction object.
CHECK_GE(DecreaseRunCnt(), 1u);
}
void Transaction::ScheduleInternal() {
DCHECK_EQ(0u, txid_); DCHECK_EQ(0u, txid_);
DCHECK_EQ(0, coordinator_state_ & (COORD_SCHED | COORD_OOO));
bool span_all = IsGlobal(); bool span_all = IsGlobal();
bool out_of_order = false; bool single_hop = (coordinator_state_ & COORD_EXEC_CONCLUDING);
uint32_t num_shards; uint32_t num_shards;
std::function<bool(uint32_t)> is_active; std::function<bool(uint32_t)> is_active;
@ -297,6 +412,7 @@ void Transaction::ScheduleInternal(bool single_hop) {
if (span_all) { if (span_all) {
is_active = [](uint32_t) { return true; }; is_active = [](uint32_t) { return true; };
num_shards = ess_->size(); num_shards = ess_->size();
// Lock shards // Lock shards
auto cb = [mode](EngineShard* shard) { shard->shard_lock()->Acquire(mode); }; auto cb = [mode](EngineShard* shard) { shard->shard_lock()->Acquire(mode); };
ess_->RunBriefInParallel(std::move(cb)); ess_->RunBriefInParallel(std::move(cb));
@ -331,10 +447,11 @@ void Transaction::ScheduleInternal(bool single_hop) {
// OOO can not happen with span-all transactions. We ensure it in ScheduleInShard when we // OOO can not happen with span-all transactions. We ensure it in ScheduleInShard when we
// refuse to acquire locks for these transactions.. // refuse to acquire locks for these transactions..
DCHECK(!span_all); DCHECK(!span_all);
out_of_order = true; coordinator_state_ |= COORD_OOO;
} }
DVLOG(1) << "Scheduled " << DebugId() << " OutOfOrder: " << out_of_order; DVLOG(1) << "Scheduled " << DebugId()
<< " OutOfOrder: " << bool(coordinator_state_ & COORD_OOO);
coordinator_state_ |= COORD_SCHED;
break; break;
} }
@ -348,7 +465,7 @@ void Transaction::ScheduleInternal(bool single_hop) {
CHECK_EQ(0u, success.load(memory_order_relaxed)); CHECK_EQ(0u, success.load(memory_order_relaxed));
} }
if (out_of_order) { if (IsOOO()) {
for (auto& sd : shard_data_) { for (auto& sd : shard_data_) {
sd.local_mask |= OUT_OF_ORDER; sd.local_mask |= OUT_OF_ORDER;
} }
@ -363,6 +480,9 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
cb_ = std::move(cb); cb_ = std::move(cb);
// single hop -> concluding.
coordinator_state_ |= (COORD_EXEC | COORD_EXEC_CONCLUDING);
bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !multi_; bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !multi_;
if (schedule_fast) { // Single shard (local) optimization. if (schedule_fast) { // Single shard (local) optimization.
// We never resize shard_data because that would affect MULTI transaction correctness. // We never resize shard_data because that would affect MULTI transaction correctness.
@ -395,8 +515,8 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
} else { } else {
// Transaction spans multiple shards or it's global (like flushdb) or multi. // Transaction spans multiple shards or it's global (like flushdb) or multi.
if (!multi_) if (!multi_)
ScheduleInternal(true); ScheduleInternal();
ExecuteAsync(true); ExecuteAsync();
} }
DVLOG(1) << "ScheduleSingleHop before Wait " << DebugId() << " " << run_count_.load(); DVLOG(1) << "ScheduleSingleHop before Wait " << DebugId() << " " << run_count_.load();
@ -454,6 +574,9 @@ void Transaction::UnlockMulti() {
} }
shard->ShutdownMulti(this); shard->ShutdownMulti(this);
// notify awakened transactions.
shard->ProcessAwakened(nullptr);
shard->PollExecution(nullptr); shard->PollExecution(nullptr);
this->DecreaseRunCnt(); this->DecreaseRunCnt();
@ -474,8 +597,15 @@ void Transaction::UnlockMulti() {
// Runs in coordinator thread. // Runs in coordinator thread.
void Transaction::Execute(RunnableType cb, bool conclude) { void Transaction::Execute(RunnableType cb, bool conclude) {
cb_ = std::move(cb); cb_ = std::move(cb);
coordinator_state_ |= COORD_EXEC;
ExecuteAsync(conclude); if (conclude) {
coordinator_state_ |= COORD_EXEC_CONCLUDING;
} else {
coordinator_state_ &= ~COORD_EXEC_CONCLUDING;
}
ExecuteAsync();
DVLOG(1) << "Wait on Exec " << DebugId(); DVLOG(1) << "Wait on Exec " << DebugId();
WaitForShardCallbacks(); WaitForShardCallbacks();
@ -485,10 +615,8 @@ void Transaction::Execute(RunnableType cb, bool conclude) {
} }
// Runs in coordinator thread. // Runs in coordinator thread.
void Transaction::ExecuteAsync(bool concluding_cb) { void Transaction::ExecuteAsync() {
DVLOG(1) << "ExecuteAsync " << DebugId() << " concluding " << concluding_cb; DVLOG(1) << "ExecuteAsync " << DebugId();
is_concluding_cb_ = concluding_cb;
DCHECK_GT(unique_shard_cnt_, 0u); DCHECK_GT(unique_shard_cnt_, 0u);
DCHECK_GT(use_count_.load(memory_order_relaxed), 0u); DCHECK_GT(use_count_.load(memory_order_relaxed), 0u);
@ -584,6 +712,52 @@ void Transaction::RunQuickie(EngineShard* shard) {
cb_ = nullptr; // We can do it because only a single shard runs the callback. cb_ = nullptr; // We can do it because only a single shard runs the callback.
} }
// runs in coordinator thread.
// Marks the transaction as expired but does not remove it from the waiting queue.
void Transaction::ExpireBlocking() {
DVLOG(1) << "ExpireBlocking " << DebugId();
DCHECK(!IsGlobal());
run_count_.store(unique_shard_cnt_, memory_order_release);
auto expire_cb = [this] {
EngineShard* shard = EngineShard::tlocal();
auto lock_args = GetLockArgs(shard->shard_id());
shard->db_slice().Release(Mode(), lock_args);
unsigned sd_idx = SidToId(shard->shard_id());
auto& sd = shard_data_[sd_idx];
sd.local_mask |= EXPIRED_Q;
sd.local_mask &= ~KEYLOCK_ACQUIRED;
// Need to see why I decided to call this.
// My guess - probably to trigger the run of stalled transactions in case
// this shard concurrently awoke this transaction and stalled the processing
// of TxQueue.
shard->PollExecution(nullptr);
CHECK_GE(DecreaseRunCnt(), 1u);
};
if (unique_shard_cnt_ == 1) {
DCHECK_LT(unique_shard_id_, ess_->size());
ess_->Add(unique_shard_id_, std::move(expire_cb));
} else {
for (ShardId i = 0; i < shard_data_.size(); ++i) {
auto& sd = shard_data_[i];
DCHECK_EQ(0, sd.local_mask & ARMED);
if (sd.arg_count == 0)
continue;
ess_->Add(i, expire_cb);
}
}
// Wait for all callbacks to conclude.
WaitForShardCallbacks();
DVLOG(1) << "ExpireBlocking finished " << DebugId();
}
const char* Transaction::Name() const { const char* Transaction::Name() const {
return cid_->name(); return cid_->name();
} }
@ -744,6 +918,123 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const {
return reverse_index_[shard_data_[shard_id].arg_start + arg_index]; return reverse_index_[shard_data_[shard_id].arg_start + arg_index];
} }
bool Transaction::WaitOnWatch(const time_point& tp) {
// Assumes that transaction is pending and scheduled. TODO: To verify it with state machine.
VLOG(2) << "WaitOnWatch Start use_count(" << use_count() << ")";
using namespace chrono;
// wake_txid_.store(kuint64max, std::memory_order_relaxed);
Execute([](auto* t, auto* shard) { return t->AddToWatchedShardCb(shard); }, true);
coordinator_state_ |= COORD_BLOCKED;
bool res = true; // returns false if timeout occurs.
auto wake_cb = [this] {
return (coordinator_state_ & COORD_CANCELLED) ||
notify_txid_.load(memory_order_relaxed) != kuint64max;
};
cv_status status = cv_status::no_timeout;
if (tp == time_point::max()) {
DVLOG(1) << "WaitOnWatch foreva " << DebugId();
blocking_ec_.await(move(wake_cb));
DVLOG(1) << "WaitOnWatch AfterWait";
} else {
DVLOG(1) << "WaitOnWatch TimeWait for "
<< duration_cast<milliseconds>(tp - time_point::clock::now()).count() << " ms";
status = blocking_ec_.await_until(move(wake_cb), tp);
DVLOG(1) << "WaitOnWatch await_until " << int(status);
}
if ((coordinator_state_ & COORD_CANCELLED) || status == cv_status::timeout) {
ExpireBlocking();
coordinator_state_ &= ~COORD_BLOCKED;
return false;
}
// We were notified by a shard, so lets make sure that our notifications converged to a stable
// form.
if (unique_shard_cnt_ > 1) {
run_count_.store(unique_shard_cnt_, memory_order_release);
auto converge_cb = [this] {
EngineShard* shard = EngineShard::tlocal();
auto& sd = shard_data_[shard->shard_id()];
TxId notify = notify_txid();
if ((sd.local_mask & AWAKED_Q) || shard->HasResultConverged(notify)) {
CHECK_GE(DecreaseRunCnt(), 1u);
return;
}
shard->WaitForConvergence(notify, this);
};
for (ShardId i = 0; i < shard_data_.size(); ++i) {
auto& sd = shard_data_[i];
DCHECK_EQ(0, sd.local_mask & ARMED);
if (sd.arg_count == 0)
continue;
ess_->Add(i, converge_cb);
}
// Wait for all callbacks to conclude.
WaitForShardCallbacks();
DVLOG(1) << "Convergence finished " << DebugId();
}
// Lift blocking mask.
coordinator_state_ &= ~COORD_BLOCKED;
return res;
}
void Transaction::UnregisterWatch() {
auto cb = [](Transaction* t, EngineShard* shard) {
t->RemoveFromWatchedShardCb(shard);
return OpStatus::OK;
};
Execute(std::move(cb), true);
}
// Runs only in the shard thread.
OpStatus Transaction::AddToWatchedShardCb(EngineShard* shard) {
ShardId sid = SidToId(shard->shard_id());
auto& sd = shard_data_[sid];
CHECK_EQ(0, sd.local_mask & SUSPENDED_Q);
DCHECK_EQ(0, sd.local_mask & ARMED);
auto args = ShardArgsInShard(shard->shard_id());
for (auto s : args) {
shard->AddWatched(s, this);
}
sd.local_mask |= SUSPENDED_Q;
return OpStatus::OK;
}
// Runs only in the shard thread.
// Quadratic complexity in number of arguments and queue length.
bool Transaction::RemoveFromWatchedShardCb(EngineShard* shard) {
ShardId sid = SidToId(shard->shard_id());
auto& sd = shard_data_[sid];
constexpr uint16_t kQueueMask =
-Transaction::SUSPENDED_Q | Transaction::AWAKED_Q | Transaction::EXPIRED_Q;
if ((sd.local_mask & kQueueMask) == 0)
return false;
sd.local_mask &= kQueueMask;
// TODO: what if args have keys and values?
auto args = ShardArgsInShard(shard->shard_id());
for (auto s : args) {
shard->RemovedWatched(s, this);
}
return true;
}
inline uint32_t Transaction::DecreaseRunCnt() { inline uint32_t Transaction::DecreaseRunCnt() {
// to protect against cases where Transaction is destroyed before run_ec_.notify // to protect against cases where Transaction is destroyed before run_ec_.notify
// finishes running. We can not put it inside the (res == 1) block because then it's too late. // finishes running. We can not put it inside the (res == 1) block because then it's too late.
@ -751,7 +1042,6 @@ inline uint32_t Transaction::DecreaseRunCnt() {
// We use release so that no stores will be reordered after. // We use release so that no stores will be reordered after.
uint32_t res = run_count_.fetch_sub(1, std::memory_order_release); uint32_t res = run_count_.fetch_sub(1, std::memory_order_release);
if (res == 1) { if (res == 1) {
run_ec_.notify(); run_ec_.notify();
} }
@ -762,4 +1052,53 @@ bool Transaction::IsGlobal() const {
return (trans_options_ & CO::GLOBAL_TRANS) != 0; return (trans_options_ & CO::GLOBAL_TRANS) != 0;
} }
// Runs only in the shard thread.
bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) {
unsigned sd_id = SidToId(sid);
auto& sd = shard_data_[sd_id];
unsigned local_mask = sd.local_mask;
CHECK_NE(0u, local_mask & SUSPENDED_Q);
DVLOG(1) << "NotifyBlocked " << DebugId() << ", local_mask: " << local_mask;
if (local_mask & Transaction::EXPIRED_Q) {
return false;
}
if (local_mask & SUSPENDED_Q) {
DCHECK_EQ(0u, local_mask & AWAKED_Q);
sd.local_mask &= ~SUSPENDED_Q;
sd.local_mask |= AWAKED_Q;
TxId notify_id = notify_txid_.load(memory_order_relaxed);
while (committed_txid < notify_id) {
if (notify_txid_.compare_exchange_weak(notify_id, committed_txid, memory_order_relaxed)) {
// if we improved notify_txid_ - break.
blocking_ec_.notify(); // release barrier.
break;
}
}
return true;
}
CHECK(sd.local_mask & AWAKED_Q);
return true;
}
void Transaction::BreakOnClose() {
if (coordinator_state_ & COORD_BLOCKED) {
coordinator_state_ |= COORD_CANCELLED;
blocking_ec_.notify();
}
}
auto Transaction::FindFirst() -> OpResult<FindFirstResult> {
FindFirstProcessor processor(notify_txid_.load(memory_order_relaxed), ess_->size());
processor.Find(this);
return processor.Process(this);
}
} // namespace dfly } // namespace dfly

View File

@ -51,6 +51,9 @@ class Transaction {
ARMED = 1, // Transaction was armed with the callback ARMED = 1, // Transaction was armed with the callback
OUT_OF_ORDER = 2, OUT_OF_ORDER = 2,
KEYLOCK_ACQUIRED = 4, KEYLOCK_ACQUIRED = 4,
SUSPENDED_Q = 0x10, // added by the coordination flow (via WaitBlocked()).
AWAKED_Q = 0x20, // awaked by condition (lpush etc)
EXPIRED_Q = 0x40, // timed-out and should be garbage collected from the blocking queue.
}; };
Transaction(const CommandId* cid, EngineShardSet* ess); Transaction(const CommandId* cid, EngineShardSet* ess);
@ -96,7 +99,7 @@ class Transaction {
// Schedules a transaction. Usually used for multi-hop transactions like Rename or BLPOP. // Schedules a transaction. Usually used for multi-hop transactions like Rename or BLPOP.
// For single hop, use ScheduleSingleHop instead. // For single hop, use ScheduleSingleHop instead.
void Schedule() { void Schedule() {
ScheduleInternal(false); ScheduleInternal();
} }
// if conclude is true, removes the transaction from the pending queue. // if conclude is true, removes the transaction from the pending queue.
@ -138,6 +141,10 @@ class Transaction {
return unique_shard_cnt_; return unique_shard_cnt_;
} }
TxId notify_txid() const {
return notify_txid_.load(std::memory_order_relaxed);
}
bool IsMulti() const { bool IsMulti() const {
return bool(multi_); return bool(multi_);
} }
@ -145,25 +152,58 @@ class Transaction {
bool IsGlobal() const; bool IsGlobal() const;
bool IsOOO() const { bool IsOOO() const {
return false; return coordinator_state_ & COORD_OOO;
} }
EngineShardSet* shard_set() { EngineShardSet* shard_set() {
return ess_; return ess_;
} }
// Registers transaction into watched queue and blocks until a) either notification is received.
// or b) tp is reached. If tp is time_point::max() then waits indefinitely.
// Expects that the transaction had been scheduled before, and uses Execute(.., true) to register.
// Returns false if timeout ocurred, true if was notified by one of the keys.
bool WaitOnWatch(const time_point& tp);
void UnregisterWatch();
// Returns true if transaction is awaked, false if it's timed-out and can be removed from the
// blocking queue. NotifySuspended may be called from (multiple) shard threads and
// with each call potentially improving the minimal wake_txid at which
// this transaction has been awaked.
bool NotifySuspended(TxId committed_ts, ShardId sid);
void BreakOnClose();
// Called by EngineShard when performing Execute over the tx queue. // Called by EngineShard when performing Execute over the tx queue.
// Returns true if transaction should be kept in the queue. // Returns true if transaction should be kept in the queue.
bool RunInShard(EngineShard* shard); bool RunInShard(EngineShard* shard);
void RunNoop(EngineShard* shard);
//! Returns locking arguments needed for DbSlice to Acquire/Release transactional locks.
//! Runs in the shard thread.
KeyLockArgs GetLockArgs(ShardId sid) const;
// TODO: iterators do not survive between hops.
// It could happen that FindFirst returns a result but then a different transaction
// grows the table and invalidates find_res. We should return a key, unfortunately,
// and not the iterator.
struct FindFirstResult {
MainIterator find_res;
ShardId sid = kInvalidSid;
};
OpResult<FindFirstResult> FindFirst();
private: private:
unsigned SidToId(ShardId sid) const { unsigned SidToId(ShardId sid) const {
return sid < shard_data_.size() ? sid : 0; return sid < shard_data_.size() ? sid : 0;
} }
void ScheduleInternal(bool single_hop); void ScheduleInternal();
void ExecuteAsync(bool concluding_cb); void ExpireBlocking();
void ExecuteAsync();
// Optimized version of RunInShard for single shard uncontended cases. // Optimized version of RunInShard for single shard uncontended cases.
void RunQuickie(EngineShard* shard); void RunQuickie(EngineShard* shard);
@ -180,9 +220,9 @@ class Transaction {
// Returns true if operation was cancelled for this shard. Runs in the shard thread. // Returns true if operation was cancelled for this shard. Runs in the shard thread.
bool CancelInShard(EngineShard* shard); bool CancelInShard(EngineShard* shard);
//! Returns locking arguments needed for DbSlice to Acquire/Release transactional locks. // Shard callbacks used within Execute calls
//! Runs in the shard thread. OpStatus AddToWatchedShardCb(EngineShard* shard);
KeyLockArgs GetLockArgs(ShardId sid) const; bool RemoveFromWatchedShardCb(EngineShard* shard);
void WaitForShardCallbacks() { void WaitForShardCallbacks() {
run_ec_.await([this] { return 0 == run_count_.load(std::memory_order_relaxed); }); run_ec_.await([this] { return 0 == run_count_.load(std::memory_order_relaxed); });
@ -199,6 +239,8 @@ class Transaction {
return use_count_.load(std::memory_order_relaxed); return use_count_.load(std::memory_order_relaxed);
} }
struct FindFirstProcessor;
struct PerShardData { struct PerShardData {
uint32_t arg_start = 0; // Indices into args_ array. uint32_t arg_start = 0; // Indices into args_ array.
uint16_t arg_count = 0; uint16_t arg_count = 0;
@ -249,6 +291,7 @@ class Transaction {
const CommandId* cid_; const CommandId* cid_;
EngineShardSet* ess_; EngineShardSet* ess_;
TxId txid_{0}; TxId txid_{0};
std::atomic<TxId> notify_txid_{kuint64max};
std::atomic_uint32_t use_count_{0}, run_count_{0}, seqlock_{0}; std::atomic_uint32_t use_count_{0}, run_count_{0}, seqlock_{0};
@ -258,16 +301,26 @@ class Transaction {
uint32_t trans_options_ = 0; uint32_t trans_options_ = 0;
ShardId unique_shard_id_{kInvalidSid}; ShardId unique_shard_id_{kInvalidSid};
DbIndex db_index_ = 0; DbIndex db_index_ = 0;
// For single-hop transactions with unique_shards_ == 1, hence no data-race. // Used for single-hop transactions with unique_shards_ == 1, hence no data-race.
OpStatus local_result_ = OpStatus::OK; OpStatus local_result_ = OpStatus::OK;
// NOTE: to move to bitmask if it grows. enum CoordinatorState : uint8_t {
// Written by coordinator thread, read by shard threads but not concurrently. COORD_SCHED = 1,
// Says whether the current callback function is concluding for this operation. COORD_EXEC = 2,
bool is_concluding_cb_{true}; // We are running the last execution step in multi-hop operation.
COORD_EXEC_CONCLUDING = 4,
COORD_BLOCKED = 8,
COORD_CANCELLED = 0x10,
COORD_OOO = 0x20,
};
// Transaction coordinator state, written and read by coordinator thread.
// Can be read by shard threads as long as we respect ordering rules, i.e. when
// they read this variable the coordinator thread is stalled and can not cause data races.
// If COORDINATOR_XXX has been set, it means we passed or crossed stage XXX.
uint8_t coordinator_state_ = 0;
struct PerShardCache { struct PerShardCache {
std::vector<std::string_view> args; std::vector<std::string_view> args;