mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
New test for cluster migration: connection issue (#3102)
* test: update test_config_consistency, update test_cluster_data_migration, new cluster migration test for network issues
This commit is contained in:
parent
b3ca27068a
commit
644dc3f139
3 changed files with 191 additions and 228 deletions
|
@ -156,7 +156,7 @@ void OutgoingMigration::SyncFb() {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (auto ec = SendCommandAndReadResponse(cmd); ec) {
|
if (auto ec = SendCommandAndReadResponse(cmd); ec) {
|
||||||
cntx_.ReportError(GenericError(ec, "Could send INIT command."));
|
cntx_.ReportError(GenericError(ec, "Could not send INIT command."));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ import pytest
|
||||||
import re
|
import re
|
||||||
import json
|
import json
|
||||||
import redis
|
import redis
|
||||||
|
from binascii import crc_hqx
|
||||||
from redis import asyncio as aioredis
|
from redis import asyncio as aioredis
|
||||||
import asyncio
|
import asyncio
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
@ -68,6 +69,63 @@ def redis_cluster(port_picker):
|
||||||
node.stop()
|
node.stop()
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class MigrationInfo:
|
||||||
|
ip: str
|
||||||
|
port: int
|
||||||
|
slots: list
|
||||||
|
node_id: str
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class NodeInfo:
|
||||||
|
instance: DflyInstance
|
||||||
|
client: aioredis.Redis
|
||||||
|
admin_client: aioredis.Redis
|
||||||
|
slots: list
|
||||||
|
next_slots: list
|
||||||
|
migrations: list
|
||||||
|
id: str
|
||||||
|
|
||||||
|
|
||||||
|
async def create_node_info(instance):
|
||||||
|
admin_client = instance.admin_client()
|
||||||
|
ninfo = NodeInfo(
|
||||||
|
instance=instance,
|
||||||
|
client=instance.client(),
|
||||||
|
admin_client=admin_client,
|
||||||
|
slots=[],
|
||||||
|
next_slots=[],
|
||||||
|
migrations=[],
|
||||||
|
id=await get_node_id(admin_client),
|
||||||
|
)
|
||||||
|
return ninfo
|
||||||
|
|
||||||
|
|
||||||
|
def generate_config(nodes):
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
"slot_ranges": [{"start": s, "end": e} for (s, e) in node.slots],
|
||||||
|
"master": {
|
||||||
|
"id": node.id,
|
||||||
|
"ip": "127.0.0.1",
|
||||||
|
"port": node.instance.port,
|
||||||
|
},
|
||||||
|
"replicas": [],
|
||||||
|
"migrations": [
|
||||||
|
{
|
||||||
|
"slot_ranges": [{"start": s, "end": e} for (s, e) in m.slots],
|
||||||
|
"node_id": m.node_id,
|
||||||
|
"ip": m.ip,
|
||||||
|
"port": m.port,
|
||||||
|
}
|
||||||
|
for m in node.migrations
|
||||||
|
],
|
||||||
|
}
|
||||||
|
for node in nodes
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
async def push_config(config, admin_connections):
|
async def push_config(config, admin_connections):
|
||||||
logging.debug("Pushing config %s", config)
|
logging.debug("Pushing config %s", config)
|
||||||
res = await asyncio.gather(
|
res = await asyncio.gather(
|
||||||
|
@ -76,16 +134,31 @@ async def push_config(config, admin_connections):
|
||||||
assert all([r == "OK" for r in res])
|
assert all([r == "OK" for r in res])
|
||||||
|
|
||||||
|
|
||||||
async def wait_for_status(admin_client, node_id, status):
|
async def wait_for_status(admin_client, node_id, status, timeout=10):
|
||||||
while True:
|
start = time.time()
|
||||||
|
while (time.time() - start) < timeout:
|
||||||
response = await admin_client.execute_command(
|
response = await admin_client.execute_command(
|
||||||
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_id
|
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_id
|
||||||
)
|
)
|
||||||
if status in response:
|
if status in response:
|
||||||
break
|
return
|
||||||
else:
|
else:
|
||||||
logging.debug(f"SLOT-MIGRATION-STATUS is {response}, not {status}")
|
logging.debug(f"SLOT-MIGRATION-STATUS is {response}, not {status}")
|
||||||
await asyncio.sleep(0.05)
|
await asyncio.sleep(0.05)
|
||||||
|
raise RuntimeError("Timeout to achieve migrations status")
|
||||||
|
|
||||||
|
|
||||||
|
async def check_for_no_state_status(admin_clients):
|
||||||
|
for client in admin_clients:
|
||||||
|
state = await client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS")
|
||||||
|
if state != "NO_STATE":
|
||||||
|
logging.debug(f"SLOT-MIGRATION-STATUS is {state}, instead of NO_STATE")
|
||||||
|
assert False
|
||||||
|
|
||||||
|
|
||||||
|
def key_slot(key_str) -> int:
|
||||||
|
key = str.encode(key_str)
|
||||||
|
return crc_hqx(key, 0) % 16384
|
||||||
|
|
||||||
|
|
||||||
async def get_node_id(admin_connection):
|
async def get_node_id(admin_connection):
|
||||||
|
@ -907,273 +980,163 @@ async def test_cluster_native_client(
|
||||||
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
||||||
async def test_config_consistency(df_local_factory: DflyInstanceFactory):
|
async def test_config_consistency(df_local_factory: DflyInstanceFactory):
|
||||||
# Check slot migration from one node to another
|
# Check slot migration from one node to another
|
||||||
nodes = [
|
instances = [
|
||||||
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
|
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
|
||||||
for i in range(2)
|
for i in range(2)
|
||||||
]
|
]
|
||||||
|
|
||||||
df_local_factory.start_all(nodes)
|
df_local_factory.start_all(instances)
|
||||||
|
|
||||||
c_nodes = [node.client() for node in nodes]
|
nodes = [(await create_node_info(instance)) for instance in instances]
|
||||||
c_nodes_admin = [node.admin_client() for node in nodes]
|
nodes[0].slots = [(0, 5259)]
|
||||||
|
nodes[1].slots = [(5260, 16383)]
|
||||||
|
|
||||||
node_ids = await asyncio.gather(*(get_node_id(c) for c in c_nodes_admin))
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
|
|
||||||
config = f"""
|
await check_for_no_state_status([node.admin_client for node in nodes])
|
||||||
[
|
|
||||||
{{
|
|
||||||
"slot_ranges": [ {{ "start": 0, "end": LAST_SLOT_CUTOFF }} ],
|
|
||||||
"master": {{ "id": "{node_ids[0]}", "ip": "localhost", "port": {nodes[0].port} }},
|
|
||||||
"replicas": []
|
|
||||||
}},
|
|
||||||
{{
|
|
||||||
"slot_ranges": [ {{ "start": NEXT_SLOT_CUTOFF, "end": 16383 }} ],
|
|
||||||
"master": {{ "id": "{node_ids[1]}", "ip": "localhost", "port": {nodes[1].port} }},
|
|
||||||
"replicas": []
|
|
||||||
}}
|
|
||||||
]
|
|
||||||
"""
|
|
||||||
|
|
||||||
await push_config(
|
nodes[0].migrations.append(
|
||||||
config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"),
|
MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(5200, 5259)], nodes[1].id)
|
||||||
c_nodes_admin,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
for node in c_nodes_admin:
|
|
||||||
assert await node.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == "NO_STATE"
|
|
||||||
|
|
||||||
migation_config = f"""
|
|
||||||
[
|
|
||||||
{{
|
|
||||||
"slot_ranges": [ {{ "start": 0, "end": LAST_SLOT_CUTOFF }} ],
|
|
||||||
"master": {{ "id": "{node_ids[0]}", "ip": "localhost", "port": {nodes[0].port} }},
|
|
||||||
"replicas": [],
|
|
||||||
"migrations": [{{ "slot_ranges": [ {{ "start": 5200, "end": 5259 }} ]
|
|
||||||
, "ip": "127.0.0.1", "port" : {nodes[1].admin_port}, "node_id": "{node_ids[1]}" }}]
|
|
||||||
}},
|
|
||||||
{{
|
|
||||||
"slot_ranges": [ {{ "start": NEXT_SLOT_CUTOFF, "end": 16383 }} ],
|
|
||||||
"master": {{ "id": "{node_ids[1]}", "ip": "localhost", "port": {nodes[1].port} }},
|
|
||||||
"replicas": []
|
|
||||||
}}
|
|
||||||
]
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Push config to source node. Migration will not start until target node gets the config as well.
|
# Push config to source node. Migration will not start until target node gets the config as well.
|
||||||
await push_config(
|
logging.debug("Push migration config to source node")
|
||||||
migation_config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"),
|
await push_config(json.dumps(generate_config(nodes)), [nodes[0].admin_client])
|
||||||
[c_nodes_admin[0]],
|
|
||||||
)
|
|
||||||
await wait_for_status(c_nodes_admin[0], node_ids[1], "CONNECTING")
|
|
||||||
await wait_for_status(c_nodes_admin[1], node_ids[0], "NO_STATE")
|
|
||||||
|
|
||||||
await push_config(
|
# some delay to check that migration isn't started until we send config to target node
|
||||||
migation_config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"),
|
await asyncio.sleep(0.2)
|
||||||
[c_nodes_admin[1]],
|
|
||||||
)
|
|
||||||
|
|
||||||
await wait_for_status(c_nodes_admin[1], node_ids[0], "FINISHED")
|
await wait_for_status(nodes[0].admin_client, nodes[1].id, "CONNECTING")
|
||||||
await wait_for_status(c_nodes_admin[0], node_ids[1], "FINISHED")
|
await wait_for_status(nodes[1].admin_client, nodes[0].id, "NO_STATE")
|
||||||
|
|
||||||
# remove finished migrations
|
logging.debug("Push migration config to target node")
|
||||||
await push_config(
|
await push_config(json.dumps(generate_config(nodes)), [nodes[1].admin_client])
|
||||||
config.replace("LAST_SLOT_CUTOFF", "5199").replace("NEXT_SLOT_CUTOFF", "5200"),
|
|
||||||
c_nodes_admin,
|
|
||||||
)
|
|
||||||
|
|
||||||
for node in c_nodes_admin:
|
await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED")
|
||||||
assert await node.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == "NO_STATE"
|
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED")
|
||||||
|
|
||||||
await close_clients(*c_nodes, *c_nodes_admin)
|
nodes[0].migrations = []
|
||||||
|
nodes[0].slots = [(0, 5199)]
|
||||||
|
nodes[1].slots = [(5200, 16383)]
|
||||||
|
|
||||||
|
logging.debug("remove finished migrations")
|
||||||
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
|
|
||||||
|
await check_for_no_state_status([node.admin_client for node in nodes])
|
||||||
|
await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])
|
||||||
|
|
||||||
|
|
||||||
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
||||||
async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory):
|
async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory):
|
||||||
# Check data migration from one node to another
|
# Check data migration from one node to another
|
||||||
nodes = [
|
instances = [
|
||||||
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
|
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
|
||||||
for i in range(2)
|
for i in range(2)
|
||||||
]
|
]
|
||||||
|
|
||||||
df_local_factory.start_all(nodes)
|
df_local_factory.start_all(instances)
|
||||||
|
|
||||||
c_nodes = [node.client() for node in nodes]
|
nodes = [(await create_node_info(instance)) for instance in instances]
|
||||||
c_nodes_admin = [node.admin_client() for node in nodes]
|
nodes[0].slots = [(0, 9000)]
|
||||||
|
nodes[1].slots = [(9001, 16383)]
|
||||||
|
|
||||||
node_ids = await asyncio.gather(*(get_node_id(c) for c in c_nodes_admin))
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
|
|
||||||
config = f"""
|
for i in range(20):
|
||||||
[
|
key = "KEY" + str(i)
|
||||||
{{
|
assert await nodes[key_slot(key) // 9001].client.set(key, "value")
|
||||||
"slot_ranges": [ {{ "start": 0, "end": LAST_SLOT_CUTOFF }} ],
|
|
||||||
"master": {{ "id": "{node_ids[0]}", "ip": "localhost", "port": {nodes[0].port} }},
|
|
||||||
"replicas": []
|
|
||||||
}},
|
|
||||||
{{
|
|
||||||
"slot_ranges": [ {{ "start": NEXT_SLOT_CUTOFF, "end": 16383 }} ],
|
|
||||||
"master": {{ "id": "{node_ids[1]}", "ip": "localhost", "port": {nodes[1].port} }},
|
|
||||||
"replicas": []
|
|
||||||
}}
|
|
||||||
]
|
|
||||||
"""
|
|
||||||
|
|
||||||
await push_config(
|
assert await nodes[0].client.execute_command("DBSIZE") == 10
|
||||||
config.replace("LAST_SLOT_CUTOFF", "9000").replace("NEXT_SLOT_CUTOFF", "9001"),
|
|
||||||
c_nodes_admin,
|
nodes[0].migrations.append(
|
||||||
|
MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(3000, 9000)], nodes[1].id)
|
||||||
)
|
)
|
||||||
|
|
||||||
assert await c_nodes[0].set("KEY0", "value")
|
logging.debug("Start migration")
|
||||||
assert await c_nodes[0].set("KEY1", "value")
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
assert await c_nodes[1].set("KEY2", "value")
|
|
||||||
assert await c_nodes[1].set("KEY3", "value")
|
|
||||||
assert await c_nodes[0].set("KEY4", "value")
|
|
||||||
assert await c_nodes[0].set("KEY5", "value")
|
|
||||||
assert await c_nodes[1].set("KEY6", "value")
|
|
||||||
assert await c_nodes[1].set("KEY7", "value")
|
|
||||||
assert await c_nodes[0].set("KEY8", "value")
|
|
||||||
assert await c_nodes[0].set("KEY9", "value")
|
|
||||||
assert await c_nodes[1].set("KEY10", "value")
|
|
||||||
assert await c_nodes[1].set("KEY11", "value")
|
|
||||||
assert await c_nodes[0].set("KEY12", "value")
|
|
||||||
assert await c_nodes[0].set("KEY13", "value")
|
|
||||||
assert await c_nodes[1].set("KEY14", "value")
|
|
||||||
assert await c_nodes[1].set("KEY15", "value")
|
|
||||||
assert await c_nodes[0].set("KEY16", "value")
|
|
||||||
assert await c_nodes[0].set("KEY17", "value")
|
|
||||||
assert await c_nodes[1].set("KEY18", "value")
|
|
||||||
assert await c_nodes[1].set("KEY19", "value")
|
|
||||||
|
|
||||||
assert await c_nodes[0].execute_command("DBSIZE") == 10
|
await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED")
|
||||||
|
|
||||||
migation_config = f"""
|
for i in range(20, 22):
|
||||||
[
|
key = "KEY" + str(i)
|
||||||
{{
|
assert await nodes[0 if (key_slot(key) // 3000) == 0 else 1].client.set(key, "value")
|
||||||
"slot_ranges": [ {{ "start": 0, "end": LAST_SLOT_CUTOFF }} ],
|
|
||||||
"master": {{ "id": "{node_ids[0]}", "ip": "localhost", "port": {nodes[0].port} }},
|
|
||||||
"replicas": [],
|
|
||||||
"migrations": [{{ "slot_ranges": [ {{ "start": 3000, "end": 9000 }} ]
|
|
||||||
, "ip": "127.0.0.1", "port" : {nodes[1].admin_port}, "node_id": "{node_ids[1]}" }}]
|
|
||||||
}},
|
|
||||||
{{
|
|
||||||
"slot_ranges": [ {{ "start": NEXT_SLOT_CUTOFF, "end": 16383 }} ],
|
|
||||||
"master": {{ "id": "{node_ids[1]}", "ip": "localhost", "port": {nodes[1].port} }},
|
|
||||||
"replicas": []
|
|
||||||
}}
|
|
||||||
]
|
|
||||||
"""
|
|
||||||
|
|
||||||
await push_config(
|
|
||||||
migation_config.replace("LAST_SLOT_CUTOFF", "9000").replace("NEXT_SLOT_CUTOFF", "9001"),
|
|
||||||
c_nodes_admin,
|
|
||||||
)
|
|
||||||
|
|
||||||
await wait_for_status(c_nodes_admin[1], node_ids[0], "FINISHED")
|
|
||||||
|
|
||||||
assert await c_nodes[1].set("KEY20", "value")
|
|
||||||
assert await c_nodes[1].set("KEY21", "value")
|
|
||||||
|
|
||||||
assert (
|
assert (
|
||||||
await c_nodes_admin[0].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_ids[1])
|
await nodes[0].admin_client.execute_command(
|
||||||
).startswith(f"""out {node_ids[1]} FINISHED keys:7""")
|
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[1].id
|
||||||
|
)
|
||||||
|
).startswith(f"""out {nodes[1].id} FINISHED keys:7""")
|
||||||
assert (
|
assert (
|
||||||
await c_nodes_admin[1].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_ids[0])
|
await nodes[1].admin_client.execute_command(
|
||||||
).startswith(f"""in {node_ids[0]} FINISHED keys:7""")
|
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[0].id
|
||||||
|
)
|
||||||
|
).startswith(f"""in {nodes[0].id} FINISHED keys:7""")
|
||||||
|
|
||||||
await push_config(
|
nodes[0].migrations = []
|
||||||
config.replace("LAST_SLOT_CUTOFF", "2999").replace("NEXT_SLOT_CUTOFF", "3000"),
|
nodes[0].slots = [(0, 2999)]
|
||||||
c_nodes_admin,
|
nodes[1].slots = [(3000, 16383)]
|
||||||
)
|
logging.debug("remove finished migrations")
|
||||||
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
|
|
||||||
assert await c_nodes[0].get("KEY0") == "value"
|
for i in range(22):
|
||||||
assert await c_nodes[1].get("KEY1") == "value"
|
key = "KEY" + str(i)
|
||||||
assert await c_nodes[1].get("KEY2") == "value"
|
assert await nodes[0 if (key_slot(key) // 3000) == 0 else 1].client.set(key, "value")
|
||||||
assert await c_nodes[1].get("KEY3") == "value"
|
|
||||||
assert await c_nodes[0].get("KEY4") == "value"
|
|
||||||
assert await c_nodes[1].get("KEY5") == "value"
|
|
||||||
assert await c_nodes[1].get("KEY6") == "value"
|
|
||||||
assert await c_nodes[1].get("KEY7") == "value"
|
|
||||||
assert await c_nodes[0].get("KEY8") == "value"
|
|
||||||
assert await c_nodes[1].get("KEY9") == "value"
|
|
||||||
assert await c_nodes[1].get("KEY10") == "value"
|
|
||||||
assert await c_nodes[1].get("KEY11") == "value"
|
|
||||||
assert await c_nodes[1].get("KEY12") == "value"
|
|
||||||
assert await c_nodes[1].get("KEY13") == "value"
|
|
||||||
assert await c_nodes[1].get("KEY14") == "value"
|
|
||||||
assert await c_nodes[1].get("KEY15") == "value"
|
|
||||||
assert await c_nodes[1].get("KEY16") == "value"
|
|
||||||
assert await c_nodes[1].get("KEY17") == "value"
|
|
||||||
assert await c_nodes[1].get("KEY18") == "value"
|
|
||||||
assert await c_nodes[1].get("KEY19") == "value"
|
|
||||||
assert await c_nodes[1].get("KEY20") == "value"
|
|
||||||
assert await c_nodes[1].get("KEY21") == "value"
|
|
||||||
assert await c_nodes[1].execute_command("DBSIZE") == 19
|
|
||||||
|
|
||||||
assert (
|
assert await nodes[1].client.execute_command("DBSIZE") == 19
|
||||||
await c_nodes_admin[1].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == "NO_STATE"
|
|
||||||
)
|
|
||||||
assert (
|
|
||||||
await c_nodes_admin[0].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == "NO_STATE"
|
|
||||||
)
|
|
||||||
|
|
||||||
await close_clients(*c_nodes, *c_nodes_admin)
|
await check_for_no_state_status([node.admin_client for node in nodes])
|
||||||
|
await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
||||||
class MigrationInfo:
|
async def test_network_disconnect_during_migration(df_local_factory, df_seeder_factory):
|
||||||
ip: str
|
instances = [
|
||||||
port: int
|
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
|
||||||
slots: list
|
for i in range(2)
|
||||||
node_id: str
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class NodeInfo:
|
|
||||||
instance: DflyInstance
|
|
||||||
client: aioredis.Redis
|
|
||||||
admin_client: aioredis.Redis
|
|
||||||
slots: list
|
|
||||||
next_slots: list
|
|
||||||
migrations: list
|
|
||||||
id: str
|
|
||||||
|
|
||||||
|
|
||||||
async def create_node_info(instance):
|
|
||||||
admin_client = instance.admin_client()
|
|
||||||
ninfo = NodeInfo(
|
|
||||||
instance=instance,
|
|
||||||
client=instance.client(),
|
|
||||||
admin_client=admin_client,
|
|
||||||
slots=[],
|
|
||||||
next_slots=[],
|
|
||||||
migrations=[],
|
|
||||||
id=await get_node_id(admin_client),
|
|
||||||
)
|
|
||||||
return ninfo
|
|
||||||
|
|
||||||
|
|
||||||
def generate_config(nodes):
|
|
||||||
return [
|
|
||||||
{
|
|
||||||
"slot_ranges": [{"start": s, "end": e} for (s, e) in node.slots],
|
|
||||||
"master": {
|
|
||||||
"id": node.id,
|
|
||||||
"ip": "127.0.0.1",
|
|
||||||
"port": node.instance.port,
|
|
||||||
},
|
|
||||||
"replicas": [],
|
|
||||||
"migrations": [
|
|
||||||
{
|
|
||||||
"slot_ranges": [{"start": s, "end": e} for (s, e) in m.slots],
|
|
||||||
"node_id": m.node_id,
|
|
||||||
"ip": m.ip,
|
|
||||||
"port": m.port,
|
|
||||||
}
|
|
||||||
for m in node.migrations
|
|
||||||
],
|
|
||||||
}
|
|
||||||
for node in nodes
|
|
||||||
]
|
]
|
||||||
|
|
||||||
|
df_local_factory.start_all(instances)
|
||||||
|
|
||||||
|
nodes = [(await create_node_info(instance)) for instance in instances]
|
||||||
|
nodes[0].slots = [(0, 16383)]
|
||||||
|
nodes[1].slots = []
|
||||||
|
|
||||||
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
|
|
||||||
|
seeder = df_seeder_factory.create(keys=30000, port=nodes[0].instance.port, cluster_mode=True)
|
||||||
|
|
||||||
|
await seeder.run(target_deviation=0.1)
|
||||||
|
|
||||||
|
proxy = Proxy("127.0.0.1", 1111, "127.0.0.1", nodes[1].instance.admin_port)
|
||||||
|
await proxy.start()
|
||||||
|
task = asyncio.create_task(proxy.serve())
|
||||||
|
|
||||||
|
nodes[0].migrations.append(MigrationInfo("127.0.0.1", proxy.port, [(0, 16383)], nodes[1].id))
|
||||||
|
try:
|
||||||
|
logging.debug("Start migration")
|
||||||
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
|
|
||||||
|
for _ in range(10):
|
||||||
|
await asyncio.sleep(random.randint(0, 10) / 20)
|
||||||
|
logging.debug("drop connections")
|
||||||
|
proxy.drop_connection()
|
||||||
|
logging.debug(
|
||||||
|
await nodes[0].admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS")
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
await proxy.close(task)
|
||||||
|
|
||||||
|
nodes[0].migrations = []
|
||||||
|
nodes[0].slots = []
|
||||||
|
nodes[1].slots = [(0, 16383)]
|
||||||
|
logging.debug("remove finished migrations")
|
||||||
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
|
|
||||||
|
capture = await seeder.capture()
|
||||||
|
assert await seeder.compare(capture, nodes[1].instance.port)
|
||||||
|
|
||||||
|
await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"node_count, segments, keys",
|
"node_count, segments, keys",
|
||||||
|
|
|
@ -364,7 +364,7 @@ class DflySeeder:
|
||||||
if cluster_mode:
|
if cluster_mode:
|
||||||
max_multikey = 1
|
max_multikey = 1
|
||||||
multi_transaction_probability = 0
|
multi_transaction_probability = 0
|
||||||
unsupported_types = [ValueType.JSON] # Cluster aio client doesn't support JSON
|
unsupported_types.append(ValueType.JSON) # Cluster aio client doesn't support JSON
|
||||||
|
|
||||||
self.cluster_mode = cluster_mode
|
self.cluster_mode = cluster_mode
|
||||||
self.gen = CommandGenerator(keys, val_size, batch_size, max_multikey, unsupported_types)
|
self.gen = CommandGenerator(keys, val_size, batch_size, max_multikey, unsupported_types)
|
||||||
|
|
Loading…
Reference in a new issue