mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
fix(pytest): timed ticker for simpler conditions (#3242)
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
b34d3ba6da
commit
4cc9834d89
4 changed files with 60 additions and 26 deletions
|
@ -12,6 +12,7 @@ import async_timeout
|
|||
from dataclasses import dataclass
|
||||
from aiohttp import ClientSession
|
||||
|
||||
from .utility import tick_timer
|
||||
from . import dfly_args
|
||||
from .instance import DflyInstance, DflyInstanceFactory
|
||||
|
||||
|
@ -309,9 +310,9 @@ async def test_pubsub_subcommand_for_numsub(async_client: aioredis.Redis):
|
|||
await asyncio.gather(*(resub(s, False, "chan1") for s in subs1))
|
||||
|
||||
# Make sure numsub drops to 0
|
||||
with async_timeout.timeout(1):
|
||||
while (await async_client.pubsub_numsub("chan1"))[0][1] > 0:
|
||||
await asyncio.sleep(0.05)
|
||||
async for numsub, breaker in tick_timer(lambda: async_client.pubsub_numsub("chan1")):
|
||||
with breaker:
|
||||
assert numsub[0][1] == 0
|
||||
|
||||
# Check empty numsub
|
||||
assert await async_client.pubsub_numsub() == []
|
||||
|
|
|
@ -2114,13 +2114,6 @@ async def test_start_replicating_while_save(df_local_factory):
|
|||
await disconnect_clients(c_master, *[c_replica])
|
||||
|
||||
|
||||
async def is_replicaiton_conn_down(conn):
|
||||
role = await conn.execute_command("INFO REPLICATION")
|
||||
# fancy of way of extracting the field master_link_status
|
||||
is_down = role.split("\r\n")[4].split(":")[1]
|
||||
return is_down == "down"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_user_acl_replication(df_local_factory):
|
||||
master = df_local_factory.create(proactor_threads=4)
|
||||
|
@ -2142,11 +2135,9 @@ async def test_user_acl_replication(df_local_factory):
|
|||
|
||||
# revoke acl's from tmp
|
||||
await c_master.execute_command("ACL SETUSER tmp -replconf")
|
||||
async with async_timeout.timeout(5):
|
||||
while True:
|
||||
if await is_replicaiton_conn_down(c_replica):
|
||||
break
|
||||
await asyncio.sleep(1)
|
||||
async for info, breaker in info_tick_timer(c_replica, section="REPLICATION"):
|
||||
with breaker:
|
||||
assert info["master_link_status"] == "down"
|
||||
|
||||
await c_master.execute_command("SET bar foo")
|
||||
|
||||
|
@ -2180,13 +2171,12 @@ async def test_replica_reconnect(df_local_factory, break_conn):
|
|||
await c_master.execute_command("set k 12345")
|
||||
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
|
||||
await wait_available_async(c_replica)
|
||||
|
||||
assert not await is_replicaiton_conn_down(c_replica)
|
||||
assert (await c_replica.info("REPLICATION"))["master_link_status"] == "up"
|
||||
|
||||
# kill existing master, create master with different repl_id but same port
|
||||
master_port = master.port
|
||||
master.stop()
|
||||
assert await is_replicaiton_conn_down(c_replica)
|
||||
assert (await c_replica.info("REPLICATION"))["master_link_status"] == "down"
|
||||
|
||||
master = df_local_factory.create(proactor_threads=1, port=master_port)
|
||||
df_local_factory.start_all([master])
|
||||
|
@ -2198,14 +2188,14 @@ async def test_replica_reconnect(df_local_factory, break_conn):
|
|||
assert await c_replica.execute_command("get k") == "12345"
|
||||
assert await c_master.execute_command("set k 6789")
|
||||
assert await c_replica.execute_command("get k") == "12345"
|
||||
assert await is_replicaiton_conn_down(c_replica)
|
||||
assert (await c_replica.info("REPLICATION"))["master_link_status"] == "down"
|
||||
else:
|
||||
assert await c_master.execute_command("get k") == None
|
||||
assert await c_replica.execute_command("get k") == None
|
||||
assert await c_master.execute_command("set k 6789")
|
||||
await check_all_replicas_finished([c_replica], c_master)
|
||||
assert await c_replica.execute_command("get k") == "6789"
|
||||
assert not await is_replicaiton_conn_down(c_replica)
|
||||
assert (await c_replica.info("REPLICATION"))["master_link_status"] == "up"
|
||||
|
||||
# Force re-replication, assert that it worked
|
||||
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
|
||||
|
|
|
@ -7,6 +7,7 @@ import redis.asyncio as aioredis
|
|||
|
||||
from . import dfly_args
|
||||
from .seeder import StaticSeeder
|
||||
from .utility import info_tick_timer
|
||||
|
||||
|
||||
BASIC_ARGS = {"port": 6379, "proactor_threads": 4, "tiered_prefix": "/tmp/tiering_test_backing"}
|
||||
|
@ -26,12 +27,9 @@ async def test_basic_memory_usage(async_client: aioredis.Redis):
|
|||
await seeder.run(async_client)
|
||||
|
||||
# Wait for tiering stashes
|
||||
with async_timeout.timeout(5):
|
||||
while True:
|
||||
info = await async_client.info("ALL")
|
||||
if info["tiered_entries"] > 195_000:
|
||||
break
|
||||
await asyncio.sleep(0.2)
|
||||
async for info, breaker in info_tick_timer(async_client, section="TIERED"):
|
||||
with breaker:
|
||||
assert info["tiered_entries"] > 195_000
|
||||
|
||||
info = await async_client.info("ALL")
|
||||
assert info["num_entries"] == 200_000
|
||||
|
|
|
@ -40,6 +40,51 @@ def batch_fill_data(client, gen, batch_size=100):
|
|||
client.mset({k: v for k, v, in group})
|
||||
|
||||
|
||||
async def tick_timer(func, timeout=5, step=0.1):
|
||||
"""
|
||||
Async generator with automatic break when all asserts pass
|
||||
|
||||
for object, breaker in tick_timer():
|
||||
with breaker:
|
||||
assert conditions on object
|
||||
|
||||
If the generator times out, the last failed assert is raised
|
||||
"""
|
||||
|
||||
class ticker_breaker:
|
||||
def __init__(self):
|
||||
self.exc = None
|
||||
self.entered = False
|
||||
|
||||
def __enter__(self):
|
||||
self.entered = True
|
||||
|
||||
def __exit__(self, exc_type, exc_value, trace):
|
||||
if exc_value:
|
||||
self.exc = exc_value
|
||||
return True
|
||||
|
||||
last_error = None
|
||||
start = time.time()
|
||||
while time.time() - start < timeout:
|
||||
breaker = ticker_breaker()
|
||||
yield (await func(), breaker)
|
||||
if breaker.entered and not breaker.exc:
|
||||
return
|
||||
|
||||
last_error = breaker.exc
|
||||
await asyncio.sleep(step)
|
||||
|
||||
if last_error:
|
||||
raise RuntimeError("Timed out!") from last_error
|
||||
raise RuntimeError("Timed out!")
|
||||
|
||||
|
||||
async def info_tick_timer(client: aioredis.Redis, section=None, **kwargs):
|
||||
async for x in tick_timer(lambda: client.info(section), **kwargs):
|
||||
yield x
|
||||
|
||||
|
||||
async def wait_available_async(client: aioredis.Redis, timeout=10):
|
||||
"""Block until instance exits loading phase"""
|
||||
its = 0
|
||||
|
|
Loading…
Reference in a new issue