1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat server: support config set serialization_max_chunk_size (#4274)

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2024-12-09 09:12:18 +02:00 committed by GitHub
parent bafd8b3b8b
commit 330d007d56
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 33 additions and 18 deletions

View file

@ -119,7 +119,6 @@ atomic_uint64_t rss_mem_peak(0);
unsigned kernel_version = 0;
size_t max_memory_limit = 0;
size_t serialization_max_chunk_size = 0;
Namespaces* namespaces = nullptr;
size_t FetchRssMemory(io::StatusData sdata) {
@ -438,7 +437,7 @@ ThreadLocalMutex::~ThreadLocalMutex() {
}
void ThreadLocalMutex::lock() {
if (serialization_max_chunk_size != 0) {
if (ServerState::tlocal()->serialization_max_chunk_size != 0) {
DCHECK_EQ(EngineShard::tlocal(), shard_);
util::fb2::NoOpLock noop_lk_;
if (locked_fiber_ != nullptr) {
@ -452,7 +451,7 @@ void ThreadLocalMutex::lock() {
}
void ThreadLocalMutex::unlock() {
if (serialization_max_chunk_size != 0) {
if (ServerState::tlocal()->serialization_max_chunk_size != 0) {
DCHECK_EQ(EngineShard::tlocal(), shard_);
flag_ = false;
cond_var_.notify_one();

View file

@ -388,8 +388,6 @@ struct BorrowedInterpreter {
bool owned_ = false;
};
extern size_t serialization_max_chunk_size;
class LocalBlockingCounter {
public:
void lock() {

View file

@ -17,7 +17,8 @@ class CommandAggregator {
public:
using WriteCmdCallback = std::function<void(absl::Span<const string_view>)>;
CommandAggregator(string_view key, WriteCmdCallback cb) : key_(key), cb_(cb) {
CommandAggregator(string_view key, WriteCmdCallback cb, size_t max_agg_bytes)
: key_(key), cb_(cb), max_aggragation_bytes_(max_agg_bytes) {
}
~CommandAggregator() {
@ -29,7 +30,7 @@ class CommandAggregator {
agg_bytes_ += arg.size();
members_.push_back(std::move(arg));
if (commit_mode != CommitMode::kNoCommit && agg_bytes_ >= serialization_max_chunk_size) {
if (commit_mode != CommitMode::kNoCommit && agg_bytes_ >= max_aggragation_bytes_) {
CommitPending();
}
}
@ -55,18 +56,20 @@ class CommandAggregator {
vector<string> members_;
absl::InlinedVector<string_view, 5> args_;
size_t agg_bytes_ = 0;
size_t max_aggragation_bytes_;
};
} // namespace
CmdSerializer::CmdSerializer(FlushSerialized cb) : cb_(std::move(cb)) {
CmdSerializer::CmdSerializer(FlushSerialized cb, size_t max_serialization_buffer_size)
: cb_(std::move(cb)), max_serialization_buffer_size_(max_serialization_buffer_size) {
}
void CmdSerializer::SerializeEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) {
// We send RESTORE commands for small objects, or objects we don't support breaking.
bool use_restore_serialization = true;
if (serialization_max_chunk_size > 0 && pv.MallocUsed() > serialization_max_chunk_size) {
if (max_serialization_buffer_size_ > 0 && pv.MallocUsed() > max_serialization_buffer_size_) {
switch (pv.ObjType()) {
case OBJ_SET:
SerializeSet(key, pv);
@ -138,7 +141,8 @@ void CmdSerializer::SerializeExpireIfNeeded(string_view key, uint64_t expire_ms)
void CmdSerializer::SerializeSet(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("SADD", args); });
key, [&](absl::Span<const string_view> args) { SerializeCommand("SADD", args); },
max_serialization_buffer_size_);
container_utils::IterateSet(pv, [&](container_utils::ContainerEntry ce) {
aggregator.AddArg(ce.ToString());
@ -148,7 +152,8 @@ void CmdSerializer::SerializeSet(string_view key, const PrimeValue& pv) {
void CmdSerializer::SerializeZSet(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("ZADD", args); });
key, [&](absl::Span<const string_view> args) { SerializeCommand("ZADD", args); },
max_serialization_buffer_size_);
container_utils::IterateSortedSet(
pv.GetRobjWrapper(),
@ -162,7 +167,8 @@ void CmdSerializer::SerializeZSet(string_view key, const PrimeValue& pv) {
void CmdSerializer::SerializeHash(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("HSET", args); });
key, [&](absl::Span<const string_view> args) { SerializeCommand("HSET", args); },
max_serialization_buffer_size_);
container_utils::IterateMap(
pv, [&](container_utils::ContainerEntry k, container_utils::ContainerEntry v) {
@ -174,7 +180,8 @@ void CmdSerializer::SerializeHash(string_view key, const PrimeValue& pv) {
void CmdSerializer::SerializeList(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("RPUSH", args); });
key, [&](absl::Span<const string_view> args) { SerializeCommand("RPUSH", args); },
max_serialization_buffer_size_);
container_utils::IterateList(pv, [&](container_utils::ContainerEntry ce) {
aggregator.AddArg(ce.ToString());

View file

@ -21,7 +21,7 @@ class CmdSerializer {
public:
using FlushSerialized = std::function<void(std::string)>;
explicit CmdSerializer(FlushSerialized cb);
explicit CmdSerializer(FlushSerialized cb, size_t max_serialization_buffer_size);
void SerializeEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms);
@ -39,6 +39,7 @@ class CmdSerializer {
uint64_t expire_ms);
FlushSerialized cb_;
size_t max_serialization_buffer_size_;
};
} // namespace dfly

View file

@ -10,6 +10,7 @@
#include "base/logging.h"
#include "server/cluster/cluster_defs.h"
#include "server/journal/cmd_serializer.h"
#include "server/server_state.h"
#include "util/fibers/synchronization.h"
using namespace facade;
@ -319,7 +320,8 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req
void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) {
CmdSerializer serializer([&](std::string s) { Write(std::move(s)); });
CmdSerializer serializer([&](std::string s) { Write(std::move(s)); },
ServerState::tlocal()->serialization_max_chunk_size);
serializer.SerializeEntry(key, pk, pv, expire_ms);
}

View file

@ -728,6 +728,11 @@ void SetRssOomDenyRatioOnAllThreads(double ratio) {
shard_set->pool()->AwaitBrief(cb);
}
void SetSerializationMaxChunkSize(size_t val) {
auto cb = [val](unsigned, auto*) { ServerState::tlocal()->serialization_max_chunk_size = val; };
shard_set->pool()->AwaitBrief(cb);
}
} // namespace
Service::Service(ProactorPool* pp)
@ -791,6 +796,8 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
config_registry.RegisterSetter<double>("rss_oom_deny_ratio",
[](double val) { SetRssOomDenyRatioOnAllThreads(val); });
config_registry.RegisterSetter<size_t>("serialization_max_chunk_size",
[](size_t val) { SetSerializationMaxChunkSize(val); });
config_registry.RegisterMutable("pipeline_squash");
@ -835,7 +842,6 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
[val](auto index, auto* context) { ServerState::tlocal()->acl_log.SetTotalEntries(val); });
});
serialization_max_chunk_size = GetFlag(FLAGS_serialization_max_chunk_size);
uint32_t shard_num = GetFlag(FLAGS_num_shards);
if (shard_num == 0 || shard_num > pp_.size()) {
LOG_IF(WARNING, shard_num > pp_.size())
@ -867,6 +873,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
Transaction::Init(shard_num);
SetRssOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_rss_oom_deny_ratio));
SetSerializationMaxChunkSize(absl::GetFlag(FLAGS_serialization_max_chunk_size));
// Requires that shard_set will be initialized before because server_family_.Init might
// load the snapshot.

View file

@ -160,7 +160,7 @@ std::string AbslUnparseFlag(dfly::CompressionMode flag) {
dfly::CompressionMode GetDefaultCompressionMode() {
const auto flag = absl::GetFlag(FLAGS_compression_mode);
if (serialization_max_chunk_size == 0) {
if (ServerState::tlocal()->serialization_max_chunk_size == 0) {
return flag;
}

View file

@ -298,6 +298,7 @@ class ServerState { // public struct - to allow initialization.
// Exec descriptor frequency count for this thread.
absl::flat_hash_map<std::string, unsigned> exec_freq_count;
double rss_oom_deny_ratio;
size_t serialization_max_chunk_size;
private:
int64_t live_transactions_ = 0;

View file

@ -78,7 +78,7 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, Snapshot
journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb));
}
const auto flush_threshold = serialization_max_chunk_size;
const auto flush_threshold = ServerState::tlocal()->serialization_max_chunk_size;
std::function<void(size_t, RdbSerializer::FlushState)> flush_fun;
if (flush_threshold != 0 && allow_flush == SnapshotFlush::kAllow) {
flush_fun = [this, flush_threshold](size_t bytes_serialized,