1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-15 17:51:06 +00:00

feat(server): introduce rss oom limit (#3702)

* introduce rss denyoom limit

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2024-09-22 13:28:24 +03:00 committed by GitHub
parent 5cf917871c
commit 4d38271efa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 209 additions and 64 deletions

View file

@ -457,9 +457,12 @@ TEST_F(DflyEngineTest, OOM) {
/// and then written with the same key.
TEST_F(DflyEngineTest, Bug207) {
max_memory_limit = 300000;
shard_set->TEST_EnableCacheMode();
absl::FlagSaver fs;
absl::SetFlag(&FLAGS_oom_deny_ratio, 4);
ResetService();
shard_set->TEST_EnableCacheMode();
ssize_t i = 0;
RespExpr resp;
@ -486,11 +489,11 @@ TEST_F(DflyEngineTest, Bug207) {
}
TEST_F(DflyEngineTest, StickyEviction) {
shard_set->TEST_EnableCacheMode();
max_memory_limit = 300000;
absl::FlagSaver fs;
absl::SetFlag(&FLAGS_oom_deny_ratio, 4);
max_memory_limit = 300000;
ResetService();
shard_set->TEST_EnableCacheMode();
string tmp_val(100, '.');

View file

@ -699,9 +699,6 @@ void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms,
std::function<void()> shard_handler) {
VLOG(1) << "RunPeriodic with period " << period_ms.count() << "ms";
bool runs_global_periodic = (shard_id() == 0); // Only shard 0 runs global periodic.
unsigned global_count = 0;
int64_t last_stats_time = time(nullptr);
int64_t last_heartbeat_ms = INT64_MAX;
int64_t last_handler_ms = 0;
@ -721,33 +718,6 @@ void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms,
last_handler_ms = last_heartbeat_ms;
shard_handler();
}
if (runs_global_periodic) {
++global_count;
// Every 8 runs, update the global stats.
if (global_count % 8 == 0) {
DVLOG(2) << "Global periodic";
uint64_t mem_current = used_mem_current.load(std::memory_order_relaxed);
// Single writer, so no races.
if (mem_current > used_mem_peak.load(memory_order_relaxed))
used_mem_peak.store(mem_current, memory_order_relaxed);
int64_t cur_time = time(nullptr);
if (cur_time != last_stats_time) {
last_stats_time = cur_time;
io::Result<io::StatusData> sdata_res = io::ReadStatusInfo();
if (sdata_res) {
size_t total_rss = sdata_res->vm_rss + sdata_res->hugetlb_pages;
rss_mem_current.store(total_rss, memory_order_relaxed);
if (rss_mem_peak.load(memory_order_relaxed) < total_rss)
rss_mem_peak.store(total_rss, memory_order_relaxed);
}
}
}
}
}
}

View file

@ -103,6 +103,11 @@ ABSL_FLAG(double, oom_deny_ratio, 1.1,
"commands with flag denyoom will return OOM when the ratio between maxmemory and used "
"memory is above this value");
ABSL_FLAG(double, rss_oom_deny_ratio, 1.25,
"When the ratio between maxmemory and RSS memory exceeds this value, commands marked as "
"DENYOOM will fail with OOM error and new connections to non-admin port will be "
"rejected. Negative value disables this feature.");
ABSL_FLAG(size_t, serialization_max_chunk_size, 0,
"Maximum size of a value that may be serialized at once during snapshotting or full "
"sync. Values bigger than this threshold will be serialized using streaming "
@ -859,6 +864,16 @@ Service::Service(ProactorPool* pp)
engine_varz.emplace("engine", [this] { return GetVarzStats(); });
}
void SetOomDenyRatioOnAllThreads(double ratio) {
auto cb = [ratio](unsigned, auto*) { ServerState::tlocal()->oom_deny_ratio = ratio; };
shard_set->pool()->AwaitBrief(cb);
}
void SetRssOomDenyRatioOnAllThreads(double ratio) {
auto cb = [ratio](unsigned, auto*) { ServerState::tlocal()->rss_oom_deny_ratio = ratio; };
shard_set->pool()->AwaitBrief(cb);
}
Service::~Service() {
delete shard_set;
shard_set = nullptr;
@ -884,7 +899,22 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
config_registry.RegisterMutable("masteruser");
config_registry.RegisterMutable("max_eviction_per_heartbeat");
config_registry.RegisterMutable("max_segment_to_consider");
config_registry.RegisterMutable("oom_deny_ratio");
config_registry.RegisterMutable("oom_deny_ratio", [](const absl::CommandLineFlag& flag) {
auto res = flag.TryGet<double>();
if (res.has_value()) {
SetOomDenyRatioOnAllThreads(*res);
}
return res.has_value();
});
config_registry.RegisterMutable("rss_oom_deny_ratio", [](const absl::CommandLineFlag& flag) {
auto res = flag.TryGet<double>();
if (res.has_value()) {
SetRssOomDenyRatioOnAllThreads(*res);
}
return res.has_value();
});
config_registry.RegisterMutable("pipeline_squash");
config_registry.RegisterMutable("pipeline_queue_limit",
[pool = &pp_](const absl::CommandLineFlag& flag) {
@ -925,7 +955,12 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
}
// Initialize shard_set with a global callback running once in a while in the shard threads.
shard_set->Init(shard_num, [this] { server_family_.GetDflyCmd()->BreakStalledFlowsInShard(); });
shard_set->Init(shard_num, [this] {
server_family_.GetDflyCmd()->BreakStalledFlowsInShard();
server_family_.UpdateMemoryGlobalStats();
});
SetOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_oom_deny_ratio));
SetRssOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_rss_oom_deny_ratio));
// Requires that shard_set will be initialized before because server_family_.Init might
// load the snapshot.
@ -1046,21 +1081,29 @@ static optional<ErrorReply> VerifyConnectionAclStatus(const CommandId* cid,
return nullopt;
}
bool ShouldDenyOnOOM(const CommandId* cid) {
ServerState& etl = *ServerState::tlocal();
if ((cid->opt_mask() & CO::DENYOOM) && etl.is_master) {
uint64_t start_ns = absl::GetCurrentTimeNanos();
auto memory_stats = etl.GetMemoryUsage(start_ns);
if (memory_stats.used_mem > (max_memory_limit * etl.oom_deny_ratio) ||
(etl.rss_oom_deny_ratio > 0 &&
memory_stats.rss_mem > (max_memory_limit * etl.rss_oom_deny_ratio))) {
DLOG(WARNING) << "Out of memory, used " << memory_stats.used_mem << " ,rss "
<< memory_stats.rss_mem << " ,limit " << max_memory_limit;
etl.stats.oom_error_cmd_cnt++;
return true;
}
}
return false;
}
optional<ErrorReply> Service::VerifyCommandExecution(const CommandId* cid,
const ConnectionContext* cntx,
CmdArgList tail_args) {
ServerState& etl = *ServerState::tlocal();
if ((cid->opt_mask() & CO::DENYOOM) && etl.is_master) {
uint64_t start_ns = absl::GetCurrentTimeNanos();
uint64_t used_memory = etl.GetUsedMemory(start_ns);
double oom_deny_ratio = GetFlag(FLAGS_oom_deny_ratio);
if (used_memory > (max_memory_limit * oom_deny_ratio)) {
DLOG(WARNING) << "Out of memory, used " << used_memory << " vs limit " << max_memory_limit;
etl.stats.oom_error_cmd_cnt++;
return facade::ErrorReply{kOutOfMemory};
}
if (ShouldDenyOnOOM(cid)) {
return facade::ErrorReply{kOutOfMemory};
}
return VerifyConnectionAclStatus(cid, cntx, "ACL rules changed between the MULTI and EXEC",
@ -1383,7 +1426,9 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo
}
std::string reason = cntx->reply_builder()->ConsumeLastError();
if (!reason.empty()) {
VLOG(2) << FailedCommandToString(cid->name(), tail_args, reason);
LOG_EVERY_T(WARNING, 1) << FailedCommandToString(cid->name(), tail_args, reason);
}
@ -2295,12 +2340,12 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
}
if (scheduled) {
VLOG(1) << "Exec unlocking " << exec_info.body.size() << " commands";
VLOG(2) << "Exec unlocking " << exec_info.body.size() << " commands";
cntx->transaction->UnlockMulti();
}
cntx->cid = exec_cid_;
VLOG(1) << "Exec completed";
VLOG(2) << "Exec completed";
}
void Service::Publish(CmdArgList args, ConnectionContext* cntx) {

View file

@ -134,6 +134,8 @@ ABSL_DECLARE_FLAG(bool, tls);
ABSL_DECLARE_FLAG(string, tls_ca_cert_file);
ABSL_DECLARE_FLAG(string, tls_ca_cert_dir);
ABSL_DECLARE_FLAG(int, replica_priority);
ABSL_DECLARE_FLAG(double, oom_deny_ratio);
ABSL_DECLARE_FLAG(double, rss_oom_deny_ratio);
bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err) {
#define RETURN_ON_ERROR(cond, m) \
@ -959,11 +961,6 @@ void ServerFamily::Shutdown() {
}
pb_task_->Await([this] {
if (stats_caching_task_) {
pb_task_->CancelPeriodic(stats_caching_task_);
stats_caching_task_ = 0;
}
auto ec = journal_->Close();
LOG_IF(ERROR, ec) << "Error closing journal " << ec;
@ -978,6 +975,57 @@ void ServerFamily::Shutdown() {
});
}
bool ServerFamily::HasPrivilegedInterface() {
for (auto* listener : listeners_) {
if (listener->IsPrivilegedInterface()) {
return true;
}
}
return false;
}
void ServerFamily::UpdateMemoryGlobalStats() {
ShardId sid = EngineShard::tlocal()->shard_id();
if (sid != 0) { // This function is executed periodicaly on all shards. To ensure the logic
// bellow runs only on one shard we return is the shard is not 0.
return;
}
uint64_t mem_current = used_mem_current.load(std::memory_order_relaxed);
if (mem_current > used_mem_peak.load(memory_order_relaxed)) {
used_mem_peak.store(mem_current, memory_order_relaxed);
}
io::Result<io::StatusData> sdata_res = io::ReadStatusInfo();
if (sdata_res) {
size_t total_rss = sdata_res->vm_rss + sdata_res->hugetlb_pages;
rss_mem_current.store(total_rss, memory_order_relaxed);
if (rss_mem_peak.load(memory_order_relaxed) < total_rss) {
rss_mem_peak.store(total_rss, memory_order_relaxed);
}
double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio;
if (rss_oom_deny_ratio > 0) {
size_t memory_limit = max_memory_limit * rss_oom_deny_ratio;
if (total_rss > memory_limit && accepting_connections_ && HasPrivilegedInterface()) {
for (auto* listener : listeners_) {
if (!listener->IsPrivilegedInterface()) {
listener->socket()->proactor()->Await([listener]() { listener->pause_accepting(); });
}
}
accepting_connections_ = false;
} else if (total_rss < memory_limit && !accepting_connections_) {
for (auto* listener : listeners_) {
if (!listener->IsPrivilegedInterface()) {
listener->socket()->proactor()->Await([listener]() { listener->resume_accepting(); });
}
}
accepting_connections_ = true;
}
}
}
}
struct AggregateLoadResult {
AggregateError first_error;
std::atomic<size_t> keys_read;
@ -2278,7 +2326,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("garbage_collected", m.events.garbage_collected);
append("bump_ups", m.events.bumpups);
append("stash_unloaded", m.events.stash_unloaded);
append("oom_rejections", m.events.insertion_rejections);
append("oom_rejections", m.events.insertion_rejections + m.coordinator_stats.oom_error_cmd_cnt);
append("traverse_ttl_sec", m.traverse_ttl_per_sec);
append("delete_ttl_sec", m.delete_ttl_per_sec);
append("keyspace_hits", m.events.hits);

View file

@ -260,7 +260,10 @@ class ServerFamily {
// Sets the server to replicate another instance. Does not flush the database beforehand!
void Replicate(std::string_view host, std::string_view port);
void UpdateMemoryGlobalStats();
private:
bool HasPrivilegedInterface();
void JoinSnapshotSchedule();
void LoadFromSnapshot() ABSL_LOCKS_EXCLUDED(loading_stats_mu_);
@ -330,11 +333,11 @@ class ServerFamily {
util::fb2::Fiber snapshot_schedule_fb_;
std::optional<util::fb2::Future<GenericError>> load_result_;
uint32_t stats_caching_task_ = 0;
Service& service_;
util::AcceptServer* acceptor_ = nullptr;
std::vector<facade::Listener*> listeners_;
bool accepting_connections_ = true;
util::ProactorBase* pb_task_ = nullptr;
mutable util::fb2::Mutex replicaof_mu_, save_mu_;

View file

@ -113,13 +113,14 @@ void ServerState::Destroy() {
state_ = nullptr;
}
uint64_t ServerState::GetUsedMemory(uint64_t now_ns) {
ServerState::MemoryUsageStats ServerState::GetMemoryUsage(uint64_t now_ns) {
static constexpr uint64_t kCacheEveryNs = 1000;
if (now_ns > used_mem_last_update_ + kCacheEveryNs) {
used_mem_last_update_ = now_ns;
used_mem_cached_ = used_mem_current.load(std::memory_order_relaxed);
memory_stats_cached_.used_mem = used_mem_current.load(std::memory_order_relaxed);
memory_stats_cached_.rss_mem = rss_mem_current.load(std::memory_order_relaxed);
}
return used_mem_cached_;
return memory_stats_cached_;
}
bool ServerState::AllowInlineScheduling() const {

View file

@ -174,7 +174,11 @@ class ServerState { // public struct - to allow initialization.
gstate_ = s;
}
uint64_t GetUsedMemory(uint64_t now_ns);
struct MemoryUsageStats {
uint64_t used_mem = 0;
uint64_t rss_mem = 0;
};
MemoryUsageStats GetMemoryUsage(uint64_t now_ns);
bool AllowInlineScheduling() const;
@ -283,6 +287,8 @@ class ServerState { // public struct - to allow initialization.
// Exec descriptor frequency count for this thread.
absl::flat_hash_map<std::string, unsigned> exec_freq_count;
double oom_deny_ratio;
double rss_oom_deny_ratio;
private:
int64_t live_transactions_ = 0;
@ -311,8 +317,9 @@ class ServerState { // public struct - to allow initialization.
absl::flat_hash_map<std::string, base::Histogram> call_latency_histos_;
uint32_t thread_index_ = 0;
uint64_t used_mem_cached_ = 0; // thread local cache of used_mem_current
uint64_t used_mem_last_update_ = 0;
MemoryUsageStats memory_stats_cached_; // thread local cache of used and rss memory current
static __thread ServerState* state_;
};

View file

@ -28,6 +28,7 @@ extern "C" {
using namespace std;
ABSL_DECLARE_FLAG(string, dbfilename);
ABSL_DECLARE_FLAG(double, rss_oom_deny_ratio);
ABSL_DECLARE_FLAG(uint32_t, num_shards);
ABSL_FLAG(bool, force_epoll, false, "If true, uses epoll api instead iouring to run tests");
ABSL_DECLARE_FLAG(size_t, acllog_max_len);
@ -152,6 +153,7 @@ BaseFamilyTest::~BaseFamilyTest() {
void BaseFamilyTest::SetUpTestSuite() {
kInitSegmentLog = 1;
absl::SetFlag(&FLAGS_rss_oom_deny_ratio, -1);
absl::SetFlag(&FLAGS_dbfilename, "");
init_zmalloc_threadlocal(mi_heap_get_backing());

View file

@ -125,11 +125,15 @@ async def test_restricted_commands(df_factory):
@pytest.mark.asyncio
async def test_reply_guard_oom(df_factory, df_seeder_factory):
master = df_factory.create(
proactor_threads=1, cache_mode="true", maxmemory="256mb", enable_heartbeat_eviction="false"
proactor_threads=1,
cache_mode="true",
maxmemory="256mb",
enable_heartbeat_eviction="false",
rss_oom_deny_ratio=2,
)
df_factory.start_all([master])
c_master = master.client()
await c_master.execute_command("DEBUG POPULATE 6000 size 44000")
await c_master.execute_command("DEBUG POPULATE 6000 size 40000")
seeder = df_seeder_factory.create(
port=master.port, keys=5000, val_size=1000, stop_on_failure=False

View file

@ -2,6 +2,7 @@ import pytest
from redis import asyncio as aioredis
from .utility import *
import logging
from . import dfly_args
@pytest.mark.opt_only
@ -47,3 +48,62 @@ async def test_rss_used_mem_gap(df_factory, type, keys, val_size, elements):
assert delta < max_unaccounted
await disconnect_clients(client)
@pytest.mark.asyncio
@dfly_args(
{
"maxmemory": "512mb",
"proactor_threads": 2,
"rss_oom_deny_ratio": 0.5,
}
)
@pytest.mark.parametrize("admin_port", [0, 1112])
async def test_rss_oom_ratio(df_factory, admin_port):
"""
Test dragonfly rejects denyoom commands and new connections when rss memory is above maxmemory*rss_oom_deny_ratio
Test dragonfly does not rejects when rss memory goes below threshold
"""
df_server = df_factory.create(admin_port=admin_port)
df_server.start()
client = aioredis.Redis(port=df_server.port)
await client.execute_command("DEBUG POPULATE 10000 key 40000 RAND")
await asyncio.sleep(1) # Wait for another RSS heartbeat update in Dragonfly
port = df_server.admin_port if admin_port else df_server.port
new_client = aioredis.Redis(port=port)
await new_client.ping()
info = await new_client.info("memory")
logging.debug(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}')
reject_limit = 256 * 1024 * 1024 # 256mb
assert info["used_memory_rss"] > reject_limit
# get command from existing connection should not be rejected
await client.execute_command("get x")
# reject set due to oom
with pytest.raises(redis.exceptions.ResponseError):
await client.execute_command("set x y")
if admin_port:
# new client create should also fail if admin port was set
client = aioredis.Redis(port=df_server.port)
with pytest.raises(redis.exceptions.ConnectionError):
await client.ping()
# flush to free memory
await new_client.flushall()
await asyncio.sleep(2) # Wait for another RSS heartbeat update in Dragonfly
info = await new_client.info("memory")
logging.debug(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}')
assert info["used_memory_rss"] < reject_limit
# new client create shoud not fail after memory usage decrease
client = aioredis.Redis(port=df_server.port)
await client.execute_command("set x y")

View file

@ -2085,6 +2085,7 @@ async def test_policy_based_eviction_propagation(df_factory, df_seeder_factory):
maxmemory="512mb",
logtostdout="true",
enable_heartbeat_eviction="false",
rss_oom_deny_ratio=1.3,
)
replica = df_factory.create(proactor_threads=2)
df_factory.start_all([master, replica])

View file

@ -567,6 +567,7 @@ async def test_tiered_entries_throttle(async_client: aioredis.Redis):
("LIST"),
],
)
@pytest.mark.slow
async def test_big_value_serialization_memory_limit(df_factory, query):
dbfilename = f"dump_{tmp_file_name()}"
instance = df_factory.create(dbfilename=dbfilename)