1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat: add INFO memory section for squashing replies memory consuming (#4147)

* feat: add INFO memory section for squashing replies memory consuming

* refactor: address comments
This commit is contained in:
Borys 2024-11-18 21:16:41 +02:00 committed by GitHub
parent 5e2b48c3f3
commit e16ef838e4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 46 additions and 0 deletions

View file

@ -7,6 +7,7 @@
#include <absl/container/inlined_vector.h>
#include "base/logging.h"
#include "core/overloaded.h"
#include "facade/dragonfly_connection.h"
#include "server/cluster/cluster_utility.h"
#include "server/command_registry.h"
@ -30,8 +31,40 @@ void CheckConnStateClean(const ConnectionState& state) {
DCHECK(!state.subscribe_info);
}
size_t Size(const facade::CapturingReplyBuilder::Payload& payload) {
size_t payload_size = sizeof(facade::CapturingReplyBuilder::Payload);
return visit(
Overloaded{
[&](monostate) { return payload_size; },
[&](long) { return payload_size; },
[&](double) { return payload_size; },
[&](OpStatus) { return payload_size; },
[&](CapturingReplyBuilder::Null) { return payload_size; },
// ignore SSO because it's insignificant
[&](const CapturingReplyBuilder::SimpleString& data) {
return payload_size + data.size();
},
[&](const CapturingReplyBuilder::BulkString& data) { return payload_size + data.size(); },
[&](const CapturingReplyBuilder::Error& data) {
return payload_size + data.first.size() + data.second.size();
},
[&](const unique_ptr<CapturingReplyBuilder::CollectionPayload>& data) {
if (!data || (data->len == 0 && data->type == RedisReplyBuilder::ARRAY)) {
return payload_size;
}
for (const auto& pl : data->arr) {
payload_size += Size(pl);
}
return payload_size;
},
},
payload);
}
} // namespace
atomic_uint64_t MultiCommandSquasher::current_reply_size_ = 0;
MultiCommandSquasher::MultiCommandSquasher(absl::Span<StoredCmd> cmds, ConnectionContext* cntx,
Service* service, bool verify_commands, bool error_abort)
: cmds_{cmds},
@ -159,6 +192,8 @@ OpStatus MultiCommandSquasher::SquashedHopCb(Transaction* parent_tx, EngineShard
if (auto err = service_->VerifyCommandState(cmd->Cid(), args, *cntx_); err) {
crb.SendError(std::move(*err));
sinfo.replies.emplace_back(crb.Take());
current_reply_size_.fetch_add(Size(sinfo.replies.back()), std::memory_order_relaxed);
continue;
}
}
@ -171,6 +206,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(Transaction* parent_tx, EngineShard
service_->InvokeCmd(cmd->Cid(), args, &crb, &local_cntx);
sinfo.replies.emplace_back(crb.Take());
current_reply_size_.fetch_add(Size(sinfo.replies.back()), std::memory_order_relaxed);
// Assert commands made no persistent state changes to stub context state
const auto& local_state = local_cntx.conn_state;
@ -238,6 +274,7 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
aborted |= error_abort_ && CapturingReplyBuilder::TryExtractError(replies.back());
CapturingReplyBuilder::Apply(std::move(replies.back()), rb);
current_reply_size_.fetch_sub(Size(replies.back()), std::memory_order_relaxed);
replies.pop_back();
if (aborted)

View file

@ -28,6 +28,10 @@ class MultiCommandSquasher {
return MultiCommandSquasher{cmds, cntx, service, verify_commands, error_abort}.Run(rb);
}
static size_t GetRepliesMemSize() {
return current_reply_size_.load(std::memory_order_relaxed);
}
private:
// Per-shard execution info.
struct ShardExecInfo {
@ -85,6 +89,9 @@ class MultiCommandSquasher {
size_t num_shards_ = 0;
std::vector<MutableSlice> tmp_keylist_;
// we increase size in one thread and decrease in another
static atomic_uint64_t current_reply_size_;
};
} // namespace dfly

View file

@ -50,6 +50,7 @@ extern "C" {
#include "server/journal/journal.h"
#include "server/main_service.h"
#include "server/memory_cmd.h"
#include "server/multi_command_squasher.h"
#include "server/protocol_client.h"
#include "server/rdb_load.h"
#include "server/rdb_save.h"
@ -2314,6 +2315,7 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
append("client_read_buffer_peak_bytes", m.peak_stats.conn_read_buf_capacity);
append("tls_bytes", m.tls_bytes);
append("snapshot_serialization_bytes", m.serialization_bytes);
append("commands_squashing_replies_bytes", MultiCommandSquasher::GetRepliesMemSize());
if (GetFlag(FLAGS_cache_mode)) {
append("cache_mode", "cache");