1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-15 17:51:06 +00:00
dragonflydb-dragonfly/tests/dragonfly/memory_test.py
adiholden 03d679ac31
fix(server) : dont apply eviction on rss over limit (#4276)
Signed-off-by: adi_holden <adi@dragonflydb.io>
2024-12-09 14:19:25 +02:00

211 lines
7.7 KiB
Python

import pytest
from redis import asyncio as aioredis
from .utility import *
import logging
from . import dfly_args
from .instance import DflyInstance, DflyInstanceFactory
@pytest.mark.opt_only
@pytest.mark.parametrize(
"type, keys, val_size, elements",
[
("JSON", 200_000, 100, 100),
("SET", 280_000, 100, 100),
("HASH", 250_000, 100, 100),
("ZSET", 250_000, 100, 100),
("LIST", 300_000, 100, 100),
("STRING", 3_500_000, 1000, 1),
("STREAM", 260_000, 100, 100),
],
)
# We limit to 5gb just in case to sanity check the gh runner. Otherwise, if we ask for too much
# memory it might force the gh runner to run out of memory (since OOM killer might not even
# get a chance to run).
@dfly_args({"proactor_threads": 4, "maxmemory": "5gb"})
async def test_rss_used_mem_gap(df_server: DflyInstance, type, keys, val_size, elements):
# Create a Dragonfly and fill it up with `type` until it reaches `min_rss`, then make sure that
# the gap between used_memory and rss is no more than `max_unaccounted_ratio`.
min_rss = 3 * 1024 * 1024 * 1024 # 3gb
max_unaccounted = 200 * 1024 * 1024 # 200mb
# There is a big rss spike when this test is ran in one the gh runners (not the self hosted)
# and it fails. This rss spike is not observed locally or on our self host runner so
# this adjustment is mostly for CI
if type == "STREAM":
max_unaccounted = max_unaccounted * 3
client = df_server.client()
await asyncio.sleep(1) # Wait for another RSS heartbeat update in Dragonfly
cmd = f"DEBUG POPULATE {keys} k {val_size} RAND TYPE {type} ELEMENTS {elements}"
print(f"Running {cmd}")
await client.execute_command(cmd)
await asyncio.sleep(2) # Wait for another RSS heartbeat update in Dragonfly
info = await client.info("memory")
logging.info(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}')
assert info["used_memory"] > min_rss, "Weak testcase: too little used memory"
delta = info["used_memory_rss"] - info["used_memory"]
# It could be the case that the machine is configured to use swap if this assertion fails
assert delta > 0
assert delta < max_unaccounted
if type != "STRING" and type != "JSON":
# STRINGs keep some of the data inline, so not all of it is accounted in object_used_memory
# We have a very small over-accounting bug in JSON
assert info["object_used_memory"] > keys * elements * val_size
assert info["used_memory"] > info["object_used_memory"]
@pytest.mark.asyncio
@dfly_args(
{
"maxmemory": "512mb",
"proactor_threads": 2,
"rss_oom_deny_ratio": 0.5,
}
)
@pytest.mark.parametrize("admin_port", [0, 1112])
async def test_rss_oom_ratio(df_factory: DflyInstanceFactory, admin_port):
"""
Test dragonfly rejects denyoom commands and new connections when rss memory is above maxmemory*rss_oom_deny_ratio
Test dragonfly does not rejects when rss memory goes below threshold
"""
df_server = df_factory.create(admin_port=admin_port)
df_server.start()
client = df_server.client()
await client.execute_command("DEBUG POPULATE 10000 key 40000 RAND")
await asyncio.sleep(1) # Wait for another RSS heartbeat update in Dragonfly
new_client = df_server.admin_client() if admin_port else df_server.client()
await new_client.ping()
info = await new_client.info("memory")
logging.debug(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}')
reject_limit = 256 * 1024 * 1024 # 256mb
assert info["used_memory_rss"] > reject_limit
# get command from existing connection should not be rejected
await client.execute_command("get x")
# reject set due to oom
with pytest.raises(redis.exceptions.ResponseError):
await client.execute_command("set x y")
if admin_port:
# new client create should also fail if admin port was set
client = df_server.client()
with pytest.raises(redis.exceptions.ConnectionError):
await client.ping()
# flush to free memory
await new_client.flushall()
await asyncio.sleep(2) # Wait for another RSS heartbeat update in Dragonfly
info = await new_client.info("memory")
logging.debug(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}')
assert info["used_memory_rss"] < reject_limit
# new client create shoud not fail after memory usage decrease
client = df_server.client()
await client.execute_command("set x y")
@pytest.mark.asyncio
@dfly_args(
{
"maxmemory": "512mb",
"proactor_threads": 1,
}
)
async def test_eval_with_oom(df_factory: DflyInstanceFactory):
"""
Test running eval commands when dragonfly returns OOM on write commands and check rss memory
This test was writen after detecting memory leak in script runs on OOM state
"""
df_server = df_factory.create()
df_server.start()
client = df_server.client()
await client.execute_command("DEBUG POPULATE 20000 key 40000 RAND")
await asyncio.sleep(1) # Wait for another RSS heartbeat update in Dragonfly
info = await client.info("memory")
logging.debug(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}')
reject_limit = 512 * 1024 * 1024 # 256mb
assert info["used_memory"] > reject_limit
rss_before_eval = info["used_memory_rss"]
pipe = client.pipeline(transaction=False)
MSET_SCRIPT = """
redis.call('MSET', KEYS[1], ARGV[1], KEYS[2], ARGV[2])
"""
for _ in range(20):
for _ in range(8000):
pipe.eval(MSET_SCRIPT, 2, "x1", "y1", "x2", "y2")
# reject mset due to oom
with pytest.raises(redis.exceptions.ResponseError):
await pipe.execute()
await asyncio.sleep(1) # Wait for another RSS heartbeat update in Dragonfly
info = await client.info("memory")
logging.debug(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}')
assert rss_before_eval * 1.01 > info["used_memory_rss"]
@pytest.mark.skip("rss eviction disabled")
@pytest.mark.asyncio
@dfly_args(
{
"proactor_threads": 1,
"cache_mode": "true",
"maxmemory": "5gb",
"rss_oom_deny_ratio": 0.8,
"max_eviction_per_heartbeat": 100,
}
)
async def test_cache_eviction_with_rss_deny_oom(
async_client: aioredis.Redis,
):
"""
Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit
"""
max_memory = 5 * 1024 * 1024 * 1024 # 5G
rss_max_memory = int(max_memory * 0.8)
data_fill_size = int(0.9 * rss_max_memory) # 95% of rss_max_memory
val_size = 1024 * 5 # 5 kb
num_keys = data_fill_size // val_size
await async_client.execute_command("DEBUG", "POPULATE", num_keys, "key", val_size)
# Test that used memory is less than 90% of max memory
memory_info = await async_client.info("memory")
assert (
memory_info["used_memory"] < max_memory * 0.9
), "Used memory should be less than 90% of max memory."
assert (
memory_info["used_memory_rss"] > rss_max_memory * 0.9
), "RSS memory should be less than 90% of rss max memory (max_memory * rss_oom_deny_ratio)."
# Get RSS memory after creating new connections
memory_info = await async_client.info("memory")
while memory_info["used_memory_rss"] > rss_max_memory * 0.9:
await asyncio.sleep(1)
memory_info = await async_client.info("memory")
logging.info(
f'Current rss: {memory_info["used_memory_rss"]}. rss eviction threshold: {rss_max_memory * 0.9}.'
)
stats_info = await async_client.info("stats")
logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.')