1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: monotonically increasing ports for cluster tests (#4268)

We have cascading failures in cluster tests because on assertion failures the nodes are not properly cleaned up and subsequent test cases that use the same ports fail. I added a monotonically increasing port generator to mitigate this effect.
This commit is contained in:
Kostas Kyrimis 2024-12-06 11:07:23 +01:00 committed by GitHub
parent 63ccbbc0a7
commit f9f93b108c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -22,6 +22,17 @@ from . import dfly_args
BASE_PORT = 30001
def monotonically_increasing_port_number():
port = BASE_PORT
while True:
yield port
port = port + 1
# Create a generator object
next_port = monotonically_increasing_port_number()
class RedisClusterNode:
def __init__(self, port):
self.port = port
@ -279,8 +290,8 @@ def verify_slots_result(port: int, answer: list, replicas) -> bool:
# are hidden from users, see https://github.com/dragonflydb/dragonfly/issues/4173
@dfly_args({"proactor_threads": 4, "cluster_mode": "emulated", "managed_service_info": "true"})
async def test_emulated_cluster_with_replicas(df_factory):
master = df_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 1000)
replicas = [df_factory.create(port=BASE_PORT + i, logtostdout=True) for i in range(1, 3)]
master = df_factory.create(port=next(next_port), admin_port=next(next_port))
replicas = [df_factory.create(port=next(next_port), logtostdout=True) for i in range(1, 3)]
df_factory.start_all([master, *replicas])
@ -379,8 +390,8 @@ async def test_emulated_cluster_with_replicas(df_factory):
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_managed_service_info(df_factory):
master = df_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 100)
replica = df_factory.create(port=BASE_PORT + 1, admin_port=BASE_PORT + 101)
master = df_factory.create(port=next(next_port), admin_port=next(next_port))
replica = df_factory.create(port=next(next_port), admin_port=next(next_port))
df_factory.start_all([master, replica])
@ -561,7 +572,7 @@ Also add keys to each of them that are *not* moved, and see that they are unaffe
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "cluster_node_id": "inigo montoya"})
async def test_cluster_node_id(df_factory: DflyInstanceFactory):
node = df_factory.create(port=BASE_PORT)
node = df_factory.create(port=next(next_port))
df_factory.start_all([node])
conn = node.client()
@ -571,9 +582,7 @@ async def test_cluster_node_id(df_factory: DflyInstanceFactory):
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory):
# Start and configure cluster with 2 nodes
nodes = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
]
nodes = [df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)]
df_factory.start_all(nodes)
@ -640,7 +649,7 @@ async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory):
await c_nodes[1].set("KEY1", "value")
assert False, "Should not be able to set key on non-owner cluster node"
except redis.exceptions.ResponseError as e:
assert e.args[0] == "MOVED 5259 localhost:30001"
assert e.args[0] == f"MOVED 5259 localhost:{nodes[0].port}"
# And that node1 only has 1 key ("KEY2")
assert await c_nodes[1].execute_command("DBSIZE") == 1
@ -664,7 +673,7 @@ async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory):
await c_nodes[0].set("KEY1", "value")
assert False, "Should not be able to set key on non-owner cluster node"
except redis.exceptions.ResponseError as e:
assert e.args[0] == "MOVED 5259 localhost:30002"
assert e.args[0] == f"MOVED 5259 localhost:{nodes[1].port}"
# And node1 should own it and allow using it
assert await c_nodes[1].set("KEY1", "value")
@ -699,8 +708,8 @@ async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory):
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_replica_sets_non_owned_keys(df_factory: DflyInstanceFactory):
# Start and configure cluster with 1 master and 1 replica, both own all slots
master = df_factory.create(admin_port=BASE_PORT + 1000)
replica = df_factory.create(admin_port=BASE_PORT + 1001)
master = df_factory.create(admin_port=next(next_port))
replica = df_factory.create(admin_port=next(next_port))
df_factory.start_all([master, replica])
async with master.client() as c_master, master.admin_client() as c_master_admin, replica.client() as c_replica, replica.admin_client() as c_replica_admin:
@ -807,8 +816,8 @@ async def test_cluster_replica_sets_non_owned_keys(df_factory: DflyInstanceFacto
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_flush_slots_after_config_change(df_factory: DflyInstanceFactory):
# Start and configure cluster with 1 master and 1 replica, both own all slots
master = df_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 1000)
replica = df_factory.create(port=BASE_PORT + 1, admin_port=BASE_PORT + 1001)
master = df_factory.create(port=next(next_port), admin_port=next(next_port))
replica = df_factory.create(port=next(next_port), admin_port=next(next_port))
df_factory.start_all([master, replica])
c_master = master.client()
@ -958,7 +967,7 @@ async def test_cluster_blocking_command(df_server):
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_blocking_commands_cancel(df_factory, df_seeder_factory):
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]
df_factory.start_all(instances)
@ -987,11 +996,11 @@ async def test_blocking_commands_cancel(df_factory, df_seeder_factory):
with pytest.raises(aioredis.ResponseError) as set_e_info:
await set_task
assert "MOVED 3037 127.0.0.1:30002" == str(set_e_info.value)
assert f"MOVED 3037 127.0.0.1:{instances[1].port}" == str(set_e_info.value)
with pytest.raises(aioredis.ResponseError) as list_e_info:
await list_task
assert "MOVED 7141 127.0.0.1:30002" == str(list_e_info.value)
assert f"MOVED 7141 127.0.0.1:{instances[1].port}" == str(list_e_info.value)
@pytest.mark.parametrize("set_cluster_node_id", [True, False])
@ -1004,8 +1013,8 @@ async def test_cluster_native_client(
# Start and configure cluster with 3 masters and 3 replicas
masters = [
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
port=next(next_port),
admin_port=next(next_port),
cluster_node_id=f"master{i}" if set_cluster_node_id else "",
)
for i in range(3)
@ -1017,10 +1026,10 @@ async def test_cluster_native_client(
replicas = [
df_factory.create(
port=BASE_PORT + 100 + i,
admin_port=BASE_PORT + i + 1100,
port=next(next_port),
admin_port=next(next_port),
cluster_node_id=f"replica{i}" if set_cluster_node_id else "",
replicaof=f"localhost:{BASE_PORT + i}",
replicaof=f"localhost:{masters[i].port}",
)
for i in range(3)
]
@ -1195,7 +1204,7 @@ async def test_cluster_native_client(
async def test_config_consistency(df_factory: DflyInstanceFactory):
# Check slot migration from one node to another
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]
df_factory.start_all(instances)
@ -1245,8 +1254,8 @@ async def test_cluster_flushall_during_migration(
# Check data migration from one node to another
instances = [
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
port=next(next_port),
admin_port=next(next_port),
vmodule="cluster_family=9,outgoing_slot_migration=9,incoming_slot_migration=9",
logtostdout=True,
)
@ -1298,8 +1307,8 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt
# Check data migration from one node to another
instances = [
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
port=next(next_port),
admin_port=next(next_port),
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9,streamer=9",
)
for i in range(2)
@ -1378,7 +1387,7 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt
@dfly_args({"proactor_threads": 2, "cluster_mode": "yes", "cache_mode": "true"})
async def test_migration_with_key_ttl(df_factory):
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]
df_factory.start_all(instances)
@ -1427,7 +1436,7 @@ async def test_migration_with_key_ttl(df_factory):
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "serialization_max_chunk_size": 0})
async def test_network_disconnect_during_migration(df_factory):
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]
df_factory.start_all(instances)
@ -1496,8 +1505,8 @@ async def test_cluster_fuzzymigration(
):
instances = [
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
port=next(next_port),
admin_port=next(next_port),
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9",
serialization_max_chunk_size=huge_values,
replication_stream_output_limit=10,
@ -1632,7 +1641,7 @@ async def test_cluster_fuzzymigration(
async def test_cluster_config_reapply(df_factory: DflyInstanceFactory):
"""Check data migration from one node to another."""
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]
df_factory.start_all(instances)
@ -1690,7 +1699,7 @@ async def test_cluster_replication_migration(
and make sure the captures on the replicas are equal.
"""
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + 1000 + i) for i in range(4)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(4)
]
df_factory.start_all(instances)
@ -1767,7 +1776,7 @@ async def test_start_replication_during_migration(
in the end master_1 and replica_1 should have the same data
"""
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + 1000 + i) for i in range(3)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(3)
]
df_factory.start_all(instances)
@ -1834,7 +1843,7 @@ async def test_snapshoting_during_migration(
The result should be the same: snapshot contains all the data that existed before migration
"""
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + 1000 + i) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]
df_factory.start_all(instances)
@ -1904,7 +1913,7 @@ async def test_snapshoting_during_migration(
async def test_cluster_migration_cancel(df_factory: DflyInstanceFactory):
"""Check data migration from one node to another."""
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]
df_factory.start_all(instances)
@ -1965,7 +1974,7 @@ async def test_cluster_migration_cancel(df_factory: DflyInstanceFactory):
@pytest.mark.asyncio
async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory):
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]
df_factory.start_all(instances)
@ -2027,9 +2036,9 @@ async def test_replicate_cluster(df_factory: DflyInstanceFactory, df_seeder_fact
Send traffic before replication start and while replicating.
Promote the replica to master and check data consistency between cluster and single node.
"""
replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
replica = df_factory.create(admin_port=next(next_port), cluster_mode="emulated")
cluster_nodes = [
df_factory.create(admin_port=BASE_PORT + i + 1, cluster_mode="yes") for i in range(2)
df_factory.create(admin_port=next(next_port), cluster_mode="yes") for i in range(2)
]
# Start instances and connect clients
@ -2114,9 +2123,9 @@ async def test_replicate_disconnect_cluster(df_factory: DflyInstanceFactory, df_
Promote replica to master
Compare cluster data and replica data
"""
replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
replica = df_factory.create(admin_port=next(next_port), cluster_mode="emulated")
cluster_nodes = [
df_factory.create(admin_port=BASE_PORT + i + 1, cluster_mode="yes") for i in range(2)
df_factory.create(admin_port=next(next_port), cluster_mode="yes") for i in range(2)
]
# Start instances and connect clients
@ -2228,7 +2237,7 @@ async def test_replicate_redis_cluster(redis_cluster, df_factory, df_seeder_fact
Send traffic before replication start and while replicating.
Promote the replica to master and check data consistency between cluster and single dragonfly node.
"""
replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
replica = df_factory.create(admin_port=next(next_port), cluster_mode="emulated")
# Start instances and connect clients
df_factory.start_all([replica])
@ -2286,7 +2295,7 @@ async def test_replicate_disconnect_redis_cluster(redis_cluster, df_factory, df_
Send more traffic
Promote the replica to master and check data consistency between cluster and single dragonfly node.
"""
replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
replica = df_factory.create(admin_port=next(next_port), cluster_mode="emulated")
# Start instances and connect clients
df_factory.start_all([replica])
@ -2371,8 +2380,8 @@ async def test_cluster_memory_consumption_migration(df_factory: DflyInstanceFact
instances = [
df_factory.create(
maxmemory="15G",
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
port=next(next_port),
admin_port=next(next_port),
vmodule="streamer=9",
)
for i in range(3)
@ -2429,8 +2438,8 @@ async def test_migration_timeout_on_sync(df_factory: DflyInstanceFactory, df_see
# Timeout set to 3 seconds because we must first saturate the socket before we get the timeout
instances = [
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
port=next(next_port),
admin_port=next(next_port),
replication_timeout=3000,
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9,streamer=2",
)