1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat: track request sizes histograms (#3951)

This PR introduces "DEBUG RECVSIZE ENABLE|DISABLE|tid"
command that allows tracking of request sizes.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-10-20 19:54:34 +03:00 committed by GitHub
parent 32a31cf1d8
commit f0c30a6d59
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 93 additions and 42 deletions

View file

@ -13,6 +13,7 @@
#include <variant>
#include "base/flags.h"
#include "base/histogram.h"
#include "base/io_buf.h"
#include "base/logging.h"
#include "core/heap_size.h"
@ -153,7 +154,8 @@ bool TrafficLogger::Write(iovec* blobs, size_t len) {
return true;
}
thread_local TrafficLogger tl_traffic_logger{}; // nullopt while disabled
thread_local TrafficLogger tl_traffic_logger{};
thread_local base::Histogram* io_req_size_hist = nullptr;
void OpenTrafficLogger(string_view base_path) {
unique_lock lk{tl_traffic_logger.mutex};
@ -528,11 +530,7 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
http_listener_(http_listener),
ssl_ctx_(ctx),
service_(service),
tracking_enabled_(false),
skip_next_squashing_(false),
migration_enabled_(false),
migration_in_process_(false),
is_http_(false) {
flags_(0) {
static atomic_uint32_t next_id{1};
protocol_ = protocol;
@ -1084,11 +1082,14 @@ Connection::ParserStatus Connection::ParseRedis() {
do {
result = redis_parser_->Parse(io_buf_.InputBuffer(), &consumed, &parse_args);
request_consumed_bytes_ += consumed;
if (result == RedisParser::OK && !parse_args.empty()) {
if (RespExpr& first = parse_args.front(); first.type == RespExpr::STRING)
DVLOG(2) << "Got Args with first token " << ToSV(first.GetBuf());
if (io_req_size_hist)
io_req_size_hist->Add(request_consumed_bytes_);
request_consumed_bytes_ = 0;
bool has_more = consumed < io_buf_.InputLen();
if (tl_traffic_logger.log_file && IsMain() /* log only on the main interface */) {
@ -1824,6 +1825,20 @@ void Connection::SetMaxQueueLenThreadLocal(uint32_t val) {
tl_queue_backpressure_.pipeline_queue_max_len = val;
}
void Connection::GetRequestSizeHistogramThreadLocal(std::string* hist) {
if (io_req_size_hist)
*hist = io_req_size_hist->ToString();
}
void Connection::TrackRequestSize(bool enable) {
if (enable && !io_req_size_hist) {
io_req_size_hist = new base::Histogram;
} else if (!enable && io_req_size_hist) {
delete io_req_size_hist;
io_req_size_hist = nullptr;
}
}
Connection::WeakRef::WeakRef(std::shared_ptr<Connection> ptr, QueueBackpressure* backpressure,
unsigned thread, uint32_t client_id)
: ptr_{ptr}, backpressure_{backpressure}, thread_{thread}, client_id_{client_id} {
@ -1870,4 +1885,20 @@ bool Connection::WeakRef::operator==(const WeakRef& other) const {
return client_id_ == other.client_id_;
}
void ResetStats() {
auto& cstats = tl_facade_stats->conn_stats;
cstats.command_cnt = 0;
cstats.pipelined_cmd_cnt = 0;
cstats.conn_received_cnt = 0;
cstats.pipelined_cmd_cnt = 0;
cstats.command_cnt = 0;
cstats.io_read_cnt = 0;
cstats.io_read_bytes = 0;
tl_facade_stats->reply_stats = {};
if (io_req_size_hist)
io_req_size_hist->Clear();
}
} // namespace facade

View file

@ -312,6 +312,8 @@ class Connection : public util::Connection {
// Sets max queue length locally in the calling thread.
static void SetMaxQueueLenThreadLocal(uint32_t val);
static void GetRequestSizeHistogramThreadLocal(std::string* hist);
static void TrackRequestSize(bool enable);
protected:
void OnShutdown() override;
@ -388,7 +390,10 @@ class Connection : public util::Connection {
util::fb2::CondVarAny cnd_; // dispatch queue waker
util::fb2::Fiber dispatch_fb_; // dispatch fiber (if started)
size_t pending_pipeline_cmd_cnt_ = 0; // how many queued Redis async commands in dispatch_q
uint64_t pending_pipeline_cmd_cnt_ = 0; // how many queued Redis async commands in dispatch_q
// how many bytes of the current request have been consumed
size_t request_consumed_bytes_ = 0;
io::IoBuf io_buf_; // used in io loop and parsers
std::unique_ptr<RedisParser> redis_parser_;
@ -441,14 +446,19 @@ class Connection : public util::Connection {
// Per-thread queue backpressure structs.
static thread_local QueueBackpressure tl_queue_backpressure_;
// a flag indicating whether the client has turned on client tracking.
bool tracking_enabled_ : 1;
bool skip_next_squashing_ : 1; // Forcefully skip next squashing
union {
uint16_t flags_;
struct {
// a flag indicating whether the client has turned on client tracking.
bool tracking_enabled_ : 1;
bool skip_next_squashing_ : 1; // Forcefully skip next squashing
// Connection migration vars, see RequestAsyncMigration() above.
bool migration_enabled_ : 1;
bool migration_in_process_ : 1;
bool is_http_ : 1;
// Connection migration vars, see RequestAsyncMigration() above.
bool migration_enabled_ : 1;
bool migration_in_process_ : 1;
bool is_http_ : 1;
};
};
};
} // namespace facade

View file

@ -193,6 +193,8 @@ constexpr inline unsigned long long operator""_KB(unsigned long long x) {
extern __thread FacadeStats* tl_facade_stats;
void ResetStats();
} // namespace facade
namespace std {

View file

@ -98,10 +98,6 @@ void SinkReplyBuilder::CloseConnection() {
ec_ = std::make_error_code(std::errc::connection_aborted);
}
void SinkReplyBuilder::ResetThreadLocalStats() {
tl_facade_stats->reply_stats = {};
}
void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
has_replied_ = true;
DCHECK(sink_);

View file

@ -147,8 +147,6 @@ class SinkReplyBuilder {
return tl_facade_stats->reply_stats;
}
static void ResetThreadLocalStats();
virtual void StartAggregate();
virtual void StopAggregate();
@ -237,10 +235,6 @@ class SinkReplyBuilder2 {
return tl_facade_stats->reply_stats;
}
static void ResetThreadLocalStats() {
tl_facade_stats->reply_stats = {};
}
public: // High level interface
virtual void SendLong(long val) = 0;
virtual void SendSimpleString(std::string_view str) = 0;

View file

@ -84,7 +84,7 @@ class RedisReplyBuilderTest : public testing::Test {
void SetUp() {
sink_.Clear();
builder_.reset(new RedisReplyBuilder2(&sink_));
SinkReplyBuilder2::ResetThreadLocalStats();
ResetStats();
}
static void SetUpTestSuite() {

View file

@ -9,6 +9,7 @@ extern "C" {
#include <absl/cleanup/cleanup.h>
#include <absl/random/random.h>
#include <absl/strings/match.h>
#include <absl/strings/str_cat.h>
#include <zstd.h>
@ -413,6 +414,8 @@ void DebugCmd::Run(CmdArgList args) {
"TRAFFIC <path> | [STOP]",
" Starts traffic logging to the specified path. If path is not specified,"
" traffic logging is stopped.",
"RECVSIZE [<tid> | ENABLE | DISABLE]",
" Prints the histogram of the received request sizes on the given thread",
"HELP",
" Prints this help.",
};
@ -468,6 +471,10 @@ void DebugCmd::Run(CmdArgList args) {
return LogTraffic(args.subspan(1));
}
if (subcmd == "RECVSIZE" && args.size() == 2) {
return RecvSize(ArgS(args, 1));
}
string reply = UnknownSubCmd(subcmd, "DEBUG");
return cntx_->SendError(reply, kSyntaxErrType);
}
@ -957,4 +964,29 @@ void DebugCmd::Shards() {
rb->SendVerbatimString(out);
}
void DebugCmd::RecvSize(string_view param) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx_->reply_builder());
uint8_t enable = 2;
if (absl::EqualsIgnoreCase(param, "ENABLE"))
enable = 1;
else if (absl::EqualsIgnoreCase(param, "DISABLE"))
enable = 0;
if (enable < 2) {
shard_set->pool()->AwaitBrief(
[enable](auto, auto*) { facade::Connection::TrackRequestSize(enable == 1); });
return rb->SendOk();
}
unsigned tid;
if (!absl::SimpleAtoi(param, &tid) || tid >= shard_set->pool()->size()) {
return rb->SendError(kUintErr);
}
string hist;
shard_set->pool()->at(tid)->AwaitBrief(
[&]() { facade::Connection::GetRequestSizeHistogramThreadLocal(&hist); });
rb->SendVerbatimString(hist);
}
} // namespace dfly

View file

@ -48,6 +48,7 @@ class DebugCmd {
void Stacktrace();
void Shards();
void LogTraffic(CmdArgList);
void RecvSize(std::string_view param);
ServerFamily& sf_;
ConnectionContext* cntx_;

View file

@ -2050,23 +2050,8 @@ void ServerFamily::ResetStat(Namespace* ns) {
shard_set->pool()->AwaitBrief(
[registry = service_.mutable_registry(), this, ns](unsigned index, auto*) {
registry->ResetCallStats(index);
SinkReplyBuilder::ResetThreadLocalStats();
auto& stats = tl_facade_stats->conn_stats;
stats.command_cnt = 0;
stats.pipelined_cmd_cnt = 0;
ns->GetCurrentDbSlice().ResetEvents();
tl_facade_stats->conn_stats.conn_received_cnt = 0;
tl_facade_stats->conn_stats.pipelined_cmd_cnt = 0;
tl_facade_stats->conn_stats.command_cnt = 0;
tl_facade_stats->conn_stats.io_read_cnt = 0;
tl_facade_stats->conn_stats.io_read_bytes = 0;
tl_facade_stats->reply_stats.io_write_bytes = 0;
tl_facade_stats->reply_stats.io_write_cnt = 0;
tl_facade_stats->reply_stats.send_stats = {};
tl_facade_stats->reply_stats.script_error_count = 0;
tl_facade_stats->reply_stats.err_count.clear();
facade::ResetStats();
ServerState::tlocal()->exec_freq_count.clear();
});
}