2022-08-26 10:54:38 +00:00
|
|
|
import pytest
|
2024-06-04 06:28:18 +00:00
|
|
|
import logging
|
2022-10-24 18:22:43 +00:00
|
|
|
import os
|
|
|
|
import glob
|
2023-08-01 08:30:17 +00:00
|
|
|
import asyncio
|
2024-08-13 13:05:15 +00:00
|
|
|
from async_timeout import timeout
|
2024-03-14 11:31:58 +00:00
|
|
|
import redis
|
2023-05-13 07:44:25 +00:00
|
|
|
from redis import asyncio as aioredis
|
2022-08-26 10:54:38 +00:00
|
|
|
from pathlib import Path
|
2023-10-06 09:24:56 +00:00
|
|
|
import boto3
|
2024-06-04 06:28:18 +00:00
|
|
|
from .instance import RedisServer
|
2024-07-02 07:26:26 +00:00
|
|
|
from random import randint as rand
|
2024-07-24 13:07:27 +00:00
|
|
|
import string
|
|
|
|
import random
|
2024-08-21 15:31:03 +00:00
|
|
|
from pymemcache.client.base import Client as MCClient
|
2022-11-06 14:27:43 +00:00
|
|
|
|
|
|
|
from . import dfly_args
|
2024-09-30 06:54:41 +00:00
|
|
|
from .utility import wait_available_async, is_saving, tmp_file_name
|
2022-08-26 10:54:38 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
from .seeder import StaticSeeder
|
2023-01-09 20:31:15 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
BASIC_ARGS = {"dir": "{DRAGONFLY_TMP}/", "proactor_threads": 4}
|
|
|
|
FILE_FORMATS = ["RDB", "DF"]
|
2022-10-24 18:22:43 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
# Should be used where text auxiliary mechanisms like filenames
|
|
|
|
LIGHTWEIGHT_SEEDER_ARGS = dict(key_target=100, data_size=100, variance=1, samples=1)
|
2022-10-24 18:22:43 +00:00
|
|
|
|
2022-08-26 10:54:38 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
def find_main_file(path: Path, pattern):
|
|
|
|
return next(iter(glob.glob(str(path) + "/" + pattern)), None)
|
2023-07-17 10:13:12 +00:00
|
|
|
|
2022-10-24 18:22:43 +00:00
|
|
|
|
2024-08-23 21:36:31 +00:00
|
|
|
async def get_metric_value(inst, metric_name, sample_index=0):
|
|
|
|
return (await inst.metrics())[metric_name].samples[sample_index].value
|
|
|
|
|
|
|
|
|
|
|
|
async def assert_metric_value(inst, metric_name, expected_value):
|
|
|
|
actual_value = await get_metric_value(inst, metric_name)
|
|
|
|
assert (
|
|
|
|
actual_value == expected_value
|
|
|
|
), f"Expected {metric_name} to be {expected_value}, got ${actual_value}"
|
|
|
|
|
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
@pytest.mark.opt_only
|
|
|
|
@pytest.mark.parametrize("format", FILE_FORMATS)
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
"seeder_opts",
|
|
|
|
[
|
|
|
|
# Many small keys, high variance
|
|
|
|
dict(key_target=50_000, data_size=100, variance=10, samples=50),
|
|
|
|
# A few large keys, high variance
|
|
|
|
dict(key_target=1000, data_size=5_000, variance=10, samples=10),
|
|
|
|
],
|
|
|
|
)
|
2024-07-02 07:26:26 +00:00
|
|
|
@dfly_args({**BASIC_ARGS, "proactor_threads": 4})
|
|
|
|
async def test_consistency(df_factory, format: str, seeder_opts: dict):
|
2024-02-14 09:13:52 +00:00
|
|
|
"""
|
|
|
|
Test consistency over a large variety of data with different sizes
|
|
|
|
"""
|
2024-07-15 08:01:02 +00:00
|
|
|
dbfilename = f"dump_{tmp_file_name()}"
|
2024-07-02 07:26:26 +00:00
|
|
|
instance = df_factory.create(dbfilename=dbfilename)
|
|
|
|
instance.start()
|
|
|
|
async_client = instance.client()
|
2024-02-14 09:13:52 +00:00
|
|
|
await StaticSeeder(**seeder_opts).run(async_client)
|
|
|
|
|
|
|
|
start_capture = await StaticSeeder.capture(async_client)
|
|
|
|
|
|
|
|
# save + flush + load
|
|
|
|
await async_client.execute_command("SAVE", format)
|
|
|
|
assert await async_client.flushall()
|
|
|
|
await async_client.execute_command(
|
2024-08-15 11:56:40 +00:00
|
|
|
"DFLY",
|
2024-02-14 09:13:52 +00:00
|
|
|
"LOAD",
|
2024-07-02 07:26:26 +00:00
|
|
|
f"{dbfilename}.rdb" if format == "RDB" else f"{dbfilename}-summary.dfs",
|
2024-02-14 09:13:52 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
assert (await StaticSeeder.capture(async_client)) == start_capture
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("format", FILE_FORMATS)
|
2024-07-02 07:26:26 +00:00
|
|
|
@dfly_args({**BASIC_ARGS, "proactor_threads": 4})
|
|
|
|
async def test_multidb(df_factory, format: str):
|
2024-02-14 09:13:52 +00:00
|
|
|
"""
|
|
|
|
Test serialization of multiple logical databases
|
|
|
|
"""
|
2024-09-02 06:27:22 +00:00
|
|
|
dbfilename = f"dump_{tmp_file_name()}"
|
2024-07-02 07:26:26 +00:00
|
|
|
instance = df_factory.create(dbfilename=dbfilename)
|
|
|
|
instance.start()
|
|
|
|
async_client = instance.client()
|
2024-02-14 09:13:52 +00:00
|
|
|
start_captures = []
|
|
|
|
for dbid in range(10):
|
2024-07-02 07:26:26 +00:00
|
|
|
db_client = instance.client(db=dbid)
|
2024-02-14 09:13:52 +00:00
|
|
|
await StaticSeeder(key_target=1000).run(db_client)
|
|
|
|
start_captures.append(await StaticSeeder.capture(db_client))
|
|
|
|
|
|
|
|
# save + flush + load
|
|
|
|
await async_client.execute_command("SAVE", format)
|
|
|
|
assert await async_client.flushall()
|
|
|
|
await async_client.execute_command(
|
2024-08-15 11:56:40 +00:00
|
|
|
"DFLY",
|
2024-02-14 09:13:52 +00:00
|
|
|
"LOAD",
|
2024-07-02 07:26:26 +00:00
|
|
|
f"{dbfilename}.rdb" if format == "RDB" else f"{dbfilename}-summary.dfs",
|
2024-02-14 09:13:52 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
for dbid in range(10):
|
2024-07-02 07:26:26 +00:00
|
|
|
db_client = instance.client(db=dbid)
|
2024-02-14 09:13:52 +00:00
|
|
|
assert (await StaticSeeder.capture(db_client)) == start_captures[dbid]
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
"save_type, dbfilename, pattern",
|
|
|
|
[
|
|
|
|
("rdb", "test-autoload1-{{timestamp}}", "test-autoload1-*.rdb"),
|
|
|
|
("df", "test-autoload2-{{timestamp}}", "test-autoload2-*-summary.dfs"),
|
|
|
|
("rdb", "test-autoload3-{{timestamp}}.rdb", "test-autoload3-*.rdb"),
|
|
|
|
("rdb", "test-autoload4", "test-autoload4.rdb"),
|
|
|
|
("df", "test-autoload5", "test-autoload5-summary.dfs"),
|
|
|
|
("rdb", "test-autoload6.rdb", "test-autoload6.rdb"),
|
|
|
|
],
|
|
|
|
)
|
|
|
|
async def test_dbfilenames(
|
2024-07-02 07:26:26 +00:00
|
|
|
df_factory, tmp_dir: Path, save_type: str, dbfilename: str, pattern: str
|
2024-02-14 09:13:52 +00:00
|
|
|
):
|
|
|
|
df_args = {**BASIC_ARGS, "dbfilename": dbfilename, "port": 1111}
|
2023-01-09 20:31:15 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
if save_type == "rdb":
|
|
|
|
df_args["nodf_snapshot_format"] = None
|
2022-10-24 18:22:43 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
start_capture = None
|
2023-04-17 11:59:44 +00:00
|
|
|
|
2024-07-02 07:26:26 +00:00
|
|
|
with df_factory.create(**df_args) as df_server:
|
2024-02-14 09:13:52 +00:00
|
|
|
async with df_server.client() as client:
|
|
|
|
await wait_available_async(client)
|
2023-04-17 11:59:44 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
# We use the seeder just to check we don't loose any files (and thus keys)
|
|
|
|
await StaticSeeder(**LIGHTWEIGHT_SEEDER_ARGS).run(client)
|
|
|
|
start_capture = await StaticSeeder.capture(client)
|
2023-07-17 10:13:12 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
await client.execute_command("SAVE " + save_type)
|
2023-04-17 11:59:44 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
file = find_main_file(tmp_dir, pattern)
|
|
|
|
assert file is not None
|
|
|
|
assert os.path.basename(file).startswith(dbfilename.split("{{")[0])
|
2023-04-17 11:59:44 +00:00
|
|
|
|
2024-07-02 07:26:26 +00:00
|
|
|
with df_factory.create(**df_args) as df_server:
|
2024-02-14 09:13:52 +00:00
|
|
|
async with df_server.client() as client:
|
|
|
|
await wait_available_async(client)
|
|
|
|
assert await StaticSeeder.capture(client) == start_capture
|
2023-04-17 11:59:44 +00:00
|
|
|
|
2022-10-24 18:22:43 +00:00
|
|
|
|
2024-06-04 06:28:18 +00:00
|
|
|
@pytest.mark.asyncio
|
2024-10-15 10:55:26 +00:00
|
|
|
@dfly_args(
|
|
|
|
{
|
|
|
|
**BASIC_ARGS,
|
|
|
|
"proactor_threads": 4,
|
|
|
|
"dbfilename": "test-redis-load-rdb",
|
|
|
|
"list_rdb_encode_v2": "false", # Needed for compatibility with Redis 6
|
|
|
|
}
|
|
|
|
)
|
2024-06-04 06:28:18 +00:00
|
|
|
async def test_redis_load_snapshot(
|
|
|
|
async_client: aioredis.Redis, df_server, redis_local_server: RedisServer, tmp_dir: Path
|
|
|
|
):
|
|
|
|
"""
|
|
|
|
Test redis server loading dragonfly snapshot rdb format
|
|
|
|
"""
|
|
|
|
await StaticSeeder(
|
|
|
|
**LIGHTWEIGHT_SEEDER_ARGS, types=["STRING", "LIST", "SET", "HASH", "ZSET"]
|
|
|
|
).run(async_client)
|
|
|
|
|
2024-10-15 10:55:26 +00:00
|
|
|
await async_client.lpush("list", "A" * 10_000)
|
|
|
|
|
2024-06-04 06:28:18 +00:00
|
|
|
await async_client.execute_command("SAVE", "rdb")
|
|
|
|
dbsize = await async_client.dbsize()
|
|
|
|
|
|
|
|
await async_client.connection_pool.disconnect()
|
|
|
|
df_server.stop()
|
|
|
|
|
|
|
|
redis_local_server.start(dir=tmp_dir, dbfilename="test-redis-load-rdb.rdb")
|
|
|
|
await asyncio.sleep(1)
|
|
|
|
c_master = aioredis.Redis(port=redis_local_server.port)
|
|
|
|
await c_master.ping()
|
|
|
|
|
|
|
|
assert await c_master.dbsize() == dbsize
|
|
|
|
|
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
@pytest.mark.slow
|
|
|
|
@dfly_args({**BASIC_ARGS, "dbfilename": "test-cron", "snapshot_cron": "* * * * *"})
|
2024-02-19 15:20:48 +00:00
|
|
|
async def test_cron_snapshot(tmp_dir: Path, async_client: aioredis.Redis):
|
2024-02-14 09:13:52 +00:00
|
|
|
await StaticSeeder(**LIGHTWEIGHT_SEEDER_ARGS).run(async_client)
|
2022-10-24 18:22:43 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
file = None
|
2024-08-13 13:05:15 +00:00
|
|
|
async with timeout(65):
|
2024-02-14 09:13:52 +00:00
|
|
|
while file is None:
|
|
|
|
await asyncio.sleep(1)
|
2024-02-19 15:20:48 +00:00
|
|
|
file = find_main_file(tmp_dir, "test-cron-summary.dfs")
|
2022-10-31 14:39:20 +00:00
|
|
|
|
2024-02-19 15:20:48 +00:00
|
|
|
assert file is not None, os.listdir(tmp_dir)
|
2023-07-17 10:13:12 +00:00
|
|
|
|
2022-10-24 18:22:43 +00:00
|
|
|
|
2024-08-26 12:20:34 +00:00
|
|
|
@pytest.mark.skip("Fails and also causes all TLS tests to fail")
|
2024-08-23 21:36:31 +00:00
|
|
|
@pytest.mark.slow
|
|
|
|
@dfly_args({**BASIC_ARGS, "dbfilename": "test-failed-saving", "snapshot_cron": "* * * * *"})
|
|
|
|
async def test_cron_snapshot_failed_saving(df_server, tmp_dir: Path, async_client: aioredis.Redis):
|
|
|
|
await StaticSeeder(**LIGHTWEIGHT_SEEDER_ARGS).run(async_client)
|
|
|
|
|
|
|
|
backups_total = await get_metric_value(df_server, "dragonfly_backups")
|
|
|
|
failed_backups_total = await get_metric_value(df_server, "dragonfly_failed_backups")
|
|
|
|
|
|
|
|
file = None
|
|
|
|
async with timeout(65):
|
|
|
|
while file is None:
|
|
|
|
await asyncio.sleep(1)
|
|
|
|
file = find_main_file(tmp_dir, "test-failed-saving-summary.dfs")
|
|
|
|
|
|
|
|
assert file is not None, os.listdir(tmp_dir)
|
|
|
|
|
|
|
|
await assert_metric_value(df_server, "dragonfly_backups", backups_total + 1)
|
|
|
|
await assert_metric_value(df_server, "dragonfly_failed_backups", failed_backups_total)
|
|
|
|
|
|
|
|
# Remove all files from directory
|
|
|
|
for dir_file in tmp_dir.iterdir():
|
|
|
|
os.unlink(dir_file)
|
|
|
|
|
|
|
|
# Make directory read-only
|
|
|
|
os.chmod(tmp_dir, 0o555)
|
|
|
|
|
|
|
|
# Wait for the next SAVE command
|
|
|
|
await asyncio.sleep(65)
|
|
|
|
file = find_main_file(tmp_dir, "test-failed-saving-summary.dfs")
|
|
|
|
|
|
|
|
# Make directory writable again
|
|
|
|
os.chmod(tmp_dir, 0o777)
|
|
|
|
|
|
|
|
assert file is None, os.listdir(tmp_dir)
|
|
|
|
|
|
|
|
await assert_metric_value(df_server, "dragonfly_backups", backups_total + 2)
|
|
|
|
await assert_metric_value(df_server, "dragonfly_failed_backups", failed_backups_total + 1)
|
|
|
|
|
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
@pytest.mark.slow
|
|
|
|
@dfly_args({**BASIC_ARGS, "dbfilename": "test-cron-set"})
|
2024-02-19 15:20:48 +00:00
|
|
|
async def test_set_cron_snapshot(tmp_dir: Path, async_client: aioredis.Redis):
|
2024-02-14 09:13:52 +00:00
|
|
|
await StaticSeeder(**LIGHTWEIGHT_SEEDER_ARGS).run(async_client)
|
2023-01-09 20:31:15 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
await async_client.config_set("snapshot_cron", "* * * * *")
|
2022-10-24 18:22:43 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
file = None
|
2024-08-13 13:05:15 +00:00
|
|
|
async with timeout(65):
|
2024-02-14 09:13:52 +00:00
|
|
|
while file is None:
|
|
|
|
await asyncio.sleep(1)
|
2024-02-19 15:20:48 +00:00
|
|
|
file = find_main_file(tmp_dir, "test-cron-set-summary.dfs")
|
2022-10-24 18:22:43 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
assert file is not None
|
2022-10-24 18:22:43 +00:00
|
|
|
|
2023-07-17 10:13:12 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
@dfly_args(
|
|
|
|
{**BASIC_ARGS, "dbfilename": "test-save-rename-command", "rename_command": "save=save-foo"}
|
|
|
|
)
|
|
|
|
async def test_shutdown_save_with_rename(df_server):
|
|
|
|
"""Checks that on shutdown we save snapshot"""
|
|
|
|
client = df_server.client()
|
2023-04-20 04:30:42 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
await StaticSeeder(**LIGHTWEIGHT_SEEDER_ARGS).run(client)
|
|
|
|
start_capture = await StaticSeeder.capture(client)
|
2023-04-20 04:30:42 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
await client.connection_pool.disconnect()
|
|
|
|
df_server.stop()
|
|
|
|
df_server.start()
|
|
|
|
client = df_server.client()
|
2023-07-17 10:13:12 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
await wait_available_async(client)
|
|
|
|
assert await StaticSeeder.capture(client) == start_capture
|
2023-04-17 11:59:44 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
await client.connection_pool.disconnect()
|
2023-04-17 11:59:44 +00:00
|
|
|
|
2023-07-31 17:26:01 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
@pytest.mark.slow
|
|
|
|
async def test_parallel_snapshot(async_client):
|
|
|
|
"""Dragonfly does not allow simultaneous save operations, send 2 save operations and make sure one is rejected"""
|
2023-07-31 17:26:01 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
await async_client.execute_command("debug", "populate", "1000000", "askldjh", "1000", "RAND")
|
2023-07-31 17:26:01 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
async def save():
|
|
|
|
try:
|
|
|
|
await async_client.execute_command("save", "rdb", "dump")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
|
|
return False
|
2023-07-31 17:26:01 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
save_successes = sum(await asyncio.gather(*(save() for _ in range(2))), 0)
|
|
|
|
assert save_successes == 1, "Only one SAVE must be successful"
|
2023-07-31 17:26:01 +00:00
|
|
|
|
|
|
|
|
2024-07-02 07:26:26 +00:00
|
|
|
async def test_path_escapes(df_factory):
|
2024-02-14 09:13:52 +00:00
|
|
|
"""Test that we don't allow path escapes. We just check that df_server.start()
|
|
|
|
fails because we don't have a much better way to test that."""
|
2023-11-03 08:10:16 +00:00
|
|
|
|
2024-07-02 07:26:26 +00:00
|
|
|
df_server = df_factory.create(dbfilename="../../../../etc/passwd")
|
2024-02-14 09:13:52 +00:00
|
|
|
try:
|
|
|
|
df_server.start()
|
|
|
|
assert False, "Server should not start correctly"
|
|
|
|
except Exception as e:
|
|
|
|
pass
|
2023-11-03 08:10:16 +00:00
|
|
|
|
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
@dfly_args({**BASIC_ARGS, "dbfilename": "test-info-persistence"})
|
|
|
|
async def test_info_persistence_field(async_client):
|
|
|
|
"""Test is_loading field on INFO PERSISTENCE during snapshot loading"""
|
2023-11-03 08:10:16 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
await StaticSeeder(**LIGHTWEIGHT_SEEDER_ARGS).run(async_client)
|
2023-11-03 08:10:16 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
# Wait for snapshot to finish loading and try INFO PERSISTENCE
|
|
|
|
await wait_available_async(async_client)
|
|
|
|
assert "loading:0" in (await async_client.execute_command("INFO PERSISTENCE"))
|
2023-11-03 08:10:16 +00:00
|
|
|
|
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
# If DRAGONFLY_S3_BUCKET is configured, AWS credentials must also be
|
|
|
|
# configured.
|
|
|
|
@pytest.mark.skipif(
|
2024-09-17 14:35:53 +00:00
|
|
|
"DRAGONFLY_S3_BUCKET" not in os.environ or os.environ["DRAGONFLY_S3_BUCKET"] == "",
|
|
|
|
reason="AWS S3 snapshots bucket is not configured",
|
2024-01-07 10:21:09 +00:00
|
|
|
)
|
2024-02-14 09:13:52 +00:00
|
|
|
@dfly_args({**BASIC_ARGS, "dir": "s3://{DRAGONFLY_S3_BUCKET}{DRAGONFLY_TMP}", "dbfilename": ""})
|
2024-09-17 14:35:53 +00:00
|
|
|
async def test_s3_snapshot(async_client, tmp_dir):
|
2024-02-14 09:13:52 +00:00
|
|
|
seeder = StaticSeeder(key_target=10_000)
|
|
|
|
await seeder.run(async_client)
|
2024-01-07 10:21:09 +00:00
|
|
|
|
2024-09-17 14:35:53 +00:00
|
|
|
start_capture = await StaticSeeder.capture(async_client)
|
2024-01-07 10:21:09 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
try:
|
|
|
|
# save + flush + load
|
|
|
|
await async_client.execute_command("SAVE DF snapshot")
|
|
|
|
assert await async_client.flushall()
|
2023-11-21 11:56:27 +00:00
|
|
|
await async_client.execute_command(
|
2024-08-15 11:56:40 +00:00
|
|
|
"DFLY LOAD "
|
2024-02-14 09:13:52 +00:00
|
|
|
+ os.environ["DRAGONFLY_S3_BUCKET"]
|
2024-09-17 14:35:53 +00:00
|
|
|
+ str(tmp_dir)
|
2024-02-14 09:13:52 +00:00
|
|
|
+ "/snapshot-summary.dfs"
|
2023-11-21 11:56:27 +00:00
|
|
|
)
|
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
assert await StaticSeeder.capture(async_client) == start_capture
|
2023-11-21 11:56:27 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
finally:
|
2023-11-21 11:56:27 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
def delete_objects(bucket, prefix):
|
|
|
|
client = boto3.client("s3")
|
|
|
|
resp = client.list_objects_v2(
|
|
|
|
Bucket=bucket,
|
|
|
|
Prefix=prefix,
|
|
|
|
)
|
|
|
|
keys = []
|
|
|
|
for obj in resp["Contents"]:
|
|
|
|
keys.append({"Key": obj["Key"]})
|
|
|
|
client.delete_objects(
|
|
|
|
Bucket=bucket,
|
|
|
|
Delete={"Objects": keys},
|
|
|
|
)
|
2023-11-21 11:56:27 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
delete_objects(
|
|
|
|
os.environ["DRAGONFLY_S3_BUCKET"],
|
2024-09-17 14:35:53 +00:00
|
|
|
str(tmp_dir)[1:],
|
2024-02-14 09:13:52 +00:00
|
|
|
)
|
2023-04-20 04:30:42 +00:00
|
|
|
|
|
|
|
|
|
|
|
@dfly_args({**BASIC_ARGS, "dbfilename": "test-shutdown"})
|
2024-02-14 09:13:52 +00:00
|
|
|
class TestDflySnapshotOnShutdown:
|
|
|
|
SEEDER_ARGS = dict(key_target=10_000)
|
2023-07-17 10:13:12 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
"""Test multi file snapshot"""
|
2023-04-20 04:30:42 +00:00
|
|
|
|
2023-12-25 07:49:57 +00:00
|
|
|
async def _get_info_memory_fields(self, client):
|
|
|
|
res = await client.execute_command("INFO MEMORY")
|
|
|
|
fields = {}
|
2024-02-14 09:13:52 +00:00
|
|
|
for line in res.splitlines():
|
2023-12-25 07:49:57 +00:00
|
|
|
if line.startswith("#"):
|
|
|
|
continue
|
|
|
|
k, v = line.split(":")
|
|
|
|
if k == "object_used_memory" or k.startswith("type_used_memory_"):
|
|
|
|
fields.update({k: int(v)})
|
|
|
|
return fields
|
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
async def _delete_all_keys(self, client: aioredis.Redis):
|
|
|
|
while True:
|
|
|
|
keys = await client.keys()
|
|
|
|
if len(keys) == 0:
|
|
|
|
break
|
|
|
|
await client.delete(*keys)
|
2023-12-25 07:49:57 +00:00
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
2024-02-14 09:13:52 +00:00
|
|
|
async def test_memory_counters(self, async_client: aioredis.Redis):
|
|
|
|
memory_counters = await self._get_info_memory_fields(async_client)
|
2023-12-25 07:49:57 +00:00
|
|
|
assert memory_counters == {"object_used_memory": 0}
|
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
seeder = StaticSeeder(**self.SEEDER_ARGS)
|
|
|
|
await seeder.run(async_client)
|
2023-12-25 07:49:57 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
memory_counters = await self._get_info_memory_fields(async_client)
|
2023-12-25 07:49:57 +00:00
|
|
|
assert all(value > 0 for value in memory_counters.values())
|
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
await self._delete_all_keys(async_client)
|
|
|
|
memory_counters = await self._get_info_memory_fields(async_client)
|
2023-12-25 07:49:57 +00:00
|
|
|
assert memory_counters == {"object_used_memory": 0}
|
|
|
|
|
2023-04-20 04:30:42 +00:00
|
|
|
@pytest.mark.asyncio
|
2023-08-23 10:04:49 +00:00
|
|
|
@pytest.mark.slow
|
2024-02-14 09:13:52 +00:00
|
|
|
async def test_snapshot(self, df_server, async_client):
|
2023-12-25 07:49:57 +00:00
|
|
|
"""Checks that:
|
|
|
|
1. After reloading the snapshot file the data is the same
|
|
|
|
2. Memory counters after loading from snapshot is similar to before creating a snapshot
|
|
|
|
3. Memory counters after deleting all keys loaded by snapshot - this validates the memory
|
|
|
|
counting when loading from snapshot."""
|
2024-02-14 09:13:52 +00:00
|
|
|
seeder = StaticSeeder(**self.SEEDER_ARGS)
|
|
|
|
await seeder.run(async_client)
|
|
|
|
start_capture = await StaticSeeder.capture(async_client)
|
2023-04-20 04:30:42 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
memory_before = await self._get_info_memory_fields(async_client)
|
2023-04-20 04:30:42 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
await async_client.connection_pool.disconnect()
|
2023-04-20 04:30:42 +00:00
|
|
|
df_server.stop()
|
|
|
|
df_server.start()
|
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
async_client = df_server.client()
|
|
|
|
await wait_available_async(async_client)
|
|
|
|
|
|
|
|
assert await StaticSeeder.capture(async_client) == start_capture
|
2023-04-20 04:30:42 +00:00
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
memory_after = await self._get_info_memory_fields(async_client)
|
2023-12-25 07:49:57 +00:00
|
|
|
for counter, value in memory_before.items():
|
|
|
|
# Unfortunately memory usage sometimes depends on order of insertion / deletion, so
|
|
|
|
# it's usually not exactly the same. For the test to be stable we check that it's
|
|
|
|
# at least 50% that of the original value.
|
|
|
|
assert memory_after[counter] >= 0.5 * value
|
|
|
|
|
2024-02-14 09:13:52 +00:00
|
|
|
await self._delete_all_keys(async_client)
|
|
|
|
memory_empty = await self._get_info_memory_fields(async_client)
|
2023-12-25 07:49:57 +00:00
|
|
|
assert memory_empty == {"object_used_memory": 0}
|
2024-02-21 13:39:57 +00:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("format", FILE_FORMATS)
|
|
|
|
@dfly_args({**BASIC_ARGS, "dbfilename": "info-while-snapshot"})
|
2024-07-02 07:26:26 +00:00
|
|
|
async def test_infomemory_while_snapshoting(df_factory, format: str):
|
2024-07-15 08:01:02 +00:00
|
|
|
instance = df_factory.create(dbfilename=f"dump_{tmp_file_name()}")
|
2024-07-02 07:26:26 +00:00
|
|
|
instance.start()
|
|
|
|
async_client = instance.client()
|
2024-02-21 13:39:57 +00:00
|
|
|
await async_client.execute_command("DEBUG POPULATE 10000 key 4048 RAND")
|
|
|
|
|
|
|
|
async def save():
|
|
|
|
await async_client.execute_command("SAVE", format)
|
|
|
|
|
|
|
|
save_finished = False
|
|
|
|
|
|
|
|
async def info_in_loop():
|
|
|
|
while not save_finished:
|
|
|
|
await async_client.execute_command("INFO MEMORY")
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
|
|
|
save_task = asyncio.create_task(save())
|
|
|
|
info_task = asyncio.create_task(info_in_loop())
|
|
|
|
|
|
|
|
await save_task
|
|
|
|
save_finished = True
|
|
|
|
await info_task
|
2024-03-14 11:31:58 +00:00
|
|
|
|
|
|
|
|
|
|
|
@dfly_args({**BASIC_ARGS, "dbfilename": "test-bgsave"})
|
|
|
|
async def test_bgsave_and_save(async_client: aioredis.Redis):
|
2024-10-01 12:03:12 +00:00
|
|
|
await async_client.execute_command("DEBUG POPULATE 200000")
|
2024-03-14 11:31:58 +00:00
|
|
|
|
|
|
|
await async_client.execute_command("BGSAVE")
|
|
|
|
with pytest.raises(redis.exceptions.ResponseError):
|
|
|
|
await async_client.execute_command("BGSAVE")
|
|
|
|
|
|
|
|
while await is_saving(async_client):
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
await async_client.execute_command("BGSAVE")
|
|
|
|
with pytest.raises(redis.exceptions.ResponseError):
|
|
|
|
await async_client.execute_command("SAVE")
|
|
|
|
|
|
|
|
while await is_saving(async_client):
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
await async_client.execute_command("SAVE")
|
2024-06-04 14:15:21 +00:00
|
|
|
|
|
|
|
|
|
|
|
@dfly_args(
|
|
|
|
{
|
|
|
|
**BASIC_ARGS,
|
|
|
|
"proactor_threads": 4,
|
|
|
|
"dbfilename": "tiered-entries",
|
2024-07-04 14:10:23 +00:00
|
|
|
"tiered_prefix": "tiering-test-backing",
|
2024-06-04 14:15:21 +00:00
|
|
|
"tiered_offload_threshold": "0.0", # ask offloading loop to offload as much as possible
|
|
|
|
}
|
|
|
|
)
|
|
|
|
async def test_tiered_entries(async_client: aioredis.Redis):
|
|
|
|
"""This test makes sure tieried entries are correctly persisted"""
|
|
|
|
|
|
|
|
# With variance 4: 512 - 8192 we include small and large values
|
|
|
|
await StaticSeeder(key_target=5000, data_size=1024, variance=4, types=["STRING"]).run(
|
|
|
|
async_client
|
|
|
|
)
|
|
|
|
|
|
|
|
# Compute the capture, this brings all items back to memory... so we'll wait for offloading
|
|
|
|
start_capture = await StaticSeeder.capture(async_client)
|
|
|
|
|
|
|
|
# Wait until the total_stashes counter stops increasing, meaning offloading finished
|
|
|
|
last_writes, current_writes = 0, -1
|
|
|
|
while last_writes != current_writes:
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
last_writes = current_writes
|
|
|
|
current_writes = (await async_client.info("TIERED"))["tiered_total_stashes"]
|
|
|
|
|
|
|
|
# Save + flush + load
|
|
|
|
await async_client.execute_command("SAVE", "DF")
|
|
|
|
assert await async_client.flushall()
|
|
|
|
await async_client.execute_command(
|
2024-08-15 11:56:40 +00:00
|
|
|
"DFLY",
|
2024-06-04 14:15:21 +00:00
|
|
|
"LOAD",
|
|
|
|
"tiered-entries-summary.dfs",
|
|
|
|
)
|
|
|
|
|
|
|
|
# Compare captures
|
|
|
|
assert await StaticSeeder.capture(async_client) == start_capture
|
2024-07-04 14:10:23 +00:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.skip("Too heavy")
|
|
|
|
@pytest.mark.opt_only
|
|
|
|
@dfly_args(
|
|
|
|
{
|
|
|
|
**BASIC_ARGS,
|
|
|
|
"proactor_threads": 4,
|
|
|
|
"maxmemory": "1G",
|
|
|
|
"dbfilename": "tiered-entries",
|
|
|
|
"tiered_prefix": "tiering-test-backing",
|
|
|
|
"tiered_offload_threshold": "0.5", # ask to keep below 0.5 * 1G
|
|
|
|
"tiered_storage_write_depth": 50,
|
|
|
|
}
|
|
|
|
)
|
|
|
|
async def test_tiered_entries_throttle(async_client: aioredis.Redis):
|
|
|
|
"""This test makes sure tieried entries are correctly persisted"""
|
|
|
|
await StaticSeeder(key_target=600_000, data_size=4096, variance=1, types=["STRING"]).run(
|
|
|
|
async_client
|
|
|
|
)
|
|
|
|
|
|
|
|
# Compute the capture, this brings all items back to memory... so we'll wait for offloading
|
|
|
|
start_capture = await StaticSeeder.capture(async_client)
|
|
|
|
|
|
|
|
# Save + flush + load
|
|
|
|
await async_client.execute_command("SAVE", "DF")
|
|
|
|
assert await async_client.flushall()
|
|
|
|
|
|
|
|
load_task = asyncio.create_task(
|
|
|
|
async_client.execute_command(
|
2024-08-15 11:56:40 +00:00
|
|
|
"DFLY",
|
2024-07-04 14:10:23 +00:00
|
|
|
"LOAD",
|
|
|
|
"tiered-entries-summary.dfs",
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
while not load_task.done():
|
|
|
|
info = await async_client.info("ALL")
|
|
|
|
# print(info["used_memory_human"], info["used_memory_rss_human"])
|
|
|
|
assert info["used_memory"] < 600e6 # less than 600mb,
|
|
|
|
await asyncio.sleep(0.05)
|
|
|
|
|
|
|
|
await load_task
|
|
|
|
assert await StaticSeeder.capture(async_client) == start_capture
|
2024-07-24 13:07:27 +00:00
|
|
|
|
|
|
|
|
|
|
|
@dfly_args({"proactor_threads": 1})
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
"query",
|
|
|
|
[
|
|
|
|
("HSET"),
|
|
|
|
("SADD"),
|
|
|
|
("ZSET"),
|
|
|
|
("LIST"),
|
|
|
|
],
|
|
|
|
)
|
2024-09-22 10:28:24 +00:00
|
|
|
@pytest.mark.slow
|
2024-07-24 13:07:27 +00:00
|
|
|
async def test_big_value_serialization_memory_limit(df_factory, query):
|
|
|
|
dbfilename = f"dump_{tmp_file_name()}"
|
|
|
|
instance = df_factory.create(dbfilename=dbfilename)
|
|
|
|
instance.start()
|
|
|
|
client = instance.client()
|
|
|
|
|
|
|
|
ten_mb = 10_000_000
|
|
|
|
|
|
|
|
def ten_mb_random_string():
|
|
|
|
return "".join(random.choices(string.ascii_letters, k=ten_mb))
|
|
|
|
|
|
|
|
one_gb = 1_000_000_000 # 1GB
|
|
|
|
|
|
|
|
upper_limit = one_gb * 1.1 # 1GB + 100MB
|
|
|
|
|
|
|
|
i = 0
|
|
|
|
|
|
|
|
while instance.rss < one_gb:
|
|
|
|
if query == "HSET":
|
|
|
|
i = i + 1
|
|
|
|
await client.execute_command(f"HSET foo_key foo_{i} {ten_mb_random_string()}")
|
|
|
|
elif query == "SADD":
|
|
|
|
await client.execute_command(f"SADD foo_key {ten_mb_random_string()}")
|
|
|
|
elif query == "ZSET":
|
|
|
|
await client.execute_command(f"ZADD foo_key {i} {ten_mb_random_string()}")
|
|
|
|
elif query == "LIST":
|
|
|
|
await client.execute_command(f"LPUSH foo_key {ten_mb_random_string()}")
|
|
|
|
|
|
|
|
async def check_memory_usage(instance):
|
|
|
|
while True:
|
|
|
|
assert instance.rss < upper_limit
|
|
|
|
await asyncio.sleep(0.01)
|
|
|
|
|
|
|
|
checker = asyncio.create_task(check_memory_usage(instance))
|
|
|
|
|
|
|
|
await client.execute_command("SAVE")
|
|
|
|
|
|
|
|
checker.cancel()
|
2024-07-25 18:38:09 +00:00
|
|
|
await client.execute_command("FLUSHALL")
|
2024-07-24 13:07:27 +00:00
|
|
|
await client.close()
|
2024-08-21 15:31:03 +00:00
|
|
|
|
|
|
|
|
|
|
|
@dfly_args(
|
|
|
|
{
|
|
|
|
"dir": "{DRAGONFLY_TMP}/",
|
|
|
|
"memcached_port": 11211,
|
|
|
|
"proactor_threads": 4,
|
|
|
|
"dbfilename": "test-MC-flags",
|
|
|
|
}
|
|
|
|
)
|
|
|
|
async def test_mc_flags_saving(memcached_client: MCClient, async_client: aioredis.Redis):
|
|
|
|
async def check_flag(key, flag):
|
|
|
|
res = memcached_client.raw_command("get " + key, "END\r\n").split()
|
|
|
|
# workaround sometimes memcached_client.raw_command returns empty str
|
|
|
|
if len(res) > 2:
|
|
|
|
assert res[2].decode() == str(flag)
|
|
|
|
|
|
|
|
assert memcached_client.set("key1", "value1", noreply=True)
|
|
|
|
assert memcached_client.set("key2", "value1", noreply=True, expire=3600, flags=123456)
|
|
|
|
assert memcached_client.replace("key1", "value2", expire=4000, flags=2, noreply=True)
|
|
|
|
|
|
|
|
await check_flag("key1", 2)
|
|
|
|
await check_flag("key2", 123456)
|
|
|
|
|
|
|
|
await async_client.execute_command("SAVE", "DF")
|
|
|
|
assert await async_client.flushall()
|
|
|
|
|
|
|
|
await async_client.execute_command(
|
|
|
|
"DFLY",
|
|
|
|
"LOAD",
|
|
|
|
"test-MC-flags-summary.dfs",
|
|
|
|
)
|
|
|
|
|
|
|
|
await check_flag("key1", 2)
|
|
|
|
await check_flag("key2", 123456)
|