From 330d007d56311fc6fab2a45409350d026a9a8b53 Mon Sep 17 00:00:00 2001 From: adiholden Date: Mon, 9 Dec 2024 09:12:18 +0200 Subject: [PATCH] feat server: support config set serialization_max_chunk_size (#4274) Signed-off-by: adi_holden --- src/server/common.cc | 5 ++--- src/server/common.h | 2 -- src/server/journal/cmd_serializer.cc | 23 +++++++++++++++-------- src/server/journal/cmd_serializer.h | 3 ++- src/server/journal/streamer.cc | 4 +++- src/server/main_service.cc | 9 ++++++++- src/server/rdb_save.cc | 2 +- src/server/server_state.h | 1 + src/server/snapshot.cc | 2 +- 9 files changed, 33 insertions(+), 18 deletions(-) diff --git a/src/server/common.cc b/src/server/common.cc index 39d4acaff..e16d0d79f 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -119,7 +119,6 @@ atomic_uint64_t rss_mem_peak(0); unsigned kernel_version = 0; size_t max_memory_limit = 0; -size_t serialization_max_chunk_size = 0; Namespaces* namespaces = nullptr; size_t FetchRssMemory(io::StatusData sdata) { @@ -438,7 +437,7 @@ ThreadLocalMutex::~ThreadLocalMutex() { } void ThreadLocalMutex::lock() { - if (serialization_max_chunk_size != 0) { + if (ServerState::tlocal()->serialization_max_chunk_size != 0) { DCHECK_EQ(EngineShard::tlocal(), shard_); util::fb2::NoOpLock noop_lk_; if (locked_fiber_ != nullptr) { @@ -452,7 +451,7 @@ void ThreadLocalMutex::lock() { } void ThreadLocalMutex::unlock() { - if (serialization_max_chunk_size != 0) { + if (ServerState::tlocal()->serialization_max_chunk_size != 0) { DCHECK_EQ(EngineShard::tlocal(), shard_); flag_ = false; cond_var_.notify_one(); diff --git a/src/server/common.h b/src/server/common.h index ac890b225..2b66b29ee 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -388,8 +388,6 @@ struct BorrowedInterpreter { bool owned_ = false; }; -extern size_t serialization_max_chunk_size; - class LocalBlockingCounter { public: void lock() { diff --git a/src/server/journal/cmd_serializer.cc b/src/server/journal/cmd_serializer.cc index 18674e4f4..f9c98b99f 100644 --- a/src/server/journal/cmd_serializer.cc +++ b/src/server/journal/cmd_serializer.cc @@ -17,7 +17,8 @@ class CommandAggregator { public: using WriteCmdCallback = std::function)>; - CommandAggregator(string_view key, WriteCmdCallback cb) : key_(key), cb_(cb) { + CommandAggregator(string_view key, WriteCmdCallback cb, size_t max_agg_bytes) + : key_(key), cb_(cb), max_aggragation_bytes_(max_agg_bytes) { } ~CommandAggregator() { @@ -29,7 +30,7 @@ class CommandAggregator { agg_bytes_ += arg.size(); members_.push_back(std::move(arg)); - if (commit_mode != CommitMode::kNoCommit && agg_bytes_ >= serialization_max_chunk_size) { + if (commit_mode != CommitMode::kNoCommit && agg_bytes_ >= max_aggragation_bytes_) { CommitPending(); } } @@ -55,18 +56,20 @@ class CommandAggregator { vector members_; absl::InlinedVector args_; size_t agg_bytes_ = 0; + size_t max_aggragation_bytes_; }; } // namespace -CmdSerializer::CmdSerializer(FlushSerialized cb) : cb_(std::move(cb)) { +CmdSerializer::CmdSerializer(FlushSerialized cb, size_t max_serialization_buffer_size) + : cb_(std::move(cb)), max_serialization_buffer_size_(max_serialization_buffer_size) { } void CmdSerializer::SerializeEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv, uint64_t expire_ms) { // We send RESTORE commands for small objects, or objects we don't support breaking. bool use_restore_serialization = true; - if (serialization_max_chunk_size > 0 && pv.MallocUsed() > serialization_max_chunk_size) { + if (max_serialization_buffer_size_ > 0 && pv.MallocUsed() > max_serialization_buffer_size_) { switch (pv.ObjType()) { case OBJ_SET: SerializeSet(key, pv); @@ -138,7 +141,8 @@ void CmdSerializer::SerializeExpireIfNeeded(string_view key, uint64_t expire_ms) void CmdSerializer::SerializeSet(string_view key, const PrimeValue& pv) { CommandAggregator aggregator( - key, [&](absl::Span args) { SerializeCommand("SADD", args); }); + key, [&](absl::Span args) { SerializeCommand("SADD", args); }, + max_serialization_buffer_size_); container_utils::IterateSet(pv, [&](container_utils::ContainerEntry ce) { aggregator.AddArg(ce.ToString()); @@ -148,7 +152,8 @@ void CmdSerializer::SerializeSet(string_view key, const PrimeValue& pv) { void CmdSerializer::SerializeZSet(string_view key, const PrimeValue& pv) { CommandAggregator aggregator( - key, [&](absl::Span args) { SerializeCommand("ZADD", args); }); + key, [&](absl::Span args) { SerializeCommand("ZADD", args); }, + max_serialization_buffer_size_); container_utils::IterateSortedSet( pv.GetRobjWrapper(), @@ -162,7 +167,8 @@ void CmdSerializer::SerializeZSet(string_view key, const PrimeValue& pv) { void CmdSerializer::SerializeHash(string_view key, const PrimeValue& pv) { CommandAggregator aggregator( - key, [&](absl::Span args) { SerializeCommand("HSET", args); }); + key, [&](absl::Span args) { SerializeCommand("HSET", args); }, + max_serialization_buffer_size_); container_utils::IterateMap( pv, [&](container_utils::ContainerEntry k, container_utils::ContainerEntry v) { @@ -174,7 +180,8 @@ void CmdSerializer::SerializeHash(string_view key, const PrimeValue& pv) { void CmdSerializer::SerializeList(string_view key, const PrimeValue& pv) { CommandAggregator aggregator( - key, [&](absl::Span args) { SerializeCommand("RPUSH", args); }); + key, [&](absl::Span args) { SerializeCommand("RPUSH", args); }, + max_serialization_buffer_size_); container_utils::IterateList(pv, [&](container_utils::ContainerEntry ce) { aggregator.AddArg(ce.ToString()); diff --git a/src/server/journal/cmd_serializer.h b/src/server/journal/cmd_serializer.h index 14e08088a..6c9e2a51d 100644 --- a/src/server/journal/cmd_serializer.h +++ b/src/server/journal/cmd_serializer.h @@ -21,7 +21,7 @@ class CmdSerializer { public: using FlushSerialized = std::function; - explicit CmdSerializer(FlushSerialized cb); + explicit CmdSerializer(FlushSerialized cb, size_t max_serialization_buffer_size); void SerializeEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv, uint64_t expire_ms); @@ -39,6 +39,7 @@ class CmdSerializer { uint64_t expire_ms); FlushSerialized cb_; + size_t max_serialization_buffer_size_; }; } // namespace dfly diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 5677c5487..bc2657a68 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -10,6 +10,7 @@ #include "base/logging.h" #include "server/cluster/cluster_defs.h" #include "server/journal/cmd_serializer.h" +#include "server/server_state.h" #include "util/fibers/synchronization.h" using namespace facade; @@ -319,7 +320,8 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv, uint64_t expire_ms) { - CmdSerializer serializer([&](std::string s) { Write(std::move(s)); }); + CmdSerializer serializer([&](std::string s) { Write(std::move(s)); }, + ServerState::tlocal()->serialization_max_chunk_size); serializer.SerializeEntry(key, pk, pv, expire_ms); } diff --git a/src/server/main_service.cc b/src/server/main_service.cc index b7dd7a55c..956d0273e 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -728,6 +728,11 @@ void SetRssOomDenyRatioOnAllThreads(double ratio) { shard_set->pool()->AwaitBrief(cb); } +void SetSerializationMaxChunkSize(size_t val) { + auto cb = [val](unsigned, auto*) { ServerState::tlocal()->serialization_max_chunk_size = val; }; + shard_set->pool()->AwaitBrief(cb); +} + } // namespace Service::Service(ProactorPool* pp) @@ -791,6 +796,8 @@ void Service::Init(util::AcceptServer* acceptor, std::vector config_registry.RegisterSetter("rss_oom_deny_ratio", [](double val) { SetRssOomDenyRatioOnAllThreads(val); }); + config_registry.RegisterSetter("serialization_max_chunk_size", + [](size_t val) { SetSerializationMaxChunkSize(val); }); config_registry.RegisterMutable("pipeline_squash"); @@ -835,7 +842,6 @@ void Service::Init(util::AcceptServer* acceptor, std::vector [val](auto index, auto* context) { ServerState::tlocal()->acl_log.SetTotalEntries(val); }); }); - serialization_max_chunk_size = GetFlag(FLAGS_serialization_max_chunk_size); uint32_t shard_num = GetFlag(FLAGS_num_shards); if (shard_num == 0 || shard_num > pp_.size()) { LOG_IF(WARNING, shard_num > pp_.size()) @@ -867,6 +873,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector Transaction::Init(shard_num); SetRssOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_rss_oom_deny_ratio)); + SetSerializationMaxChunkSize(absl::GetFlag(FLAGS_serialization_max_chunk_size)); // Requires that shard_set will be initialized before because server_family_.Init might // load the snapshot. diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 7327ad49b..c9b7dbbf1 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -160,7 +160,7 @@ std::string AbslUnparseFlag(dfly::CompressionMode flag) { dfly::CompressionMode GetDefaultCompressionMode() { const auto flag = absl::GetFlag(FLAGS_compression_mode); - if (serialization_max_chunk_size == 0) { + if (ServerState::tlocal()->serialization_max_chunk_size == 0) { return flag; } diff --git a/src/server/server_state.h b/src/server/server_state.h index 9aed8901c..c5ca6adf9 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -298,6 +298,7 @@ class ServerState { // public struct - to allow initialization. // Exec descriptor frequency count for this thread. absl::flat_hash_map exec_freq_count; double rss_oom_deny_ratio; + size_t serialization_max_chunk_size; private: int64_t live_transactions_ = 0; diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 528843ef1..e23bfe821 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -78,7 +78,7 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, Snapshot journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb)); } - const auto flush_threshold = serialization_max_chunk_size; + const auto flush_threshold = ServerState::tlocal()->serialization_max_chunk_size; std::function flush_fun; if (flush_threshold != 0 && allow_flush == SnapshotFlush::kAllow) { flush_fun = [this, flush_threshold](size_t bytes_serialized,