Add CONFIG RESETSTAT command. Start working on RPOPLPUSH
This commit is contained in:
parent
72e90bb729
commit
d3764efbca
|
@ -57,7 +57,7 @@ void FetchBuilderStats(ConnectionStats* stats, SinkReplyBuilder* builder) {
|
||||||
stats->io_write_bytes += builder->io_write_bytes();
|
stats->io_write_bytes += builder->io_write_bytes();
|
||||||
|
|
||||||
for (const auto& k_v : builder->err_count()) {
|
for (const auto& k_v : builder->err_count()) {
|
||||||
stats->err_count[k_v.first] += k_v.second;
|
stats->err_count_map[k_v.first] += k_v.second;
|
||||||
}
|
}
|
||||||
builder->reset_io_stats();
|
builder->reset_io_stats();
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,12 +36,12 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
|
||||||
ADD(num_replicas);
|
ADD(num_replicas);
|
||||||
ADD(num_blocked_clients);
|
ADD(num_blocked_clients);
|
||||||
|
|
||||||
for (const auto& k_v : o.err_count) {
|
for (const auto& k_v : o.err_count_map) {
|
||||||
err_count[k_v.first] += k_v.second;
|
err_count_map[k_v.first] += k_v.second;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const auto& k_v : o.cmd_count) {
|
for (const auto& k_v : o.cmd_count_map) {
|
||||||
cmd_count[k_v.first] += k_v.second;
|
cmd_count_map[k_v.first] += k_v.second;
|
||||||
}
|
}
|
||||||
|
|
||||||
return *this;
|
return *this;
|
||||||
|
|
|
@ -20,8 +20,8 @@ using CmdArgVec = std::vector<MutableSlice>;
|
||||||
|
|
||||||
|
|
||||||
struct ConnectionStats {
|
struct ConnectionStats {
|
||||||
absl::flat_hash_map<std::string, uint64_t> err_count;
|
absl::flat_hash_map<std::string, uint64_t> err_count_map;
|
||||||
absl::flat_hash_map<std::string, uint64_t> cmd_count;
|
absl::flat_hash_map<std::string, uint64_t> cmd_count_map;
|
||||||
|
|
||||||
size_t read_buf_capacity = 0;
|
size_t read_buf_capacity = 0;
|
||||||
size_t io_read_cnt = 0;
|
size_t io_read_cnt = 0;
|
||||||
|
|
|
@ -11,7 +11,6 @@ extern "C" {
|
||||||
#include <absl/strings/numbers.h>
|
#include <absl/strings/numbers.h>
|
||||||
|
|
||||||
#include "base/logging.h"
|
#include "base/logging.h"
|
||||||
|
|
||||||
#include "server/blocking_controller.h"
|
#include "server/blocking_controller.h"
|
||||||
#include "server/command_registry.h"
|
#include "server/command_registry.h"
|
||||||
#include "server/conn_context.h"
|
#include "server/conn_context.h"
|
||||||
|
@ -238,6 +237,36 @@ void ListFamily::RPop(CmdArgList args, ConnectionContext* cntx) {
|
||||||
return PopGeneric(ListDir::RIGHT, std::move(args), cntx);
|
return PopGeneric(ListDir::RIGHT, std::move(args), cntx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ListFamily::RPopLPush(CmdArgList args, ConnectionContext* cntx) {
|
||||||
|
string_view src = ArgS(args, 1);
|
||||||
|
string_view dest = ArgS(args, 2);
|
||||||
|
|
||||||
|
OpResult<string> result;
|
||||||
|
if (dest == src) {
|
||||||
|
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||||
|
return OpRPopLPushSingleKey(OpArgs{shard, t->db_index()}, src);
|
||||||
|
};
|
||||||
|
|
||||||
|
result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||||
|
} else {
|
||||||
|
return (*cntx)->SendError("tbd: not_implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (result) {
|
||||||
|
return (*cntx)->SendBulkString(*result);
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (result.status()) {
|
||||||
|
case OpStatus::KEY_NOTFOUND:
|
||||||
|
(*cntx)->SendNull();
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
(*cntx)->SendError(result.status());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void ListFamily::LLen(CmdArgList args, ConnectionContext* cntx) {
|
void ListFamily::LLen(CmdArgList args, ConnectionContext* cntx) {
|
||||||
auto key = ArgS(args, 1);
|
auto key = ArgS(args, 1);
|
||||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||||
|
@ -782,6 +811,20 @@ OpResult<StringVec> ListFamily::OpRange(const OpArgs& op_args, std::string_view
|
||||||
return str_vec;
|
return str_vec;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
OpResult<string> ListFamily::OpRPopLPushSingleKey(const OpArgs& op_args, std::string_view key) {
|
||||||
|
auto& db_slice = op_args.shard->db_slice();
|
||||||
|
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST);
|
||||||
|
if (!it_res)
|
||||||
|
return it_res.status();
|
||||||
|
|
||||||
|
PrimeIterator it = *it_res;
|
||||||
|
quicklist* ql = GetQL(it->second);
|
||||||
|
db_slice.PreUpdate(op_args.db_ind, it);
|
||||||
|
string val = ListPop(ListDir::RIGHT, ql);
|
||||||
|
quicklistPushHead(ql, val.data(), val.size());
|
||||||
|
return val;
|
||||||
|
}
|
||||||
|
|
||||||
using CI = CommandId;
|
using CI = CommandId;
|
||||||
|
|
||||||
#define HFUNC(x) SetHandler(&ListFamily::x)
|
#define HFUNC(x) SetHandler(&ListFamily::x)
|
||||||
|
@ -793,6 +836,7 @@ void ListFamily::Register(CommandRegistry* registry) {
|
||||||
<< CI{"RPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPush)
|
<< CI{"RPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPush)
|
||||||
<< CI{"RPUSHX", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPushX)
|
<< CI{"RPUSHX", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPushX)
|
||||||
<< CI{"RPOP", CO::WRITE | CO::FAST | CO::DENYOOM, -2, 1, 1, 1}.HFUNC(RPop)
|
<< CI{"RPOP", CO::WRITE | CO::FAST | CO::DENYOOM, -2, 1, 1, 1}.HFUNC(RPop)
|
||||||
|
<< CI{"RPOPLPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, 3, 1, 2, 1}.HFUNC(RPopLPush)
|
||||||
<< CI{"BLPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BLPop)
|
<< CI{"BLPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BLPop)
|
||||||
<< CI{"BRPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BRPop)
|
<< CI{"BRPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BRPop)
|
||||||
<< CI{"LLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(LLen)
|
<< CI{"LLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(LLen)
|
||||||
|
|
|
@ -35,6 +35,7 @@ class ListFamily {
|
||||||
static void LRange(CmdArgList args, ConnectionContext* cntx);
|
static void LRange(CmdArgList args, ConnectionContext* cntx);
|
||||||
static void LRem(CmdArgList args, ConnectionContext* cntx);
|
static void LRem(CmdArgList args, ConnectionContext* cntx);
|
||||||
static void LSet(CmdArgList args, ConnectionContext* cntx);
|
static void LSet(CmdArgList args, ConnectionContext* cntx);
|
||||||
|
static void RPopLPush(CmdArgList args, ConnectionContext* cntx);
|
||||||
|
|
||||||
static void PopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cntx);
|
static void PopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cntx);
|
||||||
static void PushGeneric(ListDir dir, bool skip_notexist, CmdArgList args,
|
static void PushGeneric(ListDir dir, bool skip_notexist, CmdArgList args,
|
||||||
|
@ -61,6 +62,8 @@ class ListFamily {
|
||||||
|
|
||||||
static OpResult<StringVec> OpRange(const OpArgs& op_args, std::string_view key, long start,
|
static OpResult<StringVec> OpRange(const OpArgs& op_args, std::string_view key, long start,
|
||||||
long end);
|
long end);
|
||||||
|
|
||||||
|
static OpResult<std::string> OpRPopLPushSingleKey(const OpArgs& op_args, std::string_view key);
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace dfly
|
} // namespace dfly
|
||||||
|
|
|
@ -443,7 +443,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
|
||||||
|
|
||||||
std::move(multi_error).Cancel();
|
std::move(multi_error).Cancel();
|
||||||
|
|
||||||
etl.connection_stats.cmd_count[cmd_name]++;
|
etl.connection_stats.cmd_count_map[cmd_name]++;
|
||||||
|
|
||||||
if (dfly_cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE && !is_trans_cmd) {
|
if (dfly_cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE && !is_trans_cmd) {
|
||||||
// TODO: protect against aggregating huge transactions.
|
// TODO: protect against aggregating huge transactions.
|
||||||
|
|
|
@ -343,6 +343,15 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
|
||||||
string_view res[2] = {param, "tbd"};
|
string_view res[2] = {param, "tbd"};
|
||||||
|
|
||||||
return (*cntx)->SendStringArr(res);
|
return (*cntx)->SendStringArr(res);
|
||||||
|
} else if (sub_cmd == "RESETSTAT") {
|
||||||
|
ess_.pool()->Await([](auto*) {
|
||||||
|
auto* stats = ServerState::tl_connection_stats();
|
||||||
|
stats->cmd_count_map.clear();
|
||||||
|
stats->err_count_map.clear();
|
||||||
|
stats->command_cnt = 0;
|
||||||
|
stats->async_writes_cnt = 0;
|
||||||
|
});
|
||||||
|
return (*cntx)->SendOk();
|
||||||
} else {
|
} else {
|
||||||
string err = StrCat("Unknown subcommand or wrong number of arguments for '", sub_cmd,
|
string err = StrCat("Unknown subcommand or wrong number of arguments for '", sub_cmd,
|
||||||
"'. Try CONFIG HELP.");
|
"'. Try CONFIG HELP.");
|
||||||
|
@ -561,14 +570,14 @@ tcp_port:)";
|
||||||
append(StrCat("unknown_", k_v.first, ":"), k_v.second);
|
append(StrCat("unknown_", k_v.first, ":"), k_v.second);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const auto& k_v : m.conn_stats.cmd_count) {
|
for (const auto& k_v : m.conn_stats.cmd_count_map) {
|
||||||
append(StrCat("cmd_", k_v.first, ":"), k_v.second);
|
append(StrCat("cmd_", k_v.first, ":"), k_v.second);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (should_enter("ERRORSTATS", true)) {
|
if (should_enter("ERRORSTATS", true)) {
|
||||||
ADD_HEADER("# Errorstats");
|
ADD_HEADER("# Errorstats");
|
||||||
for (const auto& k_v : m.conn_stats.err_count) {
|
for (const auto& k_v : m.conn_stats.err_count_map) {
|
||||||
append(StrCat(k_v.first, ":"), k_v.second);
|
append(StrCat(k_v.first, ":"), k_v.second);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,6 @@
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
memtier_benchmark -p 6379 --command "sadd __key__ __data__" -n 20 --threads=4 \
|
||||||
|
-c 10 --command-key-pattern=R --distinct-client-seed -c 30 --data-size=64 \
|
||||||
|
--key-prefix="key:" --hide-histogram --random-data --key-maximum=10000
|
||||||
|
|
Loading…
Reference in New Issue