Compare commits
3 Commits
Author | SHA1 | Date |
---|---|---|
Roman Gershman | 4fdcfa04cc | |
Roman Gershman | 1a9b8b0527 | |
Roman Gershman | 560d3f2440 |
|
@ -177,7 +177,8 @@ API 2.0
|
|||
- [ ] WATCH
|
||||
- [ ] UNWATCH
|
||||
- [X] DISCARD
|
||||
- [ ] CLIENT KILL/LIST/UNPAUSE/PAUSE/GETNAME/SETNAME/REPLY/TRACKINGINFO
|
||||
- [X] CLIENT LIST/SETNAME
|
||||
- [ ] CLIENT KILL/UNPAUSE/PAUSE/GETNAME/REPLY/TRACKINGINFO
|
||||
- [X] COMMAND
|
||||
- [X] COMMAND COUNT
|
||||
- [ ] COMMAND GETKEYS/INFO
|
||||
|
|
2
helio
2
helio
|
@ -1 +1 @@
|
|||
Subproject commit 0b73223565a12a096fa4262ccd66c2cd12209a11
|
||||
Subproject commit 75f2c88c625ac13692f1165cf1f93f8a7096d799
|
|
@ -50,6 +50,9 @@ class ConnectionContext {
|
|||
bool force_dispatch: 1; // whether we should route all requests to the dispatch fiber.
|
||||
|
||||
virtual void OnClose() {}
|
||||
|
||||
std::string GetContextInfo() const { return std::string{}; }
|
||||
|
||||
private:
|
||||
Connection* owner_;
|
||||
std::unique_ptr<SinkReplyBuilder> rbuilder_;
|
||||
|
|
|
@ -103,7 +103,6 @@ struct Connection::Shutdown {
|
|||
}
|
||||
};
|
||||
|
||||
|
||||
struct Connection::Request {
|
||||
absl::FixedArray<MutableSlice, 6> args;
|
||||
|
||||
|
@ -122,6 +121,8 @@ struct Connection::Request {
|
|||
Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx,
|
||||
ServiceInterface* service)
|
||||
: io_buf_(kMinReadSize), http_listener_(http_listener), ctx_(ctx), service_(service) {
|
||||
static atomic_uint32_t next_id{1};
|
||||
|
||||
protocol_ = protocol;
|
||||
|
||||
constexpr size_t kReqSz = sizeof(Connection::Request);
|
||||
|
@ -136,6 +137,12 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
|
|||
memcache_parser_.reset(new MemcacheParser);
|
||||
break;
|
||||
}
|
||||
|
||||
creation_time_ = time(nullptr);
|
||||
last_interaction_ = creation_time_;
|
||||
memset(name_, 0, sizeof(name_));
|
||||
memset(phase_, 0, sizeof(phase_));
|
||||
id_ = next_id.fetch_add(1, memory_order_relaxed);
|
||||
}
|
||||
|
||||
Connection::~Connection() {
|
||||
|
@ -259,6 +266,24 @@ void Connection::SendMsgVecAsync(absl::Span<const std::string_view> msg_vec,
|
|||
}
|
||||
}
|
||||
|
||||
string Connection::GetClientInfo() const {
|
||||
LinuxSocketBase* lsb = static_cast<LinuxSocketBase*>(socket_.get());
|
||||
|
||||
string res;
|
||||
auto le = lsb->LocalEndpoint();
|
||||
auto re = lsb->RemoteEndpoint();
|
||||
time_t now = time(nullptr);
|
||||
|
||||
absl::StrAppend(&res, "id=", id_, " addr=", re.address().to_string(), ":", re.port());
|
||||
absl::StrAppend(&res, " laddr=", le.address().to_string(), ":", le.port());
|
||||
absl::StrAppend(&res, " fd=", lsb->native_handle(), " name=", name_);
|
||||
absl::StrAppend(&res, " age=", now - creation_time_, " idle=", now - last_interaction_);
|
||||
absl::StrAppend(&res, " phase=", phase_, " ");
|
||||
absl::StrAppend(&res, cc_->GetContextInfo());
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
io::Result<bool> Connection::CheckForHttpProto(FiberSocketBase* peer) {
|
||||
size_t last_len = 0;
|
||||
do {
|
||||
|
@ -295,6 +320,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
|
|||
// At the start we read from the socket to determine the HTTP/Memstore protocol.
|
||||
// Therefore we may already have some data in the buffer.
|
||||
if (io_buf_.InputLen() > 0) {
|
||||
SetPhase("process");
|
||||
if (redis_parser_) {
|
||||
parse_status = ParseRedis();
|
||||
} else {
|
||||
|
@ -379,6 +405,7 @@ auto Connection::ParseRedis() -> ParserStatus {
|
|||
if (dispatch_q_.empty() && is_sync_dispatch && consumed >= io_buf_.InputLen()) {
|
||||
RespToArgList(args, &arg_vec);
|
||||
service_->DispatchCommand(CmdArgList{arg_vec.data(), arg_vec.size()}, cc_.get());
|
||||
last_interaction_ = time(nullptr);
|
||||
} else {
|
||||
// Dispatch via queue to speedup input reading.
|
||||
Request* req = FromArgs(std::move(args), tlh);
|
||||
|
@ -465,11 +492,45 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant<error_code, Pars
|
|||
error_code ec;
|
||||
ParserStatus parse_status = OK;
|
||||
|
||||
uring::Proactor* proactor = (uring::Proactor*)ProactorBase::me();
|
||||
auto* me = fibers::context::active();
|
||||
LinuxSocketBase* lsb = static_cast<LinuxSocketBase*>(socket_.get());
|
||||
|
||||
do {
|
||||
FetchBuilderStats(stats, builder);
|
||||
|
||||
io::MutableBytes append_buf = io_buf_.AppendBuffer();
|
||||
::io::Result<size_t> recv_sz = peer->Recv(append_buf);
|
||||
SetPhase("readsock");
|
||||
|
||||
uring::Proactor::IoResult io_res;
|
||||
uint32_t io_flags;
|
||||
|
||||
auto waker = [&](uring::Proactor::IoResult res, uint32_t flags, int64_t) {
|
||||
io_res = res;
|
||||
io_flags = flags;
|
||||
SetPhase("recvwake");
|
||||
fibers::context::active()->schedule(me);
|
||||
};
|
||||
uring::SubmitEntry se = proactor->GetSubmitEntry(waker, 0);
|
||||
msghdr msg;
|
||||
memset(&msg, 0, sizeof(msg));
|
||||
iovec iv{append_buf.data(), append_buf.size()};
|
||||
|
||||
msg.msg_iov = &iv;
|
||||
msg.msg_iovlen = 1;
|
||||
|
||||
se.PrepRecvMsg(lsb->native_handle(), &msg, 0);
|
||||
me->suspend();
|
||||
::io::Result<size_t> recv_sz;
|
||||
if (io_res > 0) {
|
||||
recv_sz = io_res;
|
||||
} else {
|
||||
error_code ec(-io_res, system_category());
|
||||
LOG(ERROR) << "socket error " << lsb->native_handle() << " " << ec;
|
||||
recv_sz = nonstd::make_unexpected(std::move(ec));
|
||||
}
|
||||
// ::io::Result<size_t> recv_sz = peer->Recv(append_buf);
|
||||
last_interaction_ = time(nullptr);
|
||||
|
||||
if (!recv_sz) {
|
||||
ec = recv_sz.error();
|
||||
|
@ -480,6 +541,7 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant<error_code, Pars
|
|||
io_buf_.CommitWrite(*recv_sz);
|
||||
stats->io_read_bytes += *recv_sz;
|
||||
++stats->io_read_cnt;
|
||||
SetPhase("process");
|
||||
|
||||
if (redis_parser_)
|
||||
parse_status = ParseRedis();
|
||||
|
@ -554,6 +616,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
|
|||
builder->SetBatchMode(!dispatch_q_.empty());
|
||||
cc_->async_dispatch = true;
|
||||
service_->DispatchCommand(CmdArgList{req->args.data(), req->args.size()}, cc_.get());
|
||||
last_interaction_ = time(nullptr);
|
||||
cc_->async_dispatch = false;
|
||||
}
|
||||
req->~Request();
|
||||
|
|
|
@ -52,6 +52,16 @@ class Connection : public util::Connection {
|
|||
void SendMsgVecAsync(absl::Span<const std::string_view> msg_vec,
|
||||
util::fibers_ext::BlockingCounter bc);
|
||||
|
||||
void SetName(std::string_view name) {
|
||||
CopyCharBuf(name, sizeof(name_), name_);
|
||||
}
|
||||
|
||||
void SetPhase(std::string_view phase) {
|
||||
CopyCharBuf(phase, sizeof(phase_), phase_);
|
||||
}
|
||||
|
||||
std::string GetClientInfo() const;
|
||||
|
||||
protected:
|
||||
void OnShutdown() override;
|
||||
|
||||
|
@ -60,6 +70,13 @@ class Connection : public util::Connection {
|
|||
|
||||
void HandleRequests() final;
|
||||
|
||||
static void CopyCharBuf(std::string_view src, unsigned dest_len, char* dest) {
|
||||
src = src.substr(0, dest_len - 1);
|
||||
if (!src.empty())
|
||||
memcpy(dest, src.data(), src.size());
|
||||
dest[src.size()] = '\0';
|
||||
}
|
||||
|
||||
//
|
||||
io::Result<bool> CheckForHttpProto(util::FiberSocketBase* peer);
|
||||
|
||||
|
@ -77,6 +94,9 @@ class Connection : public util::Connection {
|
|||
util::HttpListenerBase* http_listener_;
|
||||
SSL_CTX* ctx_;
|
||||
ServiceInterface* service_;
|
||||
time_t creation_time_, last_interaction_;
|
||||
char name_[16];
|
||||
char phase_[16];
|
||||
|
||||
std::unique_ptr<ConnectionContext> cc_;
|
||||
|
||||
|
@ -88,7 +108,10 @@ class Connection : public util::Connection {
|
|||
util::fibers_ext::EventCount evc_;
|
||||
|
||||
unsigned parser_error_ = 0;
|
||||
uint32_t id_;
|
||||
|
||||
Protocol protocol_;
|
||||
|
||||
struct Shutdown;
|
||||
std::unique_ptr<Shutdown> shutdown_;
|
||||
BreakerCb breaker_cb_;
|
||||
|
|
|
@ -38,11 +38,15 @@ bool RunEngine(ProactorPool* pool, AcceptServer* acceptor) {
|
|||
|
||||
Service service(pool);
|
||||
|
||||
service.Init(acceptor);
|
||||
Listener* main_listener = new Listener{Protocol::REDIS, &service};
|
||||
|
||||
Service::InitOpts opts;
|
||||
opts.disable_time_update = true;
|
||||
service.Init(acceptor, main_listener, opts);
|
||||
|
||||
const char* bind_addr = FLAGS_bind.empty() ? nullptr : FLAGS_bind.c_str();
|
||||
|
||||
error_code ec =
|
||||
acceptor->AddListener(bind_addr, FLAGS_port, new Listener{Protocol::REDIS, &service});
|
||||
error_code ec = acceptor->AddListener(bind_addr, FLAGS_port, main_listener);
|
||||
|
||||
LOG_IF(FATAL, ec) << "Cound not open port " << FLAGS_port << ", error: " << ec.message();
|
||||
|
||||
|
|
|
@ -43,6 +43,7 @@ DECLARE_string(requirepass);
|
|||
DEFINE_uint64(maxmemory, 0,
|
||||
"Limit on maximum-memory that is used by the database."
|
||||
"0 - means the program will automatically determine its maximum memory usage");
|
||||
DEFINE_uint32(thread_shards_num, 0, "");
|
||||
|
||||
namespace dfly {
|
||||
|
||||
|
@ -307,10 +308,14 @@ Service::Service(ProactorPool* pp) : pp_(*pp), shard_set_(pp), server_family_(th
|
|||
Service::~Service() {
|
||||
}
|
||||
|
||||
void Service::Init(util::AcceptServer* acceptor, const InitOpts& opts) {
|
||||
void Service::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_interface,
|
||||
const InitOpts& opts) {
|
||||
InitRedisTables();
|
||||
|
||||
uint32_t shard_num = pp_.size() > 1 ? pp_.size() - 1 : pp_.size();
|
||||
uint32_t shard_num = FLAGS_thread_shards_num;
|
||||
CHECK_LE(shard_num, pp_.size());
|
||||
if (shard_num == 0)
|
||||
shard_num = pp_.size() > 1 ? pp_.size() - 1 : pp_.size();
|
||||
shard_set_.Init(shard_num);
|
||||
|
||||
pp_.AwaitFiberOnAll([&](uint32_t index, ProactorBase* pb) {
|
||||
|
@ -324,7 +329,7 @@ void Service::Init(util::AcceptServer* acceptor, const InitOpts& opts) {
|
|||
request_latency_usec.Init(&pp_);
|
||||
StringFamily::Init(&pp_);
|
||||
GenericFamily::Init(&pp_);
|
||||
server_family_.Init(acceptor);
|
||||
server_family_.Init(acceptor, main_interface);
|
||||
cmd_req.Init(&pp_, {"type"});
|
||||
}
|
||||
|
||||
|
|
|
@ -33,7 +33,8 @@ class Service : public facade::ServiceInterface {
|
|||
explicit Service(util::ProactorPool* pp);
|
||||
~Service();
|
||||
|
||||
void Init(util::AcceptServer* acceptor, const InitOpts& opts = InitOpts{});
|
||||
void Init(util::AcceptServer* acceptor, util::ListenerInterface* main_interface,
|
||||
const InitOpts& opts = InitOpts{});
|
||||
|
||||
void Shutdown();
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
|
||||
#include <absl/cleanup/cleanup.h>
|
||||
#include <absl/random/random.h> // for master_id_ generation.
|
||||
#include <absl/strings/str_join.h>
|
||||
#include <absl/strings/match.h>
|
||||
#include <mimalloc-types.h>
|
||||
#include <sys/resource.h>
|
||||
|
@ -18,6 +19,7 @@ extern "C" {
|
|||
|
||||
#include "base/logging.h"
|
||||
#include "io/proc_reader.h"
|
||||
#include "facade/dragonfly_connection.h"
|
||||
#include "server/command_registry.h"
|
||||
#include "server/conn_context.h"
|
||||
#include "server/debugcmd.h"
|
||||
|
@ -74,6 +76,11 @@ error_code CreateDirs(fs::path dir_path) {
|
|||
return ec;
|
||||
}
|
||||
|
||||
string UnknowSubCmd(string_view subcmd, string cmd) {
|
||||
return absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd,
|
||||
"'. Try ", cmd, " HELP.");
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
ServerFamily::ServerFamily(Service* service) : service_(*service), ess_(service->shard_set()) {
|
||||
|
@ -85,9 +92,10 @@ ServerFamily::ServerFamily(Service* service) : service_(*service), ess_(service-
|
|||
ServerFamily::~ServerFamily() {
|
||||
}
|
||||
|
||||
void ServerFamily::Init(util::AcceptServer* acceptor) {
|
||||
void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_listener) {
|
||||
CHECK(acceptor_ == nullptr);
|
||||
acceptor_ = acceptor;
|
||||
main_listener_ = main_listener;
|
||||
|
||||
pb_task_ = ess_.pool()->GetNextProactor();
|
||||
auto cache_cb = [] {
|
||||
|
@ -336,12 +344,27 @@ void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) {
|
|||
ToUpper(&args[1]);
|
||||
string_view sub_cmd = ArgS(args, 1);
|
||||
|
||||
if (sub_cmd == "SETNAME") {
|
||||
if (sub_cmd == "SETNAME" && args.size() == 3) {
|
||||
cntx->owner()->SetName(ArgS(args, 2));
|
||||
return (*cntx)->SendOk();
|
||||
} else if (sub_cmd == "LIST") {
|
||||
vector<string> client_info;
|
||||
fibers::mutex mu;
|
||||
auto cb = [&](Connection* conn) {
|
||||
facade::Connection* dcon = static_cast<facade::Connection*>(conn);
|
||||
string info = dcon->GetClientInfo();
|
||||
lock_guard lk(mu);
|
||||
client_info.push_back(move(info));
|
||||
};
|
||||
|
||||
main_listener_->TraverseConnections(cb);
|
||||
string result = absl::StrJoin(move(client_info), "\n");
|
||||
result.append("\n");
|
||||
return (*cntx)->SendBulkString(result);
|
||||
}
|
||||
|
||||
LOG_FIRST_N(ERROR, 10) << "Subcommand " << sub_cmd << " not supported";
|
||||
(*cntx)->SendError(kSyntaxErr);
|
||||
return (*cntx)->SendError(UnknowSubCmd(sub_cmd, "CLIENT"), kSyntaxErr);
|
||||
}
|
||||
|
||||
void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
|
||||
|
@ -365,9 +388,7 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
|
|||
});
|
||||
return (*cntx)->SendOk();
|
||||
} else {
|
||||
string err = StrCat("Unknown subcommand or wrong number of arguments for '", sub_cmd,
|
||||
"'. Try CONFIG HELP.");
|
||||
return (*cntx)->SendError(err, kSyntaxErr);
|
||||
return (*cntx)->SendError(UnknowSubCmd(sub_cmd, "CONFIG"), kSyntaxErr);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
|
||||
namespace util {
|
||||
class AcceptServer;
|
||||
class ListenerInterface;
|
||||
} // namespace util
|
||||
|
||||
namespace dfly {
|
||||
|
@ -39,7 +40,7 @@ class ServerFamily {
|
|||
ServerFamily(Service* service);
|
||||
~ServerFamily();
|
||||
|
||||
void Init(util::AcceptServer* acceptor);
|
||||
void Init(util::AcceptServer* acceptor, util::ListenerInterface* main_listener);
|
||||
void Register(CommandRegistry* registry);
|
||||
void Shutdown();
|
||||
|
||||
|
@ -96,6 +97,7 @@ class ServerFamily {
|
|||
EngineShardSet& ess_;
|
||||
|
||||
util::AcceptServer* acceptor_ = nullptr;
|
||||
util::ListenerInterface* main_listener_ = nullptr;
|
||||
util::ProactorBase* pb_task_ = nullptr;
|
||||
|
||||
mutable ::boost::fibers::mutex replicaof_mu_, save_mu_;
|
||||
|
|
|
@ -34,6 +34,8 @@ DEFINE_VARZ(VarzQps, set_qps);
|
|||
DEFINE_VARZ(VarzQps, get_qps);
|
||||
|
||||
constexpr uint32_t kMaxStrLen = 1 << 28;
|
||||
constexpr uint32_t kMinTieredLen = TieredStorage::kMinBlobLen;
|
||||
|
||||
|
||||
string GetString(EngineShard* shard, const PrimeValue& pv) {
|
||||
string res;
|
||||
|
@ -160,7 +162,7 @@ OpResult<void> SetCmd::Set(const SetParams& params, std::string_view key, std::s
|
|||
EngineShard* shard = db_slice_.shard_owner();
|
||||
|
||||
if (shard->tiered_storage()) { // external storage enabled.
|
||||
if (value.size() >= 64) {
|
||||
if (value.size() >= kMinTieredLen) {
|
||||
shard->tiered_storage()->UnloadItem(params.db_index, it);
|
||||
}
|
||||
}
|
||||
|
@ -205,6 +207,16 @@ OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIt
|
|||
|
||||
// overwrite existing entry.
|
||||
prime_value.SetString(value);
|
||||
|
||||
|
||||
if (value.size() >= kMinTieredLen) { // external storage enabled.
|
||||
EngineShard* shard = db_slice_.shard_owner();
|
||||
|
||||
if (shard->tiered_storage()) {
|
||||
shard->tiered_storage()->UnloadItem(params.db_index, it);
|
||||
}
|
||||
}
|
||||
|
||||
db_slice_.PostUpdate(params.db_index, it);
|
||||
|
||||
return OpStatus::OK;
|
||||
|
@ -288,13 +300,23 @@ void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
std::string_view key = ArgS(args, 1);
|
||||
|
||||
uint64_t inside_usec = 0;
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpGet(OpArgs{shard, t->db_index()}, key);
|
||||
uint64_t start = absl::GetCurrentTimeNanos();
|
||||
auto res = OpGet(OpArgs{shard, t->db_index()}, key);
|
||||
inside_usec = (absl::GetCurrentTimeNanos() - start) / 1000;
|
||||
// LOG_IF(INFO, inside_usec > 3000) << "Get took " << usec << " us";
|
||||
|
||||
return res;
|
||||
};
|
||||
|
||||
DVLOG(1) << "Before Get::ScheduleSingleHopT " << key;
|
||||
Transaction* trans = cntx->transaction;
|
||||
auto* pb = util::ProactorBase::me();
|
||||
uint64_t start = pb->GetMonotonicTimeNs();
|
||||
OpResult<string> result = trans->ScheduleSingleHopT(std::move(cb));
|
||||
uint64_t usec = (pb->GetMonotonicTimeNs() - start) / 1000;
|
||||
// LOG_IF(INFO, usec > 6000) << "RGet took " << usec << " us, inside " << inside_usec;
|
||||
|
||||
if (result) {
|
||||
DVLOG(1) << "GET " << trans->DebugId() << ": " << key << " " << result.value();
|
||||
|
|
|
@ -6,12 +6,10 @@
|
|||
#include <absl/container/flat_hash_map.h>
|
||||
|
||||
#include "base/ring_buffer.h"
|
||||
|
||||
#include "core/external_alloc.h"
|
||||
#include "server/common.h"
|
||||
#include "server/io_mgr.h"
|
||||
#include "server/table.h"
|
||||
|
||||
#include "util/fibers/event_count.h"
|
||||
|
||||
namespace dfly {
|
||||
|
@ -20,6 +18,8 @@ class DbSlice;
|
|||
|
||||
class TieredStorage {
|
||||
public:
|
||||
enum : uint16_t { kMinBlobLen = 64 };
|
||||
|
||||
explicit TieredStorage(DbSlice* db_slice);
|
||||
~TieredStorage();
|
||||
|
||||
|
@ -76,16 +76,16 @@ class TieredStorage {
|
|||
// map of cursor -> pending size
|
||||
// absl::flat_hash_map<uint64_t, size_t> pending_upload;
|
||||
|
||||
|
||||
// multi_cnt_ - counts how many unloaded items exists in the batch at specified page offset.
|
||||
// here multi_cnt_.first is (file_offset in 4k pages) and
|
||||
// multi_cnt_.second is MultiBatch object storing number of allocated records in the batch
|
||||
// and its capacity (/ 4k).
|
||||
struct MultiBatch {
|
||||
uint16_t used; // number of used bytes
|
||||
uint16_t used; // number of used bytes
|
||||
uint16_t reserved; // in 4k pages.
|
||||
|
||||
MultiBatch(uint16_t mem_used) : used(mem_used) {}
|
||||
MultiBatch(uint16_t mem_used) : used(mem_used) {
|
||||
}
|
||||
};
|
||||
absl::flat_hash_map<uint32_t, MultiBatch> multi_cnt_;
|
||||
|
||||
|
|
|
@ -521,6 +521,8 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
|
|||
}
|
||||
|
||||
bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !multi_;
|
||||
uint64_t at_cb_run = 0;
|
||||
bool notified = false;
|
||||
if (schedule_fast) { // Single shard (local) optimization.
|
||||
// We never resize shard_data because that would affect MULTI transaction correctness.
|
||||
DCHECK_EQ(1u, shard_data_.size());
|
||||
|
@ -538,6 +540,8 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
|
|||
// call PollExecute that runs the callback which calls DecreaseRunCnt.
|
||||
// As a result WaitForShardCallbacks below is unblocked.
|
||||
auto schedule_cb = [&] {
|
||||
auto* pb = util::ProactorBase::me();
|
||||
at_cb_run = pb->GetMonotonicTimeNs();
|
||||
bool run_eager = ScheduleUniqueShard(EngineShard::tlocal());
|
||||
if (run_eager) {
|
||||
// it's important to DecreaseRunCnt only for run_eager and after run_eager was assigned.
|
||||
|
@ -548,7 +552,8 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
|
|||
}
|
||||
};
|
||||
|
||||
ess_->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier.
|
||||
notified = ess_->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier.
|
||||
// after_sched = pb->GetMonotonicTimeNs();
|
||||
} else {
|
||||
// Transaction spans multiple shards or it's global (like flushdb) or multi.
|
||||
if (!multi_)
|
||||
|
@ -557,7 +562,14 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
|
|||
}
|
||||
|
||||
DVLOG(1) << "ScheduleSingleHop before Wait " << DebugId() << " " << run_count_.load();
|
||||
auto* pb = util::ProactorBase::me();
|
||||
uint64_t start = pb->GetMonotonicTimeNs();
|
||||
WaitForShardCallbacks();
|
||||
uint64_t end = pb->GetMonotonicTimeNs();
|
||||
uint64_t usec = (end - start) / 1000;
|
||||
LOG_IF(INFO, usec > 8000) << "waitcbs took " << usec << " us, since notify "
|
||||
<< (end - notify_ts_) / 1000 << ", since run: " << (end - at_cb_run) / 1000
|
||||
<< " notified:" << notified;
|
||||
DVLOG(1) << "ScheduleSingleHop after Wait " << DebugId();
|
||||
|
||||
cb_ = nullptr;
|
||||
|
@ -1113,6 +1125,10 @@ inline uint32_t Transaction::DecreaseRunCnt() {
|
|||
// We use release so that no stores will be reordered after.
|
||||
uint32_t res = run_count_.fetch_sub(1, std::memory_order_release);
|
||||
if (res == 1) {
|
||||
auto* pb = util::ProactorBase::me();
|
||||
uint64_t start = pb->GetMonotonicTimeNs();
|
||||
|
||||
notify_ts_.store(start, memory_order_relaxed);
|
||||
run_ec_.notify();
|
||||
}
|
||||
return res;
|
||||
|
|
|
@ -268,6 +268,7 @@ class Transaction {
|
|||
|
||||
util::fibers_ext::EventCount blocking_ec_; // used to wake blocking transactions.
|
||||
util::fibers_ext::EventCount run_ec_;
|
||||
std::atomic_uint64_t notify_ts_{0};
|
||||
|
||||
// shard_data spans all the shards in ess_.
|
||||
// I wish we could use a dense array of size [0..uniq_shards] but since
|
||||
|
|
|
@ -32,7 +32,6 @@ def fill_hset(args, redis):
|
|||
|
||||
|
||||
def main():
|
||||
# Check processor architecture
|
||||
parser = argparse.ArgumentParser(description='fill hset entities')
|
||||
parser.add_argument(
|
||||
'-p', type=int, help='redis port', dest='port', default=6380)
|
||||
|
|
Loading…
Reference in New Issue