1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: replace session wide fixtures with scope (#3251)

* chore: replace session wide fixtures with scope
This commit is contained in:
Kostas Kyrimis 2024-07-02 10:26:26 +03:00 committed by GitHub
parent 50766fdbb3
commit 5956275818
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 323 additions and 339 deletions

View file

@ -129,8 +129,8 @@ async def test_acl_commands(async_client):
@pytest.mark.asyncio
async def test_acl_cat_commands_multi_exec_squash(df_local_factory):
df = df_local_factory.create(multi_exec_squash=True, port=1111)
async def test_acl_cat_commands_multi_exec_squash(df_factory):
df = df_factory.create(multi_exec_squash=True, port=1111)
df.start()
@ -305,10 +305,10 @@ def create_temp_file(content, tmp_dir):
@pytest.mark.asyncio
@dfly_args({"port": 1111})
async def test_bad_acl_file(df_local_factory, tmp_dir):
async def test_bad_acl_file(df_factory, tmp_dir):
acl = create_temp_file("ACL SETUSER kostas ON >mypass +@WRONG", tmp_dir)
df = df_local_factory.create(aclfile=acl)
df = df_factory.create(aclfile=acl)
df.start()
@ -322,9 +322,9 @@ async def test_bad_acl_file(df_local_factory, tmp_dir):
@pytest.mark.asyncio
@dfly_args({"port": 1111})
async def test_good_acl_file(df_local_factory, tmp_dir):
async def test_good_acl_file(df_factory, tmp_dir):
acl = create_temp_file("USER MrFoo ON >mypass", tmp_dir)
df = df_local_factory.create(aclfile=acl)
df = df_factory.create(aclfile=acl)
df.start()
client = df.client()
@ -416,8 +416,8 @@ async def test_acl_log(async_client):
@pytest.mark.asyncio
@dfly_args({"port": 1111, "admin_port": 1112, "requirepass": "mypass"})
async def test_require_pass(df_local_factory):
df = df_local_factory.create()
async def test_require_pass(df_factory):
df = df_factory.create()
df.start()
client = aioredis.Redis(port=df.port)
@ -539,7 +539,7 @@ async def test_acl_keys(async_client):
@pytest.mark.asyncio
async def default_user_bug(df_local_factory):
async def default_user_bug(df_factory):
df.start()
client = aioredis.Redis(port=df.port)

View file

@ -23,11 +23,11 @@ def run_cluster_mgr(args):
@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"})
async def test_cluster_mgr(df_local_factory):
async def test_cluster_mgr(df_factory):
NODES = 3
masters = [df_local_factory.create(port=BASE_PORT + i) for i in range(NODES)]
replicas = [df_local_factory.create(port=BASE_PORT + 100 + i) for i in range(NODES)]
df_local_factory.start_all([*masters, *replicas])
masters = [df_factory.create(port=BASE_PORT + i) for i in range(NODES)]
replicas = [df_factory.create(port=BASE_PORT + 100 + i) for i in range(NODES)]
df_factory.start_all([*masters, *replicas])
# Initialize a cluster (all slots belong to node 0)
assert run_cluster_mgr(["--action=config_single_remote", f"--target_port={BASE_PORT}"])

View file

@ -250,11 +250,11 @@ def verify_slots_result(port: int, answer: list, replicas) -> bool:
@dfly_args({"proactor_threads": 4, "cluster_mode": "emulated"})
async def test_emulated_cluster_with_replicas(df_local_factory):
master = df_local_factory.create(port=BASE_PORT)
replicas = [df_local_factory.create(port=BASE_PORT + i, logtostdout=True) for i in range(1, 3)]
async def test_emulated_cluster_with_replicas(df_factory):
master = df_factory.create(port=BASE_PORT)
replicas = [df_factory.create(port=BASE_PORT + i, logtostdout=True) for i in range(1, 3)]
df_local_factory.start_all([master, *replicas])
df_factory.start_all([master, *replicas])
c_master = aioredis.Redis(port=master.port)
master_id = (await c_master.execute_command("dflycluster myid")).decode("utf-8")
@ -381,9 +381,9 @@ Also add keys to each of them that are *not* moved, and see that they are unaffe
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "cluster_node_id": "inigo montoya"})
async def test_cluster_node_id(df_local_factory: DflyInstanceFactory):
node = df_local_factory.create(port=BASE_PORT)
df_local_factory.start_all([node])
async def test_cluster_node_id(df_factory: DflyInstanceFactory):
node = df_factory.create(port=BASE_PORT)
df_factory.start_all([node])
conn = node.client()
assert "inigo montoya" == await get_node_id(conn)
@ -392,14 +392,13 @@ async def test_cluster_node_id(df_local_factory: DflyInstanceFactory):
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_slot_ownership_changes(df_local_factory: DflyInstanceFactory):
async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory):
# Start and configure cluster with 2 nodes
nodes = [
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
for i in range(2)
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
]
df_local_factory.start_all(nodes)
df_factory.start_all(nodes)
c_nodes = [node.client() for node in nodes]
c_nodes_admin = [node.admin_client() for node in nodes]
@ -523,11 +522,11 @@ async def test_cluster_slot_ownership_changes(df_local_factory: DflyInstanceFact
# Tests that master commands to the replica are applied regardless of slot ownership
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_replica_sets_non_owned_keys(df_local_factory: DflyInstanceFactory):
async def test_cluster_replica_sets_non_owned_keys(df_factory: DflyInstanceFactory):
# Start and configure cluster with 1 master and 1 replica, both own all slots
master = df_local_factory.create(admin_port=BASE_PORT + 1000)
replica = df_local_factory.create(admin_port=BASE_PORT + 1001)
df_local_factory.start_all([master, replica])
master = df_factory.create(admin_port=BASE_PORT + 1000)
replica = df_factory.create(admin_port=BASE_PORT + 1001)
df_factory.start_all([master, replica])
async with master.client() as c_master, master.admin_client() as c_master_admin, replica.client() as c_replica, replica.admin_client() as c_replica_admin:
master_id = await get_node_id(c_master_admin)
@ -633,11 +632,11 @@ async def test_cluster_replica_sets_non_owned_keys(df_local_factory: DflyInstanc
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_flush_slots_after_config_change(df_local_factory: DflyInstanceFactory):
async def test_cluster_flush_slots_after_config_change(df_factory: DflyInstanceFactory):
# Start and configure cluster with 1 master and 1 replica, both own all slots
master = df_local_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 1000)
replica = df_local_factory.create(port=BASE_PORT + 1, admin_port=BASE_PORT + 1001)
df_local_factory.start_all([master, replica])
master = df_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 1000)
replica = df_factory.create(port=BASE_PORT + 1, admin_port=BASE_PORT + 1001)
df_factory.start_all([master, replica])
c_master = master.client()
c_master_admin = master.admin_client()
@ -790,26 +789,26 @@ async def test_cluster_blocking_command(df_server):
@pytest.mark.parametrize("set_cluster_node_id", [True, False])
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_native_client(
df_local_factory: DflyInstanceFactory,
df_factory: DflyInstanceFactory,
df_seeder_factory: DflySeederFactory,
set_cluster_node_id: bool,
):
# Start and configure cluster with 3 masters and 3 replicas
masters = [
df_local_factory.create(
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
cluster_node_id=f"master{i}" if set_cluster_node_id else "",
)
for i in range(3)
]
df_local_factory.start_all(masters)
df_factory.start_all(masters)
c_masters = [aioredis.Redis(port=master.port) for master in masters]
c_masters_admin = [master.admin_client() for master in masters]
master_ids = await asyncio.gather(*(get_node_id(c) for c in c_masters_admin))
replicas = [
df_local_factory.create(
df_factory.create(
port=BASE_PORT + 100 + i,
admin_port=BASE_PORT + i + 1100,
cluster_node_id=f"replica{i}" if set_cluster_node_id else "",
@ -817,7 +816,7 @@ async def test_cluster_native_client(
)
for i in range(3)
]
df_local_factory.start_all(replicas)
df_factory.start_all(replicas)
c_replicas = [replica.client() for replica in replicas]
await asyncio.gather(*(wait_available_async(c) for c in c_replicas))
c_replicas_admin = [replica.admin_client() for replica in replicas]
@ -986,14 +985,13 @@ async def test_cluster_native_client(
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_config_consistency(df_local_factory: DflyInstanceFactory):
async def test_config_consistency(df_factory: DflyInstanceFactory):
# Check slot migration from one node to another
instances = [
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
for i in range(2)
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
]
df_local_factory.start_all(instances)
df_factory.start_all(instances)
nodes = [(await create_node_info(instance)) for instance in instances]
nodes[0].slots = [(0, 5259)]
@ -1036,11 +1034,11 @@ async def test_config_consistency(df_local_factory: DflyInstanceFactory):
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_flushall_during_migration(
df_local_factory: DflyInstanceFactory, df_seeder_factory
df_factory: DflyInstanceFactory, df_seeder_factory
):
# Check data migration from one node to another
instances = [
df_local_factory.create(
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
vmodule="cluster_family=9,cluster_slot_migration=9,outgoing_slot_migration=9",
@ -1049,7 +1047,7 @@ async def test_cluster_flushall_during_migration(
for i in range(2)
]
df_local_factory.start_all(instances)
df_factory.start_all(instances)
nodes = [(await create_node_info(instance)) for instance in instances]
nodes[0].slots = [(0, 16383)]
@ -1088,14 +1086,13 @@ async def test_cluster_flushall_during_migration(
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory):
async def test_cluster_data_migration(df_factory: DflyInstanceFactory):
# Check data migration from one node to another
instances = [
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
for i in range(2)
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
]
df_local_factory.start_all(instances)
df_factory.start_all(instances)
nodes = [(await create_node_info(instance)) for instance in instances]
nodes[0].slots = [(0, 9000)]
@ -1150,13 +1147,12 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory):
@dfly_args({"proactor_threads": 2, "cluster_mode": "yes", "cache_mode": "true"})
async def test_migration_with_key_ttl(df_local_factory):
async def test_migration_with_key_ttl(df_factory):
instances = [
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
for i in range(2)
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
]
df_local_factory.start_all(instances)
df_factory.start_all(instances)
nodes = [(await create_node_info(instance)) for instance in instances]
nodes[0].slots = [(0, 16383)]
@ -1202,13 +1198,12 @@ async def test_migration_with_key_ttl(df_local_factory):
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_network_disconnect_during_migration(df_local_factory, df_seeder_factory):
async def test_network_disconnect_during_migration(df_factory, df_seeder_factory):
instances = [
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
for i in range(2)
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
]
df_local_factory.start_all(instances)
df_factory.start_all(instances)
nodes = [(await create_node_info(instance)) for instance in instances]
nodes[0].slots = [(0, 16383)]
@ -1260,21 +1255,21 @@ async def test_network_disconnect_during_migration(df_local_factory, df_seeder_f
)
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_fuzzymigration(
df_local_factory: DflyInstanceFactory,
df_factory: DflyInstanceFactory,
df_seeder_factory,
node_count: int,
segments: int,
keys: int,
):
instances = [
df_local_factory.create(
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
vmodule="cluster_family=9,cluster_slot_migration=9",
)
for i in range(node_count)
]
df_local_factory.start_all(instances)
df_factory.start_all(instances)
nodes = [(await create_node_info(instance)) for instance in instances]
@ -1406,13 +1401,12 @@ async def test_cluster_fuzzymigration(
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_config_reapply(df_local_factory: DflyInstanceFactory):
async def test_cluster_config_reapply(df_factory: DflyInstanceFactory):
"""Check data migration from one node to another."""
instances = [
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
for i in range(2)
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
]
df_local_factory.start_all(instances)
df_factory.start_all(instances)
nodes = [await create_node_info(instance) for instance in instances]
nodes[0].slots = [(0, 8000)]
@ -1458,13 +1452,12 @@ async def test_cluster_config_reapply(df_local_factory: DflyInstanceFactory):
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory):
async def test_cluster_migration_cancel(df_factory: DflyInstanceFactory):
"""Check data migration from one node to another."""
instances = [
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
for i in range(2)
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
]
df_local_factory.start_all(instances)
df_factory.start_all(instances)
nodes = [await create_node_info(instance) for instance in instances]
nodes[0].slots = [(0, 8000)]
@ -1537,7 +1530,7 @@ async def await_no_lag(client: aioredis.Redis, timeout=10):
@dfly_args({"proactor_threads": 4})
async def test_replicate_cluster(df_local_factory: DflyInstanceFactory, df_seeder_factory):
async def test_replicate_cluster(df_factory: DflyInstanceFactory, df_seeder_factory):
"""
Create dragonfly cluster of 2 nodes.
Create additional dragonfly server in emulated mode.
@ -1545,13 +1538,13 @@ async def test_replicate_cluster(df_local_factory: DflyInstanceFactory, df_seede
Send traffic before replication start and while replicating.
Promote the replica to master and check data consistency between cluster and single node.
"""
replica = df_local_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
cluster_nodes = [
df_local_factory.create(admin_port=BASE_PORT + i + 1, cluster_mode="yes") for i in range(2)
df_factory.create(admin_port=BASE_PORT + i + 1, cluster_mode="yes") for i in range(2)
]
# Start instances and connect clients
df_local_factory.start_all(cluster_nodes + [replica])
df_factory.start_all(cluster_nodes + [replica])
c_nodes = [node.client() for node in cluster_nodes]
c_replica = replica.client()
@ -1625,9 +1618,7 @@ async def await_stable_sync(m_client: aioredis.Redis, replica_port, timeout=10):
@dfly_args({"proactor_threads": 4})
async def test_replicate_disconnect_cluster(
df_local_factory: DflyInstanceFactory, df_seeder_factory
):
async def test_replicate_disconnect_cluster(df_factory: DflyInstanceFactory, df_seeder_factory):
"""
Create dragonfly cluster of 2 nodes and additional dragonfly server in emulated mode.
Populate the cluster with data
@ -1636,13 +1627,13 @@ async def test_replicate_disconnect_cluster(
Promote replica to master
Compare cluster data and replica data
"""
replica = df_local_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
cluster_nodes = [
df_local_factory.create(admin_port=BASE_PORT + i + 1, cluster_mode="yes") for i in range(2)
df_factory.create(admin_port=BASE_PORT + i + 1, cluster_mode="yes") for i in range(2)
]
# Start instances and connect clients
df_local_factory.start_all(cluster_nodes + [replica])
df_factory.start_all(cluster_nodes + [replica])
c_nodes = [node.client() for node in cluster_nodes]
c_replica = replica.client()
@ -1743,7 +1734,7 @@ async def await_eq_offset(client: aioredis.Redis, timeout=20):
@dfly_args({"proactor_threads": 4})
async def test_replicate_redis_cluster(redis_cluster, df_local_factory, df_seeder_factory):
async def test_replicate_redis_cluster(redis_cluster, df_factory, df_seeder_factory):
"""
Create redis cluster of 3 nodes.
Create dragonfly server in emulated mode.
@ -1751,10 +1742,10 @@ async def test_replicate_redis_cluster(redis_cluster, df_local_factory, df_seede
Send traffic before replication start and while replicating.
Promote the replica to master and check data consistency between cluster and single dragonfly node.
"""
replica = df_local_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
# Start instances and connect clients
df_local_factory.start_all([replica])
df_factory.start_all([replica])
redis_cluster_nodes = redis_cluster
node_clients = [
@ -1801,9 +1792,7 @@ async def test_replicate_redis_cluster(redis_cluster, df_local_factory, df_seede
@dfly_args({"proactor_threads": 4})
async def test_replicate_disconnect_redis_cluster(
redis_cluster, df_local_factory, df_seeder_factory
):
async def test_replicate_disconnect_redis_cluster(redis_cluster, df_factory, df_seeder_factory):
"""
Create redis cluster of 3 nodes.
Create dragonfly server in emulated mode.
@ -1813,10 +1802,10 @@ async def test_replicate_disconnect_redis_cluster(
Send more traffic
Promote the replica to master and check data consistency between cluster and single dragonfly node.
"""
replica = df_local_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
# Start instances and connect clients
df_local_factory.start_all([replica])
df_factory.start_all([replica])
redis_cluster_nodes = redis_cluster
node_clients = [

View file

@ -77,7 +77,7 @@ def parse_args(args: List[str]) -> Dict[str, Union[str, None]]:
return args_dict
@pytest.fixture(scope="session", params=[{}])
@pytest.fixture(scope="function", params=[{}])
def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory:
"""
Create an instance factory with supplied params.
@ -106,15 +106,7 @@ def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory:
factory.stop_all()
# Differs from df_factory in that its scope is function
@pytest.fixture(scope="function")
def df_local_factory(df_factory: DflyInstanceFactory):
factory = DflyInstanceFactory(df_factory.params, df_factory.args)
yield factory
factory.stop_all()
@pytest.fixture(scope="session")
def df_server(df_factory: DflyInstanceFactory) -> DflyInstance:
"""
Start the default Dragonfly server that will be used for the default pools
@ -149,7 +141,7 @@ def df_server(df_factory: DflyInstanceFactory) -> DflyInstance:
print("Cluster clients left: ", len(clients_left))
@pytest.fixture(scope="class")
@pytest.fixture(scope="function")
def connection(df_server: DflyInstance):
return redis.Connection(port=df_server.port)

View file

@ -577,8 +577,8 @@ async def test_large_cmd(async_client: aioredis.Redis):
assert len(res) == MAX_ARR_SIZE
async def test_reject_non_tls_connections_on_tls(with_tls_server_args, df_local_factory):
server: DflyInstance = df_local_factory.create(
async def test_reject_non_tls_connections_on_tls(with_tls_server_args, df_factory):
server: DflyInstance = df_factory.create(
no_tls_on_admin_port="true",
admin_port=1111,
port=1211,
@ -597,8 +597,8 @@ async def test_reject_non_tls_connections_on_tls(with_tls_server_args, df_local_
await client.close()
async def test_tls_insecure(with_ca_tls_server_args, with_tls_client_args, df_local_factory):
server = df_local_factory.create(port=BASE_PORT, **with_ca_tls_server_args)
async def test_tls_insecure(with_ca_tls_server_args, with_tls_client_args, df_factory):
server = df_factory.create(port=BASE_PORT, **with_ca_tls_server_args)
server.start()
client = aioredis.Redis(port=server.port, **with_tls_client_args, ssl_cert_reqs=None)
@ -606,8 +606,8 @@ async def test_tls_insecure(with_ca_tls_server_args, with_tls_client_args, df_lo
await client.close()
async def test_tls_full_auth(with_ca_tls_server_args, with_ca_tls_client_args, df_local_factory):
server = df_local_factory.create(port=BASE_PORT, **with_ca_tls_server_args)
async def test_tls_full_auth(with_ca_tls_server_args, with_ca_tls_client_args, df_factory):
server = df_factory.create(port=BASE_PORT, **with_ca_tls_server_args)
server.start()
client = aioredis.Redis(port=server.port, **with_ca_tls_client_args)
@ -616,9 +616,9 @@ async def test_tls_full_auth(with_ca_tls_server_args, with_ca_tls_client_args, d
async def test_tls_reject(
with_ca_tls_server_args, with_tls_client_args, df_local_factory: DflyInstanceFactory
with_ca_tls_server_args, with_tls_client_args, df_factory: DflyInstanceFactory
):
server: DflyInstance = df_local_factory.create(port=BASE_PORT, **with_ca_tls_server_args)
server: DflyInstance = df_factory.create(port=BASE_PORT, **with_ca_tls_server_args)
server.start()
client = server.client(**with_tls_client_args, ssl_cert_reqs=None)
@ -678,8 +678,8 @@ async def test_squashed_pipeline_multi(async_client: aioredis.Redis):
await p.execute()
async def test_unix_domain_socket(df_local_factory, tmp_dir):
server = df_local_factory.create(proactor_threads=1, port=BASE_PORT, unixsocket="./df.sock")
async def test_unix_domain_socket(df_factory, tmp_dir):
server = df_factory.create(proactor_threads=1, port=BASE_PORT, unixsocket="./df.sock")
server.start()
await asyncio.sleep(0.5)
@ -688,8 +688,8 @@ async def test_unix_domain_socket(df_local_factory, tmp_dir):
assert await r.ping()
async def test_unix_socket_only(df_local_factory, tmp_dir):
server = df_local_factory.create(proactor_threads=1, port=0, unixsocket="./df.sock")
async def test_unix_socket_only(df_factory, tmp_dir):
server = df_factory.create(proactor_threads=1, port=0, unixsocket="./df.sock")
server._start()
await asyncio.sleep(1)
@ -789,14 +789,14 @@ async def test_multiple_blocking_commands_client_pause(async_client: aioredis.Re
async def test_tls_when_read_write_is_interleaved(
with_ca_tls_server_args, with_ca_tls_client_args, df_local_factory
with_ca_tls_server_args, with_ca_tls_client_args, df_factory
):
"""
This test covers a deadlock bug in helio and TlsSocket when a client connection renegotiated a
handshake without reading its pending data from the socket.
This is a weak test case and from our local experiments it deadlocked 30% of the test runs
"""
server: DflyInstance = df_local_factory.create(
server: DflyInstance = df_factory.create(
port=1211, **with_ca_tls_server_args, proactor_threads=1
)
# TODO(kostas): to fix the deadlock in the test

View file

@ -27,8 +27,8 @@ def export_dfly_password() -> str:
yield pwd
async def test_password(df_local_factory, export_dfly_password):
with df_local_factory.create() as dfly:
async def test_password(df_factory, export_dfly_password):
with df_factory.create() as dfly:
# Expect password form environment variable
with pytest.raises(redis.exceptions.AuthenticationError):
async with aioredis.Redis(port=dfly.port) as client:
@ -38,7 +38,7 @@ async def test_password(df_local_factory, export_dfly_password):
# --requirepass should take precedence over environment variable
requirepass = "requirepass"
with df_local_factory.create(requirepass=requirepass) as dfly:
with df_factory.create(requirepass=requirepass) as dfly:
# Expect password form flag
with pytest.raises(redis.exceptions.AuthenticationError):
async with aioredis.Redis(port=dfly.port, password=export_dfly_password) as client:
@ -79,16 +79,16 @@ async def test_txq_ooo(async_client: aioredis.Redis, df_server):
)
async def test_arg_from_environ_overwritten_by_cli(df_local_factory):
async def test_arg_from_environ_overwritten_by_cli(df_factory):
with EnvironCntx(DFLY_port="6378"):
with df_local_factory.create(port=6377):
with df_factory.create(port=6377):
client = aioredis.Redis(port=6377)
await client.ping()
async def test_arg_from_environ(df_local_factory):
async def test_arg_from_environ(df_factory):
with EnvironCntx(DFLY_requirepass="pass"):
with df_local_factory.create() as dfly:
with df_factory.create() as dfly:
# Expect password from environment variable
with pytest.raises(redis.exceptions.AuthenticationError):
client = aioredis.Redis(port=dfly.port)
@ -98,17 +98,17 @@ async def test_arg_from_environ(df_local_factory):
await client.ping()
async def test_unknown_dfly_env(df_local_factory, export_dfly_password):
async def test_unknown_dfly_env(df_factory, export_dfly_password):
with EnvironCntx(DFLY_abcdef="xyz"):
with pytest.raises(DflyStartException):
dfly = df_local_factory.create()
dfly = df_factory.create()
dfly.start()
async def test_restricted_commands(df_local_factory):
async def test_restricted_commands(df_factory):
# Restrict GET and SET, then verify non-admin clients are blocked from
# using these commands, though admin clients can use them.
with df_local_factory.create(restricted_commands="get,set", admin_port=1112) as server:
with df_factory.create(restricted_commands="get,set", admin_port=1112) as server:
async with aioredis.Redis(port=server.port) as client:
with pytest.raises(redis.exceptions.ResponseError):
await client.get("foo")
@ -122,11 +122,11 @@ async def test_restricted_commands(df_local_factory):
@pytest.mark.asyncio
async def test_reply_guard_oom(df_local_factory, df_seeder_factory):
master = df_local_factory.create(
async def test_reply_guard_oom(df_factory, df_seeder_factory):
master = df_factory.create(
proactor_threads=1, cache_mode="true", maxmemory="256mb", enable_heartbeat_eviction="false"
)
df_local_factory.start_all([master])
df_factory.start_all([master])
c_master = master.client()
await c_master.execute_command("DEBUG POPULATE 6000 size 44000")

View file

@ -15,14 +15,14 @@ from .utility import *
("STRING", 6_000_000, 1000, 1),
],
)
async def test_rss_used_mem_gap(df_local_factory, type, keys, val_size, elements):
async def test_rss_used_mem_gap(df_factory, type, keys, val_size, elements):
# Create a Dragonfly and fill it up with `type` until it reaches `min_rss`, then make sure that
# the gap between used_memory and rss is no more than `max_unaccounted_ratio`.
min_rss = 5 * 1024 * 1024 * 1024 # 5gb
max_unaccounted = 200 * 1024 * 1024 # 200mb
df_server = df_local_factory.create()
df_local_factory.start_all([df_server])
df_server = df_factory.create()
df_factory.start_all([df_server])
client = df_server.client()
await asyncio.sleep(1) # Wait for another RSS heartbeat update in Dragonfly

View file

@ -53,7 +53,7 @@ full_sync_replication_specs = [
@pytest.mark.parametrize("t_replicas, seeder_config", full_sync_replication_specs)
async def test_replication_full_sync(
df_local_factory, df_seeder_factory, redis_server, t_replicas, seeder_config, port_picker
df_factory, df_seeder_factory, redis_server, t_replicas, seeder_config, port_picker
):
master = redis_server
c_master = aioredis.Redis(port=master.port)
@ -62,7 +62,7 @@ async def test_replication_full_sync(
seeder = df_seeder_factory.create(port=master.port, **seeder_config)
await seeder.run(target_deviation=0.1)
replica = df_local_factory.create(
replica = df_factory.create(
port=port_picker.get_available_port(), proactor_threads=t_replicas[0]
)
replica.start()
@ -88,13 +88,13 @@ stable_sync_replication_specs = [
@pytest.mark.parametrize("t_replicas, seeder_config", stable_sync_replication_specs)
async def test_replication_stable_sync(
df_local_factory, df_seeder_factory, redis_server, t_replicas, seeder_config, port_picker
df_factory, df_seeder_factory, redis_server, t_replicas, seeder_config, port_picker
):
master = redis_server
c_master = aioredis.Redis(port=master.port)
assert await c_master.ping()
replica = df_local_factory.create(
replica = df_factory.create(
port=port_picker.get_available_port(), proactor_threads=t_replicas[0]
)
replica.start()
@ -126,7 +126,7 @@ replication_specs = [
@pytest.mark.parametrize("t_replicas, seeder_config", replication_specs)
async def test_redis_replication_all(
df_local_factory: DflyInstanceFactory,
df_factory: DflyInstanceFactory,
df_seeder_factory,
redis_server,
t_replicas,
@ -138,7 +138,7 @@ async def test_redis_replication_all(
assert await c_master.ping()
replicas = [
df_local_factory.create(port=port_picker.get_available_port(), proactor_threads=t)
df_factory.create(port=port_picker.get_available_port(), proactor_threads=t)
for i, t in enumerate(t_replicas)
]
@ -147,7 +147,7 @@ async def test_redis_replication_all(
await seeder.run(target_deviation=0.1)
# Start replicas
df_local_factory.start_all(replicas)
df_factory.start_all(replicas)
c_replicas = [replica.client() for replica in replicas]
@ -189,7 +189,7 @@ master_disconnect_cases = [
@pytest.mark.parametrize("t_replicas, t_disconnect, seeder_config", master_disconnect_cases)
async def test_redis_master_restart(
df_local_factory,
df_factory,
df_seeder_factory,
redis_server,
t_replicas,
@ -202,7 +202,7 @@ async def test_redis_master_restart(
assert await c_master.ping()
replicas = [
df_local_factory.create(port=port_picker.get_available_port(), proactor_threads=t)
df_factory.create(port=port_picker.get_available_port(), proactor_threads=t)
for i, t in enumerate(t_replicas)
]
@ -211,7 +211,7 @@ async def test_redis_master_restart(
await seeder.run(target_deviation=0.1)
# Start replicas
df_local_factory.start_all(replicas)
df_factory.start_all(replicas)
c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas]
@ -259,7 +259,7 @@ master_disconnect_cases = [
@pytest.mark.parametrize("t_replicas, seeder_config", master_disconnect_cases)
async def test_disconnect_master(
df_local_factory,
df_factory,
df_seeder_factory,
redis_server,
t_replicas,
@ -275,7 +275,7 @@ async def test_disconnect_master(
proxy_task = asyncio.create_task(proxy.serve())
replicas = [
df_local_factory.create(port=port_picker.get_available_port(), proactor_threads=t)
df_factory.create(port=port_picker.get_available_port(), proactor_threads=t)
for i, t in enumerate(t_replicas)
]
@ -284,7 +284,7 @@ async def test_disconnect_master(
await seeder.run(target_deviation=0.1)
# Start replicas
df_local_factory.start_all(replicas)
df_factory.start_all(replicas)
c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas]

View file

@ -55,21 +55,21 @@ Test full replication pipeline. Test full sync with streaming changes and stable
)
@pytest.mark.parametrize("mode", [({}), ({"cache_mode": "true"})])
async def test_replication_all(
df_local_factory: DflyInstanceFactory, t_master, t_replicas, seeder_config, stream_target, mode
df_factory: DflyInstanceFactory, t_master, t_replicas, seeder_config, stream_target, mode
):
if mode:
mode["maxmemory"] = str(t_master * 256) + "mb"
master = df_local_factory.create(admin_port=ADMIN_PORT, proactor_threads=t_master, **mode)
master = df_factory.create(admin_port=ADMIN_PORT, proactor_threads=t_master, **mode)
replicas = [
df_local_factory.create(admin_port=ADMIN_PORT + i + 1, proactor_threads=t)
df_factory.create(admin_port=ADMIN_PORT + i + 1, proactor_threads=t)
for i, t in enumerate(t_replicas)
]
from_admin_port = random.choice([True, False])
# Start instances and connect clients
df_local_factory.start_all([master] + replicas)
df_factory.start_all([master] + replicas)
c_master = master.client()
c_replicas = [replica.client() for replica in replicas]
@ -180,7 +180,7 @@ disconnect_cases = [
@pytest.mark.asyncio
@pytest.mark.parametrize("t_master, t_crash_fs, t_crash_ss, t_disonnect, n_keys", disconnect_cases)
async def test_disconnect_replica(
df_local_factory: DflyInstanceFactory,
df_factory: DflyInstanceFactory,
df_seeder_factory,
t_master,
t_crash_fs,
@ -188,9 +188,9 @@ async def test_disconnect_replica(
t_disonnect,
n_keys,
):
master = df_local_factory.create(proactor_threads=t_master)
master = df_factory.create(proactor_threads=t_master)
replicas = [
(df_local_factory.create(proactor_threads=t), crash_fs)
(df_factory.create(proactor_threads=t), crash_fs)
for i, (t, crash_fs) in enumerate(
chain(
zip(t_crash_fs, repeat(DISCONNECT_CRASH_FULL_SYNC)),
@ -205,7 +205,7 @@ async def test_disconnect_replica(
c_master = master.client(single_connection_client=True)
# Start replicas and create clients
df_local_factory.start_all([replica for replica, _ in replicas])
df_factory.start_all([replica for replica, _ in replicas])
c_replicas = [(replica, replica.client(), crash_type) for replica, crash_type in replicas]
@ -317,12 +317,12 @@ master_crash_cases = [
@pytest.mark.slow
@pytest.mark.parametrize("t_master, t_replicas, n_random_crashes, n_keys", master_crash_cases)
async def test_disconnect_master(
df_local_factory, df_seeder_factory, t_master, t_replicas, n_random_crashes, n_keys
df_factory, df_seeder_factory, t_master, t_replicas, n_random_crashes, n_keys
):
master = df_local_factory.create(port=1111, proactor_threads=t_master)
replicas = [df_local_factory.create(proactor_threads=t) for i, t in enumerate(t_replicas)]
master = df_factory.create(port=1111, proactor_threads=t_master)
replicas = [df_factory.create(proactor_threads=t) for i, t in enumerate(t_replicas)]
df_local_factory.start_all(replicas)
df_factory.start_all(replicas)
c_replicas = [replica.client() for replica in replicas]
seeder = df_seeder_factory.create(port=master.port, keys=n_keys, dbcount=2)
@ -386,12 +386,10 @@ rotating_master_cases = [(4, [4, 4, 4, 4], dict(keys=2_000, dbcount=4))]
@pytest.mark.asyncio
@pytest.mark.slow
@pytest.mark.parametrize("t_replica, t_masters, seeder_config", rotating_master_cases)
async def test_rotating_masters(
df_local_factory, df_seeder_factory, t_replica, t_masters, seeder_config
):
replica = df_local_factory.create(proactor_threads=t_replica)
masters = [df_local_factory.create(proactor_threads=t) for i, t in enumerate(t_masters)]
df_local_factory.start_all([replica] + masters)
async def test_rotating_masters(df_factory, df_seeder_factory, t_replica, t_masters, seeder_config):
replica = df_factory.create(proactor_threads=t_replica)
masters = [df_factory.create(proactor_threads=t) for i, t in enumerate(t_masters)]
df_factory.start_all([replica] + masters)
seeders = [df_seeder_factory.create(port=m.port, **seeder_config) for m in masters]
@ -423,9 +421,7 @@ async def test_rotating_masters(
@pytest.mark.asyncio
@pytest.mark.slow
async def test_cancel_replication_immediately(
df_local_factory, df_seeder_factory: DflySeederFactory
):
async def test_cancel_replication_immediately(df_factory, df_seeder_factory: DflySeederFactory):
"""
Issue 100 replication commands. This checks that the replication state
machine can handle cancellation well.
@ -435,9 +431,9 @@ async def test_cancel_replication_immediately(
"""
COMMANDS_TO_ISSUE = 100
replica = df_local_factory.create()
master = df_local_factory.create()
df_local_factory.start_all([replica, master])
replica = df_factory.create()
master = df_factory.create()
df_factory.start_all([replica, master])
seeder = df_seeder_factory.create(port=master.port)
c_replica = replica.client(socket_timeout=80)
@ -483,9 +479,9 @@ Check replica keys at the end.
@pytest.mark.asyncio
async def test_flushall(df_local_factory):
master = df_local_factory.create(proactor_threads=4)
replica = df_local_factory.create(proactor_threads=2)
async def test_flushall(df_factory):
master = df_factory.create(proactor_threads=4)
replica = df_factory.create(proactor_threads=2)
master.start()
replica.start()
@ -534,12 +530,12 @@ Test journal rewrites.
@dfly_args({"proactor_threads": 4})
@pytest.mark.asyncio
async def test_rewrites(df_local_factory):
async def test_rewrites(df_factory):
CLOSE_TIMESTAMP = int(time.time()) + 100
CLOSE_TIMESTAMP_MS = CLOSE_TIMESTAMP * 1000
master = df_local_factory.create()
replica = df_local_factory.create()
master = df_factory.create()
replica = df_factory.create()
master.start()
replica.start()
@ -710,11 +706,11 @@ Test automatic replication of expiry.
@dfly_args({"proactor_threads": 4})
@pytest.mark.asyncio
async def test_expiry(df_local_factory: DflyInstanceFactory, n_keys=1000):
master = df_local_factory.create()
replica = df_local_factory.create()
async def test_expiry(df_factory: DflyInstanceFactory, n_keys=1000):
master = df_factory.create()
replica = df_factory.create()
df_local_factory.start_all([master, replica])
df_factory.start_all([master, replica])
c_master = master.client()
c_replica = replica.client()
@ -780,10 +776,10 @@ async def test_expiry(df_local_factory: DflyInstanceFactory, n_keys=1000):
@dfly_args({"proactor_threads": 4})
async def test_simple_scripts(df_local_factory: DflyInstanceFactory):
master = df_local_factory.create()
replicas = [df_local_factory.create() for _ in range(2)]
df_local_factory.start_all([master] + replicas)
async def test_simple_scripts(df_factory: DflyInstanceFactory):
master = df_factory.create()
replicas = [df_factory.create() for _ in range(2)]
df_factory.start_all([master] + replicas)
c_replicas = [replica.client() for replica in replicas]
c_master = master.client()
@ -851,11 +847,11 @@ return 'OK'
@pytest.mark.skip(reason="Failing")
@pytest.mark.asyncio
@pytest.mark.parametrize("t_master, t_replicas, num_ops, num_keys, num_par, flags", script_cases)
async def test_scripts(df_local_factory, t_master, t_replicas, num_ops, num_keys, num_par, flags):
master = df_local_factory.create(proactor_threads=t_master)
replicas = [df_local_factory.create(proactor_threads=t) for i, t in enumerate(t_replicas)]
async def test_scripts(df_factory, t_master, t_replicas, num_ops, num_keys, num_par, flags):
master = df_factory.create(proactor_threads=t_master)
replicas = [df_factory.create(proactor_threads=t) for i, t in enumerate(t_replicas)]
df_local_factory.start_all([master] + replicas)
df_factory.start_all([master] + replicas)
c_master = master.client()
c_replicas = [replica.client() for replica in replicas]
@ -884,15 +880,13 @@ async def test_scripts(df_local_factory, t_master, t_replicas, num_ops, num_keys
@dfly_args({"proactor_threads": 4})
@pytest.mark.asyncio
async def test_auth_master(df_local_factory, n_keys=20):
async def test_auth_master(df_factory, n_keys=20):
masterpass = "requirepass"
replicapass = "replicapass"
master = df_local_factory.create(requirepass=masterpass)
replica = df_local_factory.create(
logtostdout=True, masterauth=masterpass, requirepass=replicapass
)
master = df_factory.create(requirepass=masterpass)
replica = df_factory.create(logtostdout=True, masterauth=masterpass, requirepass=replicapass)
df_local_factory.start_all([master, replica])
df_factory.start_all([master, replica])
c_master = master.client(password=masterpass)
c_replica = replica.client(password=replicapass)
@ -918,11 +912,11 @@ SCRIPT_TEMPLATE = "return {}"
@dfly_args({"proactor_threads": 2})
async def test_script_transfer(df_local_factory):
master = df_local_factory.create()
replica = df_local_factory.create()
async def test_script_transfer(df_factory):
master = df_factory.create()
replica = df_factory.create()
df_local_factory.start_all([master, replica])
df_factory.start_all([master, replica])
c_master = master.client()
c_replica = replica.client()
@ -952,11 +946,11 @@ async def test_script_transfer(df_local_factory):
@dfly_args({"proactor_threads": 4})
@pytest.mark.asyncio
async def test_role_command(df_local_factory, n_keys=20):
master = df_local_factory.create()
replica = df_local_factory.create()
async def test_role_command(df_factory, n_keys=20):
master = df_factory.create()
replica = df_factory.create()
df_local_factory.start_all([master, replica])
df_factory.start_all([master, replica])
c_master = master.client()
c_replica = replica.client()
@ -1029,12 +1023,10 @@ async def assert_lag_condition(inst, client, condition):
@dfly_args({"proactor_threads": 2})
@pytest.mark.asyncio
async def test_replication_info(
df_local_factory: DflyInstanceFactory, df_seeder_factory, n_keys=2000
):
master = df_local_factory.create()
replica = df_local_factory.create(logtostdout=True, replication_acks_interval=100)
df_local_factory.start_all([master, replica])
async def test_replication_info(df_factory: DflyInstanceFactory, df_seeder_factory, n_keys=2000):
master = df_factory.create()
replica = df_factory.create(logtostdout=True, replication_acks_interval=100)
df_factory.start_all([master, replica])
c_master = master.client()
c_replica = replica.client()
@ -1064,11 +1056,11 @@ More details in https://github.com/dragonflydb/dragonfly/issues/1231
@pytest.mark.asyncio
@pytest.mark.slow
async def test_flushall_in_full_sync(df_local_factory):
master = df_local_factory.create(proactor_threads=4)
replica = df_local_factory.create(proactor_threads=2)
async def test_flushall_in_full_sync(df_factory):
master = df_factory.create(proactor_threads=4)
replica = df_factory.create(proactor_threads=2)
df_local_factory.start_all([master, replica])
df_factory.start_all([master, replica])
c_master = master.client()
c_replica = replica.client()
@ -1125,11 +1117,11 @@ redis.call('SET', 'A', 'ErrroR')
@pytest.mark.asyncio
async def test_readonly_script(df_local_factory):
master = df_local_factory.create(proactor_threads=2, logtostdout=True)
replica = df_local_factory.create(proactor_threads=2, logtostdout=True)
async def test_readonly_script(df_factory):
master = df_factory.create(proactor_threads=2, logtostdout=True)
replica = df_factory.create(proactor_threads=2, logtostdout=True)
df_local_factory.start_all([master, replica])
df_factory.start_all([master, replica])
c_master = master.client()
c_replica = replica.client()
@ -1158,15 +1150,15 @@ take_over_cases = [
@pytest.mark.parametrize("master_threads, replica_threads", take_over_cases)
@pytest.mark.asyncio
async def test_take_over_counters(df_local_factory, master_threads, replica_threads):
master = df_local_factory.create(
async def test_take_over_counters(df_factory, master_threads, replica_threads):
master = df_factory.create(
proactor_threads=master_threads,
logtostderr=True,
)
replica1 = df_local_factory.create(proactor_threads=replica_threads)
replica2 = df_local_factory.create(proactor_threads=replica_threads)
replica3 = df_local_factory.create(proactor_threads=replica_threads)
df_local_factory.start_all([master, replica1, replica2, replica3])
replica1 = df_factory.create(proactor_threads=replica_threads)
replica2 = df_factory.create(proactor_threads=replica_threads)
replica3 = df_factory.create(proactor_threads=replica_threads)
df_factory.start_all([master, replica1, replica2, replica3])
c_master = master.client()
c1 = replica1.client()
c_blocking = master.client()
@ -1219,16 +1211,16 @@ async def test_take_over_counters(df_local_factory, master_threads, replica_thre
@pytest.mark.parametrize("master_threads, replica_threads", take_over_cases)
@pytest.mark.asyncio
async def test_take_over_seeder(
request, df_local_factory, df_seeder_factory, master_threads, replica_threads
request, df_factory, df_seeder_factory, master_threads, replica_threads
):
tmp_file_name = "".join(random.choices(string.ascii_letters, k=10))
master = df_local_factory.create(
master = df_factory.create(
proactor_threads=master_threads,
dbfilename=f"dump_{tmp_file_name}",
logtostderr=True,
)
replica = df_local_factory.create(proactor_threads=replica_threads)
df_local_factory.start_all([master, replica])
replica = df_factory.create(proactor_threads=replica_threads)
df_factory.start_all([master, replica])
seeder = df_seeder_factory.create(port=master.port, keys=1000, dbcount=5, stop_on_failure=False)
@ -1265,13 +1257,13 @@ async def test_take_over_seeder(
@pytest.mark.parametrize("master_threads, replica_threads", [[4, 4]])
@pytest.mark.asyncio
async def test_take_over_read_commands(df_local_factory, master_threads, replica_threads):
master = df_local_factory.create(
async def test_take_over_read_commands(df_factory, master_threads, replica_threads):
master = df_factory.create(
proactor_threads=master_threads,
logtostderr=True,
)
replica = df_local_factory.create(proactor_threads=replica_threads)
df_local_factory.start_all([master, replica])
replica = df_factory.create(proactor_threads=replica_threads)
df_factory.start_all([master, replica])
c_master = master.client()
await c_master.execute_command("SET foo bar")
@ -1304,10 +1296,10 @@ async def test_take_over_read_commands(df_local_factory, master_threads, replica
@pytest.mark.asyncio
async def test_take_over_timeout(df_local_factory, df_seeder_factory):
master = df_local_factory.create(proactor_threads=2, logtostderr=True)
replica = df_local_factory.create(proactor_threads=2)
df_local_factory.start_all([master, replica])
async def test_take_over_timeout(df_factory, df_seeder_factory):
master = df_factory.create(proactor_threads=2, logtostderr=True)
replica = df_factory.create(proactor_threads=2)
df_factory.start_all([master, replica])
seeder = df_seeder_factory.create(port=master.port, keys=1000, dbcount=5, stop_on_failure=False)
@ -1354,14 +1346,14 @@ replication_cases = [(8, 8)]
@pytest.mark.asyncio
@pytest.mark.parametrize("t_master, t_replica", replication_cases)
async def test_no_tls_on_admin_port(
df_local_factory: DflyInstanceFactory,
df_factory: DflyInstanceFactory,
df_seeder_factory,
t_master,
t_replica,
with_tls_server_args,
):
# 1. Spin up dragonfly without tls, debug populate
master = df_local_factory.create(
master = df_factory.create(
no_tls_on_admin_port="true",
admin_port=ADMIN_PORT,
**with_tls_server_args,
@ -1375,7 +1367,7 @@ async def test_no_tls_on_admin_port(
assert 100 == db_size
# 2. Spin up a replica and initiate a REPLICAOF
replica = df_local_factory.create(
replica = df_factory.create(
no_tls_on_admin_port="true",
admin_port=ADMIN_PORT + 1,
**with_tls_server_args,
@ -1405,7 +1397,7 @@ replication_cases = [(8, 8, False), (8, 8, True)]
@pytest.mark.asyncio
@pytest.mark.parametrize("t_master, t_replica, test_admin_port", replication_cases)
async def test_tls_replication(
df_local_factory,
df_factory,
df_seeder_factory,
t_master,
t_replica,
@ -1414,7 +1406,7 @@ async def test_tls_replication(
with_ca_tls_client_args,
):
# 1. Spin up dragonfly tls enabled, debug populate
master = df_local_factory.create(
master = df_factory.create(
tls_replication="true",
**with_ca_tls_server_args,
port=1111,
@ -1434,7 +1426,7 @@ async def test_tls_replication(
proxy_task = asyncio.create_task(proxy.serve())
# 2. Spin up a replica and initiate a REPLICAOF
replica = df_local_factory.create(
replica = df_factory.create(
tls_replication="true",
**with_ca_tls_server_args,
proactor_threads=t_replica,
@ -1469,12 +1461,12 @@ async def test_tls_replication(
await proxy.close(proxy_task)
async def test_ipv6_replication(df_local_factory: DflyInstanceFactory):
async def test_ipv6_replication(df_factory: DflyInstanceFactory):
"""Test that IPV6 addresses work for replication, ::1 is 127.0.0.1 localhost"""
master = df_local_factory.create(proactor_threads=1, bind="::1", port=1111)
replica = df_local_factory.create(proactor_threads=1, bind="::1", port=1112)
master = df_factory.create(proactor_threads=1, bind="::1", port=1111)
replica = df_factory.create(proactor_threads=1, bind="::1", port=1112)
df_local_factory.start_all([master, replica])
df_factory.start_all([master, replica])
c_master = master.client()
c_replica = replica.client()
@ -1498,9 +1490,9 @@ async def wait_for_replica_status(
@pytest.mark.asyncio
async def test_replicaof_flag(df_local_factory):
async def test_replicaof_flag(df_factory):
# tests --replicaof works under normal conditions
master = df_local_factory.create(
master = df_factory.create(
proactor_threads=2,
)
@ -1511,7 +1503,7 @@ async def test_replicaof_flag(df_local_factory):
db_size = await c_master.dbsize()
assert 1 == db_size
replica = df_local_factory.create(
replica = df_factory.create(
proactor_threads=2,
replicaof=f"localhost:{master.port}", # start to replicate master
)
@ -1533,10 +1525,10 @@ async def test_replicaof_flag(df_local_factory):
@pytest.mark.asyncio
async def test_replicaof_flag_replication_waits(df_local_factory):
async def test_replicaof_flag_replication_waits(df_factory):
# tests --replicaof works when we launch replication before the master
BASE_PORT = 1111
replica = df_local_factory.create(
replica = df_factory.create(
proactor_threads=2,
replicaof=f"localhost:{BASE_PORT}", # start to replicate master
)
@ -1554,7 +1546,7 @@ async def test_replicaof_flag_replication_waits(df_local_factory):
assert info["master_link_status"] == "down"
# set up master
master = df_local_factory.create(
master = df_factory.create(
port=BASE_PORT,
proactor_threads=2,
)
@ -1577,9 +1569,9 @@ async def test_replicaof_flag_replication_waits(df_local_factory):
@pytest.mark.asyncio
async def test_replicaof_flag_disconnect(df_local_factory):
async def test_replicaof_flag_disconnect(df_factory):
# test stopping replication when started using --replicaof
master = df_local_factory.create(
master = df_factory.create(
proactor_threads=2,
)
@ -1592,7 +1584,7 @@ async def test_replicaof_flag_disconnect(df_local_factory):
db_size = await c_master.dbsize()
assert 1 == db_size
replica = df_local_factory.create(
replica = df_factory.create(
proactor_threads=2,
replicaof=f"localhost:{master.port}", # start to replicate master
)
@ -1618,13 +1610,13 @@ async def test_replicaof_flag_disconnect(df_local_factory):
@pytest.mark.asyncio
async def test_df_crash_on_memcached_error(df_local_factory):
master = df_local_factory.create(
async def test_df_crash_on_memcached_error(df_factory):
master = df_factory.create(
memcached_port=11211,
proactor_threads=2,
)
replica = df_local_factory.create(
replica = df_factory.create(
memcached_port=master.mc_port + 1,
proactor_threads=2,
)
@ -1650,13 +1642,13 @@ async def test_df_crash_on_memcached_error(df_local_factory):
@pytest.mark.asyncio
async def test_df_crash_on_replicaof_flag(df_local_factory):
master = df_local_factory.create(
async def test_df_crash_on_replicaof_flag(df_factory):
master = df_factory.create(
proactor_threads=2,
)
master.start()
replica = df_local_factory.create(proactor_threads=2, replicaof=f"127.0.0.1:{master.port}")
replica = df_factory.create(proactor_threads=2, replicaof=f"127.0.0.1:{master.port}")
replica.start()
c_master = master.client()
@ -1675,11 +1667,11 @@ async def test_df_crash_on_replicaof_flag(df_local_factory):
replica.stop()
async def test_network_disconnect(df_local_factory, df_seeder_factory):
master = df_local_factory.create(proactor_threads=6)
replica = df_local_factory.create(proactor_threads=4)
async def test_network_disconnect(df_factory, df_seeder_factory):
master = df_factory.create(proactor_threads=6)
replica = df_factory.create(proactor_threads=4)
df_local_factory.start_all([replica, master])
df_factory.start_all([replica, master])
seeder = df_seeder_factory.create(port=master.port)
async with replica.client() as c_replica:
@ -1709,11 +1701,11 @@ async def test_network_disconnect(df_local_factory, df_seeder_factory):
replica.stop()
async def test_network_disconnect_active_stream(df_local_factory, df_seeder_factory):
master = df_local_factory.create(proactor_threads=4, shard_repl_backlog_len=4000)
replica = df_local_factory.create(proactor_threads=4)
async def test_network_disconnect_active_stream(df_factory, df_seeder_factory):
master = df_factory.create(proactor_threads=4, shard_repl_backlog_len=4000)
replica = df_factory.create(proactor_threads=4)
df_local_factory.start_all([replica, master])
df_factory.start_all([replica, master])
seeder = df_seeder_factory.create(port=master.port)
async with replica.client() as c_replica, master.client() as c_master:
@ -1751,11 +1743,11 @@ async def test_network_disconnect_active_stream(df_local_factory, df_seeder_fact
replica.stop()
async def test_network_disconnect_small_buffer(df_local_factory, df_seeder_factory):
master = df_local_factory.create(proactor_threads=4, shard_repl_backlog_len=1)
replica = df_local_factory.create(proactor_threads=4)
async def test_network_disconnect_small_buffer(df_factory, df_seeder_factory):
master = df_factory.create(proactor_threads=4, shard_repl_backlog_len=1)
replica = df_factory.create(proactor_threads=4)
df_local_factory.start_all([replica, master])
df_factory.start_all([replica, master])
seeder = df_seeder_factory.create(port=master.port)
async with replica.client() as c_replica, master.client() as c_master:
@ -1798,11 +1790,11 @@ async def test_network_disconnect_small_buffer(df_local_factory, df_seeder_facto
assert master.is_in_logs("Partial sync requested from stale LSN")
async def test_search(df_local_factory):
master = df_local_factory.create(proactor_threads=4)
replica = df_local_factory.create(proactor_threads=4)
async def test_search(df_factory):
master = df_factory.create(proactor_threads=4)
replica = df_factory.create(proactor_threads=4)
df_local_factory.start_all([master, replica])
df_factory.start_all([master, replica])
c_master = master.client()
c_replica = replica.client()
@ -1848,10 +1840,10 @@ async def test_search(df_local_factory):
# @pytest.mark.slow
@pytest.mark.asyncio
async def test_client_pause_with_replica(df_local_factory, df_seeder_factory):
master = df_local_factory.create(proactor_threads=4)
replica = df_local_factory.create(proactor_threads=4)
df_local_factory.start_all([master, replica])
async def test_client_pause_with_replica(df_factory, df_seeder_factory):
master = df_factory.create(proactor_threads=4)
replica = df_factory.create(proactor_threads=4)
df_factory.start_all([master, replica])
seeder = df_seeder_factory.create(port=master.port)
@ -1891,11 +1883,11 @@ async def test_client_pause_with_replica(df_local_factory, df_seeder_factory):
await disconnect_clients(c_master, c_replica)
async def test_replicaof_reject_on_load(df_local_factory, df_seeder_factory):
async def test_replicaof_reject_on_load(df_factory, df_seeder_factory):
tmp_file_name = "".join(random.choices(string.ascii_letters, k=10))
master = df_local_factory.create()
replica = df_local_factory.create(dbfilename=f"dump_{tmp_file_name}")
df_local_factory.start_all([master, replica])
master = df_factory.create()
replica = df_factory.create(dbfilename=f"dump_{tmp_file_name}")
df_factory.start_all([master, replica])
seeder = SeederV2(key_target=40000)
c_replica = replica.client()
@ -1928,12 +1920,12 @@ async def test_replicaof_reject_on_load(df_local_factory, df_seeder_factory):
@pytest.mark.asyncio
async def test_heartbeat_eviction_propagation(df_local_factory):
master = df_local_factory.create(
async def test_heartbeat_eviction_propagation(df_factory):
master = df_factory.create(
proactor_threads=1, cache_mode="true", maxmemory="256mb", enable_heartbeat_eviction="false"
)
replica = df_local_factory.create(proactor_threads=1)
df_local_factory.start_all([master, replica])
replica = df_factory.create(proactor_threads=1)
df_factory.start_all([master, replica])
c_master = master.client()
c_replica = replica.client()
@ -1966,16 +1958,16 @@ async def test_heartbeat_eviction_propagation(df_local_factory):
@pytest.mark.skip(reason="Test is flaky")
@pytest.mark.asyncio
async def test_policy_based_eviction_propagation(df_local_factory, df_seeder_factory):
master = df_local_factory.create(
async def test_policy_based_eviction_propagation(df_factory, df_seeder_factory):
master = df_factory.create(
proactor_threads=2,
cache_mode="true",
maxmemory="512mb",
logtostdout="true",
enable_heartbeat_eviction="false",
)
replica = df_local_factory.create(proactor_threads=2)
df_local_factory.start_all([master, replica])
replica = df_factory.create(proactor_threads=2)
df_factory.start_all([master, replica])
c_master = master.client()
c_replica = replica.client()
@ -2004,15 +1996,15 @@ async def test_policy_based_eviction_propagation(df_local_factory, df_seeder_fac
@pytest.mark.asyncio
async def test_journal_doesnt_yield_issue_2500(df_local_factory, df_seeder_factory):
async def test_journal_doesnt_yield_issue_2500(df_factory, df_seeder_factory):
"""
Issues many SETEX commands through a Lua script so that no yields are done between them.
In parallel, connect a replica, so that these SETEX commands write their custom journal log.
This makes sure that no Fiber context switch while inside a shard callback.
"""
master = df_local_factory.create()
replica = df_local_factory.create()
df_local_factory.start_all([master, replica])
master = df_factory.create()
replica = df_factory.create()
df_factory.start_all([master, replica])
c_master = master.client()
c_replica = replica.client()
@ -2059,12 +2051,12 @@ async def test_journal_doesnt_yield_issue_2500(df_local_factory, df_seeder_facto
@pytest.mark.asyncio
async def test_saving_replica(df_local_factory):
async def test_saving_replica(df_factory):
tmp_file_name = "".join(random.choices(string.ascii_letters, k=10))
master = df_local_factory.create(proactor_threads=1)
replica = df_local_factory.create(proactor_threads=1, dbfilename=f"dump_{tmp_file_name}")
df_local_factory.start_all([master, replica])
master = df_factory.create(proactor_threads=1)
replica = df_factory.create(proactor_threads=1, dbfilename=f"dump_{tmp_file_name}")
df_factory.start_all([master, replica])
c_master = master.client()
c_replica = replica.client()
@ -2088,12 +2080,12 @@ async def test_saving_replica(df_local_factory):
@pytest.mark.asyncio
async def test_start_replicating_while_save(df_local_factory):
async def test_start_replicating_while_save(df_factory):
tmp_file_name = "".join(random.choices(string.ascii_letters, k=10))
master = df_local_factory.create(proactor_threads=4)
replica = df_local_factory.create(proactor_threads=4, dbfilename=f"dump_{tmp_file_name}")
df_local_factory.start_all([master, replica])
master = df_factory.create(proactor_threads=4)
replica = df_factory.create(proactor_threads=4, dbfilename=f"dump_{tmp_file_name}")
df_factory.start_all([master, replica])
c_master = master.client()
c_replica = replica.client()
@ -2115,10 +2107,10 @@ async def test_start_replicating_while_save(df_local_factory):
@pytest.mark.asyncio
async def test_user_acl_replication(df_local_factory):
master = df_local_factory.create(proactor_threads=4)
replica = df_local_factory.create(proactor_threads=4)
df_local_factory.start_all([master, replica])
async def test_user_acl_replication(df_factory):
master = df_factory.create(proactor_threads=4)
replica = df_factory.create(proactor_threads=4)
df_factory.start_all([master, replica])
c_master = master.client()
await c_master.execute_command("ACL SETUSER tmp >tmp ON +ping +dfly +replconf")
@ -2149,7 +2141,7 @@ async def test_user_acl_replication(df_local_factory):
@pytest.mark.parametrize("break_conn", [False, True])
@pytest.mark.asyncio
async def test_replica_reconnect(df_local_factory, break_conn):
async def test_replica_reconnect(df_factory, break_conn):
"""
Test replica does not connect to master if master restarted
step1: create master and replica
@ -2159,11 +2151,9 @@ async def test_replica_reconnect(df_local_factory, break_conn):
step5: check replica replicates master
"""
# Connect replica to master
master = df_local_factory.create(proactor_threads=1)
replica = df_local_factory.create(
proactor_threads=1, break_replication_on_master_restart=break_conn
)
df_local_factory.start_all([master, replica])
master = df_factory.create(proactor_threads=1)
replica = df_factory.create(proactor_threads=1, break_replication_on_master_restart=break_conn)
df_factory.start_all([master, replica])
c_master = master.client()
c_replica = replica.client()
@ -2178,8 +2168,8 @@ async def test_replica_reconnect(df_local_factory, break_conn):
master.stop()
assert (await c_replica.info("REPLICATION"))["master_link_status"] == "down"
master = df_local_factory.create(proactor_threads=1, port=master_port)
df_local_factory.start_all([master])
master = df_factory.create(proactor_threads=1, port=master_port)
df_factory.start_all([master])
await asyncio.sleep(1) # We sleep for 0.5s in replica.cc before reconnecting
# Assert that replica did not reconnected to master with different repl_id

View file

@ -140,9 +140,9 @@ def sentinel(tmp_dir, port_picker) -> Sentinel:
@pytest.mark.asyncio
@pytest.mark.slow
async def test_failover(df_local_factory, sentinel, port_picker):
master = df_local_factory.create(port=sentinel.initial_master_port)
replica = df_local_factory.create(port=port_picker.get_available_port())
async def test_failover(df_factory, sentinel, port_picker):
master = df_factory.create(port=sentinel.initial_master_port)
replica = df_factory.create(port=port_picker.get_available_port())
master.start()
replica.start()
@ -206,9 +206,9 @@ async def test_failover(df_local_factory, sentinel, port_picker):
@pytest.mark.asyncio
@pytest.mark.slow
async def test_master_failure(df_local_factory, sentinel, port_picker):
master = df_local_factory.create(port=sentinel.initial_master_port)
replica = df_local_factory.create(port=port_picker.get_available_port())
async def test_master_failure(df_factory, sentinel, port_picker):
master = df_factory.create(port=sentinel.initial_master_port)
replica = df_factory.create(port=port_picker.get_available_port())
master.start()
replica.start()
@ -245,16 +245,16 @@ async def test_master_failure(df_local_factory, sentinel, port_picker):
@dfly_args({"info_replication_valkey_compatible": True})
@pytest.mark.asyncio
async def test_priority_on_failover(df_local_factory, sentinel, port_picker):
master = df_local_factory.create(port=sentinel.initial_master_port)
async def test_priority_on_failover(df_factory, sentinel, port_picker):
master = df_factory.create(port=sentinel.initial_master_port)
# lower priority is the best candidate for sentinel
low_priority_repl = df_local_factory.create(
low_priority_repl = df_factory.create(
port=port_picker.get_available_port(), replica_priority=20
)
mid_priority_repl = df_local_factory.create(
mid_priority_repl = df_factory.create(
port=port_picker.get_available_port(), replica_priority=60
)
high_priority_repl = df_local_factory.create(
high_priority_repl = df_factory.create(
port=port_picker.get_available_port(), replica_priority=80
)

View file

@ -143,8 +143,8 @@ def configure_slowlog_parsing(async_client: aioredis.Redis):
@pytest.mark.asyncio
@dfly_args({"slowlog_log_slower_than": 0, "slowlog_max_len": 3})
async def test_slowlog_client_name_and_ip(df_local_factory, async_client: aioredis.Redis):
df = df_local_factory.create()
async def test_slowlog_client_name_and_ip(df_factory, async_client: aioredis.Redis):
df = df_factory.create()
df.start()
expected_clientname = "dragonfly"
@ -162,10 +162,10 @@ async def test_slowlog_client_name_and_ip(df_local_factory, async_client: aiored
@pytest.mark.asyncio
@dfly_args({"slowlog_log_slower_than": 0, "slowlog_max_len": 3})
async def test_blocking_commands_should_not_show_up_in_slow_log(
df_local_factory, async_client: aioredis.Redis
df_factory, async_client: aioredis.Redis
):
await async_client.slowlog_reset()
df = df_local_factory.create()
df = df_factory.create()
df.start()
async_client = configure_slowlog_parsing(async_client)

View file

@ -20,10 +20,10 @@ class TestDflyAutoLoadSnapshot:
"""
@pytest.mark.asyncio
async def test_gracefull_shutdown(self, df_local_factory):
async def test_gracefull_shutdown(self, df_factory):
df_args = {"dbfilename": "dump", **BASIC_ARGS, "port": 1111}
df_server = df_local_factory.create(**df_args)
df_server = df_factory.create(**df_args)
df_server.start()
client = aioredis.Redis(port=df_server.port)

View file

@ -9,6 +9,7 @@ from redis import asyncio as aioredis
from pathlib import Path
import boto3
from .instance import RedisServer
from random import randint as rand
from . import dfly_args
from .utility import wait_available_async, chunked, is_saving
@ -37,11 +38,15 @@ def find_main_file(path: Path, pattern):
dict(key_target=1000, data_size=5_000, variance=10, samples=10),
],
)
@dfly_args({**BASIC_ARGS, "proactor_threads": 4, "dbfilename": "test-consistency"})
async def test_consistency(async_client: aioredis.Redis, format: str, seeder_opts: dict):
@dfly_args({**BASIC_ARGS, "proactor_threads": 4})
async def test_consistency(df_factory, format: str, seeder_opts: dict):
"""
Test consistency over a large variety of data with different sizes
"""
dbfilename = f"test-consistency{rand(0, 5000)}"
instance = df_factory.create(dbfilename=dbfilename)
instance.start()
async_client = instance.client()
await StaticSeeder(**seeder_opts).run(async_client)
start_capture = await StaticSeeder.capture(async_client)
@ -52,21 +57,25 @@ async def test_consistency(async_client: aioredis.Redis, format: str, seeder_opt
await async_client.execute_command(
"DEBUG",
"LOAD",
"test-consistency.rdb" if format == "RDB" else "test-consistency-summary.dfs",
f"{dbfilename}.rdb" if format == "RDB" else f"{dbfilename}-summary.dfs",
)
assert (await StaticSeeder.capture(async_client)) == start_capture
@pytest.mark.parametrize("format", FILE_FORMATS)
@dfly_args({**BASIC_ARGS, "proactor_threads": 4, "dbfilename": "test-multidb"})
async def test_multidb(async_client: aioredis.Redis, df_server, format: str):
@dfly_args({**BASIC_ARGS, "proactor_threads": 4})
async def test_multidb(df_factory, format: str):
"""
Test serialization of multiple logical databases
"""
dbfilename = f"test-multidb{rand(0, 5000)}"
instance = df_factory.create(dbfilename=dbfilename)
instance.start()
async_client = instance.client()
start_captures = []
for dbid in range(10):
db_client = df_server.client(db=dbid)
db_client = instance.client(db=dbid)
await StaticSeeder(key_target=1000).run(db_client)
start_captures.append(await StaticSeeder.capture(db_client))
@ -76,11 +85,11 @@ async def test_multidb(async_client: aioredis.Redis, df_server, format: str):
await async_client.execute_command(
"DEBUG",
"LOAD",
"test-multidb.rdb" if format == "RDB" else "test-multidb-summary.dfs",
f"{dbfilename}.rdb" if format == "RDB" else f"{dbfilename}-summary.dfs",
)
for dbid in range(10):
db_client = df_server.client(db=dbid)
db_client = instance.client(db=dbid)
assert (await StaticSeeder.capture(db_client)) == start_captures[dbid]
@ -97,7 +106,7 @@ async def test_multidb(async_client: aioredis.Redis, df_server, format: str):
],
)
async def test_dbfilenames(
df_local_factory, tmp_dir: Path, save_type: str, dbfilename: str, pattern: str
df_factory, tmp_dir: Path, save_type: str, dbfilename: str, pattern: str
):
df_args = {**BASIC_ARGS, "dbfilename": dbfilename, "port": 1111}
@ -106,7 +115,7 @@ async def test_dbfilenames(
start_capture = None
with df_local_factory.create(**df_args) as df_server:
with df_factory.create(**df_args) as df_server:
async with df_server.client() as client:
await wait_available_async(client)
@ -120,7 +129,7 @@ async def test_dbfilenames(
assert file is not None
assert os.path.basename(file).startswith(dbfilename.split("{{")[0])
with df_local_factory.create(**df_args) as df_server:
with df_factory.create(**df_args) as df_server:
async with df_server.client() as client:
await wait_available_async(client)
assert await StaticSeeder.capture(client) == start_capture
@ -220,11 +229,11 @@ async def test_parallel_snapshot(async_client):
assert save_successes == 1, "Only one SAVE must be successful"
async def test_path_escapes(df_local_factory):
async def test_path_escapes(df_factory):
"""Test that we don't allow path escapes. We just check that df_server.start()
fails because we don't have a much better way to test that."""
df_server = df_local_factory.create(dbfilename="../../../../etc/passwd")
df_server = df_factory.create(dbfilename="../../../../etc/passwd")
try:
df_server.start()
assert False, "Server should not start correctly"
@ -366,7 +375,11 @@ class TestDflySnapshotOnShutdown:
@pytest.mark.parametrize("format", FILE_FORMATS)
@dfly_args({**BASIC_ARGS, "dbfilename": "info-while-snapshot"})
async def test_infomemory_while_snapshoting(async_client: aioredis.Redis, format: str):
async def test_infomemory_while_snapshoting(df_factory, format: str):
dbfilename = f"test-consistency{rand(0, 5000)}"
instance = df_factory.create(dbfilename=dbfilename)
instance.start()
async_client = instance.client()
await async_client.execute_command("DEBUG POPULATE 10000 key 4048 RAND")
async def save():