From 644dc3f1399820d7daf8f52dd74888a4ca3b4ed8 Mon Sep 17 00:00:00 2001 From: Borys Date: Sun, 2 Jun 2024 09:16:03 +0300 Subject: [PATCH] New test for cluster migration: connection issue (#3102) * test: update test_config_consistency, update test_cluster_data_migration, new cluster migration test for network issues --- src/server/cluster/outgoing_slot_migration.cc | 2 +- tests/dragonfly/cluster_test.py | 415 ++++++++---------- tests/dragonfly/utility.py | 2 +- 3 files changed, 191 insertions(+), 228 deletions(-) diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 89d5fe483..a538f3abc 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -156,7 +156,7 @@ void OutgoingMigration::SyncFb() { } if (auto ec = SendCommandAndReadResponse(cmd); ec) { - cntx_.ReportError(GenericError(ec, "Could send INIT command.")); + cntx_.ReportError(GenericError(ec, "Could not send INIT command.")); continue; } diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index ac2902dfb..c25887ff7 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -2,6 +2,7 @@ import pytest import re import json import redis +from binascii import crc_hqx from redis import asyncio as aioredis import asyncio from dataclasses import dataclass @@ -68,6 +69,63 @@ def redis_cluster(port_picker): node.stop() +@dataclass +class MigrationInfo: + ip: str + port: int + slots: list + node_id: str + + +@dataclass +class NodeInfo: + instance: DflyInstance + client: aioredis.Redis + admin_client: aioredis.Redis + slots: list + next_slots: list + migrations: list + id: str + + +async def create_node_info(instance): + admin_client = instance.admin_client() + ninfo = NodeInfo( + instance=instance, + client=instance.client(), + admin_client=admin_client, + slots=[], + next_slots=[], + migrations=[], + id=await get_node_id(admin_client), + ) + return ninfo + + +def generate_config(nodes): + return [ + { + "slot_ranges": [{"start": s, "end": e} for (s, e) in node.slots], + "master": { + "id": node.id, + "ip": "127.0.0.1", + "port": node.instance.port, + }, + "replicas": [], + "migrations": [ + { + "slot_ranges": [{"start": s, "end": e} for (s, e) in m.slots], + "node_id": m.node_id, + "ip": m.ip, + "port": m.port, + } + for m in node.migrations + ], + } + for node in nodes + ] + + async def push_config(config, admin_connections): logging.debug("Pushing config %s", config) res = await asyncio.gather( @@ -76,16 +134,31 @@ async def push_config(config, admin_connections): assert all([r == "OK" for r in res]) -async def wait_for_status(admin_client, node_id, status): - while True: +async def wait_for_status(admin_client, node_id, status, timeout=10): + start = time.time() + while (time.time() - start) < timeout: response = await admin_client.execute_command( "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_id ) if status in response: - break + return else: logging.debug(f"SLOT-MIGRATION-STATUS is {response}, not {status}") await asyncio.sleep(0.05) + raise RuntimeError("Timeout to achieve migrations status") + + +async def check_for_no_state_status(admin_clients): + for client in admin_clients: + state = await client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") + if state != "NO_STATE": + logging.debug(f"SLOT-MIGRATION-STATUS is {state}, instead of NO_STATE") + assert False + + +def key_slot(key_str) -> int: + key = str.encode(key_str) + return crc_hqx(key, 0) % 16384 async def get_node_id(admin_connection): @@ -907,273 +980,163 @@ async def test_cluster_native_client( @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_config_consistency(df_local_factory: DflyInstanceFactory): # Check slot migration from one node to another - nodes = [ + instances = [ df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2) ] - df_local_factory.start_all(nodes) + df_local_factory.start_all(instances) - c_nodes = [node.client() for node in nodes] - c_nodes_admin = [node.admin_client() for node in nodes] + nodes = [(await create_node_info(instance)) for instance in instances] + nodes[0].slots = [(0, 5259)] + nodes[1].slots = [(5260, 16383)] - node_ids = await asyncio.gather(*(get_node_id(c) for c in c_nodes_admin)) + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) - config = f""" - [ - {{ - "slot_ranges": [ {{ "start": 0, "end": LAST_SLOT_CUTOFF }} ], - "master": {{ "id": "{node_ids[0]}", "ip": "localhost", "port": {nodes[0].port} }}, - "replicas": [] - }}, - {{ - "slot_ranges": [ {{ "start": NEXT_SLOT_CUTOFF, "end": 16383 }} ], - "master": {{ "id": "{node_ids[1]}", "ip": "localhost", "port": {nodes[1].port} }}, - "replicas": [] - }} - ] - """ + await check_for_no_state_status([node.admin_client for node in nodes]) - await push_config( - config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"), - c_nodes_admin, + nodes[0].migrations.append( + MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(5200, 5259)], nodes[1].id) ) - for node in c_nodes_admin: - assert await node.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == "NO_STATE" - - migation_config = f""" - [ - {{ - "slot_ranges": [ {{ "start": 0, "end": LAST_SLOT_CUTOFF }} ], - "master": {{ "id": "{node_ids[0]}", "ip": "localhost", "port": {nodes[0].port} }}, - "replicas": [], - "migrations": [{{ "slot_ranges": [ {{ "start": 5200, "end": 5259 }} ] - , "ip": "127.0.0.1", "port" : {nodes[1].admin_port}, "node_id": "{node_ids[1]}" }}] - }}, - {{ - "slot_ranges": [ {{ "start": NEXT_SLOT_CUTOFF, "end": 16383 }} ], - "master": {{ "id": "{node_ids[1]}", "ip": "localhost", "port": {nodes[1].port} }}, - "replicas": [] - }} - ] - """ - # Push config to source node. Migration will not start until target node gets the config as well. - await push_config( - migation_config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"), - [c_nodes_admin[0]], - ) - await wait_for_status(c_nodes_admin[0], node_ids[1], "CONNECTING") - await wait_for_status(c_nodes_admin[1], node_ids[0], "NO_STATE") + logging.debug("Push migration config to source node") + await push_config(json.dumps(generate_config(nodes)), [nodes[0].admin_client]) - await push_config( - migation_config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"), - [c_nodes_admin[1]], - ) + # some delay to check that migration isn't started until we send config to target node + await asyncio.sleep(0.2) - await wait_for_status(c_nodes_admin[1], node_ids[0], "FINISHED") - await wait_for_status(c_nodes_admin[0], node_ids[1], "FINISHED") + await wait_for_status(nodes[0].admin_client, nodes[1].id, "CONNECTING") + await wait_for_status(nodes[1].admin_client, nodes[0].id, "NO_STATE") - # remove finished migrations - await push_config( - config.replace("LAST_SLOT_CUTOFF", "5199").replace("NEXT_SLOT_CUTOFF", "5200"), - c_nodes_admin, - ) + logging.debug("Push migration config to target node") + await push_config(json.dumps(generate_config(nodes)), [nodes[1].admin_client]) - for node in c_nodes_admin: - assert await node.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == "NO_STATE" + await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED") + await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED") - await close_clients(*c_nodes, *c_nodes_admin) + nodes[0].migrations = [] + nodes[0].slots = [(0, 5199)] + nodes[1].slots = [(5200, 16383)] + + logging.debug("remove finished migrations") + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + await check_for_no_state_status([node.admin_client for node in nodes]) + await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes]) @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory): # Check data migration from one node to another - nodes = [ + instances = [ df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2) ] - df_local_factory.start_all(nodes) + df_local_factory.start_all(instances) - c_nodes = [node.client() for node in nodes] - c_nodes_admin = [node.admin_client() for node in nodes] + nodes = [(await create_node_info(instance)) for instance in instances] + nodes[0].slots = [(0, 9000)] + nodes[1].slots = [(9001, 16383)] - node_ids = await asyncio.gather(*(get_node_id(c) for c in c_nodes_admin)) + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) - config = f""" - [ - {{ - "slot_ranges": [ {{ "start": 0, "end": LAST_SLOT_CUTOFF }} ], - "master": {{ "id": "{node_ids[0]}", "ip": "localhost", "port": {nodes[0].port} }}, - "replicas": [] - }}, - {{ - "slot_ranges": [ {{ "start": NEXT_SLOT_CUTOFF, "end": 16383 }} ], - "master": {{ "id": "{node_ids[1]}", "ip": "localhost", "port": {nodes[1].port} }}, - "replicas": [] - }} - ] - """ + for i in range(20): + key = "KEY" + str(i) + assert await nodes[key_slot(key) // 9001].client.set(key, "value") - await push_config( - config.replace("LAST_SLOT_CUTOFF", "9000").replace("NEXT_SLOT_CUTOFF", "9001"), - c_nodes_admin, + assert await nodes[0].client.execute_command("DBSIZE") == 10 + + nodes[0].migrations.append( + MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(3000, 9000)], nodes[1].id) ) - assert await c_nodes[0].set("KEY0", "value") - assert await c_nodes[0].set("KEY1", "value") - assert await c_nodes[1].set("KEY2", "value") - assert await c_nodes[1].set("KEY3", "value") - assert await c_nodes[0].set("KEY4", "value") - assert await c_nodes[0].set("KEY5", "value") - assert await c_nodes[1].set("KEY6", "value") - assert await c_nodes[1].set("KEY7", "value") - assert await c_nodes[0].set("KEY8", "value") - assert await c_nodes[0].set("KEY9", "value") - assert await c_nodes[1].set("KEY10", "value") - assert await c_nodes[1].set("KEY11", "value") - assert await c_nodes[0].set("KEY12", "value") - assert await c_nodes[0].set("KEY13", "value") - assert await c_nodes[1].set("KEY14", "value") - assert await c_nodes[1].set("KEY15", "value") - assert await c_nodes[0].set("KEY16", "value") - assert await c_nodes[0].set("KEY17", "value") - assert await c_nodes[1].set("KEY18", "value") - assert await c_nodes[1].set("KEY19", "value") + logging.debug("Start migration") + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) - assert await c_nodes[0].execute_command("DBSIZE") == 10 + await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED") - migation_config = f""" - [ - {{ - "slot_ranges": [ {{ "start": 0, "end": LAST_SLOT_CUTOFF }} ], - "master": {{ "id": "{node_ids[0]}", "ip": "localhost", "port": {nodes[0].port} }}, - "replicas": [], - "migrations": [{{ "slot_ranges": [ {{ "start": 3000, "end": 9000 }} ] - , "ip": "127.0.0.1", "port" : {nodes[1].admin_port}, "node_id": "{node_ids[1]}" }}] - }}, - {{ - "slot_ranges": [ {{ "start": NEXT_SLOT_CUTOFF, "end": 16383 }} ], - "master": {{ "id": "{node_ids[1]}", "ip": "localhost", "port": {nodes[1].port} }}, - "replicas": [] - }} - ] - """ - - await push_config( - migation_config.replace("LAST_SLOT_CUTOFF", "9000").replace("NEXT_SLOT_CUTOFF", "9001"), - c_nodes_admin, - ) - - await wait_for_status(c_nodes_admin[1], node_ids[0], "FINISHED") - - assert await c_nodes[1].set("KEY20", "value") - assert await c_nodes[1].set("KEY21", "value") + for i in range(20, 22): + key = "KEY" + str(i) + assert await nodes[0 if (key_slot(key) // 3000) == 0 else 1].client.set(key, "value") assert ( - await c_nodes_admin[0].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_ids[1]) - ).startswith(f"""out {node_ids[1]} FINISHED keys:7""") + await nodes[0].admin_client.execute_command( + "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[1].id + ) + ).startswith(f"""out {nodes[1].id} FINISHED keys:7""") assert ( - await c_nodes_admin[1].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_ids[0]) - ).startswith(f"""in {node_ids[0]} FINISHED keys:7""") + await nodes[1].admin_client.execute_command( + "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[0].id + ) + ).startswith(f"""in {nodes[0].id} FINISHED keys:7""") - await push_config( - config.replace("LAST_SLOT_CUTOFF", "2999").replace("NEXT_SLOT_CUTOFF", "3000"), - c_nodes_admin, - ) + nodes[0].migrations = [] + nodes[0].slots = [(0, 2999)] + nodes[1].slots = [(3000, 16383)] + logging.debug("remove finished migrations") + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) - assert await c_nodes[0].get("KEY0") == "value" - assert await c_nodes[1].get("KEY1") == "value" - assert await c_nodes[1].get("KEY2") == "value" - assert await c_nodes[1].get("KEY3") == "value" - assert await c_nodes[0].get("KEY4") == "value" - assert await c_nodes[1].get("KEY5") == "value" - assert await c_nodes[1].get("KEY6") == "value" - assert await c_nodes[1].get("KEY7") == "value" - assert await c_nodes[0].get("KEY8") == "value" - assert await c_nodes[1].get("KEY9") == "value" - assert await c_nodes[1].get("KEY10") == "value" - assert await c_nodes[1].get("KEY11") == "value" - assert await c_nodes[1].get("KEY12") == "value" - assert await c_nodes[1].get("KEY13") == "value" - assert await c_nodes[1].get("KEY14") == "value" - assert await c_nodes[1].get("KEY15") == "value" - assert await c_nodes[1].get("KEY16") == "value" - assert await c_nodes[1].get("KEY17") == "value" - assert await c_nodes[1].get("KEY18") == "value" - assert await c_nodes[1].get("KEY19") == "value" - assert await c_nodes[1].get("KEY20") == "value" - assert await c_nodes[1].get("KEY21") == "value" - assert await c_nodes[1].execute_command("DBSIZE") == 19 + for i in range(22): + key = "KEY" + str(i) + assert await nodes[0 if (key_slot(key) // 3000) == 0 else 1].client.set(key, "value") - assert ( - await c_nodes_admin[1].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == "NO_STATE" - ) - assert ( - await c_nodes_admin[0].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == "NO_STATE" - ) + assert await nodes[1].client.execute_command("DBSIZE") == 19 - await close_clients(*c_nodes, *c_nodes_admin) + await check_for_no_state_status([node.admin_client for node in nodes]) + await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes]) -@dataclass -class MigrationInfo: - ip: str - port: int - slots: list - node_id: str - - -@dataclass -class NodeInfo: - instance: DflyInstance - client: aioredis.Redis - admin_client: aioredis.Redis - slots: list - next_slots: list - migrations: list - id: str - - -async def create_node_info(instance): - admin_client = instance.admin_client() - ninfo = NodeInfo( - instance=instance, - client=instance.client(), - admin_client=admin_client, - slots=[], - next_slots=[], - migrations=[], - id=await get_node_id(admin_client), - ) - return ninfo - - -def generate_config(nodes): - return [ - { - "slot_ranges": [{"start": s, "end": e} for (s, e) in node.slots], - "master": { - "id": node.id, - "ip": "127.0.0.1", - "port": node.instance.port, - }, - "replicas": [], - "migrations": [ - { - "slot_ranges": [{"start": s, "end": e} for (s, e) in m.slots], - "node_id": m.node_id, - "ip": m.ip, - "port": m.port, - } - for m in node.migrations - ], - } - for node in nodes +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) +async def test_network_disconnect_during_migration(df_local_factory, df_seeder_factory): + instances = [ + df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) + for i in range(2) ] + df_local_factory.start_all(instances) + + nodes = [(await create_node_info(instance)) for instance in instances] + nodes[0].slots = [(0, 16383)] + nodes[1].slots = [] + + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + seeder = df_seeder_factory.create(keys=30000, port=nodes[0].instance.port, cluster_mode=True) + + await seeder.run(target_deviation=0.1) + + proxy = Proxy("127.0.0.1", 1111, "127.0.0.1", nodes[1].instance.admin_port) + await proxy.start() + task = asyncio.create_task(proxy.serve()) + + nodes[0].migrations.append(MigrationInfo("127.0.0.1", proxy.port, [(0, 16383)], nodes[1].id)) + try: + logging.debug("Start migration") + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + for _ in range(10): + await asyncio.sleep(random.randint(0, 10) / 20) + logging.debug("drop connections") + proxy.drop_connection() + logging.debug( + await nodes[0].admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") + ) + finally: + await proxy.close(task) + + nodes[0].migrations = [] + nodes[0].slots = [] + nodes[1].slots = [(0, 16383)] + logging.debug("remove finished migrations") + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + capture = await seeder.capture() + assert await seeder.compare(capture, nodes[1].instance.port) + + await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes]) + @pytest.mark.parametrize( "node_count, segments, keys", diff --git a/tests/dragonfly/utility.py b/tests/dragonfly/utility.py index bee241022..8e2cfd61d 100644 --- a/tests/dragonfly/utility.py +++ b/tests/dragonfly/utility.py @@ -364,7 +364,7 @@ class DflySeeder: if cluster_mode: max_multikey = 1 multi_transaction_probability = 0 - unsupported_types = [ValueType.JSON] # Cluster aio client doesn't support JSON + unsupported_types.append(ValueType.JSON) # Cluster aio client doesn't support JSON self.cluster_mode = cluster_mode self.gen = CommandGenerator(keys, val_size, batch_size, max_multikey, unsupported_types)