1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

fix(eviction): Tune eviction threshold in cache mode (#4142)

* fix(eviction): Tune eviction threshold in cache mode

fixes #4139

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>

* refactor: small fix in tiered_storage_test

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>

* refactor: address comments

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>

* chore(dragonfly_test): Remove ResetService

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>

* refactor: fix test_cache_eviction_with_rss_deny_oom test

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>

* refactor: address comments

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>

* fix(dragonfly_test): Fix DflyEngineTest.Bug207

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>

* fix(dragonfly_test): Increase string size in the test Bug207

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>

* refactor: address comments 3

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>

* refactor: address comments 4

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>

* fix: Fix failing tests

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>

* refactor: address comments 5

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>

* refactor: resolve conficts

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>

---------

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>
This commit is contained in:
Stepan Bagritsevich 2024-12-05 16:26:59 +04:00 committed by GitHub
parent 267d5ab370
commit 5483d1d05e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 226 additions and 43 deletions

View file

@ -122,6 +122,10 @@ size_t max_memory_limit = 0;
size_t serialization_max_chunk_size = 0;
Namespaces* namespaces = nullptr;
size_t FetchRssMemory(io::StatusData sdata) {
return sdata.vm_rss + sdata.hugetlb_pages;
}
const char* GlobalStateName(GlobalState s) {
switch (s) {
case GlobalState::ACTIVE:

View file

@ -17,6 +17,7 @@
#include "core/compact_object.h"
#include "facade/facade_types.h"
#include "facade/op_status.h"
#include "helio/io/proc_reader.h"
#include "util/fibers/fibers.h"
#include "util/fibers/synchronization.h"
@ -132,6 +133,8 @@ extern std::atomic_uint64_t rss_mem_peak;
extern size_t max_memory_limit;
size_t FetchRssMemory(io::StatusData sdata);
extern Namespaces* namespaces;
// version 5.11 maps to 511 etc.

View file

@ -1214,6 +1214,7 @@ auto DbSlice::DeleteExpiredStep(const Context& cntx, unsigned count) -> DeleteEx
if (ttl <= 0) {
auto prime_it = db.prime.Find(it->first);
CHECK(!prime_it.is_done());
result.deleted_bytes += prime_it->first.MallocUsed() + prime_it->second.MallocUsed();
ExpireIfNeeded(cntx, prime_it, false);
++result.deleted;
} else {
@ -1290,9 +1291,6 @@ pair<uint64_t, size_t> DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t s
auto time_finish = absl::GetCurrentTimeNanos();
events_.evicted_keys += evicted_items;
DVLOG(2) << "Evicted: " << evicted_bytes;
DVLOG(2) << "Number of keys evicted / max eviction per hb: " << evicted_items << "/"
<< max_eviction_per_hb;
DVLOG(2) << "Eviction time (us): " << (time_finish - time_start) / 1000;
return pair<uint64_t, size_t>{evicted_items, evicted_bytes};
};
@ -1326,7 +1324,7 @@ pair<uint64_t, size_t> DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t s
if (record_keys)
keys_to_journal.emplace_back(key);
evicted_bytes += evict_it->second.MallocUsed();
evicted_bytes += evict_it->first.MallocUsed() + evict_it->second.MallocUsed();
++evicted_items;
PerformDeletion(Iterator(evict_it, StringOrView::FromView(key)), db_table.get());

View file

@ -442,6 +442,7 @@ class DbSlice {
struct DeleteExpiredStats {
uint32_t deleted = 0; // number of deleted items due to expiry (less than traversed).
uint32_t deleted_bytes = 0; // total bytes of deleted items.
uint32_t traversed = 0; // number of traversed items that have ttl bit
size_t survivor_ttl_sum = 0; // total sum of ttl of survivors (traversed - deleted).
};

View file

@ -26,8 +26,8 @@ ABSL_DECLARE_FLAG(float, mem_defrag_threshold);
ABSL_DECLARE_FLAG(float, mem_defrag_waste_threshold);
ABSL_DECLARE_FLAG(uint32_t, mem_defrag_check_sec_interval);
ABSL_DECLARE_FLAG(std::vector<std::string>, rename_command);
ABSL_DECLARE_FLAG(double, oom_deny_ratio);
ABSL_DECLARE_FLAG(bool, lua_resp2_legacy_float);
ABSL_DECLARE_FLAG(double, eviction_memory_budget_threshold);
namespace dfly {
@ -456,19 +456,22 @@ TEST_F(DflyEngineTest, OOM) {
/// Reproduces the case where items with expiry data were evicted,
/// and then written with the same key.
TEST_F(DflyEngineTest, Bug207) {
max_memory_limit = 300000;
max_memory_limit = 300000 * 4;
// The threshold is set to 0.3 to trigger eviction earlier and prevent OOM.
absl::FlagSaver fs;
absl::SetFlag(&FLAGS_oom_deny_ratio, 4);
ResetService();
absl::SetFlag(&FLAGS_eviction_memory_budget_threshold, 0.3);
shard_set->TEST_EnableCacheMode();
/* The value should be large enough to avoid being inlined. Heartbeat evicts only objects for
* which HasAllocated() returns true. */
std::string value(1000, '.');
ssize_t i = 0;
RespExpr resp;
for (; i < 10000; ++i) {
resp = Run({"setex", StrCat("key", i), "30", "bar"});
// we evict some items because 5000 is too much when max_memory_limit is 300000.
for (; i < 1000; ++i) {
resp = Run({"setex", StrCat("key", i), "30", value});
ASSERT_EQ(resp, "OK");
}
@ -489,10 +492,7 @@ TEST_F(DflyEngineTest, Bug207) {
}
TEST_F(DflyEngineTest, StickyEviction) {
max_memory_limit = 300000;
absl::FlagSaver fs;
absl::SetFlag(&FLAGS_oom_deny_ratio, 4);
ResetService();
max_memory_limit = 600000; // 0.6mb
shard_set->TEST_EnableCacheMode();
string tmp_val(100, '.');

View file

@ -60,6 +60,12 @@ ABSL_FLAG(float, tiered_offload_threshold, 0.5,
ABSL_FLAG(bool, enable_heartbeat_eviction, true,
"Enable eviction during heartbeat when memory is under pressure.");
ABSL_FLAG(double, eviction_memory_budget_threshold, 0.1,
"Eviction starts when the free memory (including RSS memory) drops below "
"eviction_memory_budget_threshold * max_memory_limit.");
ABSL_DECLARE_FLAG(uint32_t, max_eviction_per_heartbeat);
namespace dfly {
using absl::GetFlag;
@ -198,6 +204,52 @@ optional<uint32_t> GetPeriodicCycleMs() {
return clock_cycle_ms;
}
size_t CalculateHowManyBytesToEvictOnShard(size_t global_memory_limit, size_t global_used_memory,
size_t shard_memory_threshold) {
if (global_used_memory > global_memory_limit) {
// Used memory is above the limit, we need to evict all bytes
return (global_used_memory - global_memory_limit) / shard_set->size() + shard_memory_threshold;
}
const size_t shard_budget = (global_memory_limit - global_used_memory) / shard_set->size();
return shard_budget < shard_memory_threshold ? (shard_memory_threshold - shard_budget) : 0;
}
/* Calculates the number of bytes to evict based on memory and rss memory usage. */
size_t CalculateEvictionBytes() {
const size_t shards_count = shard_set->size();
const double eviction_memory_budget_threshold = GetFlag(FLAGS_eviction_memory_budget_threshold);
const size_t shard_memory_budget_threshold =
size_t(max_memory_limit * eviction_memory_budget_threshold) / shards_count;
const size_t global_used_memory = used_mem_current.load(memory_order_relaxed);
// Calculate how many bytes we need to evict on this shard
size_t goal_bytes = CalculateHowManyBytesToEvictOnShard(max_memory_limit, global_used_memory,
shard_memory_budget_threshold);
const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio;
/* If rss_oom_deny_ratio is set, we should evict depending on rss memory too */
if (rss_oom_deny_ratio > 0.0) {
const size_t max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit);
/* We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss
* memory */
const size_t shard_rss_memory_budget_threshold =
size_t(max_rss_memory * eviction_memory_budget_threshold) / shards_count;
// Calculate how much rss memory is used by all shards
const size_t global_used_rss_memory = rss_mem_current.load(memory_order_relaxed);
// Try to evict more bytes if we are close to the rss memory limit
goal_bytes = std::max(
goal_bytes, CalculateHowManyBytesToEvictOnShard(max_rss_memory, global_used_rss_memory,
shard_rss_memory_budget_threshold));
}
return goal_bytes;
}
} // namespace
__thread EngineShard* EngineShard::shard_ = nullptr;
@ -706,7 +758,6 @@ void EngineShard::RetireExpiredAndEvict() {
// TODO: iterate over all namespaces
DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id());
constexpr double kTtlDeleteLimit = 200;
constexpr double kRedLimitFactor = 0.1;
uint32_t traversed = GetMovingSum6(TTL_TRAVERSE);
uint32_t deleted = GetMovingSum6(TTL_DELETE);
@ -720,11 +771,11 @@ void EngineShard::RetireExpiredAndEvict() {
ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10);
}
ssize_t eviction_redline = size_t(max_memory_limit * kRedLimitFactor) / shard_set->size();
DbContext db_cntx;
db_cntx.time_now_ms = GetCurrentTimeMs();
size_t eviction_goal = GetFlag(FLAGS_enable_heartbeat_eviction) ? CalculateEvictionBytes() : 0;
for (unsigned i = 0; i < db_slice.db_array_size(); ++i) {
if (!db_slice.IsDbValid(i))
continue;
@ -734,15 +785,22 @@ void EngineShard::RetireExpiredAndEvict() {
if (expt->size() > pt->size() / 4) {
DbSlice::DeleteExpiredStats stats = db_slice.DeleteExpiredStep(db_cntx, ttl_delete_target);
eviction_goal -= std::min(eviction_goal, size_t(stats.deleted_bytes));
counter_[TTL_TRAVERSE].IncBy(stats.traversed);
counter_[TTL_DELETE].IncBy(stats.deleted);
}
// if our budget is below the limit
if (db_slice.memory_budget() < eviction_redline && GetFlag(FLAGS_enable_heartbeat_eviction)) {
if (eviction_goal) {
uint32_t starting_segment_id = rand() % pt->GetSegmentCount();
db_slice.FreeMemWithEvictionStep(i, starting_segment_id,
eviction_redline - db_slice.memory_budget());
auto [evicted_items, evicted_bytes] =
db_slice.FreeMemWithEvictionStep(i, starting_segment_id, eviction_goal);
DVLOG(2) << "Heartbeat eviction: Expected to evict " << eviction_goal
<< " bytes. Actually evicted " << evicted_items << " items, " << evicted_bytes
<< " bytes. Max eviction per heartbeat: "
<< GetFlag(FLAGS_max_eviction_per_heartbeat);
eviction_goal -= std::min(eviction_goal, evicted_bytes);
}
}
}

View file

@ -100,7 +100,8 @@ ABSL_FLAG(dfly::MemoryBytesFlag, maxmemory, dfly::MemoryBytesFlag{},
"0 - means the program will automatically determine its maximum memory usage. "
"default: 0");
ABSL_FLAG(double, oom_deny_ratio, 1.1,
ABSL_RETIRED_FLAG(
double, oom_deny_ratio, 1.1,
"commands with flag denyoom will return OOM when the ratio between maxmemory and used "
"memory is above this value");
@ -722,11 +723,6 @@ string FailedCommandToString(std::string_view command, facade::CmdArgList args,
return result;
}
void SetOomDenyRatioOnAllThreads(double ratio) {
auto cb = [ratio](unsigned, auto*) { ServerState::tlocal()->oom_deny_ratio = ratio; };
shard_set->pool()->AwaitBrief(cb);
}
void SetRssOomDenyRatioOnAllThreads(double ratio) {
auto cb = [ratio](unsigned, auto*) { ServerState::tlocal()->rss_oom_deny_ratio = ratio; };
shard_set->pool()->AwaitBrief(cb);
@ -793,9 +789,6 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
config_registry.RegisterMutable("max_eviction_per_heartbeat");
config_registry.RegisterMutable("max_segment_to_consider");
config_registry.RegisterSetter<double>("oom_deny_ratio",
[](double val) { SetOomDenyRatioOnAllThreads(val); });
config_registry.RegisterSetter<double>("rss_oom_deny_ratio",
[](double val) { SetRssOomDenyRatioOnAllThreads(val); });
@ -873,7 +866,6 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
});
Transaction::Init(shard_num);
SetOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_oom_deny_ratio));
SetRssOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_rss_oom_deny_ratio));
// Requires that shard_set will be initialized before because server_family_.Init might
@ -1001,7 +993,7 @@ bool ShouldDenyOnOOM(const CommandId* cid) {
uint64_t start_ns = absl::GetCurrentTimeNanos();
auto memory_stats = etl.GetMemoryUsage(start_ns);
if (memory_stats.used_mem > (max_memory_limit * etl.oom_deny_ratio) ||
if (memory_stats.used_mem > max_memory_limit ||
(etl.rss_oom_deny_ratio > 0 &&
memory_stats.rss_mem > (max_memory_limit * etl.rss_oom_deny_ratio))) {
DLOG(WARNING) << "Out of memory, used " << memory_stats.used_mem << " ,rss "

View file

@ -137,7 +137,6 @@ ABSL_DECLARE_FLAG(bool, tls);
ABSL_DECLARE_FLAG(string, tls_ca_cert_file);
ABSL_DECLARE_FLAG(string, tls_ca_cert_dir);
ABSL_DECLARE_FLAG(int, replica_priority);
ABSL_DECLARE_FLAG(double, oom_deny_ratio);
ABSL_DECLARE_FLAG(double, rss_oom_deny_ratio);
bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err) {
@ -1014,7 +1013,7 @@ void ServerFamily::UpdateMemoryGlobalStats() {
io::Result<io::StatusData> sdata_res = io::ReadStatusInfo();
if (sdata_res) {
size_t total_rss = sdata_res->vm_rss + sdata_res->hugetlb_pages;
size_t total_rss = FetchRssMemory(sdata_res.value());
rss_mem_current.store(total_rss, memory_order_relaxed);
if (rss_mem_peak.load(memory_order_relaxed) < total_rss) {
rss_mem_peak.store(total_rss, memory_order_relaxed);
@ -1339,7 +1338,7 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
&resp->body());
}
if (sdata_res.has_value()) {
size_t rss = sdata_res->vm_rss + sdata_res->hugetlb_pages;
size_t rss = FetchRssMemory(sdata_res.value());
AppendMetricWithoutLabels("used_memory_rss_bytes", "", rss, MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("swap_memory_bytes", "", sdata_res->vm_swap, MetricType::GAUGE,
&resp->body());

View file

@ -180,6 +180,7 @@ class ServerState { // public struct - to allow initialization.
uint64_t used_mem = 0;
uint64_t rss_mem = 0;
};
MemoryUsageStats GetMemoryUsage(uint64_t now_ns);
bool AllowInlineScheduling() const;
@ -296,7 +297,6 @@ class ServerState { // public struct - to allow initialization.
// Exec descriptor frequency count for this thread.
absl::flat_hash_map<std::string, unsigned> exec_freq_count;
double oom_deny_ratio;
double rss_oom_deny_ratio;
private:

View file

@ -148,14 +148,12 @@ async def test_reply_guard_oom(df_factory, df_seeder_factory):
@pytest.mark.asyncio
async def test_denyoom_commands(df_factory):
df_server = df_factory.create(
proactor_threads=1, maxmemory="256mb", oom_deny_commands="get", oom_deny_ratio=0.7
)
df_server = df_factory.create(proactor_threads=1, maxmemory="256mb", oom_deny_commands="get")
df_server.start()
client = df_server.client()
await client.execute_command("DEBUG POPULATE 7000 size 44000")
min_deny = 250 * 1024 * 1024 # 250mb
min_deny = 256 * 1024 * 1024 # 256mb
info = await client.info("memory")
print(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}')
assert info["used_memory"] > min_deny, "Weak testcase: too little used memory"

View file

@ -6,6 +6,31 @@ from . import dfly_args
from .instance import DflyInstance, DflyInstanceFactory
async def calculate_estimated_connection_memory(
async_client: aioredis.Redis, df_server: DflyInstance
):
memory_info = await async_client.info("memory")
already_used_rss_memory = memory_info["used_memory_rss"]
connections_number = 100
connections = []
for _ in range(connections_number):
conn = aioredis.Redis(port=df_server.port)
await conn.ping()
connections.append(conn)
await asyncio.sleep(1) # Wait RSS update
memory_info = await async_client.info("memory")
estimated_connections_memory = memory_info["used_memory_rss"] - already_used_rss_memory
# Close test connection
for conn in connections:
await conn.close()
return estimated_connections_memory // connections_number
@pytest.mark.opt_only
@pytest.mark.parametrize(
"type, keys, val_size, elements",
@ -160,3 +185,108 @@ async def test_eval_with_oom(df_factory: DflyInstanceFactory):
info = await client.info("memory")
logging.debug(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}')
assert rss_before_eval * 1.01 > info["used_memory_rss"]
@pytest.mark.asyncio
@dfly_args(
{
"proactor_threads": 1,
"cache_mode": "true",
"maxmemory": "256mb",
"rss_oom_deny_ratio": 0.5,
"max_eviction_per_heartbeat": 1000,
}
)
async def test_cache_eviction_with_rss_deny_oom(
async_client: aioredis.Redis,
df_server: DflyInstance,
):
"""
Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit
"""
max_memory = 256 * 1024 * 1024 # 256 MB
rss_max_memory = int(max_memory * 0.5) # 50% of max memory
data_fill_size = int(0.55 * rss_max_memory) # 55% of rss_max_memory
rss_increase_size = int(0.55 * rss_max_memory) # 55% of max rss_max_memory
key_size = 1024 * 5 # 5 kb
num_keys = data_fill_size // key_size
await asyncio.sleep(1) # Wait for RSS update
estimated_connection_memory = await calculate_estimated_connection_memory(
async_client, df_server
)
num_connections = rss_increase_size // estimated_connection_memory
logging.info(
f"Estimated connection memory: {estimated_connection_memory}. Number of connections: {num_connections}."
)
# Fill data to 55% of rss max memory
await async_client.execute_command("DEBUG", "POPULATE", num_keys, "key", key_size)
await asyncio.sleep(1) # Wait for RSS heartbeat update
# First test that eviction is not triggered without connection creation
stats_info = await async_client.info("stats")
assert stats_info["evicted_keys"] == 0, "No eviction should start yet."
# Test that used memory is less than 90% of max memory
memory_info = await async_client.info("memory")
assert (
memory_info["used_memory"] < max_memory * 0.9
), "Used memory should be less than 90% of max memory."
assert (
memory_info["used_memory_rss"] < rss_max_memory * 0.9
), "RSS memory should be less than 90% of rss max memory (max_memory * rss_oom_deny_ratio)."
# Disable heartbeat eviction
await async_client.execute_command("CONFIG SET enable_heartbeat_eviction false")
# Increase RSS memory by 55% of rss max memory
# We can simulate RSS increase by creating new connections
connections = []
for _ in range(num_connections):
conn = aioredis.Redis(port=df_server.port)
await conn.ping()
connections.append(conn)
await asyncio.sleep(1)
# Check that RSS memory is above rss limit
memory_info = await async_client.info("memory")
assert (
memory_info["used_memory_rss"] >= rss_max_memory * 0.9
), "RSS memory should exceed 90% of the maximum RSS memory limit (max_memory * rss_oom_deny_ratio)."
# Enable heartbeat eviction
await async_client.execute_command("CONFIG SET enable_heartbeat_eviction true")
await asyncio.sleep(1) # Wait for RSS heartbeat update
await async_client.execute_command("MEMORY DECOMMIT")
await asyncio.sleep(1) # Wait for RSS update
# Get RSS memory after creating new connections
memory_info = await async_client.info("memory")
stats_info = await async_client.info("stats")
logging.info(f'Evicted keys number: {stats_info["evicted_keys"]}. Total keys: {num_keys}.')
assert (
memory_info["used_memory"] < data_fill_size
), "Used memory should be less than initial fill size due to eviction."
assert (
memory_info["used_memory_rss"] < rss_max_memory * 0.9
), "RSS memory should be less than 90% of rss max memory (max_memory * rss_oom_deny_ratio) after eviction."
# Check that eviction has occurred
assert (
stats_info["evicted_keys"] > 0
), "Eviction should have occurred due to rss memory pressure."
for conn in connections:
await conn.close()