diff --git a/helio b/helio index bfd4e2e..f7c9d00 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit bfd4e2e15b16c187b907c50a45465abdab23edf9 +Subproject commit f7c9d00016ea48866632583918b042b744f261b9 diff --git a/server/debugcmd.cc b/server/debugcmd.cc index a0dc990..4cf6984 100644 --- a/server/debugcmd.cc +++ b/server/debugcmd.cc @@ -5,15 +5,20 @@ #include +#include + #include "base/logging.h" #include "server/engine_shard_set.h" #include "server/error.h" #include "server/string_family.h" +#include "util/uring/uring_fiber_algo.h" namespace dfly { -using namespace boost; using namespace std; +using namespace util; +namespace this_fiber = ::boost::this_fiber; +using boost::fibers::fiber; struct PopulateBatch { DbIndex dbid; @@ -24,8 +29,8 @@ struct PopulateBatch { } }; -void DoPopulateBatch(std::string_view prefix, size_t val_size, - const SetCmd::SetParams& params, const PopulateBatch& ps) { +void DoPopulateBatch(std::string_view prefix, size_t val_size, const SetCmd::SetParams& params, + const PopulateBatch& ps) { SetCmd sg(&EngineShard::tlocal()->db_slice()); for (unsigned i = 0; i < ps.sz; ++i) { @@ -99,41 +104,11 @@ void DebugCmd::Populate(CmdArgList args) { } ranges.emplace_back(from, total_count - from); - auto distribute_cb = [this, val_size, prefix](uint64_t from, uint64_t len) { - string key = absl::StrCat(prefix, ":"); - size_t prefsize = key.size(); - DbIndex db_indx = 0; // TODO - std::vector ps(ess_->size(), PopulateBatch{db_indx}); - SetCmd::SetParams params{db_indx}; - - for (uint64_t i = from; i < from + len; ++i) { - absl::StrAppend(&key, i); - ShardId sid = Shard(key, ess_->size()); - key.resize(prefsize); - - auto& pops = ps[sid]; - pops.index[pops.sz++] = i; - if (pops.sz == 32) { - ess_->Add(sid, [=, p = pops] { - DoPopulateBatch(prefix, val_size, params, p); - if (i % 100 == 0) { - this_fiber::yield(); - } - }); - - // we capture pops by value so we can override it here. - pops.sz = 0; - } - } - - ess_->RunBriefInParallel( - [&](EngineShard* shard) { - DoPopulateBatch(prefix, val_size, params, ps[shard->shard_id()]); - }); - }; - vector fb_arr(ranges.size()); + vector fb_arr(ranges.size()); for (size_t i = 0; i < ranges.size(); ++i) { - fb_arr[i] = ess_->pool()->at(i)->LaunchFiber(distribute_cb, ranges[i].first, ranges[i].second); + fb_arr[i] = ess_->pool()->at(i)->LaunchFiber([&] { + this->PopulateRangeFiber(ranges[i].first, ranges[i].second, prefix, val_size); + }); } for (auto& fb : fb_arr) fb.join(); @@ -141,4 +116,39 @@ void DebugCmd::Populate(CmdArgList args) { cntx_->SendOk(); } +void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view prefix, + unsigned value_len) { + this_fiber::properties().set_name("populate_range"); + + string key = absl::StrCat(prefix, ":"); + size_t prefsize = key.size(); + DbIndex db_indx = 0; // TODO + std::vector ps(ess_->size(), PopulateBatch{db_indx}); + SetCmd::SetParams params{db_indx}; + + for (uint64_t i = from; i < from + len; ++i) { + absl::StrAppend(&key, i); + ShardId sid = Shard(key, ess_->size()); + key.resize(prefsize); + + auto& pops = ps[sid]; + pops.index[pops.sz++] = i; + if (pops.sz == 32) { + ess_->Add(sid, [=, p = pops] { + DoPopulateBatch(prefix, value_len, params, p); + if (i % 50 == 0) { + this_fiber::yield(); + } + }); + + // we capture pops by value so we can override it here. + pops.sz = 0; + } + } + + ess_->RunBriefInParallel([&](EngineShard* shard) { + DoPopulateBatch(prefix, value_len, params, ps[shard->shard_id()]); + }); +} + } // namespace dfly diff --git a/server/debugcmd.h b/server/debugcmd.h index 5054ba4..68b33f0 100644 --- a/server/debugcmd.h +++ b/server/debugcmd.h @@ -18,6 +18,7 @@ class DebugCmd { private: void Populate(CmdArgList args); + void PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view prefix, unsigned value_len); EngineShardSet* ess_; ConnectionContext* cntx_;