mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
fix: fix cmd latency units in /metrics (#2217)
Also, 1. rebase helio dependency 2. get rid of varz counters that are superseded by commands_total/commands_duration_seconds_total metrics Resolves #2213 Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
d6292ba6fd
commit
92614477b7
8 changed files with 22 additions and 40 deletions
2
helio
2
helio
|
@ -1 +1 @@
|
||||||
Subproject commit 88514b9270edc194ad9ea018e614e2d1b17b3962
|
Subproject commit b786e8270fe2a02cdc259d3ea14f93cff0b8b72b
|
|
@ -514,11 +514,11 @@ uint32_t Connection::GetClientId() const {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Connection::IsPrivileged() const {
|
bool Connection::IsPrivileged() const {
|
||||||
return static_cast<Listener*>(owner())->IsPrivilegedInterface();
|
return static_cast<Listener*>(listener())->IsPrivilegedInterface();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Connection::IsMain() const {
|
bool Connection::IsMain() const {
|
||||||
return static_cast<Listener*>(owner())->IsMainInterface();
|
return static_cast<Listener*>(listener())->IsMainInterface();
|
||||||
}
|
}
|
||||||
|
|
||||||
io::Result<bool> Connection::CheckForHttpProto(FiberSocketBase* peer) {
|
io::Result<bool> Connection::CheckForHttpProto(FiberSocketBase* peer) {
|
||||||
|
@ -1159,7 +1159,7 @@ void Connection::Migrate(util::fb2::ProactorBase* dest) {
|
||||||
CHECK_EQ(cc_->subscriptions, 0); // are bound to thread local caches
|
CHECK_EQ(cc_->subscriptions, 0); // are bound to thread local caches
|
||||||
CHECK(!dispatch_fb_.IsJoinable()); // can't move once it started
|
CHECK(!dispatch_fb_.IsJoinable()); // can't move once it started
|
||||||
|
|
||||||
owner()->Migrate(this, dest);
|
listener()->Migrate(this, dest);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Connection::ShutdownThreadLocal() {
|
void Connection::ShutdownThreadLocal() {
|
||||||
|
@ -1203,7 +1203,7 @@ void Connection::LaunchDispatchFiberIfNeeded() {
|
||||||
|
|
||||||
void Connection::SendAsync(MessageHandle msg) {
|
void Connection::SendAsync(MessageHandle msg) {
|
||||||
DCHECK(cc_);
|
DCHECK(cc_);
|
||||||
DCHECK(owner());
|
DCHECK(listener());
|
||||||
DCHECK_EQ(ProactorBase::me(), socket_->proactor());
|
DCHECK_EQ(ProactorBase::me(), socket_->proactor());
|
||||||
|
|
||||||
// "Closing" connections might be still processing commands, as we don't interrupt them.
|
// "Closing" connections might be still processing commands, as we don't interrupt them.
|
||||||
|
|
|
@ -62,19 +62,19 @@ void CommandId::Invoke(CmdArgList args, ConnectionContext* cntx) const {
|
||||||
int64_t after = absl::GetCurrentTimeNanos();
|
int64_t after = absl::GetCurrentTimeNanos();
|
||||||
|
|
||||||
ServerState* ss = ServerState::tlocal(); // Might have migrated thread, read after invocation
|
ServerState* ss = ServerState::tlocal(); // Might have migrated thread, read after invocation
|
||||||
int64_t execution_time_micro_s = (after - before) / 1000;
|
int64_t execution_time_usec = (after - before) / 1000;
|
||||||
|
|
||||||
const auto* conn = cntx->conn();
|
const auto* conn = cntx->conn();
|
||||||
auto& ent = command_stats_[ss->thread_index()];
|
auto& ent = command_stats_[ss->thread_index()];
|
||||||
// TODO: we should probably discard more commands here,
|
// TODO: we should probably discard more commands here,
|
||||||
// not just the blocking ones
|
// not just the blocking ones
|
||||||
if (!(opt_mask_ & CO::BLOCKING) && conn != nullptr && ss->GetSlowLog().Capacity() > 0 &&
|
if (!(opt_mask_ & CO::BLOCKING) && conn != nullptr && ss->GetSlowLog().Capacity() > 0 &&
|
||||||
execution_time_micro_s > ss->log_slower_than_usec) {
|
execution_time_usec > ss->log_slower_than_usec) {
|
||||||
ss->GetSlowLog().Add(name(), args, conn->GetName(), conn->RemoteEndpointStr(),
|
ss->GetSlowLog().Add(name(), args, conn->GetName(), conn->RemoteEndpointStr(),
|
||||||
execution_time_micro_s, after);
|
execution_time_usec, after);
|
||||||
}
|
}
|
||||||
++ent.first;
|
++ent.first;
|
||||||
ent.second += execution_time_micro_s;
|
ent.second += execution_time_usec;
|
||||||
}
|
}
|
||||||
|
|
||||||
optional<facade::ErrorReply> CommandId::Validate(CmdArgList tail_args) const {
|
optional<facade::ErrorReply> CommandId::Validate(CmdArgList tail_args) const {
|
||||||
|
|
|
@ -60,7 +60,7 @@ static_assert(!IsEvalKind(""));
|
||||||
|
|
||||||
}; // namespace CO
|
}; // namespace CO
|
||||||
|
|
||||||
// Per thread vector of command stats. Each entry is {cmd_calls, cmd_sum}.
|
// Per thread vector of command stats. Each entry is {cmd_calls, cmd_latency_agg in usec}.
|
||||||
using CmdCallStats = std::pair<uint64_t, uint64_t>;
|
using CmdCallStats = std::pair<uint64_t, uint64_t>;
|
||||||
|
|
||||||
class CommandId : public facade::CommandId {
|
class CommandId : public facade::CommandId {
|
||||||
|
|
|
@ -141,8 +141,6 @@ namespace h2 = boost::beast::http;
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
DEFINE_VARZ(VarzMapAverage, request_latency_usec);
|
|
||||||
|
|
||||||
std::optional<VarzFunction> engine_varz;
|
std::optional<VarzFunction> engine_varz;
|
||||||
|
|
||||||
constexpr size_t kMaxThreadSize = 1024;
|
constexpr size_t kMaxThreadSize = 1024;
|
||||||
|
@ -805,7 +803,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
|
||||||
if (!tcp_disabled && !listeners.empty()) {
|
if (!tcp_disabled && !listeners.empty()) {
|
||||||
acl_family_.Init(listeners.front(), &user_registry_);
|
acl_family_.Init(listeners.front(), &user_registry_);
|
||||||
}
|
}
|
||||||
request_latency_usec.Init(&pp_);
|
|
||||||
StringFamily::Init(&pp_);
|
StringFamily::Init(&pp_);
|
||||||
GenericFamily::Init(&pp_);
|
GenericFamily::Init(&pp_);
|
||||||
server_family_.Init(acceptor, std::move(listeners));
|
server_family_.Init(acceptor, std::move(listeners));
|
||||||
|
@ -833,7 +831,6 @@ void Service::Shutdown() {
|
||||||
GenericFamily::Shutdown();
|
GenericFamily::Shutdown();
|
||||||
|
|
||||||
engine_varz.reset();
|
engine_varz.reset();
|
||||||
request_latency_usec.Shutdown();
|
|
||||||
|
|
||||||
ChannelStore::Destroy();
|
ChannelStore::Destroy();
|
||||||
|
|
||||||
|
@ -1088,9 +1085,9 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
|
||||||
return cntx->SendSimpleString("QUEUED");
|
return cntx->SendSimpleString("QUEUED");
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t start_ns = absl::GetCurrentTimeNanos();
|
|
||||||
|
|
||||||
if (cid->opt_mask() & CO::DENYOOM && etl.is_master) {
|
if (cid->opt_mask() & CO::DENYOOM && etl.is_master) {
|
||||||
|
uint64_t start_ns = absl::GetCurrentTimeNanos();
|
||||||
|
|
||||||
uint64_t used_memory = etl.GetUsedMemory(start_ns);
|
uint64_t used_memory = etl.GetUsedMemory(start_ns);
|
||||||
double oom_deny_ratio = GetFlag(FLAGS_oom_deny_ratio);
|
double oom_deny_ratio = GetFlag(FLAGS_oom_deny_ratio);
|
||||||
if (used_memory > (max_memory_limit * oom_deny_ratio)) {
|
if (used_memory > (max_memory_limit * oom_deny_ratio)) {
|
||||||
|
@ -1143,9 +1140,6 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
|
||||||
dfly_cntx->reply_builder()->CloseConnection();
|
dfly_cntx->reply_builder()->CloseConnection();
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t end_ns = ProactorBase::GetMonotonicTimeNs();
|
|
||||||
request_latency_usec.IncBy(cid->name(), (end_ns - start_ns) / 1000);
|
|
||||||
|
|
||||||
if (!dispatching_in_multi) {
|
if (!dispatching_in_multi) {
|
||||||
dfly_cntx->transaction = nullptr;
|
dfly_cntx->transaction = nullptr;
|
||||||
}
|
}
|
||||||
|
|
|
@ -916,7 +916,7 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) {
|
||||||
&command_metrics);
|
&command_metrics);
|
||||||
for (const auto& [name, stat] : m.cmd_stats_map) {
|
for (const auto& [name, stat] : m.cmd_stats_map) {
|
||||||
const auto calls = stat.first;
|
const auto calls = stat.first;
|
||||||
const double duration_seconds = stat.second * 0.001;
|
const double duration_seconds = stat.second * 1e-6;
|
||||||
AppendMetricValue("commands_total", calls, {"cmd"}, {name}, &command_metrics);
|
AppendMetricValue("commands_total", calls, {"cmd"}, {name}, &command_metrics);
|
||||||
AppendMetricValue("commands_duration_seconds_total", duration_seconds, {"cmd"}, {name},
|
AppendMetricValue("commands_duration_seconds_total", duration_seconds, {"cmd"}, {name},
|
||||||
&command_metrics);
|
&command_metrics);
|
||||||
|
@ -939,13 +939,13 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) {
|
||||||
|
|
||||||
AppendMetricWithoutLabels("fiber_switch_total", "", m.fiber_switch_cnt, MetricType::COUNTER,
|
AppendMetricWithoutLabels("fiber_switch_total", "", m.fiber_switch_cnt, MetricType::COUNTER,
|
||||||
&resp->body());
|
&resp->body());
|
||||||
double delay_seconds = m.fiber_switch_delay_ns * 1e-9;
|
double delay_seconds = m.fiber_switch_delay_usec * 1e-6;
|
||||||
AppendMetricWithoutLabels("fiber_switch_delay_seconds_total", "", delay_seconds,
|
AppendMetricWithoutLabels("fiber_switch_delay_seconds_total", "", delay_seconds,
|
||||||
MetricType::COUNTER, &resp->body());
|
MetricType::COUNTER, &resp->body());
|
||||||
|
|
||||||
AppendMetricWithoutLabels("fiber_longrun_total", "", m.fiber_longrun_cnt, MetricType::COUNTER,
|
AppendMetricWithoutLabels("fiber_longrun_total", "", m.fiber_longrun_cnt, MetricType::COUNTER,
|
||||||
&resp->body());
|
&resp->body());
|
||||||
double longrun_seconds = m.fiber_longrun_ns * 1e-9;
|
double longrun_seconds = m.fiber_longrun_usec * 1e-6;
|
||||||
AppendMetricWithoutLabels("fiber_longrun_seconds_total", "", longrun_seconds, MetricType::COUNTER,
|
AppendMetricWithoutLabels("fiber_longrun_seconds_total", "", longrun_seconds, MetricType::COUNTER,
|
||||||
&resp->body());
|
&resp->body());
|
||||||
|
|
||||||
|
@ -1508,9 +1508,9 @@ Metrics ServerFamily::GetMetrics() const {
|
||||||
lock_guard lk(mu);
|
lock_guard lk(mu);
|
||||||
|
|
||||||
result.fiber_switch_cnt += fb2::FiberSwitchEpoch();
|
result.fiber_switch_cnt += fb2::FiberSwitchEpoch();
|
||||||
result.fiber_switch_delay_ns += fb2::FiberSwitchDelay();
|
result.fiber_switch_delay_usec += fb2::FiberSwitchDelayUsec();
|
||||||
result.fiber_longrun_cnt += fb2::FiberLongRunCnt();
|
result.fiber_longrun_cnt += fb2::FiberLongRunCnt();
|
||||||
result.fiber_longrun_ns += fb2::FiberLongRunSum();
|
result.fiber_longrun_usec += fb2::FiberLongRunSumUsec();
|
||||||
|
|
||||||
result.coordinator_stats += ss->stats;
|
result.coordinator_stats += ss->stats;
|
||||||
result.conn_stats += ss->connection_stats;
|
result.conn_stats += ss->connection_stats;
|
||||||
|
|
|
@ -87,13 +87,14 @@ struct Metrics {
|
||||||
uint32_t traverse_ttl_per_sec = 0;
|
uint32_t traverse_ttl_per_sec = 0;
|
||||||
uint32_t delete_ttl_per_sec = 0;
|
uint32_t delete_ttl_per_sec = 0;
|
||||||
uint64_t fiber_switch_cnt = 0;
|
uint64_t fiber_switch_cnt = 0;
|
||||||
uint64_t fiber_switch_delay_ns = 0;
|
uint64_t fiber_switch_delay_usec = 0;
|
||||||
|
|
||||||
// Statistics about fibers running for a long time (more than 1ms).
|
// Statistics about fibers running for a long time (more than 1ms).
|
||||||
uint64_t fiber_longrun_cnt = 0;
|
uint64_t fiber_longrun_cnt = 0;
|
||||||
uint64_t fiber_longrun_ns = 0;
|
uint64_t fiber_longrun_usec = 0;
|
||||||
|
|
||||||
std::map<std::string, std::pair<uint64_t, uint64_t>> cmd_stats_map; // command call frequencies
|
// command call frequencies (count, aggregated latency in usec).
|
||||||
|
std::map<std::string, std::pair<uint64_t, uint64_t>> cmd_stats_map;
|
||||||
|
|
||||||
bool is_master = true;
|
bool is_master = true;
|
||||||
std::vector<ReplicaRoleInfo> replication_metrics;
|
std::vector<ReplicaRoleInfo> replication_metrics;
|
||||||
|
|
|
@ -26,7 +26,6 @@ extern "C" {
|
||||||
#include "server/journal/journal.h"
|
#include "server/journal/journal.h"
|
||||||
#include "server/tiered_storage.h"
|
#include "server/tiered_storage.h"
|
||||||
#include "server/transaction.h"
|
#include "server/transaction.h"
|
||||||
#include "util/varz.h"
|
|
||||||
|
|
||||||
namespace dfly {
|
namespace dfly {
|
||||||
|
|
||||||
|
@ -36,8 +35,6 @@ using namespace std;
|
||||||
using namespace facade;
|
using namespace facade;
|
||||||
|
|
||||||
using CI = CommandId;
|
using CI = CommandId;
|
||||||
DEFINE_VARZ(VarzQps, set_qps);
|
|
||||||
DEFINE_VARZ(VarzQps, get_qps);
|
|
||||||
|
|
||||||
constexpr uint32_t kMaxStrLen = 1 << 28;
|
constexpr uint32_t kMaxStrLen = 1 << 28;
|
||||||
constexpr size_t kMinTieredLen = TieredStorage::kMinBlobLen;
|
constexpr size_t kMinTieredLen = TieredStorage::kMinBlobLen;
|
||||||
|
@ -722,8 +719,6 @@ void SetCmd::RecordJournal(const SetParams& params, string_view key, string_view
|
||||||
}
|
}
|
||||||
|
|
||||||
void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
|
void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
|
||||||
set_qps.Inc();
|
|
||||||
|
|
||||||
string_view key = ArgS(args, 0);
|
string_view key = ArgS(args, 0);
|
||||||
string_view value = ArgS(args, 1);
|
string_view value = ArgS(args, 1);
|
||||||
|
|
||||||
|
@ -850,8 +845,6 @@ void StringFamily::SetNx(CmdArgList args, ConnectionContext* cntx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) {
|
void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) {
|
||||||
get_qps.Inc();
|
|
||||||
|
|
||||||
string_view key = ArgS(args, 0);
|
string_view key = ArgS(args, 0);
|
||||||
|
|
||||||
auto cb = [&](Transaction* t, EngineShard* shard) { return OpGet(t->GetOpArgs(shard), key); };
|
auto cb = [&](Transaction* t, EngineShard* shard) { return OpGet(t->GetOpArgs(shard), key); };
|
||||||
|
@ -876,8 +869,6 @@ void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void StringFamily::GetDel(CmdArgList args, ConnectionContext* cntx) {
|
void StringFamily::GetDel(CmdArgList args, ConnectionContext* cntx) {
|
||||||
get_qps.Inc();
|
|
||||||
|
|
||||||
string_view key = ArgS(args, 0);
|
string_view key = ArgS(args, 0);
|
||||||
|
|
||||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||||
|
@ -1477,13 +1468,9 @@ void StringFamily::ClThrottle(CmdArgList args, ConnectionContext* cntx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void StringFamily::Init(util::ProactorPool* pp) {
|
void StringFamily::Init(util::ProactorPool* pp) {
|
||||||
set_qps.Init(pp);
|
|
||||||
get_qps.Init(pp);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void StringFamily::Shutdown() {
|
void StringFamily::Shutdown() {
|
||||||
set_qps.Shutdown();
|
|
||||||
get_qps.Shutdown();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#define HFUNC(x) SetHandler(&StringFamily::x)
|
#define HFUNC(x) SetHandler(&StringFamily::x)
|
||||||
|
|
Loading…
Reference in a new issue