diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 38156e9a2..1f1e22a17 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -22,6 +22,17 @@ from . import dfly_args BASE_PORT = 30001 +def monotonically_increasing_port_number(): + port = BASE_PORT + while True: + yield port + port = port + 1 + + +# Create a generator object +next_port = monotonically_increasing_port_number() + + class RedisClusterNode: def __init__(self, port): self.port = port @@ -279,8 +290,8 @@ def verify_slots_result(port: int, answer: list, replicas) -> bool: # are hidden from users, see https://github.com/dragonflydb/dragonfly/issues/4173 @dfly_args({"proactor_threads": 4, "cluster_mode": "emulated", "managed_service_info": "true"}) async def test_emulated_cluster_with_replicas(df_factory): - master = df_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 1000) - replicas = [df_factory.create(port=BASE_PORT + i, logtostdout=True) for i in range(1, 3)] + master = df_factory.create(port=next(next_port), admin_port=next(next_port)) + replicas = [df_factory.create(port=next(next_port), logtostdout=True) for i in range(1, 3)] df_factory.start_all([master, *replicas]) @@ -379,8 +390,8 @@ async def test_emulated_cluster_with_replicas(df_factory): @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_cluster_managed_service_info(df_factory): - master = df_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 100) - replica = df_factory.create(port=BASE_PORT + 1, admin_port=BASE_PORT + 101) + master = df_factory.create(port=next(next_port), admin_port=next(next_port)) + replica = df_factory.create(port=next(next_port), admin_port=next(next_port)) df_factory.start_all([master, replica]) @@ -561,7 +572,7 @@ Also add keys to each of them that are *not* moved, and see that they are unaffe @dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "cluster_node_id": "inigo montoya"}) async def test_cluster_node_id(df_factory: DflyInstanceFactory): - node = df_factory.create(port=BASE_PORT) + node = df_factory.create(port=next(next_port)) df_factory.start_all([node]) conn = node.client() @@ -571,9 +582,7 @@ async def test_cluster_node_id(df_factory: DflyInstanceFactory): @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory): # Start and configure cluster with 2 nodes - nodes = [ - df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2) - ] + nodes = [df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)] df_factory.start_all(nodes) @@ -640,7 +649,7 @@ async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory): await c_nodes[1].set("KEY1", "value") assert False, "Should not be able to set key on non-owner cluster node" except redis.exceptions.ResponseError as e: - assert e.args[0] == "MOVED 5259 localhost:30001" + assert e.args[0] == f"MOVED 5259 localhost:{nodes[0].port}" # And that node1 only has 1 key ("KEY2") assert await c_nodes[1].execute_command("DBSIZE") == 1 @@ -664,7 +673,7 @@ async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory): await c_nodes[0].set("KEY1", "value") assert False, "Should not be able to set key on non-owner cluster node" except redis.exceptions.ResponseError as e: - assert e.args[0] == "MOVED 5259 localhost:30002" + assert e.args[0] == f"MOVED 5259 localhost:{nodes[1].port}" # And node1 should own it and allow using it assert await c_nodes[1].set("KEY1", "value") @@ -699,8 +708,8 @@ async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory): @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_cluster_replica_sets_non_owned_keys(df_factory: DflyInstanceFactory): # Start and configure cluster with 1 master and 1 replica, both own all slots - master = df_factory.create(admin_port=BASE_PORT + 1000) - replica = df_factory.create(admin_port=BASE_PORT + 1001) + master = df_factory.create(admin_port=next(next_port)) + replica = df_factory.create(admin_port=next(next_port)) df_factory.start_all([master, replica]) async with master.client() as c_master, master.admin_client() as c_master_admin, replica.client() as c_replica, replica.admin_client() as c_replica_admin: @@ -807,8 +816,8 @@ async def test_cluster_replica_sets_non_owned_keys(df_factory: DflyInstanceFacto @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_cluster_flush_slots_after_config_change(df_factory: DflyInstanceFactory): # Start and configure cluster with 1 master and 1 replica, both own all slots - master = df_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 1000) - replica = df_factory.create(port=BASE_PORT + 1, admin_port=BASE_PORT + 1001) + master = df_factory.create(port=next(next_port), admin_port=next(next_port)) + replica = df_factory.create(port=next(next_port), admin_port=next(next_port)) df_factory.start_all([master, replica]) c_master = master.client() @@ -958,7 +967,7 @@ async def test_cluster_blocking_command(df_server): @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_blocking_commands_cancel(df_factory, df_seeder_factory): instances = [ - df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2) + df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2) ] df_factory.start_all(instances) @@ -987,11 +996,11 @@ async def test_blocking_commands_cancel(df_factory, df_seeder_factory): with pytest.raises(aioredis.ResponseError) as set_e_info: await set_task - assert "MOVED 3037 127.0.0.1:30002" == str(set_e_info.value) + assert f"MOVED 3037 127.0.0.1:{instances[1].port}" == str(set_e_info.value) with pytest.raises(aioredis.ResponseError) as list_e_info: await list_task - assert "MOVED 7141 127.0.0.1:30002" == str(list_e_info.value) + assert f"MOVED 7141 127.0.0.1:{instances[1].port}" == str(list_e_info.value) @pytest.mark.parametrize("set_cluster_node_id", [True, False]) @@ -1004,8 +1013,8 @@ async def test_cluster_native_client( # Start and configure cluster with 3 masters and 3 replicas masters = [ df_factory.create( - port=BASE_PORT + i, - admin_port=BASE_PORT + i + 1000, + port=next(next_port), + admin_port=next(next_port), cluster_node_id=f"master{i}" if set_cluster_node_id else "", ) for i in range(3) @@ -1017,10 +1026,10 @@ async def test_cluster_native_client( replicas = [ df_factory.create( - port=BASE_PORT + 100 + i, - admin_port=BASE_PORT + i + 1100, + port=next(next_port), + admin_port=next(next_port), cluster_node_id=f"replica{i}" if set_cluster_node_id else "", - replicaof=f"localhost:{BASE_PORT + i}", + replicaof=f"localhost:{masters[i].port}", ) for i in range(3) ] @@ -1195,7 +1204,7 @@ async def test_cluster_native_client( async def test_config_consistency(df_factory: DflyInstanceFactory): # Check slot migration from one node to another instances = [ - df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2) + df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2) ] df_factory.start_all(instances) @@ -1245,8 +1254,8 @@ async def test_cluster_flushall_during_migration( # Check data migration from one node to another instances = [ df_factory.create( - port=BASE_PORT + i, - admin_port=BASE_PORT + i + 1000, + port=next(next_port), + admin_port=next(next_port), vmodule="cluster_family=9,outgoing_slot_migration=9,incoming_slot_migration=9", logtostdout=True, ) @@ -1298,8 +1307,8 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt # Check data migration from one node to another instances = [ df_factory.create( - port=BASE_PORT + i, - admin_port=BASE_PORT + i + 1000, + port=next(next_port), + admin_port=next(next_port), vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9,streamer=9", ) for i in range(2) @@ -1378,7 +1387,7 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt @dfly_args({"proactor_threads": 2, "cluster_mode": "yes", "cache_mode": "true"}) async def test_migration_with_key_ttl(df_factory): instances = [ - df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2) + df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2) ] df_factory.start_all(instances) @@ -1427,7 +1436,7 @@ async def test_migration_with_key_ttl(df_factory): @dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "serialization_max_chunk_size": 0}) async def test_network_disconnect_during_migration(df_factory): instances = [ - df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2) + df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2) ] df_factory.start_all(instances) @@ -1496,8 +1505,8 @@ async def test_cluster_fuzzymigration( ): instances = [ df_factory.create( - port=BASE_PORT + i, - admin_port=BASE_PORT + i + 1000, + port=next(next_port), + admin_port=next(next_port), vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9", serialization_max_chunk_size=huge_values, replication_stream_output_limit=10, @@ -1632,7 +1641,7 @@ async def test_cluster_fuzzymigration( async def test_cluster_config_reapply(df_factory: DflyInstanceFactory): """Check data migration from one node to another.""" instances = [ - df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2) + df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2) ] df_factory.start_all(instances) @@ -1690,7 +1699,7 @@ async def test_cluster_replication_migration( and make sure the captures on the replicas are equal. """ instances = [ - df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + 1000 + i) for i in range(4) + df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(4) ] df_factory.start_all(instances) @@ -1767,7 +1776,7 @@ async def test_start_replication_during_migration( in the end master_1 and replica_1 should have the same data """ instances = [ - df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + 1000 + i) for i in range(3) + df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(3) ] df_factory.start_all(instances) @@ -1834,7 +1843,7 @@ async def test_snapshoting_during_migration( The result should be the same: snapshot contains all the data that existed before migration """ instances = [ - df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + 1000 + i) for i in range(2) + df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2) ] df_factory.start_all(instances) @@ -1904,7 +1913,7 @@ async def test_snapshoting_during_migration( async def test_cluster_migration_cancel(df_factory: DflyInstanceFactory): """Check data migration from one node to another.""" instances = [ - df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2) + df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2) ] df_factory.start_all(instances) @@ -1965,7 +1974,7 @@ async def test_cluster_migration_cancel(df_factory: DflyInstanceFactory): @pytest.mark.asyncio async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory): instances = [ - df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2) + df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2) ] df_factory.start_all(instances) @@ -2027,9 +2036,9 @@ async def test_replicate_cluster(df_factory: DflyInstanceFactory, df_seeder_fact Send traffic before replication start and while replicating. Promote the replica to master and check data consistency between cluster and single node. """ - replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated") + replica = df_factory.create(admin_port=next(next_port), cluster_mode="emulated") cluster_nodes = [ - df_factory.create(admin_port=BASE_PORT + i + 1, cluster_mode="yes") for i in range(2) + df_factory.create(admin_port=next(next_port), cluster_mode="yes") for i in range(2) ] # Start instances and connect clients @@ -2114,9 +2123,9 @@ async def test_replicate_disconnect_cluster(df_factory: DflyInstanceFactory, df_ Promote replica to master Compare cluster data and replica data """ - replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated") + replica = df_factory.create(admin_port=next(next_port), cluster_mode="emulated") cluster_nodes = [ - df_factory.create(admin_port=BASE_PORT + i + 1, cluster_mode="yes") for i in range(2) + df_factory.create(admin_port=next(next_port), cluster_mode="yes") for i in range(2) ] # Start instances and connect clients @@ -2228,7 +2237,7 @@ async def test_replicate_redis_cluster(redis_cluster, df_factory, df_seeder_fact Send traffic before replication start and while replicating. Promote the replica to master and check data consistency between cluster and single dragonfly node. """ - replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated") + replica = df_factory.create(admin_port=next(next_port), cluster_mode="emulated") # Start instances and connect clients df_factory.start_all([replica]) @@ -2286,7 +2295,7 @@ async def test_replicate_disconnect_redis_cluster(redis_cluster, df_factory, df_ Send more traffic Promote the replica to master and check data consistency between cluster and single dragonfly node. """ - replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated") + replica = df_factory.create(admin_port=next(next_port), cluster_mode="emulated") # Start instances and connect clients df_factory.start_all([replica]) @@ -2371,8 +2380,8 @@ async def test_cluster_memory_consumption_migration(df_factory: DflyInstanceFact instances = [ df_factory.create( maxmemory="15G", - port=BASE_PORT + i, - admin_port=BASE_PORT + i + 1000, + port=next(next_port), + admin_port=next(next_port), vmodule="streamer=9", ) for i in range(3) @@ -2429,8 +2438,8 @@ async def test_migration_timeout_on_sync(df_factory: DflyInstanceFactory, df_see # Timeout set to 3 seconds because we must first saturate the socket before we get the timeout instances = [ df_factory.create( - port=BASE_PORT + i, - admin_port=BASE_PORT + i + 1000, + port=next(next_port), + admin_port=next(next_port), replication_timeout=3000, vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9,streamer=2", )