mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
INFO: Add command and error stats
This commit is contained in:
parent
abec283247
commit
5dcb50dbaa
10 changed files with 115 additions and 40 deletions
|
@ -20,7 +20,7 @@
|
|||
#include "util/tls/tls_socket.h"
|
||||
#include "util/uring/uring_socket.h"
|
||||
|
||||
DEFINE_bool(tcp_nodelay, true, "Configures dragonfly connections with socket option TCP_NODELAY");
|
||||
DEFINE_bool(tcp_nodelay, false, "Configures dragonfly connections with socket option TCP_NODELAY");
|
||||
|
||||
using namespace util;
|
||||
using namespace std;
|
||||
|
@ -53,6 +53,16 @@ void RespToArgList(const RespVec& src, CmdArgVec* dest) {
|
|||
}
|
||||
}
|
||||
|
||||
void FetchBuilderStats(ConnectionStats* stats, SinkReplyBuilder* builder) {
|
||||
stats->io_write_cnt += builder->io_write_cnt();
|
||||
stats->io_write_bytes += builder->io_write_bytes();
|
||||
|
||||
for (const auto& k_v : builder->err_count()) {
|
||||
stats->err_count[k_v.first] += k_v.second;
|
||||
}
|
||||
builder->reset_io_stats();
|
||||
}
|
||||
|
||||
// TODO: to implement correct matcher according to HTTP spec
|
||||
// https://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html
|
||||
// One place to find a good implementation would be https://github.com/h2o/picohttpparser
|
||||
|
@ -413,15 +423,8 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant<error_code, Pars
|
|||
error_code ec;
|
||||
ParserStatus parse_status = OK;
|
||||
|
||||
auto fetch_builder_stats = [&] {
|
||||
stats->io_write_cnt += builder->io_write_cnt();
|
||||
stats->io_write_bytes += builder->io_write_bytes();
|
||||
|
||||
builder->reset_io_stats();
|
||||
};
|
||||
|
||||
do {
|
||||
fetch_builder_stats();
|
||||
FetchBuilderStats(stats, builder);
|
||||
|
||||
io::MutableBytes append_buf = io_buf_.AppendBuffer();
|
||||
::io::Result<size_t> recv_sz = peer->Recv(append_buf);
|
||||
|
@ -470,7 +473,7 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant<error_code, Pars
|
|||
ec = builder->GetError();
|
||||
} while (peer->IsOpen() && !ec);
|
||||
|
||||
fetch_builder_stats();
|
||||
FetchBuilderStats(stats, builder);
|
||||
|
||||
if (ec)
|
||||
return ec;
|
||||
|
|
|
@ -23,4 +23,7 @@ extern const char kScriptNotFound[];
|
|||
extern const char kAuthRejected[];
|
||||
extern const char kExpiryOutOfRange[];
|
||||
|
||||
extern const char kSyntaxErrType[];
|
||||
extern const char kScriptErrType[];
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -17,9 +17,11 @@ using namespace std;
|
|||
|
||||
#define ADD(x) (x) += o.x
|
||||
|
||||
constexpr size_t kSizeConnStats = sizeof(ConnectionStats);
|
||||
|
||||
ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
|
||||
// To break this code deliberately if we add/remove a field to this struct.
|
||||
static_assert(sizeof(ConnectionStats) == 64);
|
||||
static_assert(kSizeConnStats == 144);
|
||||
|
||||
ADD(num_conns);
|
||||
ADD(num_replicas);
|
||||
|
@ -31,6 +33,14 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
|
|||
ADD(pipelined_cmd_cnt);
|
||||
ADD(command_cnt);
|
||||
|
||||
for (const auto& k_v : o.err_count) {
|
||||
err_count[k_v.first] += k_v.second;
|
||||
}
|
||||
|
||||
for (const auto& k_v : o.cmd_count) {
|
||||
cmd_count[k_v.first] += k_v.second;
|
||||
}
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
|
@ -52,6 +62,9 @@ const char kScriptNotFound[] = "-NOSCRIPT No matching script. Please use EVAL.";
|
|||
const char kAuthRejected[] = "-WRONGPASS invalid username-password pair or user is disabled.";
|
||||
const char kExpiryOutOfRange[] = "expiry is out of range";
|
||||
|
||||
const char kSyntaxErrType[] = "syntax_error";
|
||||
const char kScriptErrType[] = "script_error";
|
||||
|
||||
const char* RespExpr::TypeName(Type t) {
|
||||
switch (t) {
|
||||
case STRING:
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
#pragma once
|
||||
|
||||
#include <absl/types/span.h>
|
||||
#include <absl/container/flat_hash_map.h>
|
||||
|
||||
namespace facade {
|
||||
|
||||
|
@ -19,6 +20,9 @@ using CmdArgVec = std::vector<MutableSlice>;
|
|||
|
||||
|
||||
struct ConnectionStats {
|
||||
absl::flat_hash_map<std::string, uint64_t> err_count;
|
||||
absl::flat_hash_map<std::string, uint64_t> cmd_count;
|
||||
|
||||
uint32_t num_conns = 0;
|
||||
uint32_t num_replicas = 0;
|
||||
size_t read_buf_capacity = 0;
|
||||
|
|
|
@ -112,7 +112,7 @@ void MCReplyBuilder::SendMGetResponse(const OptResp* resp, uint32_t count) {
|
|||
SendDirect("END\r\n");
|
||||
}
|
||||
|
||||
void MCReplyBuilder::SendError(string_view str) {
|
||||
void MCReplyBuilder::SendError(string_view str, std::string_view type) {
|
||||
SendDirect("ERROR\r\n");
|
||||
}
|
||||
|
||||
|
@ -132,7 +132,9 @@ void MCReplyBuilder::SendNotFound() {
|
|||
RedisReplyBuilder::RedisReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink) {
|
||||
}
|
||||
|
||||
void RedisReplyBuilder::SendError(string_view str) {
|
||||
void RedisReplyBuilder::SendError(string_view str, std::string_view type) {
|
||||
err_count_[type.empty() ? str : type]++;
|
||||
|
||||
if (str[0] == '-') {
|
||||
iovec v[] = {IoVec(str), IoVec(kCRLF)};
|
||||
Send(v, ABSL_ARRAYSIZE(v));
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
// Copyright 2022, Roman Gershman. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
#include <absl/container/flat_hash_map.h>
|
||||
|
||||
#include <optional>
|
||||
#include <string_view>
|
||||
|
||||
|
@ -18,7 +20,7 @@ class ReplyBuilderInterface {
|
|||
virtual void SendStored() = 0;
|
||||
|
||||
// Common for both MC and Redis.
|
||||
virtual void SendError(std::string_view str) = 0;
|
||||
virtual void SendError(std::string_view str, std::string_view type = std::string_view{}) = 0;
|
||||
|
||||
virtual std::error_code GetError() const = 0;
|
||||
|
||||
|
@ -69,6 +71,11 @@ class SinkReplyBuilder : public ReplyBuilderInterface {
|
|||
void reset_io_stats() {
|
||||
io_write_cnt_ = 0;
|
||||
io_write_bytes_ = 0;
|
||||
err_count_.clear();
|
||||
}
|
||||
|
||||
const absl::flat_hash_map<std::string, uint64_t>& err_count() const {
|
||||
return err_count_;
|
||||
}
|
||||
|
||||
//! Sends a string as is without any formatting. raw should be encoded according to the protocol.
|
||||
|
@ -80,8 +87,10 @@ class SinkReplyBuilder : public ReplyBuilderInterface {
|
|||
std::string batch_;
|
||||
::io::Sink* sink_;
|
||||
std::error_code ec_;
|
||||
|
||||
size_t io_write_cnt_ = 0;
|
||||
size_t io_write_bytes_ = 0;
|
||||
absl::flat_hash_map<std::string, uint64_t> err_count_;
|
||||
|
||||
bool should_batch_ = false;
|
||||
};
|
||||
|
@ -90,7 +99,8 @@ class MCReplyBuilder : public SinkReplyBuilder {
|
|||
public:
|
||||
MCReplyBuilder(::io::Sink* stream);
|
||||
|
||||
void SendError(std::string_view str) final;
|
||||
void SendError(std::string_view str, std::string_view type = std::string_view{}) final;
|
||||
|
||||
// void SendGetReply(std::string_view key, uint32_t flags, std::string_view value) final;
|
||||
void SendMGetResponse(const OptResp* resp, uint32_t count) final;
|
||||
|
||||
|
@ -110,7 +120,7 @@ class RedisReplyBuilder : public SinkReplyBuilder {
|
|||
SendSimpleString("OK");
|
||||
}
|
||||
|
||||
void SendError(std::string_view str) override;
|
||||
void SendError(std::string_view str, std::string_view type = std::string_view{}) override;
|
||||
void SendMGetResponse(const OptResp* resp, uint32_t count) override;
|
||||
|
||||
void SendStored() override;
|
||||
|
@ -151,4 +161,4 @@ class ReqSerializer {
|
|||
std::error_code ec_;
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
} // namespace facade
|
||||
|
|
|
@ -62,7 +62,7 @@ class InterpreterReplier : public RedisReplyBuilder {
|
|||
InterpreterReplier(ObjectExplorer* explr) : RedisReplyBuilder(nullptr), explr_(explr) {
|
||||
}
|
||||
|
||||
void SendError(std::string_view str) override;
|
||||
void SendError(std::string_view str, std::string_view type = std::string_view{}) override;
|
||||
void SendStored() override;
|
||||
|
||||
void SendSimpleString(std::string_view str) final;
|
||||
|
@ -155,7 +155,7 @@ void InterpreterReplier::PostItem() {
|
|||
}
|
||||
}
|
||||
|
||||
void InterpreterReplier::SendError(string_view str) {
|
||||
void InterpreterReplier::SendError(string_view str, std::string_view type) {
|
||||
DCHECK(array_len_.empty());
|
||||
explr_->OnError(str);
|
||||
}
|
||||
|
@ -268,7 +268,7 @@ bool EvalValidator(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
if (unsigned(num_keys) > args.size() - 3) {
|
||||
(*cntx)->SendError("Number of keys can't be greater than number of args");
|
||||
(*cntx)->SendError("Number of keys can't be greater than number of args", kSyntaxErr);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -356,7 +356,12 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
|
|||
};
|
||||
|
||||
if (cid == nullptr) {
|
||||
return (*cntx)->SendError(absl::StrCat("unknown command `", cmd_str, "`"));
|
||||
(*cntx)->SendError(absl::StrCat("unknown command `", cmd_str, "`"), "unknown_cmd");
|
||||
|
||||
lock_guard lk(stats_mu_);
|
||||
if (unknown_cmds_.size() < 1024)
|
||||
unknown_cmds_[cmd_str]++;
|
||||
return;
|
||||
}
|
||||
|
||||
if (etl.gstate() == GlobalState::LOADING || etl.gstate() == GlobalState::SHUTTING_DOWN) {
|
||||
|
@ -391,11 +396,11 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
|
|||
|
||||
if ((cid->arity() > 0 && args.size() != size_t(cid->arity())) ||
|
||||
(cid->arity() < 0 && args.size() < size_t(-cid->arity()))) {
|
||||
return (*cntx)->SendError(facade::WrongNumArgsError(cmd_str));
|
||||
return (*cntx)->SendError(facade::WrongNumArgsError(cmd_str), kSyntaxErr);
|
||||
}
|
||||
|
||||
if (cid->key_arg_step() == 2 && (args.size() % 2) == 0) {
|
||||
return (*cntx)->SendError(facade::WrongNumArgsError(cmd_str));
|
||||
return (*cntx)->SendError(facade::WrongNumArgsError(cmd_str), kSyntaxErr);
|
||||
}
|
||||
|
||||
// Validate more complicated cases with custom validators.
|
||||
|
@ -417,6 +422,8 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
|
|||
|
||||
std::move(multi_error).Cancel();
|
||||
|
||||
etl.connection_stats.cmd_count[cmd_name]++;
|
||||
|
||||
if (dfly_cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE && !is_trans_cmd) {
|
||||
// TODO: protect against aggregating huge transactions.
|
||||
StoredCmd stored_cmd{cid};
|
||||
|
@ -618,6 +625,11 @@ bool Service::IsPassProtected() const {
|
|||
return !FLAGS_requirepass.empty();
|
||||
}
|
||||
|
||||
absl::flat_hash_map<std::string, unsigned> Service::UknownCmdMap() const {
|
||||
lock_guard lk(stats_mu_);
|
||||
return unknown_cmds_;
|
||||
}
|
||||
|
||||
void Service::Quit(CmdArgList args, ConnectionContext* cntx) {
|
||||
if (cntx->protocol() == facade::Protocol::REDIS)
|
||||
(*cntx)->SendOk();
|
||||
|
@ -664,7 +676,7 @@ void Service::Eval(CmdArgList args, ConnectionContext* cntx) {
|
|||
string result;
|
||||
Interpreter::AddResult add_result = script.AddFunction(body, &result);
|
||||
if (add_result == Interpreter::COMPILE_ERR) {
|
||||
return (*cntx)->SendError(result);
|
||||
return (*cntx)->SendError(result, facade::kScriptErrType);
|
||||
}
|
||||
|
||||
if (add_result == Interpreter::ADD_OK) {
|
||||
|
@ -765,7 +777,7 @@ void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter,
|
|||
|
||||
if (result == Interpreter::RUN_ERR) {
|
||||
string resp = absl::StrCat("Error running script (call to ", eval_args.sha, "): ", error);
|
||||
return (*cntx)->SendError(resp);
|
||||
return (*cntx)->SendError(resp, facade::kScriptErrType);
|
||||
}
|
||||
CHECK(result == Interpreter::RUN_OK);
|
||||
|
||||
|
|
|
@ -65,6 +65,8 @@ class Service : public facade::ServiceInterface {
|
|||
|
||||
bool IsPassProtected() const;
|
||||
|
||||
absl::flat_hash_map<std::string, unsigned> UknownCmdMap() const;
|
||||
|
||||
private:
|
||||
static void Quit(CmdArgList args, ConnectionContext* cntx);
|
||||
static void Multi(CmdArgList args, ConnectionContext* cntx);
|
||||
|
@ -90,6 +92,9 @@ class Service : public facade::ServiceInterface {
|
|||
EngineShardSet shard_set_;
|
||||
ServerFamily server_family_;
|
||||
CommandRegistry registry_;
|
||||
|
||||
mutable ::boost::fibers::mutex stats_mu_;
|
||||
absl::flat_hash_map<std::string, unsigned> unknown_cmds_;
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -45,8 +45,8 @@ using namespace std;
|
|||
using namespace util;
|
||||
namespace fibers = ::boost::fibers;
|
||||
namespace fs = std::filesystem;
|
||||
using strings::HumanReadableNumBytes;
|
||||
using facade::MCReplyBuilder;
|
||||
using strings::HumanReadableNumBytes;
|
||||
|
||||
namespace {
|
||||
|
||||
|
@ -72,11 +72,10 @@ error_code CreateDirs(fs::path dir_path) {
|
|||
|
||||
} // namespace
|
||||
|
||||
ServerFamily::ServerFamily(Service* engine)
|
||||
: engine_(*engine), pp_(engine->proactor_pool()), ess_(engine->shard_set()) {
|
||||
ServerFamily::ServerFamily(Service* service) : service_(*service), ess_(service->shard_set()) {
|
||||
start_time_ = time(NULL);
|
||||
last_save_.store(start_time_, memory_order_release);
|
||||
script_mgr_.reset(new ScriptMgr(&engine->shard_set()));
|
||||
script_mgr_.reset(new ScriptMgr(&service->shard_set()));
|
||||
}
|
||||
|
||||
ServerFamily::~ServerFamily() {
|
||||
|
@ -90,7 +89,7 @@ void ServerFamily::Init(util::AcceptServer* acceptor) {
|
|||
void ServerFamily::Shutdown() {
|
||||
VLOG(1) << "ServerFamily::Shutdown";
|
||||
|
||||
pp_.GetNextProactor()->Await([this] {
|
||||
service_.proactor_pool().GetNextProactor()->Await([this] {
|
||||
unique_lock lk(replica_of_mu_);
|
||||
if (replica_) {
|
||||
replica_->Stop();
|
||||
|
@ -256,7 +255,8 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
|
|||
return;
|
||||
}
|
||||
|
||||
pp_.Await([](auto*) { ServerState::tlocal()->set_gstate(GlobalState::SAVING); });
|
||||
auto& pool = service_.proactor_pool();
|
||||
pool.Await([](auto*) { ServerState::tlocal()->set_gstate(GlobalState::SAVING); });
|
||||
|
||||
unique_ptr<::io::WriteFile> wf(*res);
|
||||
auto start = absl::Now();
|
||||
|
@ -280,7 +280,7 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
|
|||
return;
|
||||
}
|
||||
|
||||
pp_.Await([](auto*) { ServerState::tlocal()->set_gstate(GlobalState::IDLE); });
|
||||
pool.Await([](auto*) { ServerState::tlocal()->set_gstate(GlobalState::IDLE); });
|
||||
CHECK_EQ(GlobalState::SAVING, global_state_.Clear());
|
||||
|
||||
absl::Duration dur = absl::Now() - start;
|
||||
|
@ -326,7 +326,7 @@ Metrics ServerFamily::GetMetrics() const {
|
|||
}
|
||||
};
|
||||
|
||||
pp_.AwaitFiberOnAll(std::move(cb));
|
||||
service_.proactor_pool().AwaitFiberOnAll(std::move(cb));
|
||||
result.qps /= 6;
|
||||
|
||||
return result;
|
||||
|
@ -354,8 +354,8 @@ arch_bits:64
|
|||
multiplexing_api:iouring
|
||||
tcp_port:)";
|
||||
|
||||
auto should_enter = [&](string_view name) {
|
||||
bool res = section.empty() || section == name;
|
||||
auto should_enter = [&](string_view name, bool hidden = false) {
|
||||
bool res = (!hidden && section.empty()) || section == "ALL" || section == name;
|
||||
if (res && !info.empty())
|
||||
info.push_back('\n');
|
||||
|
||||
|
@ -404,6 +404,7 @@ tcp_port:)";
|
|||
// known we approximate their allocations by taking 16 bytes per member.
|
||||
absl::StrAppend(&info, "blob_used_memory:", m.db.obj_memory_usage, "\n");
|
||||
absl::StrAppend(&info, "table_used_memory:", m.db.table_mem_usage, "\n");
|
||||
absl::StrAppend(&info, "num_buckets:", m.db.bucket_count, "\n");
|
||||
absl::StrAppend(&info, "num_entries:", m.db.key_count, "\n");
|
||||
absl::StrAppend(&info, "inline_keys:", m.db.inline_keys, "\n");
|
||||
absl::StrAppend(&info, "small_string_bytes:", m.db.small_string_bytes, "\n");
|
||||
|
@ -453,6 +454,26 @@ tcp_port:)";
|
|||
}
|
||||
}
|
||||
|
||||
if (should_enter("COMMANDSTATS", true)) {
|
||||
absl::StrAppend(&info, "# Commandstats\n");
|
||||
auto unknown_cmd = service_.UknownCmdMap();
|
||||
|
||||
for (const auto& k_v : unknown_cmd) {
|
||||
absl::StrAppend(&info, "unknown_", k_v.first, ":", k_v.second, "\n");
|
||||
}
|
||||
|
||||
for (const auto& k_v : m.conn_stats.cmd_count) {
|
||||
absl::StrAppend(&info, "cmd_", k_v.first, ":", k_v.second, "\n");
|
||||
}
|
||||
}
|
||||
|
||||
if (should_enter("ERRORSTATS", true)) {
|
||||
absl::StrAppend(&info, "# Errorstats\n");
|
||||
for (const auto& k_v : m.conn_stats.err_count) {
|
||||
absl::StrAppend(&info, k_v.first, ":", k_v.second, "\n");
|
||||
}
|
||||
}
|
||||
|
||||
if (should_enter("KEYSPACE")) {
|
||||
absl::StrAppend(&info, "# Keyspace\n");
|
||||
absl::StrAppend(&info, "db0:keys=xxx,expires=yyy,avg_ttl=zzz\n"); // TODO
|
||||
|
@ -464,6 +485,7 @@ tcp_port:)";
|
|||
void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
|
||||
std::string_view host = ArgS(args, 1);
|
||||
std::string_view port_s = ArgS(args, 2);
|
||||
auto& pool = service_.proactor_pool();
|
||||
|
||||
if (absl::EqualsIgnoreCase(host, "no") && absl::EqualsIgnoreCase(port_s, "one")) {
|
||||
// use this lock as critical section to prevent concurrent replicaof commands running.
|
||||
|
@ -474,7 +496,8 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
|
|||
auto repl_ptr = replica_;
|
||||
CHECK(repl_ptr);
|
||||
|
||||
pp_.AwaitFiberOnAll([&](util::ProactorBase* pb) { ServerState::tlocal()->is_master = true; });
|
||||
pool.AwaitFiberOnAll(
|
||||
[&](util::ProactorBase* pb) { ServerState::tlocal()->is_master = true; });
|
||||
replica_->Stop();
|
||||
replica_.reset();
|
||||
}
|
||||
|
@ -489,7 +512,7 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
|
|||
return;
|
||||
}
|
||||
|
||||
auto new_replica = make_shared<Replica>(string(host), port, &engine_);
|
||||
auto new_replica = make_shared<Replica>(string(host), port, &service_);
|
||||
|
||||
unique_lock lk(replica_of_mu_);
|
||||
if (replica_) {
|
||||
|
@ -513,8 +536,9 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (!replica_->Run(cntx)) {
|
||||
replica_.reset();
|
||||
}
|
||||
|
||||
bool is_master = !replica_;
|
||||
pp_.AwaitFiberOnAll(
|
||||
pool.AwaitFiberOnAll(
|
||||
[&](util::ProactorBase* pb) { ServerState::tlocal()->is_master = is_master; });
|
||||
}
|
||||
|
||||
|
|
|
@ -77,8 +77,7 @@ class ServerFamily {
|
|||
|
||||
void SyncGeneric(std::string_view repl_master_id, uint64_t offs, ConnectionContext* cntx);
|
||||
|
||||
Service& engine_;
|
||||
util::ProactorPool& pp_;
|
||||
Service& service_;
|
||||
EngineShardSet& ess_;
|
||||
|
||||
util::AcceptServer* acceptor_ = nullptr;
|
||||
|
|
Loading…
Reference in a new issue