2021-11-28 15:29:26 +08:00
|
|
|
// Copyright 2021, Roman Gershman. All rights reserved.
|
|
|
|
// See LICENSE for licensing terms.
|
2021-11-17 18:51:47 +08:00
|
|
|
//
|
|
|
|
|
|
|
|
#pragma once
|
|
|
|
|
2021-12-20 17:42:55 +08:00
|
|
|
|
|
|
|
#include <xxhash.h>
|
|
|
|
|
2021-11-17 18:51:47 +08:00
|
|
|
#include "server/db_slice.h"
|
|
|
|
#include "util/fibers/fibers_ext.h"
|
|
|
|
#include "util/fibers/fiberqueue_threadpool.h"
|
|
|
|
#include "util/proactor_pool.h"
|
|
|
|
|
|
|
|
namespace dfly {
|
|
|
|
|
|
|
|
class EngineShard {
|
|
|
|
public:
|
|
|
|
|
|
|
|
//EngineShard() is private down below.
|
|
|
|
~EngineShard();
|
|
|
|
|
2021-12-20 17:42:55 +08:00
|
|
|
static void InitThreadLocal(util::ProactorBase* pb);
|
2021-11-17 18:51:47 +08:00
|
|
|
static void DestroyThreadLocal();
|
|
|
|
|
|
|
|
static EngineShard* tlocal() {
|
|
|
|
return shard_;
|
|
|
|
}
|
|
|
|
|
|
|
|
ShardId shard_id() const {
|
2021-12-20 17:42:55 +08:00
|
|
|
return db_slice_.shard_id();
|
|
|
|
}
|
|
|
|
|
|
|
|
DbSlice& db_slice() {
|
|
|
|
return db_slice_;
|
|
|
|
}
|
|
|
|
|
|
|
|
const DbSlice& db_slice() const {
|
|
|
|
return db_slice_;
|
2021-11-17 18:51:47 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
::util::fibers_ext::FiberQueue* GetQueue() {
|
|
|
|
return &queue_;
|
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
2021-12-20 17:42:55 +08:00
|
|
|
EngineShard(util::ProactorBase* pb);
|
2021-11-17 18:51:47 +08:00
|
|
|
|
|
|
|
::util::fibers_ext::FiberQueue queue_;
|
|
|
|
::boost::fibers::fiber fiber_q_;
|
|
|
|
|
2021-12-20 17:42:55 +08:00
|
|
|
DbSlice db_slice_;
|
|
|
|
uint32_t periodic_task_ = 0;
|
|
|
|
|
2021-11-17 18:51:47 +08:00
|
|
|
static thread_local EngineShard* shard_;
|
|
|
|
};
|
|
|
|
|
|
|
|
class EngineShardSet {
|
|
|
|
public:
|
|
|
|
explicit EngineShardSet(util::ProactorPool* pp) : pp_(pp) {
|
|
|
|
}
|
|
|
|
|
|
|
|
uint32_t size() const {
|
|
|
|
return uint32_t(shard_queue_.size());
|
|
|
|
}
|
|
|
|
|
|
|
|
util::ProactorPool* pool() {
|
|
|
|
return pp_;
|
|
|
|
}
|
|
|
|
|
|
|
|
void Init(uint32_t size);
|
2021-12-20 17:42:55 +08:00
|
|
|
void InitThreadLocal(util::ProactorBase* pb);
|
2021-11-17 18:51:47 +08:00
|
|
|
|
|
|
|
template <typename F> auto Await(ShardId sid, F&& f) {
|
|
|
|
return shard_queue_[sid]->Await(std::forward<F>(f));
|
|
|
|
}
|
|
|
|
|
|
|
|
template <typename F> auto Add(ShardId sid, F&& f) {
|
|
|
|
assert(sid < shard_queue_.size());
|
|
|
|
return shard_queue_[sid]->Add(std::forward<F>(f));
|
|
|
|
}
|
|
|
|
|
|
|
|
template <typename U> void RunBriefInParallel(U&& func);
|
2021-12-20 17:42:55 +08:00
|
|
|
template <typename U> void RunBlockingInParallel(U&& func);
|
2021-11-17 18:51:47 +08:00
|
|
|
|
|
|
|
private:
|
|
|
|
util::ProactorPool* pp_;
|
|
|
|
std::vector<util::fibers_ext::FiberQueue*> shard_queue_;
|
|
|
|
};
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @brief
|
|
|
|
*
|
|
|
|
* @tparam U - a function that receives EngineShard* argument and returns void.
|
|
|
|
* @param func
|
|
|
|
*/
|
|
|
|
template <typename U> void EngineShardSet::RunBriefInParallel(U&& func) {
|
|
|
|
util::fibers_ext::BlockingCounter bc{size()};
|
|
|
|
|
|
|
|
for (uint32_t i = 0; i < size(); ++i) {
|
|
|
|
util::ProactorBase* dest = pp_->at(i);
|
|
|
|
dest->AsyncBrief([f = std::forward<U>(func), bc]() mutable {
|
|
|
|
f(EngineShard::tlocal());
|
|
|
|
bc.Dec();
|
|
|
|
});
|
|
|
|
}
|
|
|
|
bc.Wait();
|
|
|
|
}
|
|
|
|
|
2021-12-20 17:42:55 +08:00
|
|
|
template <typename U> void EngineShardSet::RunBlockingInParallel(U&& func) {
|
|
|
|
util::fibers_ext::BlockingCounter bc{size()};
|
|
|
|
|
|
|
|
for (uint32_t i = 0; i < size(); ++i) {
|
|
|
|
util::ProactorBase* dest = pp_->at(i);
|
|
|
|
dest->AsyncFiber([f = std::forward<U>(func), bc]() mutable {
|
|
|
|
f(EngineShard::tlocal());
|
|
|
|
bc.Dec();
|
|
|
|
});
|
|
|
|
}
|
|
|
|
bc.Wait();
|
|
|
|
}
|
|
|
|
|
|
|
|
template <typename View> inline ShardId Shard(const View& v, ShardId shard_num) {
|
|
|
|
XXH64_hash_t hash = XXH64(v.data(), v.size(), 120577240643ULL);
|
|
|
|
return hash % shard_num;
|
|
|
|
}
|
|
|
|
|
2021-11-17 18:51:47 +08:00
|
|
|
} // namespace dfly
|