1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00
dragonflydb-dragonfly/tests/dragonfly/snapshot_test.py

645 lines
21 KiB
Python
Raw Normal View History

import pytest
import logging
import os
import glob
import asyncio
from async_timeout import timeout
import redis
from redis import asyncio as aioredis
from pathlib import Path
import boto3
from .instance import RedisServer
from random import randint as rand
import string
import random
from pymemcache.client.base import Client as MCClient
from . import dfly_args
from .utility import close_clients, wait_available_async, is_saving, tmp_file_name
from .seeder import StaticSeeder
2023-01-09 20:31:15 +00:00
BASIC_ARGS = {"dir": "{DRAGONFLY_TMP}/", "proactor_threads": 4}
FILE_FORMATS = ["RDB", "DF"]
# Should be used where text auxiliary mechanisms like filenames
LIGHTWEIGHT_SEEDER_ARGS = dict(key_target=100, data_size=100, variance=1, samples=1)
def find_main_file(path: Path, pattern):
return next(iter(glob.glob(str(path) + "/" + pattern)), None)
async def get_metric_value(inst, metric_name, sample_index=0):
return (await inst.metrics())[metric_name].samples[sample_index].value
async def assert_metric_value(inst, metric_name, expected_value):
actual_value = await get_metric_value(inst, metric_name)
assert (
actual_value == expected_value
), f"Expected {metric_name} to be {expected_value}, got ${actual_value}"
@pytest.mark.opt_only
@pytest.mark.parametrize("format", FILE_FORMATS)
@pytest.mark.parametrize(
"seeder_opts",
[
# Many small keys, high variance
dict(key_target=50_000, data_size=100, variance=10, samples=50),
# A few large keys, high variance
dict(key_target=1000, data_size=5_000, variance=10, samples=10),
],
)
@dfly_args({**BASIC_ARGS, "proactor_threads": 4})
async def test_consistency(df_factory, format: str, seeder_opts: dict):
"""
Test consistency over a large variety of data with different sizes
"""
dbfilename = f"dump_{tmp_file_name()}"
instance = df_factory.create(dbfilename=dbfilename)
instance.start()
async_client = instance.client()
await StaticSeeder(**seeder_opts).run(async_client)
start_capture = await StaticSeeder.capture(async_client)
# save + flush + load
await async_client.execute_command("SAVE", format)
assert await async_client.flushall()
await async_client.execute_command(
"DFLY",
"LOAD",
f"{dbfilename}.rdb" if format == "RDB" else f"{dbfilename}-summary.dfs",
)
assert (await StaticSeeder.capture(async_client)) == start_capture
@pytest.mark.parametrize("format", FILE_FORMATS)
@dfly_args({**BASIC_ARGS, "proactor_threads": 4})
async def test_multidb(df_factory, format: str):
"""
Test serialization of multiple logical databases
"""
dbfilename = f"test-multidb{rand(0, 5000)}"
instance = df_factory.create(dbfilename=dbfilename)
instance.start()
async_client = instance.client()
start_captures = []
for dbid in range(10):
db_client = instance.client(db=dbid)
await StaticSeeder(key_target=1000).run(db_client)
start_captures.append(await StaticSeeder.capture(db_client))
# save + flush + load
await async_client.execute_command("SAVE", format)
assert await async_client.flushall()
await async_client.execute_command(
"DFLY",
"LOAD",
f"{dbfilename}.rdb" if format == "RDB" else f"{dbfilename}-summary.dfs",
)
for dbid in range(10):
db_client = instance.client(db=dbid)
assert (await StaticSeeder.capture(db_client)) == start_captures[dbid]
@pytest.mark.asyncio
@pytest.mark.parametrize(
"save_type, dbfilename, pattern",
[
("rdb", "test-autoload1-{{timestamp}}", "test-autoload1-*.rdb"),
("df", "test-autoload2-{{timestamp}}", "test-autoload2-*-summary.dfs"),
("rdb", "test-autoload3-{{timestamp}}.rdb", "test-autoload3-*.rdb"),
("rdb", "test-autoload4", "test-autoload4.rdb"),
("df", "test-autoload5", "test-autoload5-summary.dfs"),
("rdb", "test-autoload6.rdb", "test-autoload6.rdb"),
],
)
async def test_dbfilenames(
df_factory, tmp_dir: Path, save_type: str, dbfilename: str, pattern: str
):
df_args = {**BASIC_ARGS, "dbfilename": dbfilename, "port": 1111}
2023-01-09 20:31:15 +00:00
if save_type == "rdb":
df_args["nodf_snapshot_format"] = None
start_capture = None
with df_factory.create(**df_args) as df_server:
async with df_server.client() as client:
await wait_available_async(client)
# We use the seeder just to check we don't loose any files (and thus keys)
await StaticSeeder(**LIGHTWEIGHT_SEEDER_ARGS).run(client)
start_capture = await StaticSeeder.capture(client)
await client.execute_command("SAVE " + save_type)
file = find_main_file(tmp_dir, pattern)
assert file is not None
assert os.path.basename(file).startswith(dbfilename.split("{{")[0])
with df_factory.create(**df_args) as df_server:
async with df_server.client() as client:
await wait_available_async(client)
assert await StaticSeeder.capture(client) == start_capture
@pytest.mark.asyncio
@dfly_args({**BASIC_ARGS, "proactor_threads": 4, "dbfilename": "test-redis-load-rdb"})
async def test_redis_load_snapshot(
async_client: aioredis.Redis, df_server, redis_local_server: RedisServer, tmp_dir: Path
):
"""
Test redis server loading dragonfly snapshot rdb format
"""
await StaticSeeder(
**LIGHTWEIGHT_SEEDER_ARGS, types=["STRING", "LIST", "SET", "HASH", "ZSET"]
).run(async_client)
await async_client.execute_command("SAVE", "rdb")
dbsize = await async_client.dbsize()
await async_client.connection_pool.disconnect()
df_server.stop()
redis_local_server.start(dir=tmp_dir, dbfilename="test-redis-load-rdb.rdb")
await asyncio.sleep(1)
c_master = aioredis.Redis(port=redis_local_server.port)
await c_master.ping()
assert await c_master.dbsize() == dbsize
@pytest.mark.slow
@dfly_args({**BASIC_ARGS, "dbfilename": "test-cron", "snapshot_cron": "* * * * *"})
async def test_cron_snapshot(tmp_dir: Path, async_client: aioredis.Redis):
await StaticSeeder(**LIGHTWEIGHT_SEEDER_ARGS).run(async_client)
file = None
async with timeout(65):
while file is None:
await asyncio.sleep(1)
file = find_main_file(tmp_dir, "test-cron-summary.dfs")
assert file is not None, os.listdir(tmp_dir)
@pytest.mark.slow
@dfly_args({**BASIC_ARGS, "dbfilename": "test-failed-saving", "snapshot_cron": "* * * * *"})
async def test_cron_snapshot_failed_saving(df_server, tmp_dir: Path, async_client: aioredis.Redis):
await StaticSeeder(**LIGHTWEIGHT_SEEDER_ARGS).run(async_client)
backups_total = await get_metric_value(df_server, "dragonfly_backups")
failed_backups_total = await get_metric_value(df_server, "dragonfly_failed_backups")
file = None
async with timeout(65):
while file is None:
await asyncio.sleep(1)
file = find_main_file(tmp_dir, "test-failed-saving-summary.dfs")
assert file is not None, os.listdir(tmp_dir)
await assert_metric_value(df_server, "dragonfly_backups", backups_total + 1)
await assert_metric_value(df_server, "dragonfly_failed_backups", failed_backups_total)
# Remove all files from directory
for dir_file in tmp_dir.iterdir():
os.unlink(dir_file)
# Make directory read-only
os.chmod(tmp_dir, 0o555)
# Wait for the next SAVE command
await asyncio.sleep(65)
file = find_main_file(tmp_dir, "test-failed-saving-summary.dfs")
# Make directory writable again
os.chmod(tmp_dir, 0o777)
assert file is None, os.listdir(tmp_dir)
await assert_metric_value(df_server, "dragonfly_backups", backups_total + 2)
await assert_metric_value(df_server, "dragonfly_failed_backups", failed_backups_total + 1)
@pytest.mark.slow
@dfly_args({**BASIC_ARGS, "dbfilename": "test-cron-set"})
async def test_set_cron_snapshot(tmp_dir: Path, async_client: aioredis.Redis):
await StaticSeeder(**LIGHTWEIGHT_SEEDER_ARGS).run(async_client)
2023-01-09 20:31:15 +00:00
await async_client.config_set("snapshot_cron", "* * * * *")
file = None
async with timeout(65):
while file is None:
await asyncio.sleep(1)
file = find_main_file(tmp_dir, "test-cron-set-summary.dfs")
assert file is not None
@dfly_args(
{**BASIC_ARGS, "dbfilename": "test-save-rename-command", "rename_command": "save=save-foo"}
)
async def test_shutdown_save_with_rename(df_server):
"""Checks that on shutdown we save snapshot"""
client = df_server.client()
await StaticSeeder(**LIGHTWEIGHT_SEEDER_ARGS).run(client)
start_capture = await StaticSeeder.capture(client)
await client.connection_pool.disconnect()
df_server.stop()
df_server.start()
client = df_server.client()
await wait_available_async(client)
assert await StaticSeeder.capture(client) == start_capture
await client.connection_pool.disconnect()
@pytest.mark.slow
async def test_parallel_snapshot(async_client):
"""Dragonfly does not allow simultaneous save operations, send 2 save operations and make sure one is rejected"""
await async_client.execute_command("debug", "populate", "1000000", "askldjh", "1000", "RAND")
async def save():
try:
await async_client.execute_command("save", "rdb", "dump")
return True
except Exception as e:
return False
save_successes = sum(await asyncio.gather(*(save() for _ in range(2))), 0)
assert save_successes == 1, "Only one SAVE must be successful"
async def test_path_escapes(df_factory):
"""Test that we don't allow path escapes. We just check that df_server.start()
fails because we don't have a much better way to test that."""
df_server = df_factory.create(dbfilename="../../../../etc/passwd")
try:
df_server.start()
assert False, "Server should not start correctly"
except Exception as e:
pass
@dfly_args({**BASIC_ARGS, "dbfilename": "test-info-persistence"})
async def test_info_persistence_field(async_client):
"""Test is_loading field on INFO PERSISTENCE during snapshot loading"""
await StaticSeeder(**LIGHTWEIGHT_SEEDER_ARGS).run(async_client)
# Wait for snapshot to finish loading and try INFO PERSISTENCE
await wait_available_async(async_client)
assert "loading:0" in (await async_client.execute_command("INFO PERSISTENCE"))
# If DRAGONFLY_S3_BUCKET is configured, AWS credentials must also be
# configured.
@pytest.mark.skipif(
"DRAGONFLY_S3_BUCKET" not in os.environ, reason="AWS S3 snapshots bucket is not configured"
)
@dfly_args({**BASIC_ARGS, "dir": "s3://{DRAGONFLY_S3_BUCKET}{DRAGONFLY_TMP}", "dbfilename": ""})
async def test_s3_snapshot(self, async_client):
seeder = StaticSeeder(key_target=10_000)
await seeder.run(async_client)
start_capture = await StaticSeeder.capture()
try:
# save + flush + load
await async_client.execute_command("SAVE DF snapshot")
assert await async_client.flushall()
await async_client.execute_command(
"DFLY LOAD "
+ os.environ["DRAGONFLY_S3_BUCKET"]
+ str(self.tmp_dir)
+ "/snapshot-summary.dfs"
)
assert await StaticSeeder.capture(async_client) == start_capture
finally:
def delete_objects(bucket, prefix):
client = boto3.client("s3")
resp = client.list_objects_v2(
Bucket=bucket,
Prefix=prefix,
)
keys = []
for obj in resp["Contents"]:
keys.append({"Key": obj["Key"]})
client.delete_objects(
Bucket=bucket,
Delete={"Objects": keys},
)
delete_objects(
os.environ["DRAGONFLY_S3_BUCKET"],
str(self.tmp_dir)[1:],
)
@dfly_args({**BASIC_ARGS, "dbfilename": "test-shutdown"})
class TestDflySnapshotOnShutdown:
SEEDER_ARGS = dict(key_target=10_000)
"""Test multi file snapshot"""
async def _get_info_memory_fields(self, client):
res = await client.execute_command("INFO MEMORY")
fields = {}
for line in res.splitlines():
if line.startswith("#"):
continue
k, v = line.split(":")
if k == "object_used_memory" or k.startswith("type_used_memory_"):
fields.update({k: int(v)})
return fields
async def _delete_all_keys(self, client: aioredis.Redis):
while True:
keys = await client.keys()
if len(keys) == 0:
break
await client.delete(*keys)
@pytest.mark.asyncio
async def test_memory_counters(self, async_client: aioredis.Redis):
memory_counters = await self._get_info_memory_fields(async_client)
assert memory_counters == {"object_used_memory": 0}
seeder = StaticSeeder(**self.SEEDER_ARGS)
await seeder.run(async_client)
memory_counters = await self._get_info_memory_fields(async_client)
assert all(value > 0 for value in memory_counters.values())
await self._delete_all_keys(async_client)
memory_counters = await self._get_info_memory_fields(async_client)
assert memory_counters == {"object_used_memory": 0}
@pytest.mark.asyncio
@pytest.mark.slow
async def test_snapshot(self, df_server, async_client):
"""Checks that:
1. After reloading the snapshot file the data is the same
2. Memory counters after loading from snapshot is similar to before creating a snapshot
3. Memory counters after deleting all keys loaded by snapshot - this validates the memory
counting when loading from snapshot."""
seeder = StaticSeeder(**self.SEEDER_ARGS)
await seeder.run(async_client)
start_capture = await StaticSeeder.capture(async_client)
memory_before = await self._get_info_memory_fields(async_client)
await async_client.connection_pool.disconnect()
df_server.stop()
df_server.start()
async_client = df_server.client()
await wait_available_async(async_client)
assert await StaticSeeder.capture(async_client) == start_capture
memory_after = await self._get_info_memory_fields(async_client)
for counter, value in memory_before.items():
# Unfortunately memory usage sometimes depends on order of insertion / deletion, so
# it's usually not exactly the same. For the test to be stable we check that it's
# at least 50% that of the original value.
assert memory_after[counter] >= 0.5 * value
await self._delete_all_keys(async_client)
memory_empty = await self._get_info_memory_fields(async_client)
assert memory_empty == {"object_used_memory": 0}
@pytest.mark.parametrize("format", FILE_FORMATS)
@dfly_args({**BASIC_ARGS, "dbfilename": "info-while-snapshot"})
async def test_infomemory_while_snapshoting(df_factory, format: str):
instance = df_factory.create(dbfilename=f"dump_{tmp_file_name()}")
instance.start()
async_client = instance.client()
await async_client.execute_command("DEBUG POPULATE 10000 key 4048 RAND")
async def save():
await async_client.execute_command("SAVE", format)
save_finished = False
async def info_in_loop():
while not save_finished:
await async_client.execute_command("INFO MEMORY")
await asyncio.sleep(0.1)
save_task = asyncio.create_task(save())
info_task = asyncio.create_task(info_in_loop())
await save_task
save_finished = True
await info_task
@dfly_args({**BASIC_ARGS, "dbfilename": "test-bgsave"})
async def test_bgsave_and_save(async_client: aioredis.Redis):
await async_client.execute_command("DEBUG POPULATE 20000")
await async_client.execute_command("BGSAVE")
with pytest.raises(redis.exceptions.ResponseError):
await async_client.execute_command("BGSAVE")
while await is_saving(async_client):
await asyncio.sleep(0.1)
await async_client.execute_command("BGSAVE")
with pytest.raises(redis.exceptions.ResponseError):
await async_client.execute_command("SAVE")
while await is_saving(async_client):
await asyncio.sleep(0.1)
await async_client.execute_command("SAVE")
@dfly_args(
{
**BASIC_ARGS,
"proactor_threads": 4,
"dbfilename": "tiered-entries",
"tiered_prefix": "tiering-test-backing",
"tiered_offload_threshold": "0.0", # ask offloading loop to offload as much as possible
}
)
async def test_tiered_entries(async_client: aioredis.Redis):
"""This test makes sure tieried entries are correctly persisted"""
# With variance 4: 512 - 8192 we include small and large values
await StaticSeeder(key_target=5000, data_size=1024, variance=4, types=["STRING"]).run(
async_client
)
# Compute the capture, this brings all items back to memory... so we'll wait for offloading
start_capture = await StaticSeeder.capture(async_client)
# Wait until the total_stashes counter stops increasing, meaning offloading finished
last_writes, current_writes = 0, -1
while last_writes != current_writes:
await asyncio.sleep(0.1)
last_writes = current_writes
current_writes = (await async_client.info("TIERED"))["tiered_total_stashes"]
# Save + flush + load
await async_client.execute_command("SAVE", "DF")
assert await async_client.flushall()
await async_client.execute_command(
"DFLY",
"LOAD",
"tiered-entries-summary.dfs",
)
# Compare captures
assert await StaticSeeder.capture(async_client) == start_capture
@pytest.mark.skip("Too heavy")
@pytest.mark.opt_only
@dfly_args(
{
**BASIC_ARGS,
"proactor_threads": 4,
"maxmemory": "1G",
"dbfilename": "tiered-entries",
"tiered_prefix": "tiering-test-backing",
"tiered_offload_threshold": "0.5", # ask to keep below 0.5 * 1G
"tiered_storage_write_depth": 50,
}
)
async def test_tiered_entries_throttle(async_client: aioredis.Redis):
"""This test makes sure tieried entries are correctly persisted"""
await StaticSeeder(key_target=600_000, data_size=4096, variance=1, types=["STRING"]).run(
async_client
)
# Compute the capture, this brings all items back to memory... so we'll wait for offloading
start_capture = await StaticSeeder.capture(async_client)
# Save + flush + load
await async_client.execute_command("SAVE", "DF")
assert await async_client.flushall()
load_task = asyncio.create_task(
async_client.execute_command(
"DFLY",
"LOAD",
"tiered-entries-summary.dfs",
)
)
while not load_task.done():
info = await async_client.info("ALL")
# print(info["used_memory_human"], info["used_memory_rss_human"])
assert info["used_memory"] < 600e6 # less than 600mb,
await asyncio.sleep(0.05)
await load_task
assert await StaticSeeder.capture(async_client) == start_capture
@dfly_args({"proactor_threads": 1})
@pytest.mark.parametrize(
"query",
[
("HSET"),
("SADD"),
("ZSET"),
("LIST"),
],
)
async def test_big_value_serialization_memory_limit(df_factory, query):
dbfilename = f"dump_{tmp_file_name()}"
instance = df_factory.create(dbfilename=dbfilename)
instance.start()
client = instance.client()
ten_mb = 10_000_000
def ten_mb_random_string():
return "".join(random.choices(string.ascii_letters, k=ten_mb))
one_gb = 1_000_000_000 # 1GB
upper_limit = one_gb * 1.1 # 1GB + 100MB
i = 0
while instance.rss < one_gb:
if query == "HSET":
i = i + 1
await client.execute_command(f"HSET foo_key foo_{i} {ten_mb_random_string()}")
elif query == "SADD":
await client.execute_command(f"SADD foo_key {ten_mb_random_string()}")
elif query == "ZSET":
await client.execute_command(f"ZADD foo_key {i} {ten_mb_random_string()}")
elif query == "LIST":
await client.execute_command(f"LPUSH foo_key {ten_mb_random_string()}")
async def check_memory_usage(instance):
while True:
assert instance.rss < upper_limit
await asyncio.sleep(0.01)
checker = asyncio.create_task(check_memory_usage(instance))
await client.execute_command("SAVE")
checker.cancel()
await client.execute_command("FLUSHALL")
await client.close()
@dfly_args(
{
"dir": "{DRAGONFLY_TMP}/",
"memcached_port": 11211,
"proactor_threads": 4,
"dbfilename": "test-MC-flags",
}
)
async def test_mc_flags_saving(memcached_client: MCClient, async_client: aioredis.Redis):
async def check_flag(key, flag):
res = memcached_client.raw_command("get " + key, "END\r\n").split()
# workaround sometimes memcached_client.raw_command returns empty str
if len(res) > 2:
assert res[2].decode() == str(flag)
assert memcached_client.set("key1", "value1", noreply=True)
assert memcached_client.set("key2", "value1", noreply=True, expire=3600, flags=123456)
assert memcached_client.replace("key1", "value2", expire=4000, flags=2, noreply=True)
await check_flag("key1", 2)
await check_flag("key2", 123456)
await async_client.execute_command("SAVE", "DF")
assert await async_client.flushall()
await async_client.execute_command(
"DFLY",
"LOAD",
"test-MC-flags-summary.dfs",
)
await check_flag("key1", 2)
await check_flag("key2", 123456)
await close_clients(async_client)