From 5483d1d05eddf1d3bff643d163a356925e7407c3 Mon Sep 17 00:00:00 2001 From: Stepan Bagritsevich <43710058+BagritsevichStepan@users.noreply.github.com> Date: Thu, 5 Dec 2024 16:26:59 +0400 Subject: [PATCH] fix(eviction): Tune eviction threshold in cache mode (#4142) * fix(eviction): Tune eviction threshold in cache mode fixes #4139 Signed-off-by: Stepan Bagritsevich * refactor: small fix in tiered_storage_test Signed-off-by: Stepan Bagritsevich * refactor: address comments Signed-off-by: Stepan Bagritsevich * chore(dragonfly_test): Remove ResetService Signed-off-by: Stepan Bagritsevich * refactor: fix test_cache_eviction_with_rss_deny_oom test Signed-off-by: Stepan Bagritsevich * refactor: address comments Signed-off-by: Stepan Bagritsevich * fix(dragonfly_test): Fix DflyEngineTest.Bug207 Signed-off-by: Stepan Bagritsevich * fix(dragonfly_test): Increase string size in the test Bug207 Signed-off-by: Stepan Bagritsevich * refactor: address comments 3 Signed-off-by: Stepan Bagritsevich * refactor: address comments 4 Signed-off-by: Stepan Bagritsevich * fix: Fix failing tests Signed-off-by: Stepan Bagritsevich * refactor: address comments 5 Signed-off-by: Stepan Bagritsevich * refactor: resolve conficts Signed-off-by: Stepan Bagritsevich --------- Signed-off-by: Stepan Bagritsevich --- src/server/common.cc | 4 + src/server/common.h | 3 + src/server/db_slice.cc | 6 +- src/server/db_slice.h | 1 + src/server/dragonfly_test.cc | 22 +++--- src/server/engine_shard.cc | 72 ++++++++++++++++-- src/server/main_service.cc | 18 ++--- src/server/server_family.cc | 5 +- src/server/server_state.h | 2 +- tests/dragonfly/generic_test.py | 6 +- tests/dragonfly/memory_test.py | 130 ++++++++++++++++++++++++++++++++ 11 files changed, 226 insertions(+), 43 deletions(-) diff --git a/src/server/common.cc b/src/server/common.cc index 603455c82..39d4acaff 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -122,6 +122,10 @@ size_t max_memory_limit = 0; size_t serialization_max_chunk_size = 0; Namespaces* namespaces = nullptr; +size_t FetchRssMemory(io::StatusData sdata) { + return sdata.vm_rss + sdata.hugetlb_pages; +} + const char* GlobalStateName(GlobalState s) { switch (s) { case GlobalState::ACTIVE: diff --git a/src/server/common.h b/src/server/common.h index 28c41da4e..ac890b225 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -17,6 +17,7 @@ #include "core/compact_object.h" #include "facade/facade_types.h" #include "facade/op_status.h" +#include "helio/io/proc_reader.h" #include "util/fibers/fibers.h" #include "util/fibers/synchronization.h" @@ -132,6 +133,8 @@ extern std::atomic_uint64_t rss_mem_peak; extern size_t max_memory_limit; +size_t FetchRssMemory(io::StatusData sdata); + extern Namespaces* namespaces; // version 5.11 maps to 511 etc. diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index ad7aed04e..f9d65f447 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -1214,6 +1214,7 @@ auto DbSlice::DeleteExpiredStep(const Context& cntx, unsigned count) -> DeleteEx if (ttl <= 0) { auto prime_it = db.prime.Find(it->first); CHECK(!prime_it.is_done()); + result.deleted_bytes += prime_it->first.MallocUsed() + prime_it->second.MallocUsed(); ExpireIfNeeded(cntx, prime_it, false); ++result.deleted; } else { @@ -1290,9 +1291,6 @@ pair DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t s auto time_finish = absl::GetCurrentTimeNanos(); events_.evicted_keys += evicted_items; - DVLOG(2) << "Evicted: " << evicted_bytes; - DVLOG(2) << "Number of keys evicted / max eviction per hb: " << evicted_items << "/" - << max_eviction_per_hb; DVLOG(2) << "Eviction time (us): " << (time_finish - time_start) / 1000; return pair{evicted_items, evicted_bytes}; }; @@ -1326,7 +1324,7 @@ pair DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t s if (record_keys) keys_to_journal.emplace_back(key); - evicted_bytes += evict_it->second.MallocUsed(); + evicted_bytes += evict_it->first.MallocUsed() + evict_it->second.MallocUsed(); ++evicted_items; PerformDeletion(Iterator(evict_it, StringOrView::FromView(key)), db_table.get()); diff --git a/src/server/db_slice.h b/src/server/db_slice.h index a5e97f044..47d179722 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -442,6 +442,7 @@ class DbSlice { struct DeleteExpiredStats { uint32_t deleted = 0; // number of deleted items due to expiry (less than traversed). + uint32_t deleted_bytes = 0; // total bytes of deleted items. uint32_t traversed = 0; // number of traversed items that have ttl bit size_t survivor_ttl_sum = 0; // total sum of ttl of survivors (traversed - deleted). }; diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 276bcd87b..83f8c3bdb 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -26,8 +26,8 @@ ABSL_DECLARE_FLAG(float, mem_defrag_threshold); ABSL_DECLARE_FLAG(float, mem_defrag_waste_threshold); ABSL_DECLARE_FLAG(uint32_t, mem_defrag_check_sec_interval); ABSL_DECLARE_FLAG(std::vector, rename_command); -ABSL_DECLARE_FLAG(double, oom_deny_ratio); ABSL_DECLARE_FLAG(bool, lua_resp2_legacy_float); +ABSL_DECLARE_FLAG(double, eviction_memory_budget_threshold); namespace dfly { @@ -456,19 +456,22 @@ TEST_F(DflyEngineTest, OOM) { /// Reproduces the case where items with expiry data were evicted, /// and then written with the same key. TEST_F(DflyEngineTest, Bug207) { - max_memory_limit = 300000; + max_memory_limit = 300000 * 4; + // The threshold is set to 0.3 to trigger eviction earlier and prevent OOM. absl::FlagSaver fs; - absl::SetFlag(&FLAGS_oom_deny_ratio, 4); - ResetService(); + absl::SetFlag(&FLAGS_eviction_memory_budget_threshold, 0.3); shard_set->TEST_EnableCacheMode(); + /* The value should be large enough to avoid being inlined. Heartbeat evicts only objects for + * which HasAllocated() returns true. */ + std::string value(1000, '.'); + ssize_t i = 0; RespExpr resp; - for (; i < 10000; ++i) { - resp = Run({"setex", StrCat("key", i), "30", "bar"}); - // we evict some items because 5000 is too much when max_memory_limit is 300000. + for (; i < 1000; ++i) { + resp = Run({"setex", StrCat("key", i), "30", value}); ASSERT_EQ(resp, "OK"); } @@ -489,10 +492,7 @@ TEST_F(DflyEngineTest, Bug207) { } TEST_F(DflyEngineTest, StickyEviction) { - max_memory_limit = 300000; - absl::FlagSaver fs; - absl::SetFlag(&FLAGS_oom_deny_ratio, 4); - ResetService(); + max_memory_limit = 600000; // 0.6mb shard_set->TEST_EnableCacheMode(); string tmp_val(100, '.'); diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 467cd32ac..70d95ca58 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -60,6 +60,12 @@ ABSL_FLAG(float, tiered_offload_threshold, 0.5, ABSL_FLAG(bool, enable_heartbeat_eviction, true, "Enable eviction during heartbeat when memory is under pressure."); +ABSL_FLAG(double, eviction_memory_budget_threshold, 0.1, + "Eviction starts when the free memory (including RSS memory) drops below " + "eviction_memory_budget_threshold * max_memory_limit."); + +ABSL_DECLARE_FLAG(uint32_t, max_eviction_per_heartbeat); + namespace dfly { using absl::GetFlag; @@ -198,6 +204,52 @@ optional GetPeriodicCycleMs() { return clock_cycle_ms; } +size_t CalculateHowManyBytesToEvictOnShard(size_t global_memory_limit, size_t global_used_memory, + size_t shard_memory_threshold) { + if (global_used_memory > global_memory_limit) { + // Used memory is above the limit, we need to evict all bytes + return (global_used_memory - global_memory_limit) / shard_set->size() + shard_memory_threshold; + } + + const size_t shard_budget = (global_memory_limit - global_used_memory) / shard_set->size(); + return shard_budget < shard_memory_threshold ? (shard_memory_threshold - shard_budget) : 0; +} + +/* Calculates the number of bytes to evict based on memory and rss memory usage. */ +size_t CalculateEvictionBytes() { + const size_t shards_count = shard_set->size(); + const double eviction_memory_budget_threshold = GetFlag(FLAGS_eviction_memory_budget_threshold); + + const size_t shard_memory_budget_threshold = + size_t(max_memory_limit * eviction_memory_budget_threshold) / shards_count; + + const size_t global_used_memory = used_mem_current.load(memory_order_relaxed); + + // Calculate how many bytes we need to evict on this shard + size_t goal_bytes = CalculateHowManyBytesToEvictOnShard(max_memory_limit, global_used_memory, + shard_memory_budget_threshold); + + const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio; + + /* If rss_oom_deny_ratio is set, we should evict depending on rss memory too */ + if (rss_oom_deny_ratio > 0.0) { + const size_t max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit); + /* We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss + * memory */ + const size_t shard_rss_memory_budget_threshold = + size_t(max_rss_memory * eviction_memory_budget_threshold) / shards_count; + + // Calculate how much rss memory is used by all shards + const size_t global_used_rss_memory = rss_mem_current.load(memory_order_relaxed); + + // Try to evict more bytes if we are close to the rss memory limit + goal_bytes = std::max( + goal_bytes, CalculateHowManyBytesToEvictOnShard(max_rss_memory, global_used_rss_memory, + shard_rss_memory_budget_threshold)); + } + return goal_bytes; +} + } // namespace __thread EngineShard* EngineShard::shard_ = nullptr; @@ -706,7 +758,6 @@ void EngineShard::RetireExpiredAndEvict() { // TODO: iterate over all namespaces DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id()); constexpr double kTtlDeleteLimit = 200; - constexpr double kRedLimitFactor = 0.1; uint32_t traversed = GetMovingSum6(TTL_TRAVERSE); uint32_t deleted = GetMovingSum6(TTL_DELETE); @@ -720,11 +771,11 @@ void EngineShard::RetireExpiredAndEvict() { ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10); } - ssize_t eviction_redline = size_t(max_memory_limit * kRedLimitFactor) / shard_set->size(); - DbContext db_cntx; db_cntx.time_now_ms = GetCurrentTimeMs(); + size_t eviction_goal = GetFlag(FLAGS_enable_heartbeat_eviction) ? CalculateEvictionBytes() : 0; + for (unsigned i = 0; i < db_slice.db_array_size(); ++i) { if (!db_slice.IsDbValid(i)) continue; @@ -734,15 +785,22 @@ void EngineShard::RetireExpiredAndEvict() { if (expt->size() > pt->size() / 4) { DbSlice::DeleteExpiredStats stats = db_slice.DeleteExpiredStep(db_cntx, ttl_delete_target); + eviction_goal -= std::min(eviction_goal, size_t(stats.deleted_bytes)); counter_[TTL_TRAVERSE].IncBy(stats.traversed); counter_[TTL_DELETE].IncBy(stats.deleted); } - // if our budget is below the limit - if (db_slice.memory_budget() < eviction_redline && GetFlag(FLAGS_enable_heartbeat_eviction)) { + if (eviction_goal) { uint32_t starting_segment_id = rand() % pt->GetSegmentCount(); - db_slice.FreeMemWithEvictionStep(i, starting_segment_id, - eviction_redline - db_slice.memory_budget()); + auto [evicted_items, evicted_bytes] = + db_slice.FreeMemWithEvictionStep(i, starting_segment_id, eviction_goal); + + DVLOG(2) << "Heartbeat eviction: Expected to evict " << eviction_goal + << " bytes. Actually evicted " << evicted_items << " items, " << evicted_bytes + << " bytes. Max eviction per heartbeat: " + << GetFlag(FLAGS_max_eviction_per_heartbeat); + + eviction_goal -= std::min(eviction_goal, evicted_bytes); } } } diff --git a/src/server/main_service.cc b/src/server/main_service.cc index d1cb33853..b7dd7a55c 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -100,9 +100,10 @@ ABSL_FLAG(dfly::MemoryBytesFlag, maxmemory, dfly::MemoryBytesFlag{}, "0 - means the program will automatically determine its maximum memory usage. " "default: 0"); -ABSL_FLAG(double, oom_deny_ratio, 1.1, - "commands with flag denyoom will return OOM when the ratio between maxmemory and used " - "memory is above this value"); +ABSL_RETIRED_FLAG( + double, oom_deny_ratio, 1.1, + "commands with flag denyoom will return OOM when the ratio between maxmemory and used " + "memory is above this value"); ABSL_FLAG(double, rss_oom_deny_ratio, 1.25, "When the ratio between maxmemory and RSS memory exceeds this value, commands marked as " @@ -722,11 +723,6 @@ string FailedCommandToString(std::string_view command, facade::CmdArgList args, return result; } -void SetOomDenyRatioOnAllThreads(double ratio) { - auto cb = [ratio](unsigned, auto*) { ServerState::tlocal()->oom_deny_ratio = ratio; }; - shard_set->pool()->AwaitBrief(cb); -} - void SetRssOomDenyRatioOnAllThreads(double ratio) { auto cb = [ratio](unsigned, auto*) { ServerState::tlocal()->rss_oom_deny_ratio = ratio; }; shard_set->pool()->AwaitBrief(cb); @@ -793,9 +789,6 @@ void Service::Init(util::AcceptServer* acceptor, std::vector config_registry.RegisterMutable("max_eviction_per_heartbeat"); config_registry.RegisterMutable("max_segment_to_consider"); - config_registry.RegisterSetter("oom_deny_ratio", - [](double val) { SetOomDenyRatioOnAllThreads(val); }); - config_registry.RegisterSetter("rss_oom_deny_ratio", [](double val) { SetRssOomDenyRatioOnAllThreads(val); }); @@ -873,7 +866,6 @@ void Service::Init(util::AcceptServer* acceptor, std::vector }); Transaction::Init(shard_num); - SetOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_oom_deny_ratio)); SetRssOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_rss_oom_deny_ratio)); // Requires that shard_set will be initialized before because server_family_.Init might @@ -1001,7 +993,7 @@ bool ShouldDenyOnOOM(const CommandId* cid) { uint64_t start_ns = absl::GetCurrentTimeNanos(); auto memory_stats = etl.GetMemoryUsage(start_ns); - if (memory_stats.used_mem > (max_memory_limit * etl.oom_deny_ratio) || + if (memory_stats.used_mem > max_memory_limit || (etl.rss_oom_deny_ratio > 0 && memory_stats.rss_mem > (max_memory_limit * etl.rss_oom_deny_ratio))) { DLOG(WARNING) << "Out of memory, used " << memory_stats.used_mem << " ,rss " diff --git a/src/server/server_family.cc b/src/server/server_family.cc index b3ff9c167..7e9638c10 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -137,7 +137,6 @@ ABSL_DECLARE_FLAG(bool, tls); ABSL_DECLARE_FLAG(string, tls_ca_cert_file); ABSL_DECLARE_FLAG(string, tls_ca_cert_dir); ABSL_DECLARE_FLAG(int, replica_priority); -ABSL_DECLARE_FLAG(double, oom_deny_ratio); ABSL_DECLARE_FLAG(double, rss_oom_deny_ratio); bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err) { @@ -1014,7 +1013,7 @@ void ServerFamily::UpdateMemoryGlobalStats() { io::Result sdata_res = io::ReadStatusInfo(); if (sdata_res) { - size_t total_rss = sdata_res->vm_rss + sdata_res->hugetlb_pages; + size_t total_rss = FetchRssMemory(sdata_res.value()); rss_mem_current.store(total_rss, memory_order_relaxed); if (rss_mem_peak.load(memory_order_relaxed) < total_rss) { rss_mem_peak.store(total_rss, memory_order_relaxed); @@ -1339,7 +1338,7 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd &resp->body()); } if (sdata_res.has_value()) { - size_t rss = sdata_res->vm_rss + sdata_res->hugetlb_pages; + size_t rss = FetchRssMemory(sdata_res.value()); AppendMetricWithoutLabels("used_memory_rss_bytes", "", rss, MetricType::GAUGE, &resp->body()); AppendMetricWithoutLabels("swap_memory_bytes", "", sdata_res->vm_swap, MetricType::GAUGE, &resp->body()); diff --git a/src/server/server_state.h b/src/server/server_state.h index 0cfc48be1..9aed8901c 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -180,6 +180,7 @@ class ServerState { // public struct - to allow initialization. uint64_t used_mem = 0; uint64_t rss_mem = 0; }; + MemoryUsageStats GetMemoryUsage(uint64_t now_ns); bool AllowInlineScheduling() const; @@ -296,7 +297,6 @@ class ServerState { // public struct - to allow initialization. // Exec descriptor frequency count for this thread. absl::flat_hash_map exec_freq_count; - double oom_deny_ratio; double rss_oom_deny_ratio; private: diff --git a/tests/dragonfly/generic_test.py b/tests/dragonfly/generic_test.py index c4ceef3d6..25da08a42 100644 --- a/tests/dragonfly/generic_test.py +++ b/tests/dragonfly/generic_test.py @@ -148,14 +148,12 @@ async def test_reply_guard_oom(df_factory, df_seeder_factory): @pytest.mark.asyncio async def test_denyoom_commands(df_factory): - df_server = df_factory.create( - proactor_threads=1, maxmemory="256mb", oom_deny_commands="get", oom_deny_ratio=0.7 - ) + df_server = df_factory.create(proactor_threads=1, maxmemory="256mb", oom_deny_commands="get") df_server.start() client = df_server.client() await client.execute_command("DEBUG POPULATE 7000 size 44000") - min_deny = 250 * 1024 * 1024 # 250mb + min_deny = 256 * 1024 * 1024 # 256mb info = await client.info("memory") print(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}') assert info["used_memory"] > min_deny, "Weak testcase: too little used memory" diff --git a/tests/dragonfly/memory_test.py b/tests/dragonfly/memory_test.py index 1105f1d83..438f5296f 100644 --- a/tests/dragonfly/memory_test.py +++ b/tests/dragonfly/memory_test.py @@ -6,6 +6,31 @@ from . import dfly_args from .instance import DflyInstance, DflyInstanceFactory +async def calculate_estimated_connection_memory( + async_client: aioredis.Redis, df_server: DflyInstance +): + memory_info = await async_client.info("memory") + already_used_rss_memory = memory_info["used_memory_rss"] + + connections_number = 100 + connections = [] + for _ in range(connections_number): + conn = aioredis.Redis(port=df_server.port) + await conn.ping() + connections.append(conn) + + await asyncio.sleep(1) # Wait RSS update + + memory_info = await async_client.info("memory") + estimated_connections_memory = memory_info["used_memory_rss"] - already_used_rss_memory + + # Close test connection + for conn in connections: + await conn.close() + + return estimated_connections_memory // connections_number + + @pytest.mark.opt_only @pytest.mark.parametrize( "type, keys, val_size, elements", @@ -160,3 +185,108 @@ async def test_eval_with_oom(df_factory: DflyInstanceFactory): info = await client.info("memory") logging.debug(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}') assert rss_before_eval * 1.01 > info["used_memory_rss"] + + +@pytest.mark.asyncio +@dfly_args( + { + "proactor_threads": 1, + "cache_mode": "true", + "maxmemory": "256mb", + "rss_oom_deny_ratio": 0.5, + "max_eviction_per_heartbeat": 1000, + } +) +async def test_cache_eviction_with_rss_deny_oom( + async_client: aioredis.Redis, + df_server: DflyInstance, +): + """ + Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit + """ + + max_memory = 256 * 1024 * 1024 # 256 MB + rss_max_memory = int(max_memory * 0.5) # 50% of max memory + + data_fill_size = int(0.55 * rss_max_memory) # 55% of rss_max_memory + rss_increase_size = int(0.55 * rss_max_memory) # 55% of max rss_max_memory + + key_size = 1024 * 5 # 5 kb + num_keys = data_fill_size // key_size + + await asyncio.sleep(1) # Wait for RSS update + + estimated_connection_memory = await calculate_estimated_connection_memory( + async_client, df_server + ) + num_connections = rss_increase_size // estimated_connection_memory + + logging.info( + f"Estimated connection memory: {estimated_connection_memory}. Number of connections: {num_connections}." + ) + + # Fill data to 55% of rss max memory + await async_client.execute_command("DEBUG", "POPULATE", num_keys, "key", key_size) + + await asyncio.sleep(1) # Wait for RSS heartbeat update + + # First test that eviction is not triggered without connection creation + stats_info = await async_client.info("stats") + assert stats_info["evicted_keys"] == 0, "No eviction should start yet." + + # Test that used memory is less than 90% of max memory + memory_info = await async_client.info("memory") + assert ( + memory_info["used_memory"] < max_memory * 0.9 + ), "Used memory should be less than 90% of max memory." + assert ( + memory_info["used_memory_rss"] < rss_max_memory * 0.9 + ), "RSS memory should be less than 90% of rss max memory (max_memory * rss_oom_deny_ratio)." + + # Disable heartbeat eviction + await async_client.execute_command("CONFIG SET enable_heartbeat_eviction false") + + # Increase RSS memory by 55% of rss max memory + # We can simulate RSS increase by creating new connections + connections = [] + for _ in range(num_connections): + conn = aioredis.Redis(port=df_server.port) + await conn.ping() + connections.append(conn) + + await asyncio.sleep(1) + + # Check that RSS memory is above rss limit + memory_info = await async_client.info("memory") + assert ( + memory_info["used_memory_rss"] >= rss_max_memory * 0.9 + ), "RSS memory should exceed 90% of the maximum RSS memory limit (max_memory * rss_oom_deny_ratio)." + + # Enable heartbeat eviction + await async_client.execute_command("CONFIG SET enable_heartbeat_eviction true") + + await asyncio.sleep(1) # Wait for RSS heartbeat update + await async_client.execute_command("MEMORY DECOMMIT") + await asyncio.sleep(1) # Wait for RSS update + + # Get RSS memory after creating new connections + memory_info = await async_client.info("memory") + stats_info = await async_client.info("stats") + + logging.info(f'Evicted keys number: {stats_info["evicted_keys"]}. Total keys: {num_keys}.') + + assert ( + memory_info["used_memory"] < data_fill_size + ), "Used memory should be less than initial fill size due to eviction." + + assert ( + memory_info["used_memory_rss"] < rss_max_memory * 0.9 + ), "RSS memory should be less than 90% of rss max memory (max_memory * rss_oom_deny_ratio) after eviction." + + # Check that eviction has occurred + assert ( + stats_info["evicted_keys"] > 0 + ), "Eviction should have occurred due to rss memory pressure." + + for conn in connections: + await conn.close()