1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: test metrics for huge value serialization (#4262)

* fix seeder bugs
* add test
* add assertions for huge value metrics

Signed-off-by: kostas <kostas@dragonflydb.io>
This commit is contained in:
Kostas Kyrimis 2024-12-12 13:19:14 +01:00 committed by GitHub
parent f892d9b7fb
commit b37287bf14
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 51 additions and 36 deletions

View file

@ -426,8 +426,7 @@ class DflyInstanceFactory:
args.setdefault("log_dir", self.params.log_dir)
if version >= 1.21 and "serialization_max_chunk_size" not in args:
# Add 1 byte limit for big values
args.setdefault("serialization_max_chunk_size", 1)
args.setdefault("serialization_max_chunk_size", 16384)
for k, v in args.items():
args[k] = v.format(**self.params.env) if isinstance(v, str) else v

View file

@ -44,23 +44,21 @@ Test full replication pipeline. Test full sync with streaming changes and stable
@pytest.mark.parametrize(
"t_master, t_replicas, seeder_config, stream_target, big_value",
"t_master, t_replicas, seeder_config, stream_target",
[
# Quick general test that replication is working
(1, 3 * [1], dict(key_target=1_000), 500, False),
(4, [4, 4], dict(key_target=10_000), 1_000, False),
pytest.param(6, [6, 6, 6], dict(key_target=100_000), 20_000, False, marks=M_OPT),
(1, 3 * [1], dict(key_target=1_000), 500),
# A lot of huge values
(2, 2 * [1], dict(key_target=1_000, huge_value_percentage=2), 500),
(4, [4, 4], dict(key_target=10_000), 1_000),
pytest.param(6, [6, 6, 6], dict(key_target=100_000), 20_000, marks=M_OPT),
# Skewed tests with different thread ratio
pytest.param(8, 6 * [1], dict(key_target=5_000), 2_000, False, marks=M_SLOW),
pytest.param(2, [8, 8], dict(key_target=10_000), 2_000, False, marks=M_SLOW),
# Test with big value size
pytest.param(2, [2], dict(key_target=1_000, data_size=10_000), 100, False, marks=M_SLOW),
# Test with big value and big value serialization
pytest.param(2, [2], dict(key_target=1_000, data_size=10_000), 100, True, marks=M_SLOW),
pytest.param(8, 6 * [1], dict(key_target=5_000), 2_000, marks=M_SLOW),
pytest.param(2, [8, 8], dict(key_target=10_000), 2_000, marks=M_SLOW),
# Everything is big because data size is 10k
pytest.param(2, [2], dict(key_target=1_000, data_size=10_000), 100, marks=M_SLOW),
# Stress test
pytest.param(
8, [8, 8], dict(key_target=1_000_000, units=16), 50_000, False, marks=M_STRESS
),
pytest.param(8, [8, 8], dict(key_target=1_000_000, units=16), 50_000, marks=M_STRESS),
],
)
@pytest.mark.parametrize("mode", [({}), ({"cache_mode": "true"})])
@ -70,7 +68,6 @@ async def test_replication_all(
t_replicas,
seeder_config,
stream_target,
big_value,
mode,
):
args = {}
@ -78,9 +75,6 @@ async def test_replication_all(
args["cache_mode"] = "true"
args["maxmemory"] = str(t_master * 256) + "mb"
if big_value:
args["serialization_max_chunk_size"] = 4096
master = df_factory.create(admin_port=ADMIN_PORT, proactor_threads=t_master, **args)
replicas = [
df_factory.create(admin_port=ADMIN_PORT + i + 1, proactor_threads=t)
@ -132,6 +126,26 @@ async def test_replication_all(
# Check data after stable state stream
await check()
info = await c_master.info()
preemptions = info["big_value_preemptions"]
key_target = seeder_config["key_target"]
# Rough estimate
estimated_preemptions = key_target * (0.01)
assert preemptions > estimated_preemptions
# Because data size could be 10k and for that case there will be almost a preemption
# per bucket.
if "data_size" not in seeder_config.keys():
total_buckets = info["num_buckets"]
# We care that we preempt less times than the total buckets such that we can be
# sure that we test both flows (with and without preemptions). Preemptions on 30%
# of buckets seems like a big number but that depends on a few parameters like
# the size of the hug value and the serialization max chunk size. For the test cases here,
# it's usually close to 10% but there are some that are close to 30.
total_buckets = info["num_buckets"]
logging.debug(f"Buckets {total_buckets}. Preemptions {preemptions}")
assert preemptions <= (total_buckets * 0.3)
async def check_replica_finished_exec(c_replica: aioredis.Redis, m_offset):
role = await c_replica.role()

View file

@ -138,10 +138,10 @@ class Seeder(SeederBase):
data_size=100,
collection_size=None,
types: typing.Optional[typing.List[str]] = None,
huge_value_percentage=1,
huge_value_size=1024,
# 1 huge entries per container/key as default
huge_value_csize=1,
huge_value_percentage=0,
huge_value_size=10000,
# 2 huge entries per container/key as default
huge_value_csize=2,
):
SeederBase.__init__(self, types)
self.key_target = key_target
@ -216,6 +216,6 @@ class Seeder(SeederBase):
msg = f"running unit {unit.prefix}/{unit.type} took {time.time() - s}, target {args[4+0]}"
if huge_keys > 0:
msg = f"{msg}. Total huge keys added {huge_keys} with {args[11]} elements each. Total extra modified huge entries {huge_entries}."
msg = f"{msg}. Total huge keys added {huge_keys} with {args[11]} elements each. Total huge entries {huge_entries}."
logging.debug(msg)

View file

@ -56,22 +56,23 @@ end
function LG_funcs.add_list(key, keys)
local is_huge = keys[key]
redis.apcall('LPUSH', key, unpack(randstr_sequence(is_huge)))
--- TODO -- investigate why second case of replication_test_all fails
--- we somehow create a quicklist that is circular and we deadlock
redis.apcall('LPUSH', key, unpack(randstr_sequence(false)))
end
function LG_funcs.mod_list(key, keys)
-- equally likely pops and pushes, we rely on the list size being large enough
-- to "highly likely" not get emptied out by consequitve pops
local is_huge = keys[key]
local action = math.random(1, 4)
if action == 1 then
redis.apcall('RPOP', key)
elseif action == 2 then
redis.apcall('LPOP', key)
elseif action == 3 then
redis.apcall('LPUSH', key, randstr(is_huge))
redis.apcall('LPUSH', key, randstr(false))
else
redis.apcall('RPUSH', key, randstr(is_huge))
redis.apcall('RPUSH', key, randstr(false))
end
end
@ -101,7 +102,7 @@ function LG_funcs.mod_set(key, keys)
redis.apcall('SPOP', key)
else
local is_huge = keys[key]
redis.apcall('SADD', key, randstr(is_huge))
redis.apcall('SADD', key, randstr(false))
end
end
@ -113,19 +114,21 @@ end
function LG_funcs.add_hash(key, keys)
local blobs
local is_huge = keys[key]
local limit = LG_funcs.csize
if is_huge then
blobs = dragonfly.randstr(LG_funcs.huge_value_size, LG_funcs.csize / 2)
limit = LG_funcs.huge_value_csize
blobs = dragonfly.randstr(LG_funcs.huge_value_size, limit)
huge_entries = huge_entries + 1
else
blobs = dragonfly.randstr(LG_funcs.esize, LG_funcs.csize / 2)
end
local htable = {}
for i = 1, LG_funcs.csize, 2 do
for i = 1, limit, 2 do
htable[i * 2 - 1] = tostring(i)
htable[i * 2] = math.random(0, 1000)
end
for i = 2, LG_funcs.csize, 2 do
for i = 2, limit, 2 do
htable[i * 2 - 1] = tostring(i)
htable[i * 2] = blobs[i // 2]
end
@ -137,8 +140,7 @@ function LG_funcs.mod_hash(key, keys)
if idx % 2 == 1 then
redis.apcall('HINCRBY', key, tostring(idx), 1)
else
local is_huge = keys[key]
redis.apcall('HSET', key, tostring(idx), randstr(is_huge))
redis.apcall('HSET', key, tostring(idx), randstr(false))
end
end
@ -166,8 +168,8 @@ end
function LG_funcs.mod_zset(key, keys)
local action = math.random(1, 4)
if action <= 2 then
local is_huge = keys[key]
redis.apcall('ZADD', key, math.random(0, LG_funcs.csize * 2), randstr(is_huge))
local size = LG_funcs.csize * 2
redis.apcall('ZADD', key, math.random(0, size), randstr(false))
elseif action == 3 then
redis.apcall('ZPOPMAX', key)
else